Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
};

module
// We don't clear views or procedures after reducer calls
.call_identity_disconnected(caller_identity, connection_id, false)
.call_identity_disconnected(caller_identity, connection_id)
.await
.map_err(client_disconnected_error_to_response)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ impl Host {
// No need to clear view tables here since we do it in `clear_all_clients`.
for (identity, connection_id) in connected_clients {
module_host
.call_identity_disconnected(identity, connection_id, false)
.call_identity_disconnected(identity, connection_id)
.await
.with_context(|| {
format!(
Expand Down
23 changes: 4 additions & 19 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,6 @@ impl ModuleHost {
client_id.identity,
client_id.connection_id,
info,
true,
call_reducer,
trapped_slot,
)
Expand Down Expand Up @@ -1293,7 +1292,6 @@ impl ModuleHost {
caller_identity: Identity,
caller_connection_id: ConnectionId,
info: &ModuleInfo,
drop_view_subscribers: bool,
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
trapped_slot: &mut bool,
) -> Result<(), ReducerCallError> {
Expand All @@ -1309,20 +1307,10 @@ impl ModuleHost {

let workload = || Workload::reducer_no_args(reducer_name.clone(), caller_identity, caller_connection_id);

// Decrement the number of subscribers for each view this caller is subscribed to
let dec_view_subscribers = |tx: &mut MutTxId| {
if drop_view_subscribers && let Err(err) = tx.unsubscribe_views(caller_identity) {
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
}
};

// A fallback transaction that deletes the client from `st_client`.
let database_identity = stdb.database_identity();
let fallback = || {
stdb.with_auto_commit(workload(), |mut_tx| {

dec_view_subscribers(mut_tx);

if !is_client_exist(mut_tx) {
// The client is already gone. Nothing to do.
log::debug!(
Expand All @@ -1348,9 +1336,7 @@ impl ModuleHost {
};

if let Some((reducer_id, reducer_def)) = reducer_lookup {
let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());

dec_view_subscribers(&mut mut_tx);
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());

if !is_client_exist(&mut_tx) {
// The client is already gone. Nothing to do.
Expand Down Expand Up @@ -1430,13 +1416,12 @@ impl ModuleHost {
&self,
caller_identity: Identity,
caller_connection_id: ConnectionId,
drop_view_subscribers: bool,
) -> Result<(), ReducerCallError> {
self.call(
"call_identity_disconnected",
(caller_identity, caller_connection_id, drop_view_subscribers),
async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c),
async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c).await,
(caller_identity, caller_connection_id),
async |(a, b), inst| inst.call_identity_disconnected(a, b),
async |(a, b), inst| inst.call_identity_disconnected(a, b).await,
)
.await?
}
Expand Down
12 changes: 3 additions & 9 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,10 @@ impl JsInstance {
&mut self,
caller_identity: Identity,
caller_connection_id: ConnectionId,
drop_view_subscribers: bool,
) -> Result<(), ReducerCallError> {
self.send_recv(
JsWorkerReply::into_call_identity_disconnected,
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id, drop_view_subscribers),
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id),
)
.await
}
Expand Down Expand Up @@ -505,7 +504,7 @@ enum JsWorkerRequest {
/// See [`JsInstance::call_identity_connected`].
CallIdentityConnected(ConnectionAuthCtx, ConnectionId),
/// See [`JsInstance::call_identity_disconnected`].
CallIdentityDisconnected(Identity, ConnectionId, bool),
CallIdentityDisconnected(Identity, ConnectionId),
/// See [`JsInstance::disconnect_client`].
DisconnectClient(ClientActorId),
/// See [`JsInstance::init_database`].
Expand Down Expand Up @@ -718,17 +717,12 @@ async fn spawn_instance_worker(
call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped);
reply("call_identity_connected", CallIdentityConnected(res), trapped);
}
JsWorkerRequest::CallIdentityDisconnected(
caller_identity,
caller_connection_id,
drop_view_subcribers,
) => {
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => {
let mut trapped = false;
let res = ModuleHost::call_identity_disconnected_inner(
caller_identity,
caller_connection_id,
info,
drop_view_subcribers,
call_reducer,
&mut trapped,
);
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
&mut self,
caller_identity: Identity,
caller_connection_id: ConnectionId,
drop_view_subscribers: bool,
) -> Result<(), ReducerCallError> {
let module = &self.common.info.clone();
let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params);
Expand All @@ -497,7 +496,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
caller_identity,
caller_connection_id,
module,
drop_view_subscribers,
call_reducer,
&mut trapped,
);
Expand Down
20 changes: 18 additions & 2 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1560,8 +1560,24 @@ impl ModuleSubscriptions {
}

pub fn remove_subscriber(&self, client_id: ClientActorId) {
let mut subscriptions = self.subscriptions.write();
subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id));
let removed_queries = {
let mut subscriptions = self.subscriptions.write();
subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id))
};

if removed_queries.is_empty() {
return;
}

// TODO(perf): Removing a subscriber is currently O(subscribed_queries).
// Instead we should maintain an index to make this O(subscribed_views).
if let Err(err) = self.unsubscribe_views(&removed_queries, client_id.identity) {
log::error!(
"failed to unsubscribe views for disconnected client ({}, {}): {err}",
client_id.identity,
client_id.connection_id
);
}
}

/// Rolls back `tx` and returns the offset as it was before `tx`.
Expand Down
16 changes: 14 additions & 2 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,10 +1203,20 @@ impl SubscriptionManager {
/// If a query no longer has any subscribers,
/// it is removed from the index along with its table ids.
#[tracing::instrument(level = "trace", skip_all)]
pub fn remove_all_subscriptions(&mut self, client: &ClientId) {
pub fn remove_all_subscriptions(&mut self, client: &ClientId) -> Vec<Query> {
let mut removed_query_hashes = HashSet::new();
if let Some(client_info) = self.clients.get(client) {
removed_query_hashes.extend(client_info.legacy_subscriptions.iter().copied());
removed_query_hashes.extend(client_info.subscription_ref_count.keys().copied());
}
let removed_queries = removed_query_hashes
.into_iter()
.filter_map(|hash| self.queries.get(&hash).map(|q| q.query.clone()))
.collect::<Vec<_>>();

self.remove_legacy_subscriptions(client);
let Some(client_info) = self.remove_client_and_inform_send_worker(*client) else {
return;
return removed_queries;
};

debug_assert!(client_info.legacy_subscriptions.is_empty());
Expand Down Expand Up @@ -1252,6 +1262,8 @@ impl SubscriptionManager {
for query_hash in queries_to_remove {
self.queries.remove(&query_hash);
}

removed_queries
}

/// Find the queries that need to be evaluated for this table update.
Expand Down
10 changes: 10 additions & 0 deletions crates/smoketests/modules/views-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ pub fn add_player_level(ctx: &ReducerContext, id: u64, level: u64) {
ctx.db.player_level().insert(PlayerState { id, level });
}

#[spacetimedb::reducer]
pub fn set_player_state(ctx: &ReducerContext, id: u64, level: u64) {
if let Some(mut row) = ctx.db.player_state().id().find(id) {
row.level = level;
ctx.db.player_state().id().update(row);
} else {
ctx.db.player_state().insert(PlayerState { id, level });
}
}

#[spacetimedb::view(accessor = my_player_and_level, public)]
pub fn my_player_and_level(ctx: &AnonymousViewContext) -> Option<PlayerState> {
ctx.db.player_level().id().find(0)
Expand Down
69 changes: 69 additions & 0 deletions crates/smoketests/tests/smoketests/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,75 @@ fn test_typescript_procedure_triggers_subscription_updates() {
);
}

#[test]
fn test_disconnect_does_not_break_sender_view() {
let test = Smoketest::builder().precompiled_module("views-sql").build();

test.call("set_player_state", &["42", "1"]).unwrap();

// Two connections subscribe to the same view.
let sub_keep = test.subscribe_background(&["SELECT * FROM player"], 2).unwrap();
let sub_drop = test.subscribe_background(&["SELECT * FROM player"], 1).unwrap();

// Both connections should receive the first update.
// After one connection disconnects, the other should still receive updates.
test.call("set_player_state", &["42", "2"]).unwrap();
let _ = sub_drop.collect().unwrap();
test.call("set_player_state", &["42", "3"]).unwrap();

let events = sub_keep.collect().unwrap();

assert_eq!(events.len(), 2, "Expected two updates for player, got: {events:?}");
let inserts = events[1]["player"]["inserts"]
.as_array()
.expect("Expected inserts array on player update");
assert!(
inserts
.iter()
.any(|row| row["id"] == json!(42) && row["level"] == json!(3)),
"Expected player id=42 level=3 insert after disconnect, got: {events:?}"
);
}

#[test]
fn test_disconnect_does_not_break_anonymous_view() {
let test = Smoketest::builder().precompiled_module("views-sql").build();

// Seed a row in the anonymous-view source table.
test.call("add_player_level", &["0", "2"]).unwrap();

// Two connections subscribe to the same anonymous view.
let sub_keep = test
.subscribe_background(&["SELECT * FROM player_and_level"], 2)
.unwrap();
let sub_drop = test
.subscribe_background(&["SELECT * FROM player_and_level"], 1)
.unwrap();

// Both connections should receive the first update.
// After one connection disconnects, the other should still receive updates.
test.call("add_player_level", &["1", "2"]).unwrap();
let _ = sub_drop.collect().unwrap();
test.call("add_player_level", &["2", "2"]).unwrap();

let events = sub_keep.collect().unwrap();

assert_eq!(
events.len(),
2,
"Expected two updates for player_and_level, got: {events:?}"
);
let inserts = events[1]["player_and_level"]["inserts"]
.as_array()
.expect("Expected inserts array on player_and_level update");
assert!(
inserts
.iter()
.any(|row| row["id"] == json!(2) && row["level"] == json!(2)),
"Expected player id=2 level=2 insert after disconnect, got: {events:?}"
);
}

#[test]
fn test_typescript_query_builder_view_query() {
require_pnpm!();
Expand Down
Loading