diff --git a/src/cli.rs b/src/cli.rs index 36b2a899..9e85ee5d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -472,6 +472,8 @@ pub enum DaemonAction { #[arg(long)] no_stop: bool, }, + /// Restart the installed daemon service (e.g. after a version mismatch) + Restart, /// Print daemon service/socket status Status, } diff --git a/src/daemon.rs b/src/daemon.rs index 6bd98001..5f3627a9 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,5 +1,5 @@ #[cfg(unix)] -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Write; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; @@ -115,6 +115,15 @@ pub struct DaemonHandshake { pub timings: bool, pub allow_init: bool, pub client_identity: DaemonClientIdentity, + /// Version of the tracedecay binary that opened this connection. + /// + /// `#[serde(default)]` keeps mixed-version pairs interoperable: a new + /// daemon reads handshakes from old clients (missing field → empty), and + /// old daemons ignore the extra field. The daemon uses it to detect and + /// log version skew, e.g. a stale daemon still serving after + /// `tracedecay update` replaced the binary. + #[serde(default)] + pub client_version: String, } impl DaemonHandshake { @@ -130,6 +139,7 @@ impl DaemonHandshake { timings, allow_init, client_identity: DaemonClientIdentity::current()?, + client_version: binary_version().to_string(), }) } @@ -149,6 +159,24 @@ impl DaemonHandshake { } } +/// Version of this tracedecay binary, advertised in daemon handshakes and +/// compared against peers to detect stale daemons after `tracedecay update`. +fn binary_version() -> &'static str { + env!("CARGO_PKG_VERSION") +} + +/// The client version to report as skewed, or `None` when the versions match. +/// +/// Old clients send no version (empty string); that is indistinguishable from +/// "same version before this field existed", so it never counts as skew. +#[cfg(unix)] +fn client_version_skew(client_version: &str, daemon_version: &str) -> Option { + if client_version.is_empty() || client_version == daemon_version { + return None; + } + Some(client_version.to_string()) +} + #[cfg(unix)] pub async fn notify_hook_event(project_path: &Path, event: DaemonHookEvent) { let _ = timeout( @@ -588,6 +616,52 @@ async fn connect_with_restart_grace( } } +/// Decides at `tracedecay serve` startup whether to proxy to the daemon. +/// +/// A missing socket usually means "no daemon", but `tracedecay update` +/// restarts the daemon service and shutdown unlinks the socket before the new +/// daemon rebinds it; a serve process starting inside that window would +/// otherwise silently commit to in-process mode for its whole lifetime. When +/// a daemon service is installed for this socket, wait out that window with +/// the same grace used for per-request connects before falling back. +#[cfg(unix)] +pub async fn should_proxy_serve_to_daemon(socket_path: &Path) -> bool { + let installed_socket = installed_service_socket_path().ok().flatten(); + should_proxy_serve_to_daemon_with( + socket_path, + installed_socket.as_deref(), + DAEMON_RESTART_GRACE, + DAEMON_RESTART_POLL_INTERVAL, + ) + .await +} + +#[cfg(unix)] +async fn should_proxy_serve_to_daemon_with( + socket_path: &Path, + installed_service_socket: Option<&Path>, + grace: Duration, + poll_interval: Duration, +) -> bool { + if socket_path.exists() { + return true; + } + // Only wait when an installed service is expected to rebind this exact + // socket; otherwise in-process startup must stay instant. + if installed_service_socket != Some(socket_path) { + return false; + } + connect_with_restart_grace(socket_path, grace, poll_interval) + .await + .is_ok() +} + +/// Non-unix builds have no daemon; `proxy_stdio_to_daemon` would error anyway. +#[cfg(not(unix))] +pub async fn should_proxy_serve_to_daemon(socket_path: &Path) -> bool { + socket_path.exists() +} + #[cfg(unix)] pub async fn run_foreground(socket_path: PathBuf) -> Result<()> { run_foreground_unix(socket_path).await @@ -638,6 +712,9 @@ async fn proxy_request_line_to_daemon( match send_daemon_request_line(socket_path, handshake, line).await { Ok(responses) => { + if let Some(warning) = daemon_version_skew_warning(line, &responses, binary_version()) { + eprintln!("[tracedecay] warning: {warning}"); + } for response in responses { transport.write_line(&response).await?; if !response.ends_with('\n') { @@ -718,6 +795,48 @@ async fn send_daemon_request_line( Ok(responses) } +/// Extracts the daemon's advertised version from a proxied `initialize` +/// response (`result.serverInfo.version`, which daemons have always sent). +/// +/// This works against daemons older than the handshake version field, so a +/// freshly-updated client can still detect a stale daemon left running by a +/// non-systemd setup or a plain `tracedecay upgrade`. +#[cfg(unix)] +fn daemon_version_from_initialize_response( + request_line: &str, + responses: &[String], +) -> Option { + let request = serde_json::from_str::(request_line).ok()?; + if request.method != "initialize" { + return None; + } + responses.iter().find_map(|line| { + serde_json::from_str::(line) + .ok()? + .pointer("/result/serverInfo/version")? + .as_str() + .map(str::to_string) + }) +} + +/// The warning to surface when the daemon behind an `initialize` response is +/// running a different binary version than this client. +#[cfg(unix)] +fn daemon_version_skew_warning( + request_line: &str, + responses: &[String], + client_version: &str, +) -> Option { + let daemon_version = daemon_version_from_initialize_response(request_line, responses)?; + if daemon_version == client_version { + return None; + } + Some(format!( + "TraceDecay daemon is version {daemon_version} but this client is {client_version} — \ + run `tracedecay daemon restart` to reload the daemon binary" + )) +} + #[cfg(unix)] fn daemon_proxy_error_response(line: &str, err: &TraceDecayError) -> Option { let request = serde_json::from_str::(line).ok()?; @@ -909,6 +1028,9 @@ struct DaemonEngine { project_servers: Arc>>>, /// Background automation loops, partitioned with the same client/project identity as MCP state. automation_schedulers: Arc>>>, + /// Client versions whose skew was already logged. Proxy clients reconnect + /// per request, so without this the mismatch would flood the daemon log. + logged_client_version_skews: Arc>>, } #[cfg(unix)] @@ -932,6 +1054,35 @@ impl ProjectServerKey { #[cfg(unix)] impl DaemonEngine { + /// Returns the client version to log for this handshake, once per distinct + /// skewed version; repeat connections from the same client return `None`. + async fn client_version_skew_to_log(&self, handshake: &DaemonHandshake) -> Option { + let skew = client_version_skew(&handshake.client_version, binary_version())?; + let mut logged = self.logged_client_version_skews.lock().await; + logged.insert(skew.clone()).then_some(skew) + } + + /// Logs a `daemon_version_skew` event when this handshake's client runs a + /// different binary version, deduped per distinct client version. + async fn log_client_version_skew(&self, handshake: &DaemonHandshake) { + let Some(client_version) = self.client_version_skew_to_log(handshake).await else { + return; + }; + log_daemon_event( + "daemon_version_skew", + &[ + ("daemon_version", binary_version().to_string()), + ("client_version", client_version), + ( + "hint", + "daemon binary differs from the connecting client; \ + run `tracedecay daemon restart` to reload it" + .to_string(), + ), + ], + ); + } + async fn project_server( &self, handshake: &DaemonHandshake, @@ -1291,6 +1442,7 @@ async fn serve_socket_client(stream: tokio::net::UnixStream, engine: DaemonEngin return Ok(()); }; let handshake = DaemonHandshake::from_line(&line)?; + engine.log_client_version_skew(&handshake).await; if handshake.project_path.is_some() { let server = match Box::pin(engine.project_server(&handshake)).await { Ok(server) => server, @@ -1584,6 +1736,17 @@ mod tests { } } + fn test_handshake_defaults() -> DaemonHandshake { + DaemonHandshake { + project_path: None, + scope_prefix: None, + timings: false, + allow_init: false, + client_identity: test_client_identity(), + client_version: super::binary_version().to_string(), + } + } + fn run_git(cwd: &std::path::Path, args: &[&str]) { let output = std::process::Command::new("git") .args(args) @@ -1705,6 +1868,107 @@ mod tests { ); } + #[cfg(unix)] + #[tokio::test] + async fn serve_proxies_when_socket_already_exists() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + let _listener = tokio::net::UnixListener::bind(&socket).expect("bind daemon socket"); + + assert!( + super::should_proxy_serve_to_daemon_with( + &socket, + None, + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ) + .await + ); + } + + #[cfg(unix)] + #[tokio::test] + async fn serve_stays_in_process_without_socket_or_installed_service() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + let other_socket = dir.path().join("other.sock"); + + // No socket and no service claiming it: fall back immediately, even + // with a long grace configured — startup must not stall. + let decision = tokio::time::timeout( + std::time::Duration::from_secs(1), + super::should_proxy_serve_to_daemon_with( + &socket, + None, + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ), + ) + .await + .expect("decision without daemon evidence should be immediate"); + assert!(!decision); + + // A service installed for a different socket is not evidence either. + let decision = tokio::time::timeout( + std::time::Duration::from_secs(1), + super::should_proxy_serve_to_daemon_with( + &socket, + Some(&other_socket), + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ), + ) + .await + .expect("mismatched service socket should not delay the decision"); + assert!(!decision); + } + + #[cfg(unix)] + #[tokio::test] + async fn serve_waits_out_restart_window_when_service_owns_socket() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + + // Simulate the `tracedecay update` restart window: the service is + // installed but the old daemon already unlinked the socket; the new + // daemon binds it shortly after serve starts. + let bind_path = socket.clone(); + let daemon = tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + tokio::net::UnixListener::bind(&bind_path).expect("bind restarted daemon socket") + }); + + assert!( + super::should_proxy_serve_to_daemon_with( + &socket, + Some(&socket), + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ) + .await, + "serve started during a daemon restart should still pick the daemon transport" + ); + daemon.await.expect("daemon bind task"); + } + + #[cfg(unix)] + #[tokio::test] + async fn serve_falls_back_when_installed_service_never_rebinds() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + + assert!( + !super::should_proxy_serve_to_daemon_with( + &socket, + Some(&socket), + std::time::Duration::from_millis(200), + std::time::Duration::from_millis(50), + ) + .await, + "a stopped service should fall back to in-process after the grace expires" + ); + } + #[cfg(unix)] #[tokio::test] async fn proxied_request_survives_daemon_restart_window() { @@ -1747,13 +2011,7 @@ mod tests { writer.write_all(b"\n").await.expect("write newline"); }); - let handshake = DaemonHandshake { - project_path: None, - scope_prefix: None, - timings: false, - allow_init: false, - client_identity: test_client_identity(), - }; + let handshake = test_handshake_defaults(); let request = serde_json::to_string(&json!({ "jsonrpc": "2.0", "id": 42, @@ -1844,7 +2102,7 @@ mod tests { scope_prefix: Some("src/mcp".to_string()), timings: true, allow_init: true, - client_identity: test_client_identity(), + ..test_handshake_defaults() }; let encoded = handshake.to_line().expect("handshake should encode"); @@ -1866,6 +2124,145 @@ mod tests { assert!(DaemonHandshake::from_line(&encoded).is_err()); } + /// Old client → new daemon: handshakes without `client_version` (sent by + /// binaries predating the field) must still parse, with an empty version. + #[test] + fn daemon_handshake_accepts_old_client_without_version() { + let encoded = serde_json::json!({ + "project_path": "/work/repo", + "scope_prefix": null, + "timings": false, + "allow_init": false, + "client_identity": { + "profile_root": "/profiles/client", + "global_db_path": "/profiles/client/global.db" + } + }) + .to_string(); + + let decoded = DaemonHandshake::from_line(&encoded).expect("old handshake should decode"); + + assert_eq!(decoded.client_version, ""); + } + + /// New client → old daemon: the serde derive ignores unknown fields, so a + /// daemon predating `client_version` (same derive) parses new handshakes. + /// Adding another unknown field to a current handshake proves the + /// tolerance the old daemon relies on. + #[test] + fn daemon_handshake_ignores_unknown_fields_for_old_daemons() { + let handshake = test_handshake_defaults(); + let mut value: serde_json::Value = + serde_json::from_str(&handshake.to_line().expect("handshake should encode")) + .expect("handshake json"); + value["field_from_a_future_version"] = serde_json::json!("ignored"); + + let decoded = DaemonHandshake::from_line(&value.to_string()) + .expect("handshake with unknown fields should decode"); + + assert_eq!(decoded, handshake); + } + + #[test] + fn daemon_handshake_advertises_binary_version() { + let handshake = test_handshake_defaults(); + + let encoded = handshake.to_line().expect("handshake should encode"); + let value: serde_json::Value = serde_json::from_str(&encoded).expect("handshake json"); + + assert_eq!( + value["client_version"], + serde_json::json!(env!("CARGO_PKG_VERSION")) + ); + } + + #[cfg(unix)] + #[test] + fn client_version_skew_flags_only_real_mismatches() { + assert_eq!(super::client_version_skew("1.2.3", "1.2.3"), None); + assert_eq!(super::client_version_skew("", "1.2.3"), None); + assert_eq!( + super::client_version_skew("1.3.0", "1.2.3"), + Some("1.3.0".to_string()) + ); + } + + #[cfg(unix)] + #[tokio::test] + async fn daemon_engine_logs_version_skew_once_per_client_version() { + let engine = super::DaemonEngine::default(); + let mut handshake = test_handshake_defaults(); + handshake.client_version = "0.0.0-skewed".to_string(); + + assert_eq!( + engine.client_version_skew_to_log(&handshake).await, + Some("0.0.0-skewed".to_string()), + "first connection from a skewed client should be logged" + ); + assert_eq!( + engine.client_version_skew_to_log(&handshake).await, + None, + "repeat connections from the same client version must not spam the log" + ); + + let matching = test_handshake_defaults(); + assert_eq!( + engine.client_version_skew_to_log(&matching).await, + None, + "matching client versions are not skew" + ); + } + + #[cfg(unix)] + #[test] + fn daemon_version_skew_warning_reads_initialize_server_info() { + let initialize = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": {} + }) + .to_string(); + let response = |version: &str| { + vec![serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { "serverInfo": { "name": "tracedecay", "version": version } } + }) + .to_string()] + }; + + let warning = super::daemon_version_skew_warning(&initialize, &response("9.9.9"), "1.0.0") + .expect("mismatched daemon version should warn"); + assert!( + warning.contains("9.9.9") && warning.contains("1.0.0"), + "warning should name both versions, got: {warning}" + ); + assert!( + warning.contains("tracedecay daemon restart"), + "warning should point at the restart command, got: {warning}" + ); + + assert_eq!( + super::daemon_version_skew_warning(&initialize, &response("1.0.0"), "1.0.0"), + None, + "matching versions must not warn" + ); + + let tools_call = serde_json::json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": {} + }) + .to_string(); + assert_eq!( + super::daemon_version_skew_warning(&tools_call, &response("9.9.9"), "1.0.0"), + None, + "only initialize responses advertise the daemon version" + ); + } + #[cfg(unix)] #[test] fn automation_scheduler_starts_when_any_task_has_interval() { @@ -1983,10 +2380,8 @@ mod tests { .expect("save automation config"); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; let tick_secs = Box::pin(super::automation_scheduler_tick_secs_for_project( @@ -2015,10 +2410,9 @@ mod tests { let (reader, mut writer) = client.into_split(); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, allow_init: true, client_identity, + ..test_handshake_defaults() }; writer .write_all(handshake.to_line().expect("handshake").as_bytes()) @@ -2087,10 +2481,8 @@ mod tests { let (_reader, mut writer) = client.into_split(); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; writer .write_all(handshake.to_line().expect("handshake").as_bytes()) @@ -2156,10 +2548,8 @@ mod tests { let dashboard_root = cg.store_layout().dashboard_root.clone(); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; let key = super::ProjectServerKey::from_handshake(project.clone(), &handshake); let engine = super::DaemonEngine::default(); @@ -2281,10 +2671,8 @@ mod tests { .expect("save paused scheduler control"); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; Box::pin(super::run_automation_scheduler_tick(&project, &handshake)) @@ -2319,11 +2707,8 @@ mod tests { let (reader, mut writer) = client.into_split(); let handshake = DaemonHandshake { - project_path: None, - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; writer .write_all(handshake.to_line().expect("handshake").as_bytes()) @@ -2400,10 +2785,8 @@ mod tests { let handshake_a = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity: client_identity.clone(), + ..test_handshake_defaults() }; writer_a .write_all(handshake_a.to_line().expect("handshake").as_bytes()) @@ -2438,10 +2821,9 @@ mod tests { let mut lines_b = tokio::io::BufReader::new(reader_b).lines(); let handshake_b = DaemonHandshake { project_path: Some(project), - scope_prefix: None, timings: true, - allow_init: false, client_identity, + ..test_handshake_defaults() }; writer_b .write_all(handshake_b.to_line().expect("handshake").as_bytes()) diff --git a/src/main.rs b/src/main.rs index 2b30e007..ddf30ec2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -410,19 +410,32 @@ fn refresh_generated_plugins() -> tracedecay::errors::Result<()> { Ok(()) } -fn refresh_daemon_service() -> tracedecay::errors::Result<()> { +/// Rewrites and restarts the installed daemon service, returning the service +/// path and its socket, or `None` when no service is installed. +fn refresh_daemon_service() -> tracedecay::errors::Result> { let tracedecay_bin = tracedecay_bin_on_path()?; let spec = tracedecay::daemon::service_spec(tracedecay_bin, None)?; let socket_path = tracedecay::daemon::installed_service_socket_path()? .unwrap_or_else(|| spec.socket_path.clone()); - match tracedecay::daemon::refresh_installed_service(&spec)? { - Some(service_path) => { + Ok(tracedecay::daemon::refresh_installed_service(&spec)? + .map(|service_path| (service_path, socket_path))) +} + +fn refresh_daemon_service_after_update() -> tracedecay::errors::Result<()> { + match refresh_daemon_service()? { + Some((service_path, socket_path)) => { eprintln!( "\x1b[32m✔\x1b[0m Daemon service refreshed at {}", service_path.display() ); eprintln!("Daemon socket: {}", socket_path.display()); } + None if tracedecay::daemon::daemon_reachable() => { + eprintln!( + " \x1b[33mwarning:\x1b[0m a TraceDecay daemon is running without an installed service; \ + it keeps serving the previous version until its `tracedecay daemon run` process is restarted." + ); + } None => { eprintln!("TraceDecay daemon service is not installed; skipping daemon restart."); } @@ -430,6 +443,24 @@ fn refresh_daemon_service() -> tracedecay::errors::Result<()> { Ok(()) } +fn restart_daemon_service() -> tracedecay::errors::Result<()> { + match refresh_daemon_service()? { + Some((service_path, socket_path)) => { + eprintln!( + "\x1b[32m✔\x1b[0m Daemon service restarted at {}", + service_path.display() + ); + eprintln!("Daemon socket: {}", socket_path.display()); + Ok(()) + } + None => Err(tracedecay::errors::TraceDecayError::Config { + message: "no TraceDecay daemon service is installed — restart your `tracedecay daemon run` \ + process manually, or run `tracedecay daemon install-service` to manage it as a service" + .to_string(), + }), + } +} + fn tracedecay_home_dir() -> tracedecay::errors::Result { tracedecay::agents::home_dir().ok_or_else(|| tracedecay::errors::TraceDecayError::Config { message: "could not determine home directory".to_string(), @@ -479,7 +510,7 @@ fn run_post_update_subcommand() -> tracedecay::errors::Result<()> { fn run_post_update_tasks() -> tracedecay::errors::Result<()> { refresh_generated_plugins()?; - if let Err(error) = refresh_daemon_service() { + if let Err(error) = refresh_daemon_service_after_update() { eprintln!(" \x1b[33mwarning:\x1b[0m daemon service refresh failed: {error}"); } Ok(()) @@ -730,7 +761,7 @@ async fn dispatch_command(command: Commands) -> tracedecay::errors::Result<()> { false, )?; let socket_path = tracedecay::daemon::default_socket_path()?; - if socket_path.exists() { + if tracedecay::daemon::should_proxy_serve_to_daemon(&socket_path).await { tracedecay::daemon::proxy_stdio_to_daemon(&socket_path, &handshake, peeked_line) .await?; } else { @@ -766,6 +797,9 @@ async fn dispatch_command(command: Commands) -> tracedecay::errors::Result<()> { service_path.display() ); } + DaemonAction::Restart => { + restart_daemon_service()?; + } DaemonAction::Status => { let socket_path = tracedecay::daemon::socket_path_or_default(None)?; print!("{}", tracedecay::daemon::service_status(&socket_path)); diff --git a/tests/mcp_cli_serve_test.rs b/tests/mcp_cli_serve_test.rs index edff0017..a646d8b5 100644 --- a/tests/mcp_cli_serve_test.rs +++ b/tests/mcp_cli_serve_test.rs @@ -606,6 +606,151 @@ async fn serve_stdio_smokes_automation_run_artifact_view() { ); } +/// Regression test for the serve/daemon-restart race: a serve process started +/// while `tracedecay update` is restarting the daemon sees no socket file, but +/// must not silently commit to in-process mode when an installed service is +/// about to rebind the socket. +#[cfg(target_os = "linux")] +#[tokio::test] +async fn serve_started_during_daemon_restart_window_proxies_to_restarted_daemon() { + use std::io::{BufRead, BufReader, Read}; + use std::os::unix::net::UnixListener; + use std::time::{Duration, Instant}; + + let home = TempDir::new().unwrap(); + let project = init_project_with_file(home.path(), "pub fn restart_window_marker() {}\n").await; + let socket_path = common::daemon_socket_path(home.path()); + fs::create_dir_all(socket_path.parent().unwrap()).unwrap(); + + // An installed service unit claims the socket, but the socket file is + // missing — exactly the window between daemon shutdown and rebind. + let unit_dir = canonical_existing_path(home.path()).join(".config/systemd/user"); + fs::create_dir_all(&unit_dir).unwrap(); + fs::write( + unit_dir.join("tracedecay.service"), + format!( + "[Service]\nExecStart=/opt/tracedecay/bin/tracedecay daemon run --socket {}\n", + socket_path.display() + ), + ) + .unwrap(); + + // The "restarted daemon" binds the socket only after serve has started. + // It answers `initialize` with a sentinel server name and a skewed + // version, so a proxied response is distinguishable from the in-process + // fallback and exercises the client-side version-skew warning. + let listener_path = socket_path.clone(); + let fake_daemon = std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(400)); + let listener = UnixListener::bind(&listener_path).expect("bind restarted daemon socket"); + listener + .set_nonblocking(true) + .expect("nonblocking fake daemon listener"); + let deadline = Instant::now() + Duration::from_secs(10); + loop { + assert!( + Instant::now() < deadline, + "timed out waiting for serve to proxy a request to the restarted daemon" + ); + let mut stream = match listener.accept() { + Ok((stream, _addr)) => stream, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(10)); + continue; + } + Err(e) => panic!("accept serve connection: {e}"), + }; + stream + .set_nonblocking(false) + .expect("blocking fake daemon stream"); + let mut reader = BufReader::new(stream.try_clone().expect("clone fake daemon stream")); + let mut handshake_line = String::new(); + if reader + .read_line(&mut handshake_line) + .expect("read handshake") + == 0 + { + // The transport probe connects and hangs up without a handshake. + continue; + } + let mut request_line = String::new(); + if reader.read_line(&mut request_line).expect("read request") == 0 { + continue; + } + let request: Value = serde_json::from_str(request_line.trim()).expect("request json"); + let response = json!({ + "jsonrpc": "2.0", + "id": request["id"], + "result": { + "protocolVersion": "2024-11-05", + "capabilities": { "tools": {} }, + "serverInfo": { + "name": "sentinel-restarted-daemon", + "version": "0.0.1-sentinel" + } + } + }); + writeln!(stream, "{response}").expect("write fake daemon response"); + // Drain until the proxy hangs up so the response is not lost to a + // connection reset. + let mut scratch = [0_u8; 64]; + while matches!(reader.read(&mut scratch), Ok(n) if n > 0) {} + return; + } + }); + + let mut child = tracedecay_command_with_home(home.path()) + .arg("serve") + .arg("--path") + .arg(project.path()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("tracedecay serve should run"); + { + let stdin = child.stdin.as_mut().expect("stdin should be piped"); + writeln!( + stdin, + "{}", + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": {} + }) + ) + .unwrap(); + } + drop(child.stdin.take()); + + let output = child + .wait_with_output() + .expect("tracedecay serve should exit after stdin closes"); + fake_daemon.join().expect("fake daemon thread should exit"); + + assert!( + output.status.success(), + "serve should ride out the daemon restart window\nstdout:\n{}\nstderr:\n{}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + let stdout = String::from_utf8_lossy(&output.stdout); + let response: Value = serde_json::from_str(stdout.trim()).unwrap_or_else(|err| { + panic!("stdout should contain one JSON-RPC response: {err}\n{stdout}") + }); + assert_eq!( + response["result"]["serverInfo"]["name"], + json!("sentinel-restarted-daemon"), + "initialize must be answered by the restarted daemon, not an in-process fallback:\n{response}" + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("0.0.1-sentinel") && stderr.contains("tracedecay daemon restart"), + "proxy should warn about the daemon/client version skew\nstderr:\n{stderr}" + ); +} + #[cfg(unix)] #[tokio::test] async fn serve_daemon_proxy_reports_daemon_disconnect_as_json_rpc_error() {