From a89c71a070cba81ca9da83f6a907a941957ce9e9 Mon Sep 17 00:00:00 2001 From: ScriptedAlchemy Date: Thu, 2 Jul 2026 01:31:06 +0000 Subject: [PATCH 1/2] feat(daemon): version handshake, skew warnings, restart-aware serve Follow-ups deferred from #179 (connect-retry grace for daemon restarts): - DaemonHandshake now advertises the client binary version with #[serde(default)] so old daemons/clients still interoperate. The daemon logs a deduped daemon_version_skew event when a client's version differs. - The serve stdio proxy reads the daemon's serverInfo.version from proxied initialize responses (sent by daemons of every version) and warns on stderr when it differs from the client, pointing at the new `tracedecay daemon restart` command. `tracedecay post-update` now warns loudly when an unmanaged daemon keeps serving the previous version. - `tracedecay serve` no longer commits to in-process mode just because the socket file is missing: when an installed service claims the socket, it waits out the daemon-restart window with the same connect grace used for per-request reconnects before falling back. --- src/cli.rs | 2 + src/daemon.rs | 439 +++++++++++++++++++++++++++++++++--- src/main.rs | 44 +++- tests/mcp_cli_serve_test.rs | 145 ++++++++++++ 4 files changed, 593 insertions(+), 37 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 36b2a899..9e85ee5d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -472,6 +472,8 @@ pub enum DaemonAction { #[arg(long)] no_stop: bool, }, + /// Restart the installed daemon service (e.g. after a version mismatch) + Restart, /// Print daemon service/socket status Status, } diff --git a/src/daemon.rs b/src/daemon.rs index 6bd98001..15574ddb 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,5 +1,5 @@ #[cfg(unix)] -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Write; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; @@ -115,6 +115,15 @@ pub struct DaemonHandshake { pub timings: bool, pub allow_init: bool, pub client_identity: DaemonClientIdentity, + /// Version of the tracedecay binary that opened this connection. + /// + /// `#[serde(default)]` keeps mixed-version pairs interoperable: a new + /// daemon reads handshakes from old clients (missing field → empty), and + /// old daemons ignore the extra field. The daemon uses it to detect and + /// log version skew, e.g. a stale daemon still serving after + /// `tracedecay update` replaced the binary. + #[serde(default)] + pub client_version: String, } impl DaemonHandshake { @@ -130,6 +139,7 @@ impl DaemonHandshake { timings, allow_init, client_identity: DaemonClientIdentity::current()?, + client_version: binary_version().to_string(), }) } @@ -149,6 +159,24 @@ impl DaemonHandshake { } } +/// Version of this tracedecay binary, advertised in daemon handshakes and +/// compared against peers to detect stale daemons after `tracedecay update`. +fn binary_version() -> &'static str { + env!("CARGO_PKG_VERSION") +} + +/// The client version to report as skewed, or `None` when the versions match. +/// +/// Old clients send no version (empty string); that is indistinguishable from +/// "same version before this field existed", so it never counts as skew. +#[cfg(unix)] +fn client_version_skew(client_version: &str, daemon_version: &str) -> Option { + if client_version.is_empty() || client_version == daemon_version { + return None; + } + Some(client_version.to_string()) +} + #[cfg(unix)] pub async fn notify_hook_event(project_path: &Path, event: DaemonHookEvent) { let _ = timeout( @@ -588,6 +616,52 @@ async fn connect_with_restart_grace( } } +/// Decides at `tracedecay serve` startup whether to proxy to the daemon. +/// +/// A missing socket usually means "no daemon", but `tracedecay update` +/// restarts the daemon service and shutdown unlinks the socket before the new +/// daemon rebinds it; a serve process starting inside that window would +/// otherwise silently commit to in-process mode for its whole lifetime. When +/// a daemon service is installed for this socket, wait out that window with +/// the same grace used for per-request connects before falling back. +#[cfg(unix)] +pub async fn should_proxy_serve_to_daemon(socket_path: &Path) -> bool { + let installed_socket = installed_service_socket_path().ok().flatten(); + should_proxy_serve_to_daemon_with( + socket_path, + installed_socket.as_deref(), + DAEMON_RESTART_GRACE, + DAEMON_RESTART_POLL_INTERVAL, + ) + .await +} + +#[cfg(unix)] +async fn should_proxy_serve_to_daemon_with( + socket_path: &Path, + installed_service_socket: Option<&Path>, + grace: Duration, + poll_interval: Duration, +) -> bool { + if socket_path.exists() { + return true; + } + // Only wait when an installed service is expected to rebind this exact + // socket; otherwise in-process startup must stay instant. + if installed_service_socket != Some(socket_path) { + return false; + } + connect_with_restart_grace(socket_path, grace, poll_interval) + .await + .is_ok() +} + +/// Non-unix builds have no daemon; `proxy_stdio_to_daemon` would error anyway. +#[cfg(not(unix))] +pub async fn should_proxy_serve_to_daemon(socket_path: &Path) -> bool { + socket_path.exists() +} + #[cfg(unix)] pub async fn run_foreground(socket_path: PathBuf) -> Result<()> { run_foreground_unix(socket_path).await @@ -638,6 +712,9 @@ async fn proxy_request_line_to_daemon( match send_daemon_request_line(socket_path, handshake, line).await { Ok(responses) => { + if let Some(warning) = daemon_version_skew_warning(line, &responses, binary_version()) { + eprintln!("[tracedecay] warning: {warning}"); + } for response in responses { transport.write_line(&response).await?; if !response.ends_with('\n') { @@ -718,6 +795,48 @@ async fn send_daemon_request_line( Ok(responses) } +/// Extracts the daemon's advertised version from a proxied `initialize` +/// response (`result.serverInfo.version`, which daemons have always sent). +/// +/// This works against daemons older than the handshake version field, so a +/// freshly-updated client can still detect a stale daemon left running by a +/// non-systemd setup or a plain `tracedecay upgrade`. +#[cfg(unix)] +fn daemon_version_from_initialize_response( + request_line: &str, + responses: &[String], +) -> Option { + let request = serde_json::from_str::(request_line).ok()?; + if request.method != "initialize" { + return None; + } + responses.iter().find_map(|line| { + serde_json::from_str::(line) + .ok()? + .pointer("/result/serverInfo/version")? + .as_str() + .map(str::to_string) + }) +} + +/// The warning to surface when the daemon behind an `initialize` response is +/// running a different binary version than this client. +#[cfg(unix)] +fn daemon_version_skew_warning( + request_line: &str, + responses: &[String], + client_version: &str, +) -> Option { + let daemon_version = daemon_version_from_initialize_response(request_line, responses)?; + if daemon_version == client_version { + return None; + } + Some(format!( + "TraceDecay daemon is version {daemon_version} but this client is {client_version} — \ + run `tracedecay daemon restart` to reload the daemon binary" + )) +} + #[cfg(unix)] fn daemon_proxy_error_response(line: &str, err: &TraceDecayError) -> Option { let request = serde_json::from_str::(line).ok()?; @@ -909,6 +1028,9 @@ struct DaemonEngine { project_servers: Arc>>>, /// Background automation loops, partitioned with the same client/project identity as MCP state. automation_schedulers: Arc>>>, + /// Client versions whose skew was already logged. Proxy clients reconnect + /// per request, so without this the mismatch would flood the daemon log. + logged_client_version_skews: Arc>>, } #[cfg(unix)] @@ -932,6 +1054,14 @@ impl ProjectServerKey { #[cfg(unix)] impl DaemonEngine { + /// Returns the client version to log for this handshake, once per distinct + /// skewed version; repeat connections from the same client return `None`. + async fn client_version_skew_to_log(&self, handshake: &DaemonHandshake) -> Option { + let skew = client_version_skew(&handshake.client_version, binary_version())?; + let mut logged = self.logged_client_version_skews.lock().await; + logged.insert(skew.clone()).then_some(skew) + } + async fn project_server( &self, handshake: &DaemonHandshake, @@ -1291,6 +1421,21 @@ async fn serve_socket_client(stream: tokio::net::UnixStream, engine: DaemonEngin return Ok(()); }; let handshake = DaemonHandshake::from_line(&line)?; + if let Some(client_version) = engine.client_version_skew_to_log(&handshake).await { + log_daemon_event( + "daemon_version_skew", + &[ + ("daemon_version", binary_version().to_string()), + ("client_version", client_version), + ( + "hint", + "daemon binary differs from the connecting client; \ + run `tracedecay daemon restart` to reload it" + .to_string(), + ), + ], + ); + } if handshake.project_path.is_some() { let server = match Box::pin(engine.project_server(&handshake)).await { Ok(server) => server, @@ -1584,6 +1729,17 @@ mod tests { } } + fn test_handshake_defaults() -> DaemonHandshake { + DaemonHandshake { + project_path: None, + scope_prefix: None, + timings: false, + allow_init: false, + client_identity: test_client_identity(), + client_version: super::binary_version().to_string(), + } + } + fn run_git(cwd: &std::path::Path, args: &[&str]) { let output = std::process::Command::new("git") .args(args) @@ -1705,6 +1861,107 @@ mod tests { ); } + #[cfg(unix)] + #[tokio::test] + async fn serve_proxies_when_socket_already_exists() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + let _listener = tokio::net::UnixListener::bind(&socket).expect("bind daemon socket"); + + assert!( + super::should_proxy_serve_to_daemon_with( + &socket, + None, + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ) + .await + ); + } + + #[cfg(unix)] + #[tokio::test] + async fn serve_stays_in_process_without_socket_or_installed_service() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + let other_socket = dir.path().join("other.sock"); + + // No socket and no service claiming it: fall back immediately, even + // with a long grace configured — startup must not stall. + let decision = tokio::time::timeout( + std::time::Duration::from_secs(1), + super::should_proxy_serve_to_daemon_with( + &socket, + None, + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ), + ) + .await + .expect("decision without daemon evidence should be immediate"); + assert!(!decision); + + // A service installed for a different socket is not evidence either. + let decision = tokio::time::timeout( + std::time::Duration::from_secs(1), + super::should_proxy_serve_to_daemon_with( + &socket, + Some(&other_socket), + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ), + ) + .await + .expect("mismatched service socket should not delay the decision"); + assert!(!decision); + } + + #[cfg(unix)] + #[tokio::test] + async fn serve_waits_out_restart_window_when_service_owns_socket() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + + // Simulate the `tracedecay update` restart window: the service is + // installed but the old daemon already unlinked the socket; the new + // daemon binds it shortly after serve starts. + let bind_path = socket.clone(); + let daemon = tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + tokio::net::UnixListener::bind(&bind_path).expect("bind restarted daemon socket") + }); + + assert!( + super::should_proxy_serve_to_daemon_with( + &socket, + Some(&socket), + std::time::Duration::from_secs(8), + std::time::Duration::from_millis(50), + ) + .await, + "serve started during a daemon restart should still pick the daemon transport" + ); + daemon.await.expect("daemon bind task"); + } + + #[cfg(unix)] + #[tokio::test] + async fn serve_falls_back_when_installed_service_never_rebinds() { + let dir = TempDir::new().expect("temp dir"); + let socket = dir.path().join("daemon.sock"); + + assert!( + !super::should_proxy_serve_to_daemon_with( + &socket, + Some(&socket), + std::time::Duration::from_millis(200), + std::time::Duration::from_millis(50), + ) + .await, + "a stopped service should fall back to in-process after the grace expires" + ); + } + #[cfg(unix)] #[tokio::test] async fn proxied_request_survives_daemon_restart_window() { @@ -1747,13 +2004,7 @@ mod tests { writer.write_all(b"\n").await.expect("write newline"); }); - let handshake = DaemonHandshake { - project_path: None, - scope_prefix: None, - timings: false, - allow_init: false, - client_identity: test_client_identity(), - }; + let handshake = test_handshake_defaults(); let request = serde_json::to_string(&json!({ "jsonrpc": "2.0", "id": 42, @@ -1844,7 +2095,7 @@ mod tests { scope_prefix: Some("src/mcp".to_string()), timings: true, allow_init: true, - client_identity: test_client_identity(), + ..test_handshake_defaults() }; let encoded = handshake.to_line().expect("handshake should encode"); @@ -1866,6 +2117,145 @@ mod tests { assert!(DaemonHandshake::from_line(&encoded).is_err()); } + /// Old client → new daemon: handshakes without `client_version` (sent by + /// binaries predating the field) must still parse, with an empty version. + #[test] + fn daemon_handshake_accepts_old_client_without_version() { + let encoded = serde_json::json!({ + "project_path": "/work/repo", + "scope_prefix": null, + "timings": false, + "allow_init": false, + "client_identity": { + "profile_root": "/profiles/client", + "global_db_path": "/profiles/client/global.db" + } + }) + .to_string(); + + let decoded = DaemonHandshake::from_line(&encoded).expect("old handshake should decode"); + + assert_eq!(decoded.client_version, ""); + } + + /// New client → old daemon: the serde derive ignores unknown fields, so a + /// daemon predating `client_version` (same derive) parses new handshakes. + /// Adding another unknown field to a current handshake proves the + /// tolerance the old daemon relies on. + #[test] + fn daemon_handshake_ignores_unknown_fields_for_old_daemons() { + let handshake = test_handshake_defaults(); + let mut value: serde_json::Value = + serde_json::from_str(&handshake.to_line().expect("handshake should encode")) + .expect("handshake json"); + value["field_from_a_future_version"] = serde_json::json!("ignored"); + + let decoded = DaemonHandshake::from_line(&value.to_string()) + .expect("handshake with unknown fields should decode"); + + assert_eq!(decoded, handshake); + } + + #[test] + fn daemon_handshake_advertises_binary_version() { + let handshake = test_handshake_defaults(); + + let encoded = handshake.to_line().expect("handshake should encode"); + let value: serde_json::Value = serde_json::from_str(&encoded).expect("handshake json"); + + assert_eq!( + value["client_version"], + serde_json::json!(env!("CARGO_PKG_VERSION")) + ); + } + + #[cfg(unix)] + #[test] + fn client_version_skew_flags_only_real_mismatches() { + assert_eq!(super::client_version_skew("1.2.3", "1.2.3"), None); + assert_eq!(super::client_version_skew("", "1.2.3"), None); + assert_eq!( + super::client_version_skew("1.3.0", "1.2.3"), + Some("1.3.0".to_string()) + ); + } + + #[cfg(unix)] + #[tokio::test] + async fn daemon_engine_logs_version_skew_once_per_client_version() { + let engine = super::DaemonEngine::default(); + let mut handshake = test_handshake_defaults(); + handshake.client_version = "0.0.0-skewed".to_string(); + + assert_eq!( + engine.client_version_skew_to_log(&handshake).await, + Some("0.0.0-skewed".to_string()), + "first connection from a skewed client should be logged" + ); + assert_eq!( + engine.client_version_skew_to_log(&handshake).await, + None, + "repeat connections from the same client version must not spam the log" + ); + + let matching = test_handshake_defaults(); + assert_eq!( + engine.client_version_skew_to_log(&matching).await, + None, + "matching client versions are not skew" + ); + } + + #[cfg(unix)] + #[test] + fn daemon_version_skew_warning_reads_initialize_server_info() { + let initialize = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": {} + }) + .to_string(); + let response = |version: &str| { + vec![serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { "serverInfo": { "name": "tracedecay", "version": version } } + }) + .to_string()] + }; + + let warning = super::daemon_version_skew_warning(&initialize, &response("9.9.9"), "1.0.0") + .expect("mismatched daemon version should warn"); + assert!( + warning.contains("9.9.9") && warning.contains("1.0.0"), + "warning should name both versions, got: {warning}" + ); + assert!( + warning.contains("tracedecay daemon restart"), + "warning should point at the restart command, got: {warning}" + ); + + assert_eq!( + super::daemon_version_skew_warning(&initialize, &response("1.0.0"), "1.0.0"), + None, + "matching versions must not warn" + ); + + let tools_call = serde_json::json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": {} + }) + .to_string(); + assert_eq!( + super::daemon_version_skew_warning(&tools_call, &response("9.9.9"), "1.0.0"), + None, + "only initialize responses advertise the daemon version" + ); + } + #[cfg(unix)] #[test] fn automation_scheduler_starts_when_any_task_has_interval() { @@ -1983,10 +2373,8 @@ mod tests { .expect("save automation config"); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; let tick_secs = Box::pin(super::automation_scheduler_tick_secs_for_project( @@ -2015,10 +2403,9 @@ mod tests { let (reader, mut writer) = client.into_split(); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, allow_init: true, client_identity, + ..test_handshake_defaults() }; writer .write_all(handshake.to_line().expect("handshake").as_bytes()) @@ -2087,10 +2474,8 @@ mod tests { let (_reader, mut writer) = client.into_split(); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; writer .write_all(handshake.to_line().expect("handshake").as_bytes()) @@ -2156,10 +2541,8 @@ mod tests { let dashboard_root = cg.store_layout().dashboard_root.clone(); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; let key = super::ProjectServerKey::from_handshake(project.clone(), &handshake); let engine = super::DaemonEngine::default(); @@ -2281,10 +2664,8 @@ mod tests { .expect("save paused scheduler control"); let handshake = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; Box::pin(super::run_automation_scheduler_tick(&project, &handshake)) @@ -2319,11 +2700,8 @@ mod tests { let (reader, mut writer) = client.into_split(); let handshake = DaemonHandshake { - project_path: None, - scope_prefix: None, - timings: false, - allow_init: false, client_identity, + ..test_handshake_defaults() }; writer .write_all(handshake.to_line().expect("handshake").as_bytes()) @@ -2400,10 +2778,8 @@ mod tests { let handshake_a = DaemonHandshake { project_path: Some(project.clone()), - scope_prefix: None, - timings: false, - allow_init: false, client_identity: client_identity.clone(), + ..test_handshake_defaults() }; writer_a .write_all(handshake_a.to_line().expect("handshake").as_bytes()) @@ -2438,10 +2814,9 @@ mod tests { let mut lines_b = tokio::io::BufReader::new(reader_b).lines(); let handshake_b = DaemonHandshake { project_path: Some(project), - scope_prefix: None, timings: true, - allow_init: false, client_identity, + ..test_handshake_defaults() }; writer_b .write_all(handshake_b.to_line().expect("handshake").as_bytes()) diff --git a/src/main.rs b/src/main.rs index 2b30e007..ddf30ec2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -410,19 +410,32 @@ fn refresh_generated_plugins() -> tracedecay::errors::Result<()> { Ok(()) } -fn refresh_daemon_service() -> tracedecay::errors::Result<()> { +/// Rewrites and restarts the installed daemon service, returning the service +/// path and its socket, or `None` when no service is installed. +fn refresh_daemon_service() -> tracedecay::errors::Result> { let tracedecay_bin = tracedecay_bin_on_path()?; let spec = tracedecay::daemon::service_spec(tracedecay_bin, None)?; let socket_path = tracedecay::daemon::installed_service_socket_path()? .unwrap_or_else(|| spec.socket_path.clone()); - match tracedecay::daemon::refresh_installed_service(&spec)? { - Some(service_path) => { + Ok(tracedecay::daemon::refresh_installed_service(&spec)? + .map(|service_path| (service_path, socket_path))) +} + +fn refresh_daemon_service_after_update() -> tracedecay::errors::Result<()> { + match refresh_daemon_service()? { + Some((service_path, socket_path)) => { eprintln!( "\x1b[32m✔\x1b[0m Daemon service refreshed at {}", service_path.display() ); eprintln!("Daemon socket: {}", socket_path.display()); } + None if tracedecay::daemon::daemon_reachable() => { + eprintln!( + " \x1b[33mwarning:\x1b[0m a TraceDecay daemon is running without an installed service; \ + it keeps serving the previous version until its `tracedecay daemon run` process is restarted." + ); + } None => { eprintln!("TraceDecay daemon service is not installed; skipping daemon restart."); } @@ -430,6 +443,24 @@ fn refresh_daemon_service() -> tracedecay::errors::Result<()> { Ok(()) } +fn restart_daemon_service() -> tracedecay::errors::Result<()> { + match refresh_daemon_service()? { + Some((service_path, socket_path)) => { + eprintln!( + "\x1b[32m✔\x1b[0m Daemon service restarted at {}", + service_path.display() + ); + eprintln!("Daemon socket: {}", socket_path.display()); + Ok(()) + } + None => Err(tracedecay::errors::TraceDecayError::Config { + message: "no TraceDecay daemon service is installed — restart your `tracedecay daemon run` \ + process manually, or run `tracedecay daemon install-service` to manage it as a service" + .to_string(), + }), + } +} + fn tracedecay_home_dir() -> tracedecay::errors::Result { tracedecay::agents::home_dir().ok_or_else(|| tracedecay::errors::TraceDecayError::Config { message: "could not determine home directory".to_string(), @@ -479,7 +510,7 @@ fn run_post_update_subcommand() -> tracedecay::errors::Result<()> { fn run_post_update_tasks() -> tracedecay::errors::Result<()> { refresh_generated_plugins()?; - if let Err(error) = refresh_daemon_service() { + if let Err(error) = refresh_daemon_service_after_update() { eprintln!(" \x1b[33mwarning:\x1b[0m daemon service refresh failed: {error}"); } Ok(()) @@ -730,7 +761,7 @@ async fn dispatch_command(command: Commands) -> tracedecay::errors::Result<()> { false, )?; let socket_path = tracedecay::daemon::default_socket_path()?; - if socket_path.exists() { + if tracedecay::daemon::should_proxy_serve_to_daemon(&socket_path).await { tracedecay::daemon::proxy_stdio_to_daemon(&socket_path, &handshake, peeked_line) .await?; } else { @@ -766,6 +797,9 @@ async fn dispatch_command(command: Commands) -> tracedecay::errors::Result<()> { service_path.display() ); } + DaemonAction::Restart => { + restart_daemon_service()?; + } DaemonAction::Status => { let socket_path = tracedecay::daemon::socket_path_or_default(None)?; print!("{}", tracedecay::daemon::service_status(&socket_path)); diff --git a/tests/mcp_cli_serve_test.rs b/tests/mcp_cli_serve_test.rs index edff0017..a646d8b5 100644 --- a/tests/mcp_cli_serve_test.rs +++ b/tests/mcp_cli_serve_test.rs @@ -606,6 +606,151 @@ async fn serve_stdio_smokes_automation_run_artifact_view() { ); } +/// Regression test for the serve/daemon-restart race: a serve process started +/// while `tracedecay update` is restarting the daemon sees no socket file, but +/// must not silently commit to in-process mode when an installed service is +/// about to rebind the socket. +#[cfg(target_os = "linux")] +#[tokio::test] +async fn serve_started_during_daemon_restart_window_proxies_to_restarted_daemon() { + use std::io::{BufRead, BufReader, Read}; + use std::os::unix::net::UnixListener; + use std::time::{Duration, Instant}; + + let home = TempDir::new().unwrap(); + let project = init_project_with_file(home.path(), "pub fn restart_window_marker() {}\n").await; + let socket_path = common::daemon_socket_path(home.path()); + fs::create_dir_all(socket_path.parent().unwrap()).unwrap(); + + // An installed service unit claims the socket, but the socket file is + // missing — exactly the window between daemon shutdown and rebind. + let unit_dir = canonical_existing_path(home.path()).join(".config/systemd/user"); + fs::create_dir_all(&unit_dir).unwrap(); + fs::write( + unit_dir.join("tracedecay.service"), + format!( + "[Service]\nExecStart=/opt/tracedecay/bin/tracedecay daemon run --socket {}\n", + socket_path.display() + ), + ) + .unwrap(); + + // The "restarted daemon" binds the socket only after serve has started. + // It answers `initialize` with a sentinel server name and a skewed + // version, so a proxied response is distinguishable from the in-process + // fallback and exercises the client-side version-skew warning. + let listener_path = socket_path.clone(); + let fake_daemon = std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(400)); + let listener = UnixListener::bind(&listener_path).expect("bind restarted daemon socket"); + listener + .set_nonblocking(true) + .expect("nonblocking fake daemon listener"); + let deadline = Instant::now() + Duration::from_secs(10); + loop { + assert!( + Instant::now() < deadline, + "timed out waiting for serve to proxy a request to the restarted daemon" + ); + let mut stream = match listener.accept() { + Ok((stream, _addr)) => stream, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(10)); + continue; + } + Err(e) => panic!("accept serve connection: {e}"), + }; + stream + .set_nonblocking(false) + .expect("blocking fake daemon stream"); + let mut reader = BufReader::new(stream.try_clone().expect("clone fake daemon stream")); + let mut handshake_line = String::new(); + if reader + .read_line(&mut handshake_line) + .expect("read handshake") + == 0 + { + // The transport probe connects and hangs up without a handshake. + continue; + } + let mut request_line = String::new(); + if reader.read_line(&mut request_line).expect("read request") == 0 { + continue; + } + let request: Value = serde_json::from_str(request_line.trim()).expect("request json"); + let response = json!({ + "jsonrpc": "2.0", + "id": request["id"], + "result": { + "protocolVersion": "2024-11-05", + "capabilities": { "tools": {} }, + "serverInfo": { + "name": "sentinel-restarted-daemon", + "version": "0.0.1-sentinel" + } + } + }); + writeln!(stream, "{response}").expect("write fake daemon response"); + // Drain until the proxy hangs up so the response is not lost to a + // connection reset. + let mut scratch = [0_u8; 64]; + while matches!(reader.read(&mut scratch), Ok(n) if n > 0) {} + return; + } + }); + + let mut child = tracedecay_command_with_home(home.path()) + .arg("serve") + .arg("--path") + .arg(project.path()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("tracedecay serve should run"); + { + let stdin = child.stdin.as_mut().expect("stdin should be piped"); + writeln!( + stdin, + "{}", + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": {} + }) + ) + .unwrap(); + } + drop(child.stdin.take()); + + let output = child + .wait_with_output() + .expect("tracedecay serve should exit after stdin closes"); + fake_daemon.join().expect("fake daemon thread should exit"); + + assert!( + output.status.success(), + "serve should ride out the daemon restart window\nstdout:\n{}\nstderr:\n{}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + let stdout = String::from_utf8_lossy(&output.stdout); + let response: Value = serde_json::from_str(stdout.trim()).unwrap_or_else(|err| { + panic!("stdout should contain one JSON-RPC response: {err}\n{stdout}") + }); + assert_eq!( + response["result"]["serverInfo"]["name"], + json!("sentinel-restarted-daemon"), + "initialize must be answered by the restarted daemon, not an in-process fallback:\n{response}" + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("0.0.1-sentinel") && stderr.contains("tracedecay daemon restart"), + "proxy should warn about the daemon/client version skew\nstderr:\n{stderr}" + ); +} + #[cfg(unix)] #[tokio::test] async fn serve_daemon_proxy_reports_daemon_disconnect_as_json_rpc_error() { From 2b5dc1744ae09f9f9b5cfb1e5136e400da54a8b4 Mon Sep 17 00:00:00 2001 From: ScriptedAlchemy Date: Thu, 2 Jul 2026 01:37:42 +0000 Subject: [PATCH 2/2] refactor(daemon): move version-skew logging behind engine helper --- src/daemon.rs | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 15574ddb..5f3627a9 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1062,6 +1062,27 @@ impl DaemonEngine { logged.insert(skew.clone()).then_some(skew) } + /// Logs a `daemon_version_skew` event when this handshake's client runs a + /// different binary version, deduped per distinct client version. + async fn log_client_version_skew(&self, handshake: &DaemonHandshake) { + let Some(client_version) = self.client_version_skew_to_log(handshake).await else { + return; + }; + log_daemon_event( + "daemon_version_skew", + &[ + ("daemon_version", binary_version().to_string()), + ("client_version", client_version), + ( + "hint", + "daemon binary differs from the connecting client; \ + run `tracedecay daemon restart` to reload it" + .to_string(), + ), + ], + ); + } + async fn project_server( &self, handshake: &DaemonHandshake, @@ -1421,21 +1442,7 @@ async fn serve_socket_client(stream: tokio::net::UnixStream, engine: DaemonEngin return Ok(()); }; let handshake = DaemonHandshake::from_line(&line)?; - if let Some(client_version) = engine.client_version_skew_to_log(&handshake).await { - log_daemon_event( - "daemon_version_skew", - &[ - ("daemon_version", binary_version().to_string()), - ("client_version", client_version), - ( - "hint", - "daemon binary differs from the connecting client; \ - run `tracedecay daemon restart` to reload it" - .to_string(), - ), - ], - ); - } + engine.log_client_version_skew(&handshake).await; if handshake.project_path.is_some() { let server = match Box::pin(engine.project_server(&handshake)).await { Ok(server) => server,