From cf150d6dcb8a6d9aa2b2bbececbf6c08dc5ce090 Mon Sep 17 00:00:00 2001 From: ksemenenko Date: Thu, 2 Apr 2026 03:02:38 +0200 Subject: [PATCH 1/2] fix: harden batch group cleanup --- AGENTS.md | 3 + .../SignalR/Observers/Subscription.cs | 21 ++-- .../SignalR/OrleansHubLifetimeManager.cs | 101 +++++++++++++++--- docs/Features/Group-Partitioning.md | 5 +- .../Hub-Lifetime-Manager-Integration.md | 2 + 5 files changed, 107 insertions(+), 25 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index da62712..1cf233f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -186,7 +186,10 @@ If no new rule is detected -> do not update the file. - Avoid `this.` qualification; prefer predefined types (`int` over `Int32`) - Never use `ConfigureAwait(false)` - No magic literals - extract to constants, enums, config +- Prefer the simplest clear implementation over incidental complexity, especially on hot paths; if two fixes are correct, choose the one that is easier to reason about and keeps group operations fast - In concurrency-sensitive paths like `OrleansHubLifetimeManager`, prefer the smallest behavior-preserving fix; avoid widening the async/concurrency shape unless tests prove it is necessary, because this code is easy to make unsafe +- Never introduce explicit `lock`/`Monitor` synchronization in Orleans-related paths; fix races via ordering, idempotent cleanup, or Orleans/concurrent primitives instead of `_syncRoot`-style locking +- For Orleans-facing changes, follow the request scheduling model explicitly: prefer grain-aligned ordering/idempotency fixes over host-side synchronization, because group operations must stay fast and consistent with Orleans execution semantics ### Diagnostics diff --git a/ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs b/ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs index a092f29..d739275 100644 --- a/ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs +++ b/ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using ManagedCode.Orleans.SignalR.Core.Interfaces; @@ -8,8 +9,9 @@ namespace ManagedCode.Orleans.SignalR.Core.SignalR.Observers; public sealed class Subscription(SignalRObserver observer) : IDisposable { - private readonly HashSet _grains = new(); - private readonly HashSet _heartbeatGrainIds = new(); + // Use ConcurrentDictionary as a concurrent hash-set because batch group mutations can overlap disconnect cleanup. + private readonly ConcurrentDictionary _grains = new(); + private readonly ConcurrentDictionary _heartbeatGrainIds = new(); private bool _disposed; public ISignalRObserver Reference { get; private set; } = default!; @@ -20,7 +22,7 @@ public sealed class Subscription(SignalRObserver observer) : IDisposable public int PartitionId { get; private set; } - public IReadOnlyCollection Grains => _grains; + public IReadOnlyCollection Grains => _grains.IsEmpty ? [] : [.. _grains.Keys]; public void Dispose() { @@ -30,6 +32,7 @@ public void Dispose() } _disposed = true; + observer?.Dispose(); _grains.Clear(); _heartbeatGrainIds.Clear(); @@ -41,14 +44,14 @@ public void Dispose() public void AddGrain(IObserverConnectionManager grain) { - _grains.Add(grain); - _heartbeatGrainIds.Add(((GrainReference)grain).GrainId); + _grains.TryAdd(grain, true); + _heartbeatGrainIds.TryAdd(((GrainReference)grain).GrainId, true); } public void RemoveGrain(IObserverConnectionManager grain) { - _grains.Remove(grain); - _heartbeatGrainIds.Remove(((GrainReference)grain).GrainId); + _grains.TryRemove(grain, out _); + _heartbeatGrainIds.TryRemove(((GrainReference)grain).GrainId, out _); } public void ClearGrains() @@ -76,13 +79,13 @@ public SignalRObserver GetObserver() public ImmutableArray GetHeartbeatGrainIds() { - if (_heartbeatGrainIds.Count == 0) + if (_heartbeatGrainIds.IsEmpty) { return ImmutableArray.Empty; } var builder = ImmutableArray.CreateBuilder(_heartbeatGrainIds.Count); - foreach (var grainId in _heartbeatGrainIds) + foreach (var grainId in _heartbeatGrainIds.Keys) { builder.Add(grainId); } diff --git a/ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs b/ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs index 28355f1..a3b5824 100644 --- a/ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs +++ b/ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs @@ -368,19 +368,42 @@ public async Task AddToGroupsAsync(string connectionId, IReadOnlyList gr return; } + var subscriptionReference = subscription.Reference; + if (_orleansSignalOptions.Value.GroupPartitionCount > 1) { var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain(_clusterClient); - foreach (var groupName in uniqueGroupNames) + var partitionIds = await Task.Run( + () => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscriptionReference), + cancellationToken); + + if (IsConnectionDisconnected(connectionId)) + { + await CleanupDisconnectedBatchPartitionMembershipAsync( + coordinatorGrain, + uniqueGroupNames, + connectionId, + subscriptionReference, + cancellationToken); + return; + } + + foreach (var partitionId in partitionIds) { - var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken); var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain(_clusterClient, partitionId); subscription.AddGrain(partitionGrain); } - await Task.Run( - () => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscription.Reference), - cancellationToken); + if (IsConnectionDisconnected(connectionId)) + { + await CleanupDisconnectedBatchPartitionMembershipAsync( + coordinatorGrain, + uniqueGroupNames, + connectionId, + subscriptionReference, + cancellationToken); + return; + } } else { @@ -389,19 +412,47 @@ await Task.Run( .Distinct() .ToArray(); - foreach (var groupGrain in groupGrains) - { - subscription.AddGrain(groupGrain); - } - var tasks = groupGrains - .Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscription.Reference), cancellationToken)) + .Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscriptionReference), cancellationToken)) .ToArray(); if (tasks.Length > 0) { await Task.WhenAll(tasks); } + + if (IsConnectionDisconnected(connectionId)) + { + var cleanupTasks = groupGrains + .Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken)) + .ToArray(); + + if (cleanupTasks.Length > 0) + { + await Task.WhenAll(cleanupTasks); + } + + return; + } + + foreach (var groupGrain in groupGrains) + { + subscription.AddGrain(groupGrain); + } + + if (IsConnectionDisconnected(connectionId)) + { + var cleanupTasks = groupGrains + .Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken)) + .ToArray(); + + if (cleanupTasks.Length > 0) + { + await Task.WhenAll(cleanupTasks); + } + + return; + } } await UpdateConnectionHeartbeatAsync(connectionId, subscription); @@ -428,11 +479,13 @@ public async Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList 1) { var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain(_clusterClient); var partitionIds = await Task.Run( - () => coordinatorGrain.RemoveConnectionFromGroups(uniqueGroupNames, connectionId, subscription.Reference), + () => coordinatorGrain.RemoveConnectionFromGroups(uniqueGroupNames, connectionId, subscriptionReference), cancellationToken); foreach (var partitionId in partitionIds) @@ -455,7 +508,7 @@ public async Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList Task.Run(() => groupGrain.RemoveConnection(connectionId, subscription.Reference), cancellationToken)) + .Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken)) .ToArray(); if (tasks.Length > 0) @@ -723,9 +776,29 @@ private static string[] GetUniqueGroupNames(IReadOnlyList groupNames) return ordered.ToArray(); } + private static async Task CleanupDisconnectedBatchPartitionMembershipAsync( + ISignalRGroupCoordinatorGrain coordinatorGrain, + string[] groupNames, + string connectionId, + ISignalRObserver subscriptionReference, + CancellationToken cancellationToken) + { + await Task.Run( + () => coordinatorGrain.RemoveConnectionFromGroups(groupNames, connectionId, subscriptionReference), + cancellationToken); + } + + private bool IsConnectionDisconnected(string connectionId) + { + var connection = _connections[connectionId]; + return connection is null || connection.ConnectionAborted.IsCancellationRequested; + } + private Task UpdateConnectionHeartbeatAsync(string connectionId, Subscription subscription) { - if (!_orleansSignalOptions.Value.KeepEachConnectionAlive || string.IsNullOrEmpty(subscription.HubKey)) + if (!_orleansSignalOptions.Value.KeepEachConnectionAlive || + string.IsNullOrEmpty(subscription.HubKey) || + IsConnectionDisconnected(connectionId)) { return Task.CompletedTask; } diff --git a/docs/Features/Group-Partitioning.md b/docs/Features/Group-Partitioning.md index 1c6dbe1..4fd5c73 100644 --- a/docs/Features/Group-Partitioning.md +++ b/docs/Features/Group-Partitioning.md @@ -23,9 +23,10 @@ Group operations are routed through a coordinator grain that assigns group names - [x] Add package-level batch group APIs for hub and host-service callers. - [x] Batch coordinator and partition updates so one request does not force sequential writes per group. - [x] Cover batch add/remove with integration tests and direct coordinator verification. -- [x] Restore disconnect-safe subscription tracking for partitioned batch adds. +- [x] Restore disconnect-safe cleanup for partitioned batch adds. - [x] Restore the hashed cleanup fallback when batch removals see missing coordinator assignments. - [x] Add regression tests for disconnect cleanup and degraded coordinator state. +- [x] Re-run partition cleanup when a disconnected batch join finishes late so late partition adds cannot outlive the connection. ## Main flow @@ -47,7 +48,7 @@ flowchart TD - Partition grains hold `group -> connection -> observer` mappings and emit fan-out to observers. - Empty groups trigger cleanup so partitions can shed state. - Batch membership operations collapse repeated coordinator writes into one persistence step per request and one partition write per touched partition. -- Partitioned batch adds pre-register touched partitions in the connection subscription before the coordinator write finishes so disconnect cleanup still reaches every touched partition. +- If that coordinator write finishes after the connection has already disconnected, the lifetime manager immediately issues a compensating batch remove through the coordinator so late partition adds do not recreate stale membership. - Batch removals fall back to the hashed partition when coordinator assignment metadata is missing, which preserves cleanup in degraded-state recovery scenarios. ## Configuration knobs diff --git a/docs/Features/Hub-Lifetime-Manager-Integration.md b/docs/Features/Hub-Lifetime-Manager-Integration.md index 7741fe0..4054011 100644 --- a/docs/Features/Hub-Lifetime-Manager-Integration.md +++ b/docs/Features/Hub-Lifetime-Manager-Integration.md @@ -21,6 +21,7 @@ The ASP.NET Core host swaps the default SignalR hub lifetime manager with `Orlea - [x] Route package-specific batch group membership calls through the Orleans lifetime manager. - [x] Keep batch group helper calls usable when the host is running plain `AddSignalR()`. - [x] Add regression coverage for the batch helper path without Orleans registration. +- [x] Re-run batch partition cleanup after disconnect when a late coordinator write finishes. ## Main flow @@ -41,6 +42,7 @@ flowchart TD - The lifetime manager creates a per-connection `Subscription` and registers observers with connection/group/user grains. - Package-specific batch group operations (`AddToGroupsAsync` / `RemoveFromGroupsAsync`) also route through the lifetime manager instead of looping over sequential single-group writes. - Hub batch helpers fall back to the registered `HubLifetimeManager` when `IOrleansGroupManager` is not explicitly registered, so the API still works on plain `AddSignalR()` hosts. +- If a partitioned batch join finishes after the connection has already disconnected, the lifetime manager immediately routes a compensating batch remove through the coordinator before returning. - Detailed batching behavior and partition persistence rules live in `docs/Features/Group-Partitioning.md`. ## Configuration knobs From f2e440b8abdf54c277f71395977b1e25266620ca Mon Sep 17 00:00:00 2001 From: ksemenenko Date: Thu, 2 Apr 2026 03:47:47 +0200 Subject: [PATCH 2/2] fix: keep grace replay on grain turn --- .../SignalRObserverGrainBase.cs | 54 +++++++++++-------- .../Observer-Health-and-Circuit-Breaker.md | 2 + 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/ManagedCode.Orleans.SignalR.Server/SignalRObserverGrainBase.cs b/ManagedCode.Orleans.SignalR.Server/SignalRObserverGrainBase.cs index cde064e..41256ca 100644 --- a/ManagedCode.Orleans.SignalR.Server/SignalRObserverGrainBase.cs +++ b/ManagedCode.Orleans.SignalR.Server/SignalRObserverGrainBase.cs @@ -121,8 +121,7 @@ protected void TrackConnection(string connectionId, ISignalRObserver observer) if (_gracePeriodEnabled && HealthTracker.IsInGracePeriod(connectionId)) { - // Critical: do NOT replay buffered SignalR messages on the Orleans scheduler. - _ = Task.Run(() => RestoreObserverFromGracePeriodAsync(connectionId, observer)); + _ = RestoreObserverFromGracePeriodAsync(connectionId, observer); } } @@ -149,8 +148,7 @@ protected void TouchObserver(ISignalRObserver observer) if (_gracePeriodEnabled && _observerToConnectionId.TryGetValue(observer, out var connectionId) && HealthTracker.IsInGracePeriod(connectionId)) { - // Critical: do NOT replay buffered SignalR messages on the Orleans scheduler. - _ = Task.Run(() => RestoreObserverFromGracePeriodAsync(connectionId, observer)); + _ = RestoreObserverFromGracePeriodAsync(connectionId, observer); } } @@ -462,23 +460,7 @@ protected async Task RestoreObserverFromGracePeriodAsync(string connectionI connectionId, bufferedMessages.Count); - var replayedCount = 0; - foreach (var message in bufferedMessages) - { - try - { - await observer.OnNextAsync(message); - replayedCount++; - } - catch (Exception ex) - { - Logger.LogWarning( - ex, - "Failed to replay buffered message to connection {ConnectionId}. Stopping replay.", - connectionId); - break; - } - } + var replayedCount = await ReplayBufferedMessagesAsync(connectionId, observer, bufferedMessages); if (replayedCount > 0) { @@ -497,6 +479,36 @@ protected async Task RestoreObserverFromGracePeriodAsync(string connectionI return replayedCount; } + private async Task ReplayBufferedMessagesAsync( + string connectionId, + ISignalRObserver observer, + IReadOnlyList bufferedMessages) + { + // Critical: do NOT replay buffered SignalR messages on the Orleans scheduler. + return await Task.Run(async () => + { + var replayedCount = 0; + foreach (var message in bufferedMessages) + { + try + { + await observer.OnNextAsync(message); + replayedCount++; + } + catch (Exception ex) + { + Logger.LogWarning( + ex, + "Failed to replay buffered message to connection {ConnectionId}. Stopping replay.", + connectionId); + break; + } + } + + return replayedCount; + }); + } + /// /// Called when grace periods expire for observers. /// Override to implement custom cleanup logic. diff --git a/docs/Features/Observer-Health-and-Circuit-Breaker.md b/docs/Features/Observer-Health-and-Circuit-Breaker.md index 49f2d36..39d7cbb 100644 --- a/docs/Features/Observer-Health-and-Circuit-Breaker.md +++ b/docs/Features/Observer-Health-and-Circuit-Breaker.md @@ -20,6 +20,7 @@ Observer delivery is protected by health tracking, circuit breaker logic, and op - [x] Route grace-period expiration to observer cleanup and health-state removal. - [x] Offload observer notifications from the Orleans scheduler and document why it is critical. - [x] Add tests for circuit-breaker threshold behavior (enabled vs disabled). +- [x] Keep grace-period state transitions on the Orleans scheduler and offload only observer replay I/O. ## Main flow @@ -41,6 +42,7 @@ flowchart TD - Failed deliveries are recorded in a rolling failure window. - When thresholds are exceeded, the circuit opens and delivery is skipped. - If a grace period is configured, messages are buffered and replayed on recovery; expired grace periods remove observers. +- Grace-period recovery mutates `ObserverHealthTracker` on the grain scheduler first, then replays buffered observer callbacks off-scheduler so Orleans-owned state is never touched from a thread-pool turn. - Without a grace period, reaching the failure threshold removes the observer immediately. ## Configuration knobs