From 6ec3df10ca15aabe0d9c0088cf30ebb9eab3cdc5 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Tue, 10 Mar 2026 14:59:09 -0700 Subject: [PATCH] Fix disconnects breaking view updates for other connections --- crates/client-api/src/routes/database.rs | 3 +- crates/core/src/host/host_controller.rs | 2 +- crates/core/src/host/module_host.rs | 23 ++----- crates/core/src/host/v8/mod.rs | 12 +--- .../src/host/wasm_common/module_host_actor.rs | 2 - .../subscription/module_subscription_actor.rs | 20 +++++- .../module_subscription_manager.rs | 16 ++++- .../smoketests/modules/views-sql/src/lib.rs | 10 +++ crates/smoketests/tests/smoketests/views.rs | 69 +++++++++++++++++++ 9 files changed, 120 insertions(+), 37 deletions(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 20e6196c78b..aeade589927 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -184,8 +184,7 @@ pub async fn call( }; module - // We don't clear views or procedures after reducer calls - .call_identity_disconnected(caller_identity, connection_id, false) + .call_identity_disconnected(caller_identity, connection_id) .await .map_err(client_disconnected_error_to_response)?; diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 82eb265c393..2fc41070ca4 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1012,7 +1012,7 @@ impl Host { // No need to clear view tables here since we do it in `clear_all_clients`. for (identity, connection_id) in connected_clients { module_host - .call_identity_disconnected(identity, connection_id, false) + .call_identity_disconnected(identity, connection_id) .await .with_context(|| { format!( diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 75d76f24273..1b46b12af08 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1251,7 +1251,6 @@ impl ModuleHost { client_id.identity, client_id.connection_id, info, - true, call_reducer, trapped_slot, ) @@ -1293,7 +1292,6 @@ impl ModuleHost { caller_identity: Identity, caller_connection_id: ConnectionId, info: &ModuleInfo, - drop_view_subscribers: bool, call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), trapped_slot: &mut bool, ) -> Result<(), ReducerCallError> { @@ -1309,20 +1307,10 @@ impl ModuleHost { let workload = || Workload::reducer_no_args(reducer_name.clone(), caller_identity, caller_connection_id); - // Decrement the number of subscribers for each view this caller is subscribed to - let dec_view_subscribers = |tx: &mut MutTxId| { - if drop_view_subscribers && let Err(err) = tx.unsubscribe_views(caller_identity) { - log::error!("`call_identity_disconnected`: failed to delete client view data: {err}"); - } - }; - // A fallback transaction that deletes the client from `st_client`. let database_identity = stdb.database_identity(); let fallback = || { stdb.with_auto_commit(workload(), |mut_tx| { - - dec_view_subscribers(mut_tx); - if !is_client_exist(mut_tx) { // The client is already gone. Nothing to do. log::debug!( @@ -1348,9 +1336,7 @@ impl ModuleHost { }; if let Some((reducer_id, reducer_def)) = reducer_lookup { - let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); - - dec_view_subscribers(&mut mut_tx); + let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); if !is_client_exist(&mut_tx) { // The client is already gone. Nothing to do. @@ -1430,13 +1416,12 @@ impl ModuleHost { &self, caller_identity: Identity, caller_connection_id: ConnectionId, - drop_view_subscribers: bool, ) -> Result<(), ReducerCallError> { self.call( "call_identity_disconnected", - (caller_identity, caller_connection_id, drop_view_subscribers), - async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c), - async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c).await, + (caller_identity, caller_connection_id), + async |(a, b), inst| inst.call_identity_disconnected(a, b), + async |(a, b), inst| inst.call_identity_disconnected(a, b).await, ) .await? } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 057f8a802ca..b611fcaef26 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -410,11 +410,10 @@ impl JsInstance { &mut self, caller_identity: Identity, caller_connection_id: ConnectionId, - drop_view_subscribers: bool, ) -> Result<(), ReducerCallError> { self.send_recv( JsWorkerReply::into_call_identity_disconnected, - JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id, drop_view_subscribers), + JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id), ) .await } @@ -505,7 +504,7 @@ enum JsWorkerRequest { /// See [`JsInstance::call_identity_connected`]. CallIdentityConnected(ConnectionAuthCtx, ConnectionId), /// See [`JsInstance::call_identity_disconnected`]. - CallIdentityDisconnected(Identity, ConnectionId, bool), + CallIdentityDisconnected(Identity, ConnectionId), /// See [`JsInstance::disconnect_client`]. DisconnectClient(ClientActorId), /// See [`JsInstance::init_database`]. @@ -718,17 +717,12 @@ async fn spawn_instance_worker( call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); reply("call_identity_connected", CallIdentityConnected(res), trapped); } - JsWorkerRequest::CallIdentityDisconnected( - caller_identity, - caller_connection_id, - drop_view_subcribers, - ) => { + JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => { let mut trapped = false; let res = ModuleHost::call_identity_disconnected_inner( caller_identity, caller_connection_id, info, - drop_view_subcribers, call_reducer, &mut trapped, ); diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 310401a42b3..654c081c4c6 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -488,7 +488,6 @@ impl WasmModuleInstance { &mut self, caller_identity: Identity, caller_connection_id: ConnectionId, - drop_view_subscribers: bool, ) -> Result<(), ReducerCallError> { let module = &self.common.info.clone(); let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params); @@ -497,7 +496,6 @@ impl WasmModuleInstance { caller_identity, caller_connection_id, module, - drop_view_subscribers, call_reducer, &mut trapped, ); diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 6d28d338978..d117a60aa33 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1560,8 +1560,24 @@ impl ModuleSubscriptions { } pub fn remove_subscriber(&self, client_id: ClientActorId) { - let mut subscriptions = self.subscriptions.write(); - subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id)); + let removed_queries = { + let mut subscriptions = self.subscriptions.write(); + subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id)) + }; + + if removed_queries.is_empty() { + return; + } + + // TODO(perf): Removing a subscriber is currently O(subscribed_queries). + // Instead we should maintain an index to make this O(subscribed_views). + if let Err(err) = self.unsubscribe_views(&removed_queries, client_id.identity) { + log::error!( + "failed to unsubscribe views for disconnected client ({}, {}): {err}", + client_id.identity, + client_id.connection_id + ); + } } /// Rolls back `tx` and returns the offset as it was before `tx`. diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 7d6223bf8ce..5220711c48f 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -1203,10 +1203,20 @@ impl SubscriptionManager { /// If a query no longer has any subscribers, /// it is removed from the index along with its table ids. #[tracing::instrument(level = "trace", skip_all)] - pub fn remove_all_subscriptions(&mut self, client: &ClientId) { + pub fn remove_all_subscriptions(&mut self, client: &ClientId) -> Vec { + let mut removed_query_hashes = HashSet::new(); + if let Some(client_info) = self.clients.get(client) { + removed_query_hashes.extend(client_info.legacy_subscriptions.iter().copied()); + removed_query_hashes.extend(client_info.subscription_ref_count.keys().copied()); + } + let removed_queries = removed_query_hashes + .into_iter() + .filter_map(|hash| self.queries.get(&hash).map(|q| q.query.clone())) + .collect::>(); + self.remove_legacy_subscriptions(client); let Some(client_info) = self.remove_client_and_inform_send_worker(*client) else { - return; + return removed_queries; }; debug_assert!(client_info.legacy_subscriptions.is_empty()); @@ -1252,6 +1262,8 @@ impl SubscriptionManager { for query_hash in queries_to_remove { self.queries.remove(&query_hash); } + + removed_queries } /// Find the queries that need to be evaluated for this table update. diff --git a/crates/smoketests/modules/views-sql/src/lib.rs b/crates/smoketests/modules/views-sql/src/lib.rs index 57e9a758553..333b12100e1 100644 --- a/crates/smoketests/modules/views-sql/src/lib.rs +++ b/crates/smoketests/modules/views-sql/src/lib.rs @@ -24,6 +24,16 @@ pub fn add_player_level(ctx: &ReducerContext, id: u64, level: u64) { ctx.db.player_level().insert(PlayerState { id, level }); } +#[spacetimedb::reducer] +pub fn set_player_state(ctx: &ReducerContext, id: u64, level: u64) { + if let Some(mut row) = ctx.db.player_state().id().find(id) { + row.level = level; + ctx.db.player_state().id().update(row); + } else { + ctx.db.player_state().insert(PlayerState { id, level }); + } +} + #[spacetimedb::view(accessor = my_player_and_level, public)] pub fn my_player_and_level(ctx: &AnonymousViewContext) -> Option { ctx.db.player_level().id().find(0) diff --git a/crates/smoketests/tests/smoketests/views.rs b/crates/smoketests/tests/smoketests/views.rs index 654c551625b..473a4086353 100644 --- a/crates/smoketests/tests/smoketests/views.rs +++ b/crates/smoketests/tests/smoketests/views.rs @@ -570,6 +570,75 @@ fn test_typescript_procedure_triggers_subscription_updates() { ); } +#[test] +fn test_disconnect_does_not_break_sender_view() { + let test = Smoketest::builder().precompiled_module("views-sql").build(); + + test.call("set_player_state", &["42", "1"]).unwrap(); + + // Two connections subscribe to the same view. + let sub_keep = test.subscribe_background(&["SELECT * FROM player"], 2).unwrap(); + let sub_drop = test.subscribe_background(&["SELECT * FROM player"], 1).unwrap(); + + // Both connections should receive the first update. + // After one connection disconnects, the other should still receive updates. + test.call("set_player_state", &["42", "2"]).unwrap(); + let _ = sub_drop.collect().unwrap(); + test.call("set_player_state", &["42", "3"]).unwrap(); + + let events = sub_keep.collect().unwrap(); + + assert_eq!(events.len(), 2, "Expected two updates for player, got: {events:?}"); + let inserts = events[1]["player"]["inserts"] + .as_array() + .expect("Expected inserts array on player update"); + assert!( + inserts + .iter() + .any(|row| row["id"] == json!(42) && row["level"] == json!(3)), + "Expected player id=42 level=3 insert after disconnect, got: {events:?}" + ); +} + +#[test] +fn test_disconnect_does_not_break_anonymous_view() { + let test = Smoketest::builder().precompiled_module("views-sql").build(); + + // Seed a row in the anonymous-view source table. + test.call("add_player_level", &["0", "2"]).unwrap(); + + // Two connections subscribe to the same anonymous view. + let sub_keep = test + .subscribe_background(&["SELECT * FROM player_and_level"], 2) + .unwrap(); + let sub_drop = test + .subscribe_background(&["SELECT * FROM player_and_level"], 1) + .unwrap(); + + // Both connections should receive the first update. + // After one connection disconnects, the other should still receive updates. + test.call("add_player_level", &["1", "2"]).unwrap(); + let _ = sub_drop.collect().unwrap(); + test.call("add_player_level", &["2", "2"]).unwrap(); + + let events = sub_keep.collect().unwrap(); + + assert_eq!( + events.len(), + 2, + "Expected two updates for player_and_level, got: {events:?}" + ); + let inserts = events[1]["player_and_level"]["inserts"] + .as_array() + .expect("Expected inserts array on player_and_level update"); + assert!( + inserts + .iter() + .any(|row| row["id"] == json!(2) && row["level"] == json!(2)), + "Expected player id=2 level=2 insert after disconnect, got: {events:?}" + ); +} + #[test] fn test_typescript_query_builder_view_query() { require_pnpm!();