diff --git a/Cargo.lock b/Cargo.lock index f0f7e6f8c..bfaa55d93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3051,7 +3051,7 @@ dependencies = [ "chrono", "getrandom 0.2.17", "http", - "rand 0.8.5", + "rand 0.8.6", "reqwest", "serde", "serde_json", diff --git a/architecture/gateway-single-node.md b/architecture/gateway-single-node.md index 01b69b2f5..d7874a01a 100644 --- a/architecture/gateway-single-node.md +++ b/architecture/gateway-single-node.md @@ -70,6 +70,7 @@ Development task entrypoints split bootstrap behavior: For `mise run cluster`, `.env` acts as local source-of-truth for `GATEWAY_NAME`, `GATEWAY_PORT`, and `OPENSHELL_GATEWAY`. Missing keys are appended; existing values are preserved. If `GATEWAY_PORT` is missing, the task selects a free local port and persists it. Fast mode ensures a local registry (`127.0.0.1:5000`) is running and configures k3s to mirror pulls via `host.docker.internal:5000`, so the cluster task can push/pull local component images consistently. +Sandbox service routing is always configured for gateway deployments. Bootstrap derives a default service base domain from the gateway name, and fast deploy passes the same domain through Helm so incremental deploys do not reset it. ## Bootstrap Sequence Diagram diff --git a/crates/openshell-bootstrap/src/docker.rs b/crates/openshell-bootstrap/src/docker.rs index c18c938aa..4ef134e08 100644 --- a/crates/openshell-bootstrap/src/docker.rs +++ b/crates/openshell-bootstrap/src/docker.rs @@ -493,6 +493,7 @@ pub async fn ensure_image( /// because the container was originally created with a different port. // Refactoring this signature would touch many call sites across the workspace. #[allow(clippy::too_many_arguments)] +#[allow(clippy::fn_params_excessive_bools)] pub async fn ensure_container( docker: &Docker, name: &str, @@ -502,6 +503,7 @@ pub async fn ensure_container( gateway_port: u16, disable_tls: bool, disable_gateway_auth: bool, + service_base_domains: &[String], registry_username: Option<&str>, registry_token: Option<&str>, device_ids: &[String], @@ -748,6 +750,10 @@ pub async fn ensure_container( // gateway port for remote clusters must match. env_vars.push(format!("SSH_GATEWAY_PORT={gateway_port}")); } + env_vars.push(format!( + "SERVICE_BASE_DOMAINS={}", + service_base_domains.join(",") + )); // Pass image configuration to the cluster entrypoint. // The effective tag is resolved from the runtime IMAGE_TAG env var (if set) diff --git a/crates/openshell-bootstrap/src/lib.rs b/crates/openshell-bootstrap/src/lib.rs index 5d2a17afd..5d4b07d4b 100644 --- a/crates/openshell-bootstrap/src/lib.rs +++ b/crates/openshell-bootstrap/src/lib.rs @@ -86,6 +86,7 @@ impl RemoteOptions { pub const DEFAULT_GATEWAY_PORT: u16 = 8080; #[derive(Debug, Clone)] +#[allow(clippy::struct_excessive_bools)] pub struct DeployOptions { pub name: String, pub image_ref: Option, @@ -104,6 +105,8 @@ pub struct DeployOptions { /// Disable gateway authentication (mTLS client certificate requirement). /// Ignored when `disable_tls` is true. pub disable_gateway_auth: bool, + /// Base domains accepted for sandbox service routing. + pub service_base_domains: Vec, /// Registry authentication username. Defaults to `__token__` when a /// `registry_token` is provided but no username is set. Only needed /// for private registries — public GHCR repos pull without auth. @@ -150,6 +153,7 @@ impl DeployOptions { gateway_host: None, disable_tls: false, disable_gateway_auth: false, + service_base_domains: Vec::new(), registry_username: None, registry_token: None, gpu: vec![], @@ -199,6 +203,16 @@ impl DeployOptions { self } + #[must_use] + pub fn with_service_base_domains(mut self, domains: I) -> Self + where + I: IntoIterator, + S: Into, + { + self.service_base_domains = domains.into_iter().map(Into::into).collect(); + self + } + /// Set the registry authentication username. #[must_use] pub fn with_registry_username(mut self, username: impl Into) -> Self { @@ -332,6 +346,11 @@ where let gateway_host = options.gateway_host; let disable_tls = options.disable_tls; let disable_gateway_auth = options.disable_gateway_auth; + let service_base_domains = if options.service_base_domains.is_empty() { + vec![openshell_core::config::default_service_base_domain_for_gateway(&name)] + } else { + options.service_base_domains + }; let registry_username = options.registry_username; let registry_token = options.registry_token; let gpu = options.gpu; @@ -475,6 +494,10 @@ where { sans.push(host.clone()); } + for base_domain in &service_base_domains { + sans.push(base_domain.clone()); + sans.push(format!("*.{base_domain}")); + } (sans, gateway_host) }; @@ -524,6 +547,7 @@ where port, disable_tls, disable_gateway_auth, + &service_base_domains, registry_username.as_deref(), registry_token.as_deref(), &device_ids, diff --git a/crates/openshell-cli/src/main.rs b/crates/openshell-cli/src/main.rs index ccad7a099..05bc68e03 100644 --- a/crates/openshell-cli/src/main.rs +++ b/crates/openshell-cli/src/main.rs @@ -200,6 +200,7 @@ const HELP_TEMPLATE: &str = "\ \x1b[1mSANDBOX COMMANDS\x1b[0m sandbox: Manage sandboxes + service: Expose sandbox services forward: Manage port forwarding to a sandbox logs: View sandbox logs policy: Manage sandbox policy @@ -415,6 +416,13 @@ enum Commands { command: Option, }, + /// Expose sandbox services. + #[command(help_template = SUBCOMMAND_HELP_TEMPLATE)] + Service { + #[command(subcommand)] + command: Option, + }, + /// View sandbox logs. #[command(alias = "lg", after_help = LOGS_EXAMPLES, help_template = LEAF_HELP_TEMPLATE, next_help_heading = "FLAGS")] Logs { @@ -826,6 +834,11 @@ enum GatewayCommands { #[arg(long)] disable_gateway_auth: bool, + /// Base domain accepted for sandbox service routes. May be repeated. + /// Defaults to `.openshell.localhost`. + #[arg(long = "service-base-domain")] + service_base_domains: Vec, + /// Username for authenticating with the container image registry. /// /// Defaults to `__token__` when `--registry-token` is set (the @@ -1774,6 +1787,24 @@ enum ForwardCommands { List, } +#[derive(Subcommand, Debug)] +enum ServiceCommands { + /// Expose an HTTP service running inside a sandbox. + #[command(help_template = LEAF_HELP_TEMPLATE, next_help_heading = "FLAGS")] + Expose { + /// Sandbox name. + #[arg(add = ArgValueCompleter::new(completers::complete_sandbox_names))] + sandbox: String, + + /// Service name. + service: String, + + /// Loopback TCP port inside the sandbox. + #[arg(long)] + target_port: u16, + }, +} + #[tokio::main] #[allow(clippy::large_stack_frames)] // CLI dispatch holds many futures; OK at top level. async fn main() -> Result<()> { @@ -1836,6 +1867,7 @@ async fn main() -> Result<()> { recreate, plaintext, disable_gateway_auth, + service_base_domains, registry_username, registry_token, gpu, @@ -1862,6 +1894,7 @@ async fn main() -> Result<()> { recreate, plaintext, disable_gateway_auth, + service_base_domains, registry_username.as_deref(), registry_token.as_deref(), gpu, @@ -2118,6 +2151,22 @@ async fn main() -> Result<()> { } }, + // ----------------------------------------------------------- + // Service exposure + // ----------------------------------------------------------- + Some(Commands::Service { + command: + Some(ServiceCommands::Expose { + sandbox, + service, + target_port, + }), + }) => { + let ctx = resolve_gateway(&cli.gateway, &cli.gateway_endpoint)?; + let mut tls = tls.with_gateway_name(&ctx.name); + apply_auth(&mut tls, &ctx.name); + run::service_expose(&ctx.endpoint, &sandbox, &service, target_port, &tls).await?; + } // ----------------------------------------------------------- // Top-level logs (was `sandbox logs`) // ----------------------------------------------------------- @@ -2869,6 +2918,13 @@ async fn main() -> Result<()> { .print_help() .expect("Failed to print help"); } + Some(Commands::Service { command: None }) => { + Cli::command() + .find_subcommand_mut("service") + .expect("service subcommand exists") + .print_help() + .expect("Failed to print help"); + } Some(Commands::Policy { command: None }) => { Cli::command() .find_subcommand_mut("policy") diff --git a/crates/openshell-cli/src/run.rs b/crates/openshell-cli/src/run.rs index eaadf7908..2b81932d9 100644 --- a/crates/openshell-cli/src/run.rs +++ b/crates/openshell-cli/src/run.rs @@ -25,13 +25,14 @@ use openshell_bootstrap::{ use openshell_core::proto::{ ApproveAllDraftChunksRequest, ApproveDraftChunkRequest, ClearDraftChunksRequest, CreateProviderRequest, CreateSandboxRequest, DeleteProviderRequest, DeleteSandboxRequest, - ExecSandboxRequest, GetClusterInferenceRequest, GetDraftHistoryRequest, GetDraftPolicyRequest, - GetGatewayConfigRequest, GetProviderRequest, GetSandboxConfigRequest, GetSandboxLogsRequest, - GetSandboxPolicyStatusRequest, GetSandboxRequest, HealthRequest, ListProvidersRequest, - ListSandboxPoliciesRequest, ListSandboxesRequest, PolicySource, PolicyStatus, Provider, - RejectDraftChunkRequest, Sandbox, SandboxPhase, SandboxPolicy, SandboxSpec, SandboxTemplate, - SetClusterInferenceRequest, SettingScope, SettingValue, UpdateConfigRequest, - UpdateProviderRequest, WatchSandboxRequest, exec_sandbox_event, setting_value, + ExecSandboxRequest, ExposeServiceRequest, GetClusterInferenceRequest, GetDraftHistoryRequest, + GetDraftPolicyRequest, GetGatewayConfigRequest, GetProviderRequest, GetSandboxConfigRequest, + GetSandboxLogsRequest, GetSandboxPolicyStatusRequest, GetSandboxRequest, HealthRequest, + ListProvidersRequest, ListSandboxPoliciesRequest, ListSandboxesRequest, PolicySource, + PolicyStatus, Provider, RejectDraftChunkRequest, Sandbox, SandboxPhase, SandboxPolicy, + SandboxSpec, SandboxTemplate, SetClusterInferenceRequest, SettingScope, SettingValue, + UpdateConfigRequest, UpdateProviderRequest, WatchSandboxRequest, exec_sandbox_event, + setting_value, }; use openshell_core::settings::{self, SettingValueKind}; use openshell_core::{ObjectId, ObjectName}; @@ -1579,6 +1580,7 @@ fn print_failure_diagnosis(diagnosis: &openshell_bootstrap::errors::GatewayFailu /// Provision or start a gateway (local or remote). #[allow(clippy::too_many_arguments)] // user-facing CLI command +#[allow(clippy::fn_params_excessive_bools)] pub async fn gateway_admin_deploy( name: &str, remote: Option<&str>, @@ -1588,6 +1590,7 @@ pub async fn gateway_admin_deploy( recreate: bool, disable_tls: bool, disable_gateway_auth: bool, + service_base_domains: Vec, registry_username: Option<&str>, registry_token: Option<&str>, gpu: Vec, @@ -1649,6 +1652,7 @@ pub async fn gateway_admin_deploy( .with_port(effective_port) .with_disable_tls(disable_tls) .with_disable_gateway_auth(disable_gateway_auth) + .with_service_base_domains(service_base_domains) .with_gpu(gpu) .with_recreate(recreate); if let Some(opts) = remote_opts { @@ -3712,6 +3716,57 @@ fn parse_credential_pairs(items: &[String]) -> Result> { Ok(map) } +pub async fn service_expose( + server: &str, + sandbox: &str, + service: &str, + target_port: u16, + tls: &TlsOptions, +) -> Result<()> { + let mut client = grpc_client(server, tls).await?; + let response = client + .expose_service(ExposeServiceRequest { + sandbox: sandbox.to_string(), + service: service.to_string(), + target_port: u32::from(target_port), + domain: true, + }) + .await + .map_err(|status| miette::miette!("expose service failed: {status}"))? + .into_inner(); + + println!( + "{} Exposed service {} on sandbox {} -> 127.0.0.1:{}", + "✓".green().bold(), + service.bold(), + sandbox.bold(), + target_port, + ); + if !response.url.is_empty() { + let url = service_url_for_gateway(&response.url, server); + println!(" URL: {}", url.cyan()); + } + Ok(()) +} + +fn service_url_for_gateway(service_url: &str, gateway_endpoint: &str) -> String { + let (Ok(mut service_url), Ok(gateway_endpoint)) = ( + url::Url::parse(service_url), + url::Url::parse(gateway_endpoint), + ) else { + return service_url.to_string(); + }; + + if service_url.set_scheme(gateway_endpoint.scheme()).is_err() { + return service_url.to_string(); + } + if service_url.set_port(gateway_endpoint.port()).is_err() { + return service_url.to_string(); + } + + service_url.to_string() +} + pub async fn provider_create( server: &str, name: &str, @@ -5717,8 +5772,8 @@ mod tests { gateway_type_label, git_sync_files, http_health_check, image_requests_gpu, inferred_provider_type, parse_cli_setting_value, parse_credential_pairs, plaintext_gateway_is_remote, provisioning_timeout_message, ready_false_condition_message, - resolve_gateway_control_target_from, sandbox_should_persist, shell_escape, - source_requests_gpu, validate_gateway_name, validate_ssh_host, + resolve_gateway_control_target_from, sandbox_should_persist, service_url_for_gateway, + shell_escape, source_requests_gpu, validate_gateway_name, validate_ssh_host, }; use crate::TEST_ENV_LOCK; use hyper::StatusCode; @@ -5964,6 +6019,28 @@ mod tests { assert!(!source_requests_gpu("base")); } + #[test] + fn service_url_for_gateway_uses_external_gateway_port() { + assert_eq!( + service_url_for_gateway( + "https://quiet-flamingo--openclaw.navigator.openshell.localhost:8080/", + "https://127.0.0.1:31886" + ), + "https://quiet-flamingo--openclaw.navigator.openshell.localhost:31886/" + ); + } + + #[test] + fn service_url_for_gateway_omits_default_external_port() { + assert_eq!( + service_url_for_gateway( + "http://quiet-flamingo--openclaw.navigator.openshell.localhost:8080/", + "https://gateway.example.com" + ), + "https://quiet-flamingo--openclaw.navigator.openshell.localhost/" + ); + } + #[test] fn ready_false_condition_message_prefers_reason_and_message() { let status = SandboxStatus { diff --git a/crates/openshell-cli/tests/ensure_providers_integration.rs b/crates/openshell-cli/tests/ensure_providers_integration.rs index a5a485735..bd40718e3 100644 --- a/crates/openshell-cli/tests/ensure_providers_integration.rs +++ b/crates/openshell-cli/tests/ensure_providers_integration.rs @@ -190,6 +190,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/mtls_integration.rs b/crates/openshell-cli/tests/mtls_integration.rs index 77d33f7b0..8fe12f08f 100644 --- a/crates/openshell-cli/tests/mtls_integration.rs +++ b/crates/openshell-cli/tests/mtls_integration.rs @@ -143,6 +143,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/provider_commands_integration.rs b/crates/openshell-cli/tests/provider_commands_integration.rs index d151e5a1c..fed005efe 100644 --- a/crates/openshell-cli/tests/provider_commands_integration.rs +++ b/crates/openshell-cli/tests/provider_commands_integration.rs @@ -140,6 +140,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs b/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs index e69d06f4f..895d248e9 100644 --- a/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs +++ b/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs @@ -203,6 +203,15 @@ impl OpenShell for TestOpenShell { })) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs b/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs index 7d6a9536a..09026a430 100644 --- a/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs +++ b/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs @@ -177,6 +177,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-core/src/config.rs b/crates/openshell-core/src/config.rs index 8d59cb79b..96dd9cad8 100644 --- a/crates/openshell-core/src/config.rs +++ b/crates/openshell-core/src/config.rs @@ -31,6 +31,12 @@ pub const DEFAULT_SSH_HANDSHAKE_SKEW_SECS: u64 = 300; /// Default Podman bridge network name. pub const DEFAULT_NETWORK_NAME: &str = "openshell"; +/// Default root used to derive browser-facing sandbox service domains. +pub const DEFAULT_SERVICE_BASE_DOMAIN_ROOT: &str = "openshell.localhost"; + +/// Default gateway name used when no gateway-specific service base domain exists. +pub const DEFAULT_GATEWAY_NAME: &str = "openshell"; + /// Default OCI image for the openshell-sandbox supervisor binary. pub const DEFAULT_SUPERVISOR_IMAGE: &str = "openshell/supervisor:latest"; @@ -234,6 +240,19 @@ pub struct Config { /// allowing them to reach services running on the Docker host. #[serde(default)] pub host_gateway_ip: String, + + /// Browser-facing sandbox service routing configuration. + #[serde(default)] + pub service_routing: ServiceRoutingConfig, +} + +/// Browser-facing sandbox service routing configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServiceRoutingConfig { + /// Base domains accepted for `sandbox--service.` routes. + /// The first domain is used when the gateway prints endpoint URLs. + #[serde(default = "default_service_base_domains")] + pub service_base_domains: Vec, } /// TLS configuration. @@ -346,6 +365,7 @@ impl Config { ssh_session_ttl_secs: default_ssh_session_ttl_secs(), client_tls_secret_name: String::new(), host_gateway_ip: String::new(), + service_routing: ServiceRoutingConfig::default(), } } @@ -483,6 +503,27 @@ impl Config { self } + /// Configure browser-facing sandbox service base domains. + #[must_use] + pub fn with_service_base_domains(mut self, domains: I) -> Self + where + I: IntoIterator, + S: Into, + { + let domains: Vec = domains + .into_iter() + .filter_map(|domain| normalize_service_base_domain(domain.into())) + .collect(); + self.service_routing = ServiceRoutingConfig { + service_base_domains: if domains.is_empty() { + default_service_base_domains() + } else { + domains + }, + }; + self + } + /// Set the OIDC configuration for JWT-based authentication. #[must_use] pub fn with_oidc(mut self, oidc: OidcConfig) -> Self { @@ -491,10 +532,33 @@ impl Config { } } +impl Default for ServiceRoutingConfig { + fn default() -> Self { + Self { + service_base_domains: default_service_base_domains(), + } + } +} + fn default_bind_address() -> SocketAddr { "0.0.0.0:8080".parse().expect("valid default address") } +fn default_service_base_domains() -> Vec { + vec![default_service_base_domain_for_gateway( + DEFAULT_GATEWAY_NAME, + )] +} + +pub fn default_service_base_domain_for_gateway(name: &str) -> String { + format!("{name}.{DEFAULT_SERVICE_BASE_DOMAIN_ROOT}") +} + +fn normalize_service_base_domain(domain: String) -> Option { + let domain = domain.trim().trim_matches('.'); + (!domain.is_empty()).then(|| domain.to_string()) +} + fn default_log_level() -> String { "info".to_string() } diff --git a/crates/openshell-core/src/metadata.rs b/crates/openshell-core/src/metadata.rs index 90566dcfd..813cf34ba 100644 --- a/crates/openshell-core/src/metadata.rs +++ b/crates/openshell-core/src/metadata.rs @@ -5,7 +5,7 @@ //! //! These traits provide uniform access to `ObjectMeta` fields across all resource types. -use crate::proto::{InferenceRoute, ObjectForTest, Provider, Sandbox, SshSession}; +use crate::proto::{InferenceRoute, ObjectForTest, Provider, Sandbox, ServiceEndpoint, SshSession}; use std::collections::HashMap; /// Provides access to the object's unique identifier. @@ -80,6 +80,25 @@ impl ObjectLabels for SshSession { } } +// Implementations for ServiceEndpoint +impl ObjectId for ServiceEndpoint { + fn object_id(&self) -> &str { + self.metadata.as_ref().map_or("", |m| m.id.as_str()) + } +} + +impl ObjectName for ServiceEndpoint { + fn object_name(&self) -> &str { + self.metadata.as_ref().map_or("", |m| m.name.as_str()) + } +} + +impl ObjectLabels for ServiceEndpoint { + fn object_labels(&self) -> Option> { + self.metadata.as_ref().map(|m| m.labels.clone()) + } +} + // Implementations for InferenceRoute impl ObjectId for InferenceRoute { fn object_id(&self) -> &str { diff --git a/crates/openshell-sandbox/src/lib.rs b/crates/openshell-sandbox/src/lib.rs index fbd7460e2..a4b49d526 100644 --- a/crates/openshell-sandbox/src/lib.rs +++ b/crates/openshell-sandbox/src/lib.rs @@ -685,7 +685,7 @@ pub async fn run_sandbox( sandbox_id.as_ref(), ssh_socket_path.as_ref(), ) { - supervisor_session::spawn(endpoint.clone(), id.clone(), socket.clone()); + supervisor_session::spawn(endpoint.clone(), id.clone(), socket.clone(), ssh_netns_fd); info!("supervisor session task spawned"); } diff --git a/crates/openshell-sandbox/src/ssh.rs b/crates/openshell-sandbox/src/ssh.rs index 9434d0a16..9939e61d7 100644 --- a/crates/openshell-sandbox/src/ssh.rs +++ b/crates/openshell-sandbox/src/ssh.rs @@ -588,7 +588,7 @@ impl SshHandler { /// the calling thread's network namespace permanently — a tokio blocking-pool /// thread could be reused for unrelated tasks and must not be contaminated. /// On non-Linux platforms (no network namespace support), we connect directly. -async fn connect_in_netns( +pub async fn connect_in_netns( addr: &str, netns_fd: Option, ) -> std::io::Result { diff --git a/crates/openshell-sandbox/src/supervisor_session.rs b/crates/openshell-sandbox/src/supervisor_session.rs index 490a0cba7..b6ad40a70 100644 --- a/crates/openshell-sandbox/src/supervisor_session.rs +++ b/crates/openshell-sandbox/src/supervisor_session.rs @@ -10,18 +10,19 @@ //! daemon. The supervisor is a dumb byte bridge — it has no protocol awareness //! of the SSH or NSSH1 bytes flowing through. +use std::os::fd::RawFd; use std::time::Duration; use openshell_core::proto::open_shell_client::OpenShellClient; use openshell_core::proto::{ GatewayMessage, RelayFrame, RelayInit, SupervisorHeartbeat, SupervisorHello, SupervisorMessage, - gateway_message, supervisor_message, + gateway_message, relay_open, supervisor_message, }; use openshell_ocsf::{ ActivityId, Endpoint, NetworkActivityBuilder, OcsfEvent, SandboxContext, SeverityId, StatusId, ocsf_emit, }; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc; use tokio_stream::StreamExt; use tonic::transport::Channel; @@ -158,14 +159,21 @@ pub fn spawn( endpoint: String, sandbox_id: String, ssh_socket_path: std::path::PathBuf, + netns_fd: Option, ) -> tokio::task::JoinHandle<()> { - tokio::spawn(run_session_loop(endpoint, sandbox_id, ssh_socket_path)) + tokio::spawn(run_session_loop( + endpoint, + sandbox_id, + ssh_socket_path, + netns_fd, + )) } async fn run_session_loop( endpoint: String, sandbox_id: String, ssh_socket_path: std::path::PathBuf, + netns_fd: Option, ) { let mut backoff = INITIAL_BACKOFF; let mut attempt: u64 = 0; @@ -173,7 +181,7 @@ async fn run_session_loop( loop { attempt += 1; - match run_single_session(&endpoint, &sandbox_id, &ssh_socket_path).await { + match run_single_session(&endpoint, &sandbox_id, &ssh_socket_path, netns_fd).await { Ok(()) => { let event = session_closed_event(crate::ocsf_ctx(), &endpoint, &sandbox_id); ocsf_emit!(event); @@ -194,6 +202,7 @@ async fn run_single_session( endpoint: &str, sandbox_id: &str, ssh_socket_path: &std::path::Path, + netns_fd: Option, ) -> Result<(), Box> { // Connect to the gateway. The same `Channel` is used for both the // long-lived control stream and all data-plane `RelayStream` calls, so @@ -263,6 +272,7 @@ async fn run_single_session( sandbox_id, ssh_socket_path, &channel, + netns_fd, ); } _ = heartbeat_interval.tick() => { @@ -284,6 +294,7 @@ fn handle_gateway_message( sandbox_id: &str, ssh_socket_path: &std::path::Path, channel: &Channel, + netns_fd: Option, ) { match &msg.payload { Some(gateway_message::Payload::Heartbeat(_)) => { @@ -291,6 +302,7 @@ fn handle_gateway_message( } Some(gateway_message::Payload::RelayOpen(open)) => { let channel_id = open.channel_id.clone(); + let target = RelayTarget::from_open(open); let sandbox_id = sandbox_id.to_string(); let channel = channel.clone(); let ssh_socket_path = ssh_socket_path.to_path_buf(); @@ -299,7 +311,9 @@ fn handle_gateway_message( ocsf_emit!(event); tokio::spawn(async move { - match handle_relay_open(&channel_id, &ssh_socket_path, channel).await { + match handle_relay_open(&channel_id, target, &ssh_socket_path, channel, netns_fd) + .await + { Ok(()) => { let event = relay_closed_event(crate::ocsf_ctx(), &channel_id); ocsf_emit!(event); @@ -329,6 +343,24 @@ fn handle_gateway_message( } } +#[derive(Clone, Copy, Debug)] +enum RelayTarget { + Ssh, + Tcp { port: u16 }, +} + +impl RelayTarget { + fn from_open(open: &openshell_core::proto::RelayOpen) -> Self { + match open.target.as_ref() { + Some(relay_open::Target::Tcp(target)) => u16::try_from(target.port) + .ok() + .filter(|port| *port > 0) + .map_or(Self::Ssh, |port| Self::Tcp { port }), + Some(relay_open::Target::Ssh(_)) | None => Self::Ssh, + } + } +} + /// Handle a `RelayOpen` by initiating a `RelayStream` RPC on the gateway and /// bridging that stream to the local SSH daemon. /// @@ -337,8 +369,10 @@ fn handle_gateway_message( /// frames carry raw SSH bytes in `data`. async fn handle_relay_open( channel_id: &str, + target: RelayTarget, ssh_socket_path: &std::path::Path, channel: Channel, + netns_fd: Option, ) -> Result<(), Box> { let mut client = OpenShellClient::new(channel); @@ -363,24 +397,48 @@ async fn handle_relay_open( .relay_stream(outbound) .await .map_err(|e| format!("relay_stream RPC failed: {e}"))?; - let mut inbound = response.into_inner(); - - // Connect to the local SSH daemon on its Unix socket. - let ssh = tokio::net::UnixStream::connect(ssh_socket_path).await?; - let (mut ssh_r, mut ssh_w) = ssh.into_split(); + let inbound = response.into_inner(); + + if let RelayTarget::Tcp { port } = target { + let service = connect_target_port(port, netns_fd).await?; + debug!(channel_id = %channel_id, port = port, "relay bridge: connected to local TCP service"); + bridge_relay_stream(channel_id, inbound, out_tx, service).await + } else { + let ssh = tokio::net::UnixStream::connect(ssh_socket_path).await?; + debug!( + channel_id = %channel_id, + socket = %ssh_socket_path.display(), + "relay bridge: connected to local SSH daemon" + ); + bridge_relay_stream(channel_id, inbound, out_tx, ssh).await + } +} - debug!( - channel_id = %channel_id, - socket = %ssh_socket_path.display(), - "relay bridge: connected to local SSH daemon" - ); +async fn connect_target_port( + port: u16, + netns_fd: Option, +) -> std::io::Result { + let addr = format!("127.0.0.1:{port}"); + crate::ssh::connect_in_netns(&addr, netns_fd).await +} - // SSH → gRPC (out_tx): read local SSH, forward as `RelayFrame::data`. +async fn bridge_relay_stream( + _channel_id: &str, + mut inbound: tonic::Streaming, + out_tx: mpsc::Sender, + service: S, +) -> Result<(), Box> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let (mut service_r, mut service_w) = tokio::io::split(service); + + // Service → gRPC (out_tx): read local service, forward as `RelayFrame::data`. let out_tx_writer = out_tx.clone(); - let ssh_to_grpc = tokio::spawn(async move { + let service_to_grpc = tokio::spawn(async move { let mut buf = vec![0u8; RELAY_CHUNK_SIZE]; loop { - match ssh_r.read(&mut buf).await { + match service_r.read(&mut buf).await { Ok(0) | Err(_) => break, Ok(n) => { let chunk = RelayFrame { @@ -396,7 +454,7 @@ async fn handle_relay_open( } }); - // gRPC (inbound) → SSH: drain inbound chunks into the local SSH socket. + // gRPC (inbound) → service: drain inbound chunks into the local service socket. let mut inbound_err: Option = None; while let Some(next) = inbound.next().await { match next { @@ -409,8 +467,8 @@ async fn handle_relay_open( if data.is_empty() { continue; } - if let Err(e) = ssh_w.write_all(&data).await { - inbound_err = Some(format!("write to ssh failed: {e}")); + if let Err(e) = service_w.write_all(&data).await { + inbound_err = Some(format!("write to service failed: {e}")); break; } } @@ -421,13 +479,13 @@ async fn handle_relay_open( } } - // Half-close the SSH socket's write side so the daemon sees EOF. - let _ = ssh_w.shutdown().await; + // Half-close the service socket's write side so the service sees EOF. + let _ = service_w.shutdown().await; // Dropping out_tx closes the outbound gRPC stream, letting the gateway // observe EOF on its side too. drop(out_tx); - let _ = ssh_to_grpc.await; + let _ = service_to_grpc.await; if let Some(e) = inbound_err { return Err(e.into()); @@ -567,4 +625,22 @@ mod ocsf_event_tests { .expect_err("eof should force reconnect"); assert_eq!(err.to_string(), "gateway closed stream"); } + + #[tokio::test] + async fn connect_target_port_connects_to_loopback_without_netns() { + let listener = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0)) + .await + .expect("listener should bind"); + let port = listener.local_addr().unwrap().port(); + + let accept = tokio::spawn(async move { + let _ = listener.accept().await.expect("listener should accept"); + }); + + let stream = connect_target_port(port, None) + .await + .expect("target port should connect"); + drop(stream); + accept.await.expect("accept task should finish"); + } } diff --git a/crates/openshell-server/src/cli.rs b/crates/openshell-server/src/cli.rs index 6e64b596e..043dddfb3 100644 --- a/crates/openshell-server/src/cli.rs +++ b/crates/openshell-server/src/cli.rs @@ -218,6 +218,14 @@ struct Args { #[arg(long, env = "OPENSHELL_DISABLE_GATEWAY_AUTH")] disable_gateway_auth: bool, + /// Base domains accepted for sandbox service routing. + #[arg( + long = "service-base-domain", + env = "OPENSHELL_SERVICE_BASE_DOMAINS", + value_delimiter = ',' + )] + service_base_domains: Vec, + /// OIDC issuer URL for JWT-based authentication. /// When set, the server validates `authorization: Bearer` tokens on gRPC /// requests against the issuer's JWKS endpoint. @@ -352,7 +360,8 @@ async fn run_from_args(args: Args) -> Result<()> { .with_ssh_gateway_port(args.ssh_gateway_port) .with_ssh_connect_path(args.ssh_connect_path) .with_sandbox_ssh_port(args.sandbox_ssh_port) - .with_ssh_handshake_skew_secs(args.ssh_handshake_skew_secs); + .with_ssh_handshake_skew_secs(args.ssh_handshake_skew_secs) + .with_service_base_domains(args.service_base_domains); if let Some(image) = args.sandbox_image { config = config.with_sandbox_image(image); diff --git a/crates/openshell-server/src/grpc/mod.rs b/crates/openshell-server/src/grpc/mod.rs index 89e639ac9..3344b2f19 100644 --- a/crates/openshell-server/src/grpc/mod.rs +++ b/crates/openshell-server/src/grpc/mod.rs @@ -6,6 +6,7 @@ pub mod policy; mod provider; mod sandbox; +mod service; mod validation; use openshell_core::proto::{ @@ -14,19 +15,21 @@ use openshell_core::proto::{ CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, EditDraftChunkRequest, EditDraftChunkResponse, ExecSandboxEvent, ExecSandboxRequest, - GatewayMessage, GetDraftHistoryRequest, GetDraftHistoryResponse, GetDraftPolicyRequest, - GetDraftPolicyResponse, GetGatewayConfigRequest, GetGatewayConfigResponse, GetProviderRequest, - GetSandboxConfigRequest, GetSandboxConfigResponse, GetSandboxLogsRequest, - GetSandboxLogsResponse, GetSandboxPolicyStatusRequest, GetSandboxPolicyStatusResponse, + ExposeServiceRequest, GatewayMessage, GetDraftHistoryRequest, GetDraftHistoryResponse, + GetDraftPolicyRequest, GetDraftPolicyResponse, GetGatewayConfigRequest, + GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + GetSandboxConfigResponse, GetSandboxLogsRequest, GetSandboxLogsResponse, + GetSandboxPolicyStatusRequest, GetSandboxPolicyStatusResponse, GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, ListSandboxPoliciesRequest, ListSandboxPoliciesResponse, ListSandboxesRequest, ListSandboxesResponse, ProviderResponse, PushSandboxLogsRequest, PushSandboxLogsResponse, RejectDraftChunkRequest, RejectDraftChunkResponse, RelayFrame, ReportPolicyStatusRequest, ReportPolicyStatusResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse, - SandboxStreamEvent, ServiceStatus, SubmitPolicyAnalysisRequest, SubmitPolicyAnalysisResponse, - SupervisorMessage, UndoDraftChunkRequest, UndoDraftChunkResponse, UpdateConfigRequest, - UpdateConfigResponse, UpdateProviderRequest, WatchSandboxRequest, open_shell_server::OpenShell, + SandboxStreamEvent, ServiceEndpointResponse, ServiceStatus, SubmitPolicyAnalysisRequest, + SubmitPolicyAnalysisResponse, SupervisorMessage, UndoDraftChunkRequest, UndoDraftChunkResponse, + UpdateConfigRequest, UpdateConfigResponse, UpdateProviderRequest, WatchSandboxRequest, + open_shell_server::OpenShell, }; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -222,6 +225,13 @@ impl OpenShell for OpenShellService { sandbox::handle_create_ssh_session(&self.state, request).await } + async fn expose_service( + &self, + request: Request, + ) -> Result, Status> { + service::handle_expose_service(&self.state, request).await + } + async fn revoke_ssh_session( &self, request: Request, diff --git a/crates/openshell-server/src/grpc/sandbox.rs b/crates/openshell-server/src/grpc/sandbox.rs index ed1b4cdfc..d4a8bd137 100644 --- a/crates/openshell-server/src/grpc/sandbox.rs +++ b/crates/openshell-server/src/grpc/sandbox.rs @@ -475,7 +475,12 @@ pub(super) async fn handle_exec_sandbox( // typically called during normal operation (not right after create). let (channel_id, relay_rx) = state .supervisor_sessions - .open_relay(sandbox.object_id(), std::time::Duration::from_secs(15)) + .open_relay( + sandbox.object_id(), + crate::supervisor_session::RelayTarget::Ssh, + None, + std::time::Duration::from_secs(15), + ) .await .map_err(|e| Status::unavailable(format!("supervisor relay failed: {e}")))?; @@ -706,8 +711,7 @@ fn build_remote_exec_command(req: &ExecSandboxRequest) -> Result /// /// This is the relay equivalent of `stream_exec_over_ssh`. Instead of dialing a /// sandbox endpoint directly, the SSH transport runs over a `DuplexStream` that -/// is bridged to the supervisor's local SSH daemon via a reverse HTTP CONNECT -/// tunnel. +/// is bridged to the supervisor's local SSH daemon through `RelayStream`. #[allow(clippy::too_many_arguments)] async fn stream_exec_over_relay( tx: mpsc::Sender>, diff --git a/crates/openshell-server/src/grpc/service.rs b/crates/openshell-server/src/grpc/service.rs new file mode 100644 index 000000000..011b1e1b9 --- /dev/null +++ b/crates/openshell-server/src/grpc/service.rs @@ -0,0 +1,131 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; +use std::sync::Arc; + +use openshell_core::ObjectId; +use openshell_core::proto::datamodel::v1::ObjectMeta; +use openshell_core::proto::{ + ExposeServiceRequest, Sandbox, ServiceEndpoint, ServiceEndpointResponse, +}; +use tonic::{Request, Response, Status}; +use uuid::Uuid; + +use crate::ServerState; +use crate::service_routing; + +const MAX_SERVICE_NAME_LEN: usize = 28; +const MAX_SANDBOX_NAME_LEN: usize = 28; + +pub(super) async fn handle_expose_service( + state: &Arc, + request: Request, +) -> Result, Status> { + let req = request.into_inner(); + validate_endpoint_name("sandbox", &req.sandbox, MAX_SANDBOX_NAME_LEN)?; + validate_endpoint_name("service", &req.service, MAX_SERVICE_NAME_LEN)?; + if req.target_port == 0 || req.target_port > u32::from(u16::MAX) { + return Err(Status::invalid_argument("target_port must be in 1..=65535")); + } + + let sandbox = state + .store + .get_message_by_name::(&req.sandbox) + .await + .map_err(|e| Status::internal(format!("fetch sandbox failed: {e}")))? + .ok_or_else(|| Status::not_found("sandbox not found"))?; + + let now = + super::current_time_ms().map_err(|e| Status::internal(format!("clock error: {e}")))?; + let key = service_routing::endpoint_key(&req.sandbox, &req.service); + let id = match state + .store + .get_message_by_name::(&key) + .await + { + Ok(Some(existing)) => existing.object_id().to_string(), + Ok(None) => Uuid::new_v4().to_string(), + Err(e) => return Err(Status::internal(format!("fetch endpoint failed: {e}"))), + }; + + let endpoint = ServiceEndpoint { + metadata: Some(ObjectMeta { + id, + name: key, + created_at_ms: now, + labels: HashMap::from([("sandbox".to_string(), req.sandbox.clone())]), + }), + sandbox_id: sandbox.object_id().to_string(), + sandbox_name: req.sandbox.clone(), + service_name: req.service.clone(), + target_port: req.target_port, + domain: true, + }; + + state + .store + .put_message(&endpoint) + .await + .map_err(|e| Status::internal(format!("persist endpoint failed: {e}")))?; + + let url = service_routing::endpoint_url(&state.config, &req.sandbox, &req.service) + .unwrap_or_default(); + + Ok(Response::new(ServiceEndpointResponse { + endpoint: Some(endpoint), + url, + })) +} + +#[allow(clippy::result_large_err)] +fn validate_endpoint_name(field: &str, value: &str, max_len: usize) -> Result<(), Status> { + if value.is_empty() { + return Err(Status::invalid_argument(format!("{field} is required"))); + } + if value.len() > max_len { + return Err(Status::invalid_argument(format!( + "{field} must be at most {max_len} characters for sandbox service routing" + ))); + } + if value.contains("--") { + return Err(Status::invalid_argument(format!( + "{field} must not contain '--'" + ))); + } + if !is_dns_label(value) { + return Err(Status::invalid_argument(format!( + "{field} must be a lowercase DNS label" + ))); + } + Ok(()) +} + +fn is_dns_label(value: &str) -> bool { + if value.starts_with('-') || value.ends_with('-') { + return false; + } + value + .bytes() + .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-') +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn validates_good_endpoint_name() { + validate_endpoint_name("service", "web-api", 28).unwrap(); + } + + #[test] + fn rejects_separator_in_endpoint_name() { + assert!(validate_endpoint_name("service", "web--api", 28).is_err()); + } + + #[test] + fn rejects_uppercase_endpoint_name() { + assert!(validate_endpoint_name("service", "Web", 28).is_err()); + } +} diff --git a/crates/openshell-server/src/http.rs b/crates/openshell-server/src/http.rs index 7650c2339..26ab314fa 100644 --- a/crates/openshell-server/src/http.rs +++ b/crates/openshell-server/src/http.rs @@ -3,7 +3,14 @@ //! HTTP health endpoints using Axum. -use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get}; +use axum::{ + Json, Router, + extract::{Request, State}, + http::StatusCode, + middleware::{self, Next}, + response::IntoResponse, + routing::get, +}; use metrics_exporter_prometheus::PrometheusHandle; use serde::Serialize; use std::sync::Arc; @@ -61,5 +68,22 @@ async fn render_metrics(State(handle): State) -> impl IntoResp pub fn http_router(state: Arc) -> Router { crate::ssh_tunnel::router(state.clone()) .merge(crate::ws_tunnel::router(state.clone())) - .merge(crate::auth::router(state)) + .merge(crate::auth::router(state.clone())) + .layer(middleware::from_fn_with_state( + state, + sandbox_service_routing_first, + )) +} + +async fn sandbox_service_routing_first( + State(state): State>, + req: Request, + next: Next, +) -> impl IntoResponse { + if crate::service_routing::is_sandbox_service_request(&req, &state.config.service_routing) { + return crate::service_routing::proxy_sandbox_service_request(state, req) + .await + .into_response(); + } + next.run(req).await.into_response() } diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index 18243dc5a..1f88df01f 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -30,6 +30,7 @@ mod persistence; pub(crate) mod policy_store; mod sandbox_index; mod sandbox_watch; +mod service_routing; mod ssh_tunnel; pub mod supervisor_session; mod tls; diff --git a/crates/openshell-server/src/service_routing.rs b/crates/openshell-server/src/service_routing.rs new file mode 100644 index 000000000..2d28139f6 --- /dev/null +++ b/crates/openshell-server/src/service_routing.rs @@ -0,0 +1,481 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Browser-facing HTTP routing for sandbox service endpoints. + +use axum::{body::Body, response::IntoResponse}; +use http::{HeaderMap, HeaderValue, Method, Request, Response, StatusCode, header}; +use hyper_util::rt::TokioIo; +use openshell_core::ObjectId; +use openshell_core::config::ServiceRoutingConfig; +use openshell_core::proto::{Sandbox, SandboxPhase, ServiceEndpoint}; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::AsyncWriteExt; +use tracing::warn; + +use crate::ServerState; +use crate::persistence::{ObjectType, Store}; +use crate::supervisor_session::RelayTarget; + +const ENDPOINT_OBJECT_TYPE: &str = "service_endpoint"; + +impl ObjectType for ServiceEndpoint { + fn object_type() -> &'static str { + ENDPOINT_OBJECT_TYPE + } +} + +pub fn endpoint_key(sandbox: &str, service: &str) -> String { + format!("{sandbox}--{service}") +} + +pub fn endpoint_url( + config: &openshell_core::Config, + sandbox: &str, + service: &str, +) -> Option { + let host = endpoint_host(&config.service_routing, sandbox, service)?; + let scheme = if config.tls.is_some() { + "https" + } else { + "http" + }; + let port = config.bind_address.port(); + let include_port = !matches!((scheme, port), ("https", 443) | ("http", 80)); + Some(if include_port { + format!("{scheme}://{host}:{port}/") + } else { + format!("{scheme}://{host}/") + }) +} + +fn endpoint_host(config: &ServiceRoutingConfig, sandbox: &str, service: &str) -> Option { + let base_domain = config.service_base_domains.first()?; + Some(format!("{sandbox}--{service}.{base_domain}")) +} + +pub fn parse_host(host: &str, config: &ServiceRoutingConfig) -> Option<(String, String)> { + let host = host.split_once(':').map_or(host, |(name, _)| name); + for base_domain in &config.service_base_domains { + let expected_suffix = format!(".{base_domain}"); + let Some(encoded) = host.strip_suffix(&expected_suffix) else { + continue; + }; + let (sandbox, service) = encoded.split_once("--")?; + if sandbox.is_empty() + || service.is_empty() + || sandbox.contains("--") + || service.contains("--") + { + return None; + } + return Some((sandbox.to_string(), service.to_string())); + } + None +} + +pub fn is_sandbox_service_request(req: &Request, config: &ServiceRoutingConfig) -> bool { + request_host(req).is_some_and(|host| parse_host(host, config).is_some()) +} + +pub async fn proxy_sandbox_service_request( + state: Arc, + req: Request, +) -> impl IntoResponse { + let Some(host) = request_host(&req) else { + return StatusCode::NOT_FOUND.into_response(); + }; + let Some((sandbox_name, service_name)) = parse_host(host, &state.config.service_routing) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + match proxy_to_endpoint(state, req, sandbox_name, service_name).await { + Ok(response) => response.into_response(), + Err(status) => status.into_response(), + } +} + +async fn proxy_to_endpoint( + state: Arc, + mut req: Request, + sandbox_name: String, + service_name: String, +) -> Result, StatusCode> { + let endpoint = load_endpoint(&state.store, &sandbox_name, &service_name).await?; + if !endpoint.domain || endpoint.target_port == 0 || endpoint.target_port > u32::from(u16::MAX) { + return Err(StatusCode::NOT_FOUND); + } + + let sandbox = state + .store + .get_message::(&endpoint.sandbox_id) + .await + .map_err(|err| { + warn!(error = %err, sandbox_id = %endpoint.sandbox_id, "sandbox service routing: failed to load sandbox"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + if SandboxPhase::try_from(sandbox.phase).ok() != Some(SandboxPhase::Ready) { + return Err(StatusCode::PRECONDITION_FAILED); + } + let target_port = u16::try_from(endpoint.target_port).map_err(|_| StatusCode::NOT_FOUND)?; + + let websocket_upgrade = is_websocket_upgrade(&req); + let downstream_upgrade = websocket_upgrade.then(|| hyper::upgrade::on(&mut req)); + + let (_channel_id, relay_rx) = state + .supervisor_sessions + .open_relay( + sandbox.object_id(), + RelayTarget::loopback_tcp(target_port), + Some(endpoint.object_id().to_string()), + Duration::from_secs(15), + ) + .await + .map_err(|err| { + warn!(error = %err, sandbox_id = %endpoint.sandbox_id, "sandbox service routing: supervisor relay unavailable"); + StatusCode::BAD_GATEWAY + })?; + + let relay = tokio::time::timeout(Duration::from_secs(10), relay_rx) + .await + .map_err(|_| StatusCode::BAD_GATEWAY)? + .map_err(|_| StatusCode::BAD_GATEWAY)?; + + let (mut sender, conn) = hyper::client::conn::http1::Builder::new() + .handshake(TokioIo::new(relay)) + .await + .map_err(|err| { + warn!(error = %err, "sandbox service routing: failed to start upstream HTTP client"); + StatusCode::BAD_GATEWAY + })?; + + if websocket_upgrade { + tokio::spawn(async move { + if let Err(err) = conn.with_upgrades().await { + warn!(error = %err, "sandbox service routing: upstream WebSocket connection failed"); + } + }); + } else { + tokio::spawn(async move { + if let Err(err) = conn.await { + warn!(error = %err, "sandbox service routing: upstream HTTP connection failed"); + } + }); + } + + let upstream = build_upstream_request(req, target_port, websocket_upgrade)?; + let mut response = sender.send_request(upstream).await.map_err(|err| { + warn!(error = %err, "sandbox service routing: upstream HTTP request failed"); + StatusCode::BAD_GATEWAY + })?; + + if websocket_upgrade && response.status() == StatusCode::SWITCHING_PROTOCOLS { + let upstream_upgrade = hyper::upgrade::on(&mut response); + let downstream_upgrade = downstream_upgrade.ok_or(StatusCode::BAD_GATEWAY)?; + tokio::spawn(async move { + match (downstream_upgrade.await, upstream_upgrade.await) { + (Ok(downstream), Ok(upstream)) => { + let mut downstream = TokioIo::new(downstream); + let mut upstream = TokioIo::new(upstream); + let _ = tokio::io::copy_bidirectional(&mut downstream, &mut upstream).await; + let _ = downstream.shutdown().await; + let _ = upstream.shutdown().await; + } + (Err(err), _) => { + warn!(error = %err, "sandbox service routing: downstream WebSocket upgrade failed"); + } + (_, Err(err)) => { + warn!(error = %err, "sandbox service routing: upstream WebSocket upgrade failed"); + } + } + }); + + let (parts, _) = response.into_parts(); + return Ok(Response::from_parts(parts, Body::empty())); + } + + let (parts, body) = response.into_parts(); + Ok(Response::from_parts(parts, Body::new(body))) +} + +async fn load_endpoint( + store: &Store, + sandbox_name: &str, + service_name: &str, +) -> Result { + let key = endpoint_key(sandbox_name, service_name); + store + .get_message_by_name::(&key) + .await + .map_err(|err| { + warn!(error = %err, endpoint = %key, "sandbox service routing: failed to load service endpoint"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND) +} + +fn build_upstream_request( + req: Request, + target_port: u16, + preserve_upgrade_headers: bool, +) -> Result, StatusCode> { + let (parts, body) = req.into_parts(); + let path = parts.uri.path_and_query().map_or("/", |path| path.as_str()); + let uri = path + .parse::() + .map_err(|_| StatusCode::BAD_REQUEST)?; + + let mut builder = Request::builder() + .method(parts.method) + .uri(uri) + .version(http::Version::HTTP_11); + + let headers = builder + .headers_mut() + .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?; + for (name, value) in &parts.headers { + if (is_hop_by_hop_header(name) + && !(preserve_upgrade_headers && is_websocket_hop_by_hop_header(name))) + || is_gateway_auth_header(name) + { + continue; + } + if name == header::COOKIE { + if let Some(cookie) = sanitize_cookie_header(value) { + headers.append(name, cookie); + } + continue; + } + headers.append(name, value.clone()); + } + headers.insert( + header::HOST, + format!("127.0.0.1:{target_port}").parse().unwrap(), + ); + + builder + .body(body) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) +} + +fn host_header(headers: &HeaderMap) -> Option<&str> { + headers.get(header::HOST)?.to_str().ok() +} + +fn request_host(req: &Request) -> Option<&str> { + host_header(req.headers()).or_else(|| req.uri().authority().map(http::uri::Authority::as_str)) +} + +fn is_websocket_upgrade(req: &Request) -> bool { + req.method() == Method::GET + && header_value_is(req.headers(), header::UPGRADE, "websocket") + && header_contains_token(req.headers(), header::CONNECTION, "upgrade") +} + +fn header_value_is(headers: &HeaderMap, name: header::HeaderName, expected: &str) -> bool { + headers + .get(name) + .and_then(|value| value.to_str().ok()) + .is_some_and(|value| value.eq_ignore_ascii_case(expected)) +} + +fn header_contains_token(headers: &HeaderMap, name: header::HeaderName, token: &str) -> bool { + headers + .get(name) + .and_then(|value| value.to_str().ok()) + .is_some_and(|value| { + value + .split(',') + .any(|part| part.trim().eq_ignore_ascii_case(token)) + }) +} + +fn is_hop_by_hop_header(name: &header::HeaderName) -> bool { + matches!( + name.as_str(), + "connection" + | "host" + | "keep-alive" + | "proxy-authenticate" + | "proxy-authorization" + | "te" + | "trailer" + | "transfer-encoding" + | "upgrade" + ) +} + +fn is_websocket_hop_by_hop_header(name: &header::HeaderName) -> bool { + matches!(name.as_str(), "connection" | "upgrade") +} + +fn is_gateway_auth_header(name: &header::HeaderName) -> bool { + matches!( + name.as_str(), + "authorization" + | "cf-access-jwt-assertion" + | "x-forwarded-client-cert" + | "x-ssl-client-cert" + | "x-client-cert" + ) +} + +fn sanitize_cookie_header(value: &HeaderValue) -> Option { + let value = value.to_str().ok()?; + let cookies = value + .split(';') + .filter_map(|cookie| { + let cookie = cookie.trim(); + let (name, _) = cookie.split_once('=')?; + (!is_gateway_auth_cookie(name.trim())).then_some(cookie) + }) + .collect::>(); + + if cookies.is_empty() { + return None; + } + + HeaderValue::from_str(&cookies.join("; ")).ok() +} + +fn is_gateway_auth_cookie(name: &str) -> bool { + name.eq_ignore_ascii_case("CF_Authorization") || name.eq_ignore_ascii_case("cf-authorization") +} + +#[cfg(test)] +mod tests { + use super::*; + + fn config() -> ServiceRoutingConfig { + ServiceRoutingConfig { + service_base_domains: vec![ + "dev.openshell.localhost".to_string(), + "svc.gateway.localhost".to_string(), + ], + } + } + + #[test] + fn parses_sandbox_service_host() { + assert_eq!( + parse_host("my-sandbox--web.dev.openshell.localhost", &config()), + Some(("my-sandbox".to_string(), "web".to_string())) + ); + } + + #[test] + fn parses_sandbox_service_host_with_port() { + assert_eq!( + parse_host("my-sandbox--web.dev.openshell.localhost:8080", &config()), + Some(("my-sandbox".to_string(), "web".to_string())) + ); + } + + #[test] + fn parses_alternate_service_base_domain() { + assert_eq!( + parse_host("my-sandbox--web.svc.gateway.localhost", &config()), + Some(("my-sandbox".to_string(), "web".to_string())) + ); + } + + #[test] + fn rejects_unknown_base_domain() { + assert_eq!( + parse_host("my-sandbox--web.prod.openshell.localhost", &config()), + None + ); + } + + #[test] + fn identifies_sandbox_service_request_from_host_header() { + let request = Request::builder() + .uri("/") + .header(header::HOST, "my-sandbox--web.dev.openshell.localhost") + .body(Body::empty()) + .unwrap(); + assert!(is_sandbox_service_request(&request, &config())); + } + + #[test] + fn identifies_sandbox_service_request_from_http2_authority() { + let request = Request::builder() + .uri("https://my-sandbox--web.dev.openshell.localhost/") + .body(Body::empty()) + .unwrap(); + assert!(is_sandbox_service_request(&request, &config())); + } + + #[test] + fn ignores_non_sandbox_service_request() { + let request = Request::builder() + .uri("/") + .header(header::HOST, "127.0.0.1:8080") + .body(Body::empty()) + .unwrap(); + assert!(!is_sandbox_service_request(&request, &config())); + } + + #[test] + fn strips_gateway_auth_headers_from_upstream_request() { + let request = Request::builder() + .uri("https://my-sandbox--web.dev.openshell.localhost/path") + .header(header::AUTHORIZATION, "Bearer gateway-token") + .header("cf-access-jwt-assertion", "edge-token") + .header("x-forwarded-client-cert", "cert") + .header( + header::COOKIE, + "theme=dark; CF_Authorization=edge-cookie; app=session", + ) + .header("x-app-header", "kept") + .body(Body::empty()) + .unwrap(); + + let upstream = build_upstream_request(request, 8080, false).unwrap(); + + assert_eq!(upstream.uri(), "/path"); + assert!(!upstream.headers().contains_key(header::AUTHORIZATION)); + assert!(!upstream.headers().contains_key("cf-access-jwt-assertion")); + assert!(!upstream.headers().contains_key("x-forwarded-client-cert")); + assert_eq!( + upstream.headers()[header::COOKIE], + "theme=dark; app=session" + ); + assert_eq!(upstream.headers()["x-app-header"], "kept"); + } + + #[test] + fn detects_websocket_upgrade_request() { + let request = Request::builder() + .method(Method::GET) + .uri("/chat?session=main") + .header(header::CONNECTION, "keep-alive, Upgrade") + .header(header::UPGRADE, "websocket") + .body(Body::empty()) + .unwrap(); + + assert!(is_websocket_upgrade(&request)); + } + + #[test] + fn preserves_websocket_upgrade_headers_for_upstream_request() { + let request = Request::builder() + .method(Method::GET) + .uri("https://my-sandbox--web.dev.openshell.localhost/chat?session=main") + .header(header::CONNECTION, "Upgrade") + .header(header::UPGRADE, "websocket") + .header("sec-websocket-key", "abc") + .body(Body::empty()) + .unwrap(); + + let upstream = build_upstream_request(request, 8080, true).unwrap(); + + assert_eq!(upstream.uri(), "/chat?session=main"); + assert_eq!(upstream.headers()[header::CONNECTION], "Upgrade"); + assert_eq!(upstream.headers()[header::UPGRADE], "websocket"); + assert_eq!(upstream.headers()["sec-websocket-key"], "abc"); + assert_eq!(upstream.headers()[header::HOST], "127.0.0.1:8080"); + } +} diff --git a/crates/openshell-server/src/ssh_tunnel.rs b/crates/openshell-server/src/ssh_tunnel.rs index bd317d53f..6bf2e820a 100644 --- a/crates/openshell-server/src/ssh_tunnel.rs +++ b/crates/openshell-server/src/ssh_tunnel.rs @@ -16,6 +16,7 @@ use tracing::{info, warn}; use crate::ServerState; use crate::persistence::{ObjectType, Store}; +use crate::supervisor_session::RelayTarget; const HEADER_SANDBOX_ID: &str = "x-sandbox-id"; const HEADER_TOKEN: &str = "x-sandbox-token"; @@ -158,7 +159,7 @@ async fn ssh_connect( // direct-connect path tolerated ~34s here for similar reasons. let (channel_id, relay_rx) = match state .supervisor_sessions - .open_relay(&sandbox_id, Duration::from_secs(30)) + .open_relay(&sandbox_id, RelayTarget::Ssh, None, Duration::from_secs(30)) .await { Ok(pair) => pair, diff --git a/crates/openshell-server/src/supervisor_session.rs b/crates/openshell-server/src/supervisor_session.rs index 94c352ba5..5af4f32de 100644 --- a/crates/openshell-server/src/supervisor_session.rs +++ b/crates/openshell-server/src/supervisor_session.rs @@ -13,8 +13,8 @@ use tracing::{info, warn}; use uuid::Uuid; use openshell_core::proto::{ - GatewayMessage, RelayFrame, RelayInit, RelayOpen, Sandbox, SessionAccepted, SupervisorMessage, - gateway_message, supervisor_message, + GatewayMessage, RelayFrame, RelayInit, RelayOpen, Sandbox, SessionAccepted, SshRelayTarget, + SupervisorMessage, TcpRelayTarget, gateway_message, relay_open, supervisor_message, }; use crate::ServerState; @@ -79,9 +79,32 @@ pub struct SupervisorSessionRegistry { struct PendingRelay { sender: RelayStreamSender, sandbox_id: String, + target: RelayTarget, + service_id: Option, created_at: Instant, } +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum RelayTarget { + Ssh, + Tcp { port: u16 }, +} + +impl RelayTarget { + pub fn loopback_tcp(port: u16) -> Self { + Self::Tcp { port } + } + + fn proto_target(&self) -> relay_open::Target { + match self { + Self::Ssh => relay_open::Target::Ssh(SshRelayTarget {}), + Self::Tcp { port } => relay_open::Target::Tcp(TcpRelayTarget { + port: u32::from(*port), + }), + } + } +} + impl std::fmt::Debug for SupervisorSessionRegistry { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let session_count = self.sessions.lock().unwrap().len(); @@ -212,8 +235,8 @@ impl SupervisorSessionRegistry { /// stream. /// /// Sends `RelayOpen` over the supervisor's gRPC session and returns a - /// oneshot receiver that resolves once the supervisor opens its reverse - /// HTTP CONNECT to `/relay/{channel_id}`. + /// oneshot receiver that resolves once the supervisor opens a `RelayStream` + /// and claims the channel with a `RelayInit` frame. /// /// If the session is not currently registered, this method waits up to /// `session_wait_timeout` for it to appear. A session may be temporarily @@ -233,6 +256,8 @@ impl SupervisorSessionRegistry { pub async fn open_relay( &self, sandbox_id: &str, + target: RelayTarget, + service_id: Option, session_wait_timeout: Duration, ) -> Result<(String, oneshot::Receiver), Status> { let tx = self @@ -267,6 +292,8 @@ impl SupervisorSessionRegistry { PendingRelay { sender: relay_tx, sandbox_id: sandbox_id.to_string(), + target: target.clone(), + service_id: service_id.clone(), created_at: Instant::now(), }, ); @@ -275,6 +302,8 @@ impl SupervisorSessionRegistry { let msg = GatewayMessage { payload: Some(gateway_message::Payload::RelayOpen(RelayOpen { channel_id: channel_id.clone(), + target: Some(target.proto_target()), + service_id: service_id.unwrap_or_default(), })), }; @@ -329,9 +358,19 @@ impl SupervisorSessionRegistry { pub async fn replay_pending_relays(&self, sandbox_id: &str, tx: &mpsc::Sender) { for channel_id in self.pending_channel_ids(sandbox_id) { + let (target, service_id) = self + .pending_relays + .lock() + .unwrap() + .get(&channel_id) + .map_or((RelayTarget::Ssh, None), |pending| { + (pending.target.clone(), pending.service_id.clone()) + }); let msg = GatewayMessage { payload: Some(gateway_message::Payload::RelayOpen(RelayOpen { channel_id: channel_id.clone(), + target: Some(target.proto_target()), + service_id: service_id.unwrap_or_default(), })), }; if tx.send(msg).await.is_err() { @@ -855,7 +894,7 @@ mod tests { registry.register("sbx".to_string(), "s1".to_string(), tx, make_shutdown()); let (channel_id, _relay_rx) = registry - .open_relay("sbx", Duration::from_secs(1)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(1)) .await .expect("open_relay should succeed when session is live"); @@ -863,6 +902,8 @@ mod tests { match msg.payload { Some(gateway_message::Payload::RelayOpen(open)) => { assert_eq!(open.channel_id, channel_id); + assert!(matches!(open.target, Some(relay_open::Target::Ssh(_)))); + assert!(open.service_id.is_empty()); } other => panic!("expected RelayOpen, got {other:?}"), } @@ -872,7 +913,7 @@ mod tests { async fn open_relay_times_out_without_session() { let registry = SupervisorSessionRegistry::new(); let err = registry - .open_relay("missing", Duration::from_millis(50)) + .open_relay("missing", RelayTarget::Ssh, None, Duration::from_millis(50)) .await .expect_err("open_relay should time out"); assert_eq!(err.code(), tonic::Code::Unavailable); @@ -897,7 +938,9 @@ mod tests { ); }); - let result = registry.open_relay("sbx", Duration::from_secs(2)).await; + let result = registry + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(2)) + .await; assert!( result.is_ok(), "open_relay should succeed when session arrives mid-wait: {result:?}" @@ -915,7 +958,7 @@ mod tests { drop(rx); let err = registry - .open_relay("sbx", Duration::from_secs(1)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(1)) .await .expect_err("open_relay should fail when mpsc is closed"); assert_eq!(err.code(), tonic::Code::Unavailable); @@ -947,6 +990,8 @@ mod tests { PendingRelay { sender: oneshot_tx, sandbox_id: sandbox_id.to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now(), }, ); @@ -954,7 +999,7 @@ mod tests { } let err = registry - .open_relay("sbx-a", Duration::from_millis(50)) + .open_relay("sbx-a", RelayTarget::Ssh, None, Duration::from_millis(50)) .await .expect_err("open_relay should reject once global cap is reached"); assert_eq!(err.code(), tonic::Code::ResourceExhausted); @@ -976,6 +1021,8 @@ mod tests { PendingRelay { sender: oneshot_tx, sandbox_id: "sbx".to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now(), }, ); @@ -983,7 +1030,7 @@ mod tests { } let err = registry - .open_relay("sbx", Duration::from_millis(50)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_millis(50)) .await .expect_err("open_relay should reject when per-sandbox cap is reached"); assert_eq!(err.code(), tonic::Code::ResourceExhausted); @@ -998,7 +1045,12 @@ mod tests { make_shutdown(), ); registry - .open_relay("sbx-other", Duration::from_millis(50)) + .open_relay( + "sbx-other", + RelayTarget::Ssh, + None, + Duration::from_millis(50), + ) .await .expect("different sandbox should still accept new relays"); } @@ -1030,7 +1082,7 @@ mod tests { ); let (_channel_id, _relay_rx) = registry - .open_relay("sbx", Duration::from_secs(1)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(1)) .await .expect("open_relay should succeed"); @@ -1090,7 +1142,12 @@ mod tests { ); let (channel_id, _relay_rx) = registry - .open_relay("sbx", Duration::from_secs(1)) + .open_relay( + "sbx", + RelayTarget::loopback_tcp(8080), + Some("svc-1".to_string()), + Duration::from_secs(1), + ) .await .expect("open_relay should succeed"); @@ -1122,6 +1179,13 @@ mod tests { match replayed.payload { Some(gateway_message::Payload::RelayOpen(open)) => { assert_eq!(open.channel_id, channel_id); + assert_eq!(open.service_id, "svc-1"); + match open.target { + Some(relay_open::Target::Tcp(target)) => { + assert_eq!(target.port, 8080); + } + other => panic!("expected TCP target on replay, got {other:?}"), + } } other => panic!("expected RelayOpen on replay, got {other:?}"), } @@ -1177,6 +1241,8 @@ mod tests { PendingRelay { sender: relay_tx, sandbox_id: "sbx-test".to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now(), }, ); @@ -1195,6 +1261,8 @@ mod tests { PendingRelay { sender: relay_tx, sandbox_id: "sbx-test".to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now() .checked_sub(Duration::from_secs(60)) .expect("test instant subtraction underflow"), @@ -1225,6 +1293,8 @@ mod tests { PendingRelay { sender: relay_tx, sandbox_id: "sbx-test".to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now(), }, ); @@ -1244,6 +1314,8 @@ mod tests { PendingRelay { sender: relay_tx, sandbox_id: "sbx-test".to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now(), }, ); @@ -1275,6 +1347,8 @@ mod tests { PendingRelay { sender: relay_tx, sandbox_id: "sbx-test".to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now() .checked_sub(Duration::from_secs(60)) .expect("test instant subtraction underflow"), @@ -1300,6 +1374,8 @@ mod tests { PendingRelay { sender: relay_tx, sandbox_id: "sbx-test".to_string(), + target: RelayTarget::Ssh, + service_id: None, created_at: Instant::now(), }, ); diff --git a/crates/openshell-server/tests/auth_endpoint_integration.rs b/crates/openshell-server/tests/auth_endpoint_integration.rs index 12f302b63..9c877b8d4 100644 --- a/crates/openshell-server/tests/auth_endpoint_integration.rs +++ b/crates/openshell-server/tests/auth_endpoint_integration.rs @@ -477,6 +477,16 @@ impl openshell_core::proto::open_shell_server::OpenShell for TestOpenShell { )) } + async fn expose_service( + &self, + _: tonic::Request, + ) -> Result, tonic::Status> + { + Ok(tonic::Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _: tonic::Request, diff --git a/crates/openshell-server/tests/edge_tunnel_auth.rs b/crates/openshell-server/tests/edge_tunnel_auth.rs index 15df2f9d8..3af057100 100644 --- a/crates/openshell-server/tests/edge_tunnel_auth.rs +++ b/crates/openshell-server/tests/edge_tunnel_auth.rs @@ -143,6 +143,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-server/tests/multiplex_integration.rs b/crates/openshell-server/tests/multiplex_integration.rs index dd14c63ec..45533c5ca 100644 --- a/crates/openshell-server/tests/multiplex_integration.rs +++ b/crates/openshell-server/tests/multiplex_integration.rs @@ -101,6 +101,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-server/tests/multiplex_tls_integration.rs b/crates/openshell-server/tests/multiplex_tls_integration.rs index 83ba76988..1ded271e1 100644 --- a/crates/openshell-server/tests/multiplex_tls_integration.rs +++ b/crates/openshell-server/tests/multiplex_tls_integration.rs @@ -114,6 +114,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/crates/openshell-server/tests/supervisor_relay_integration.rs b/crates/openshell-server/tests/supervisor_relay_integration.rs index 85d263223..56718d047 100644 --- a/crates/openshell-server/tests/supervisor_relay_integration.rs +++ b/crates/openshell-server/tests/supervisor_relay_integration.rs @@ -27,7 +27,7 @@ use openshell_core::proto::{ open_shell_client::OpenShellClient, open_shell_server::{OpenShell, OpenShellServer}, }; -use openshell_server::supervisor_session::SupervisorSessionRegistry; +use openshell_server::supervisor_session::{RelayTarget, SupervisorSessionRegistry}; use openshell_server::{MultiplexedService, health_router}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; @@ -142,6 +142,12 @@ impl OpenShell for RelayGateway { ) -> Result, Status> { Err(Status::unimplemented("unused")) } + async fn expose_service( + &self, + _: tonic::Request, + ) -> Result, Status> { + Err(Status::unimplemented("unused")) + } async fn revoke_ssh_session( &self, _: tonic::Request, @@ -373,7 +379,7 @@ async fn relay_round_trips_bytes() { let mut session_rx = register_session(®istry, "sbx"); let (channel_id, relay_rx) = registry - .open_relay("sbx", Duration::from_secs(2)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(2)) .await .expect("open_relay"); @@ -403,7 +409,7 @@ async fn relay_closes_cleanly_when_gateway_drops() { let mut session_rx = register_session(®istry, "sbx"); let (channel_id, relay_rx) = registry - .open_relay("sbx", Duration::from_secs(2)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(2)) .await .expect("open_relay"); let _ = session_rx.recv().await.expect("RelayOpen"); @@ -428,7 +434,7 @@ async fn relay_sees_eof_when_supervisor_closes() { let mut session_rx = register_session(®istry, "sbx"); let (channel_id, relay_rx) = registry - .open_relay("sbx", Duration::from_secs(2)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(2)) .await .expect("open_relay"); let _ = session_rx.recv().await.expect("RelayOpen"); @@ -473,7 +479,12 @@ async fn open_relay_times_out_when_no_session() { let _channel = spawn_gateway(Arc::clone(®istry)).await; let err = registry - .open_relay("missing", Duration::from_millis(100)) + .open_relay( + "missing", + RelayTarget::Ssh, + None, + Duration::from_millis(100), + ) .await .expect_err("should time out"); assert_eq!(err.code(), tonic::Code::Unavailable); @@ -486,13 +497,13 @@ async fn concurrent_relays_multiplex_independently() { let mut session_rx = register_session(®istry, "sbx"); let (id_a, rx_a) = registry - .open_relay("sbx", Duration::from_secs(2)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(2)) .await .expect("open_relay a"); let _ = session_rx.recv().await.expect("RelayOpen a"); let (id_b, rx_b) = registry - .open_relay("sbx", Duration::from_secs(2)) + .open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(2)) .await .expect("open_relay b"); let _ = session_rx.recv().await.expect("RelayOpen b"); @@ -539,7 +550,8 @@ async fn open_relay_enforces_per_sandbox_cap_under_concurrent_burst() { for _ in 0..64 { let r = Arc::clone(®istry); handles.push(tokio::spawn(async move { - r.open_relay("sbx", Duration::from_secs(1)).await + r.open_relay("sbx", RelayTarget::Ssh, None, Duration::from_secs(1)) + .await })); } @@ -566,7 +578,7 @@ async fn open_relay_enforces_per_sandbox_cap_under_concurrent_burst() { // leak onto unrelated tenants. let _other_rx = register_session_with_capacity(®istry, "sbx-other", 8); registry - .open_relay("sbx-other", Duration::from_secs(1)) + .open_relay("sbx-other", RelayTarget::Ssh, None, Duration::from_secs(1)) .await .expect("other sandbox should not be affected by sbx cap"); } diff --git a/crates/openshell-server/tests/ws_tunnel_integration.rs b/crates/openshell-server/tests/ws_tunnel_integration.rs index 949c0200a..f509ad88d 100644 --- a/crates/openshell-server/tests/ws_tunnel_integration.rs +++ b/crates/openshell-server/tests/ws_tunnel_integration.rs @@ -137,6 +137,15 @@ impl OpenShell for TestOpenShell { Ok(Response::new(CreateSshSessionResponse::default())) } + async fn expose_service( + &self, + _request: tonic::Request, + ) -> Result, Status> { + Ok(Response::new( + openshell_core::proto::ServiceEndpointResponse::default(), + )) + } + async fn revoke_ssh_session( &self, _request: tonic::Request, diff --git a/deploy/docker/cluster-entrypoint.sh b/deploy/docker/cluster-entrypoint.sh index 9287adc48..9f2af4c86 100644 --- a/deploy/docker/cluster-entrypoint.sh +++ b/deploy/docker/cluster-entrypoint.sh @@ -468,9 +468,6 @@ if [ -f "$HELMCHART" ]; then fi sed -i "s|__IMAGE_PULL_POLICY__|${IMAGE_PULL_POLICY_VALUE}|g" "$HELMCHART" - SANDBOX_IMAGE_PULL_POLICY_VALUE="${SANDBOX_IMAGE_PULL_POLICY:-\"\"}" - sed -i "s|__SANDBOX_IMAGE_PULL_POLICY__|${SANDBOX_IMAGE_PULL_POLICY_VALUE}|g" "$HELMCHART" - DB_URL_VALUE="${DB_URL:-\"sqlite:/var/openshell/openshell.db\"}" sed -i "s|__DB_URL__|${DB_URL_VALUE}|g" "$HELMCHART" fi @@ -535,6 +532,11 @@ if [ -f "$HELMCHART" ]; then else sed -i "s|__DISABLE_TLS__|false|g" "$HELMCHART" fi + + SERVICE_BASE_DOMAINS_VALUE="${SERVICE_BASE_DOMAINS:-openshell.openshell.localhost}" + SERVICE_BASE_DOMAINS_YAML="[\"$(printf "%s" "${SERVICE_BASE_DOMAINS_VALUE}" | sed 's|,|","|g')\"]" + echo "Enabling sandbox service routing for: ${SERVICE_BASE_DOMAINS_VALUE}" + sed -i "s|__SERVICE_BASE_DOMAINS__|${SERVICE_BASE_DOMAINS_YAML}|g" "$HELMCHART" fi # Inject host gateway IP into the HelmChart manifest so sandbox pods can diff --git a/deploy/helm/openshell/templates/statefulset.yaml b/deploy/helm/openshell/templates/statefulset.yaml index 86f6dc3ed..3e17abdbb 100644 --- a/deploy/helm/openshell/templates/statefulset.yaml +++ b/deploy/helm/openshell/templates/statefulset.yaml @@ -64,10 +64,6 @@ spec: value: {{ .Values.server.sandboxNamespace | quote }} - name: OPENSHELL_SANDBOX_IMAGE value: {{ .Values.server.sandboxImage | quote }} - {{- if .Values.server.sandboxImagePullPolicy }} - - name: OPENSHELL_SANDBOX_IMAGE_PULL_POLICY - value: {{ .Values.server.sandboxImagePullPolicy | quote }} - {{- end }} - name: OPENSHELL_GRPC_ENDPOINT value: {{ if .Values.server.disableTls }}{{ .Values.server.grpcEndpoint | replace "https://" "http://" | quote }}{{ else }}{{ .Values.server.grpcEndpoint | quote }}{{ end }} {{- if .Values.server.sshGatewayHost }} @@ -82,6 +78,8 @@ spec: - name: OPENSHELL_HOST_GATEWAY_IP value: {{ .Values.server.hostGatewayIP | quote }} {{- end }} + - name: OPENSHELL_SERVICE_BASE_DOMAINS + value: {{ join "," .Values.server.serviceBaseDomains | quote }} - name: OPENSHELL_SSH_HANDSHAKE_SECRET valueFrom: secretKeyRef: diff --git a/deploy/helm/openshell/values.yaml b/deploy/helm/openshell/values.yaml index 18e671375..1d844e7b1 100644 --- a/deploy/helm/openshell/values.yaml +++ b/deploy/helm/openshell/values.yaml @@ -74,10 +74,6 @@ server: sandboxNamespace: openshell dbUrl: "sqlite:/var/openshell/openshell.db" sandboxImage: "ghcr.io/nvidia/openshell-community/sandboxes/base:latest" - # Kubernetes imagePullPolicy for sandbox pods. Empty = Kubernetes default - # (Always for :latest, IfNotPresent otherwise). Set to "Always" for dev - # clusters so new images are picked up without manual eviction. - sandboxImagePullPolicy: "" # gRPC endpoint for sandboxes to callback to OpenShell (must be reachable from pods) grpcEndpoint: "https://openshell.openshell.svc.cluster.local:8080" # Public host/port returned to CLI clients for SSH proxy CONNECT requests. @@ -103,6 +99,8 @@ server: # Disable TLS entirely — the server listens on plaintext HTTP. # Set to true when a reverse proxy / tunnel terminates TLS at the edge. disableTls: false + serviceBaseDomains: + - "openshell.openshell.localhost" tls: # K8s secret (type kubernetes.io/tls) with tls.crt and tls.key for the server certSecretName: openshell-server-tls diff --git a/deploy/kube/manifests/openshell-helmchart.yaml b/deploy/kube/manifests/openshell-helmchart.yaml index 81743746a..0934a8345 100644 --- a/deploy/kube/manifests/openshell-helmchart.yaml +++ b/deploy/kube/manifests/openshell-helmchart.yaml @@ -30,7 +30,6 @@ spec: pullPolicy: __IMAGE_PULL_POLICY__ server: sandboxImage: ghcr.io/nvidia/openshell-community/sandboxes/base:latest - sandboxImagePullPolicy: __SANDBOX_IMAGE_PULL_POLICY__ dbUrl: __DB_URL__ sshGatewayHost: __SSH_GATEWAY_HOST__ sshGatewayPort: __SSH_GATEWAY_PORT__ @@ -38,6 +37,7 @@ spec: hostGatewayIP: __HOST_GATEWAY_IP__ disableGatewayAuth: __DISABLE_GATEWAY_AUTH__ disableTls: __DISABLE_TLS__ + serviceBaseDomains: __SERVICE_BASE_DOMAINS__ oidc: issuer: "__OIDC_ISSUER__" audience: "__OIDC_AUDIENCE__" diff --git a/docs/reference/browser-certificates.mdx b/docs/reference/browser-certificates.mdx new file mode 100644 index 000000000..9b56997d2 --- /dev/null +++ b/docs/reference/browser-certificates.mdx @@ -0,0 +1,138 @@ +--- +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +title: "Browser Certificates" +description: "Trust an OpenShell gateway CA and install the client certificate for browser access to sandbox services." +keywords: "Generative AI, Cybersecurity, Browser Certificates, Firefox, mTLS, Local Domain" +position: 2 +--- + +OpenShell sandbox service URLs use the same gateway mTLS model as the CLI. Your browser needs two certificate changes before it can open `https://--..openshell.localhost:/`: + +- Trust the gateway CA so the browser accepts the gateway server certificate. +- Install the client certificate so the gateway accepts the browser connection. + +The CLI stores gateway certificates under `~/.config/openshell/gateways//mtls/`: + +| File | Browser use | +|---|---| +| `ca.crt` | Import as a trusted certificate authority. | +| `tls.crt` and `tls.key` | Convert to PKCS#12, then import as the browser client certificate. | + +## Create a Client Certificate Bundle + +Browsers import client certificates as PKCS#12 (`.p12`) bundles. Create one from the OpenShell PEM files: + +```shell +GATEWAY=navigator # replace with your gateway name +MTLS_DIR="$HOME/.config/openshell/gateways/$GATEWAY/mtls" + +openssl pkcs12 -export \ + -inkey "$MTLS_DIR/tls.key" \ + -in "$MTLS_DIR/tls.crt" \ + -certfile "$MTLS_DIR/ca.crt" \ + -name "OpenShell $GATEWAY client" \ + -out "$MTLS_DIR/openshell-$GATEWAY-client.p12" \ + -passout pass: +``` + +This writes `openshell--client.p12` with an empty import password. If you set a password, enter it when the browser asks during import. + +## Firefox + +Firefox uses its own NSS certificate database and does not always use the macOS keychain. + +### Manual Import + +Open Firefox settings and import both certificates: + +1. Open `about:preferences#privacy`. +2. Scroll to **Certificates** and select **View Certificates**. +3. In **Authorities**, select **Import**, choose `~/.config/openshell/gateways//mtls/ca.crt`, and enable **Trust this CA to identify websites**. +4. In **Your Certificates**, select **Import**, choose `~/.config/openshell/gateways//mtls/openshell--client.p12`, and enter the PKCS#12 password. If you used the command above, leave it blank. +5. Restart Firefox. + +### CLI Import + +Install the NSS tools, close Firefox, then import into the active Firefox profile: + +```shell +brew install nss + +GATEWAY=navigator # replace with your gateway name +MTLS_DIR="$HOME/.config/openshell/gateways/$GATEWAY/mtls" +PROFILE="$HOME/Library/Application Support/Firefox/Profiles/" + +certutil -A \ + -d "sql:$PROFILE" \ + -n "OpenShell $GATEWAY CA" \ + -t "CT,C,C" \ + -i "$MTLS_DIR/ca.crt" + +pk12util \ + -d "sql:$PROFILE" \ + -i "$MTLS_DIR/openshell-$GATEWAY-client.p12" \ + -W "" +``` + +Find the active profile directory in `about:profiles` before running the commands. Restart Firefox after importing. If Firefox prompts for a client certificate when you visit the sandbox service URL, choose the `OpenShell client` certificate. + +## Chrome, Edge, and Safari on macOS + +Chrome, Edge, and Safari generally use the macOS keychain. Import the CA into your login keychain and mark it trusted: + +```shell +GATEWAY=navigator # replace with your gateway name +MTLS_DIR="$HOME/.config/openshell/gateways/$GATEWAY/mtls" + +security add-trusted-cert \ + -d \ + -r trustRoot \ + -k "$HOME/Library/Keychains/login.keychain-db" \ + "$MTLS_DIR/ca.crt" +``` + +macOS Keychain may reject the default OpenSSL 3 PKCS#12 bundle. Create a Keychain-compatible bundle: + +```shell +GATEWAY=navigator # replace with your gateway name +MTLS_DIR="$HOME/.config/openshell/gateways/$GATEWAY/mtls" + +openssl pkcs12 -export -legacy \ + -inkey "$MTLS_DIR/tls.key" \ + -in "$MTLS_DIR/tls.crt" \ + -certfile "$MTLS_DIR/ca.crt" \ + -name "OpenShell $GATEWAY client" \ + -out "$MTLS_DIR/openshell-$GATEWAY-client-keychain.p12" \ + -passout pass:openshell +``` + +Import it for Safari from the command line: + +```shell +security import "$MTLS_DIR/openshell-$GATEWAY-client-keychain.p12" \ + -k "$HOME/Library/Keychains/login.keychain-db" \ + -P "openshell" \ + -f pkcs12 \ + -T "/Applications/Safari.app" +``` + +Or import the client `.p12` into Keychain Access: + +1. Open **Keychain Access**. +2. Select the **login** keychain. +3. Choose **File > Import Items**. +4. Select `~/.config/openshell/gateways//mtls/openshell--client-keychain.p12`. +5. Enter the PKCS#12 password. If you used the command above, enter `openshell`. + +Restart the browser after importing certificates. + +## Troubleshooting + +If the browser still shows a certificate warning, recreate gateways created before sandbox service routing so their server certificate includes service base domain SANs: + +```shell +openshell gateway start --recreate +``` + +If the gateway rejects the browser connection, confirm the client certificate is installed in **Your Certificates** in Firefox or in the login keychain on macOS. diff --git a/docs/sandboxes/manage-gateways.mdx b/docs/sandboxes/manage-gateways.mdx index a20a27c20..cac075bb4 100644 --- a/docs/sandboxes/manage-gateways.mdx +++ b/docs/sandboxes/manage-gateways.mdx @@ -161,6 +161,7 @@ openshell gateway info --name my-remote-cluster | Flag | Purpose | |---|---| | `--gpu` | Enable NVIDIA GPU passthrough. Requires NVIDIA drivers and the Container Toolkit on the host. OpenShell auto-selects CDI when enabled on the daemon and falls back to Docker's NVIDIA GPU request path (`--gpus all`) otherwise. | +| `--service-base-domain` | Add a base domain for sandbox service routing. Defaults to `.openshell.localhost`. May be repeated. | | `--plaintext` | Listen on HTTP instead of mTLS. Use behind a TLS-terminating reverse proxy. | | `--disable-gateway-auth` | Skip mTLS client certificate checks. Use when a reverse proxy cannot forward client certs. | | `--registry-username` | Username for registry authentication. Defaults to `__token__` when `--registry-token` is set. Only needed for private registries. Also configurable with `OPENSHELL_REGISTRY_USERNAME`. | diff --git a/docs/sandboxes/manage-sandboxes.mdx b/docs/sandboxes/manage-sandboxes.mdx index 4b83fb4bb..2a7ad46b7 100644 --- a/docs/sandboxes/manage-sandboxes.mdx +++ b/docs/sandboxes/manage-sandboxes.mdx @@ -123,6 +123,27 @@ OpenShell allocates a TTY automatically when both stdin and stdout are terminals ## Monitor and Debug +## Expose HTTP Services + +Sandbox service routing is enabled for gateways by default. Recreate gateways created before this routing mode so their generated TLS certificate includes the service base domain SANs: + +```shell +openshell gateway start --recreate +``` + +Expose a service that listens on loopback inside the sandbox: + +```shell +openshell service expose my-sandbox web --target-port 8080 +``` + +OpenShell prints a URL in the form `https://--..openshell.localhost:/`. HTTP traffic enters the same gateway listener as gRPC and routes by the `Host` header before gateway routes such as `/auth`, so application paths are preserved for the sandbox service. + + +Import the gateway CA and client certificate before using mTLS-protected browser access; see [Browser Certificates](/reference/browser-certificates). Endpoint deletion, renewal, and remote/public domains are follow-up work. + + + List all sandboxes: ```shell diff --git a/proto/openshell.proto b/proto/openshell.proto index 75490f338..82930a3ea 100644 --- a/proto/openshell.proto +++ b/proto/openshell.proto @@ -36,6 +36,9 @@ service OpenShell { // Create a short-lived SSH session for a sandbox. rpc CreateSshSession(CreateSshSessionRequest) returns (CreateSshSessionResponse); + // Create or update a sandbox HTTP service endpoint for local-domain routing. + rpc ExposeService(ExposeServiceRequest) returns (ServiceEndpointResponse); + // Revoke a previously issued SSH session. rpc RevokeSshSession(RevokeSshSessionRequest) returns (RevokeSshSessionResponse); @@ -105,11 +108,10 @@ service OpenShell { // on its ConnectSupervisor stream. The first RelayFrame carries a // RelayInit with the channel_id to associate the new HTTP/2 stream with // the pending relay slot on the gateway. Subsequent frames carry raw bytes in either - // direction between the gateway-side waiter (ssh_tunnel / exec handler) - // and the supervisor-side local SSH daemon bridge. + // direction between the gateway-side waiter and the supervisor-side local + // target bridge. // - // This rides the same TCP+TLS+HTTP/2 connection as ConnectSupervisor — - // no new TLS handshake, no reverse HTTP CONNECT. + // This rides the same TCP+TLS+HTTP/2 connection as ConnectSupervisor. rpc RelayStream(stream RelayFrame) returns (stream RelayFrame); // Watch a sandbox and stream updates. @@ -379,6 +381,40 @@ message CreateSshSessionResponse { int64 expires_at_ms = 8; } +// Request to expose an HTTP service running inside a sandbox. +message ExposeServiceRequest { + // Sandbox name. + string sandbox = 1; + // Service name within the sandbox. + string service = 2; + // Loopback TCP port inside the sandbox. + uint32 target_port = 3; + // Whether to print/use the local-domain URL. + bool domain = 4; +} + +// Persisted sandbox service endpoint. +message ServiceEndpoint { + // Kubernetes-style metadata. + openshell.datamodel.v1.ObjectMeta metadata = 1; + // Sandbox object ID. + string sandbox_id = 2; + // Sandbox name. + string sandbox_name = 3; + // Service name within the sandbox. + string service_name = 4; + // Loopback TCP port inside the sandbox. + uint32 target_port = 5; + // Whether local-domain routing is enabled for this endpoint. + bool domain = 6; +} + +// Response containing a service endpoint and, when available, its local URL. +message ServiceEndpointResponse { + ServiceEndpoint endpoint = 1; + string url = 2; +} + // Revoke SSH session request. message RevokeSshSessionRequest { // Session token to revoke. @@ -839,10 +875,27 @@ message GatewayHeartbeat {} // On receiving this, the supervisor should initiate a RelayStream RPC to // the gateway, sending a RelayInit in the first RelayFrame to associate // the new HTTP/2 stream with the pending relay slot. The supervisor -// bridges that stream to the local SSH daemon. +// bridges that stream to the requested local target. message RelayOpen { // Gateway-allocated channel identifier (UUID). string channel_id = 1; + // Target the supervisor should dial inside the sandbox. + // If absent, supervisors treat the relay as SSH for compatibility. + oneof target { + SshRelayTarget ssh = 2; + TcpRelayTarget tcp = 3; + } + // Optional service identifier for audit/correlation. + string service_id = 5; +} + +// Built-in SSH relay target. +message SshRelayTarget {} + +// TCP target dialed by the supervisor from inside the sandbox. +message TcpRelayTarget { + // Target port. Must fit in u16 and be non-zero. + uint32 port = 1; } // Initial RelayStream frame sent by the supervisor to claim a pending relay. diff --git a/tasks/scripts/cluster-deploy-fast.sh b/tasks/scripts/cluster-deploy-fast.sh index da3e6494f..1bea935c8 100755 --- a/tasks/scripts/cluster-deploy-fast.sh +++ b/tasks/scripts/cluster-deploy-fast.sh @@ -454,6 +454,7 @@ if [[ "${needs_helm_upgrade}" == "1" ]]; then --set server.tls.certSecretName=openshell-server-tls \ --set server.tls.clientCaSecretName=openshell-server-client-ca \ --set server.tls.clientTlsSecretName=openshell-client-tls \ + --set-string server.serviceBaseDomains[0]=${CLUSTER_NAME}.openshell.localhost \ ${HOST_GATEWAY_ARGS} \ ${OIDC_HELM_ARGS} \ ${helm_wait_args}"