Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ If no new rule is detected -> do not update the file.
- Avoid `this.` qualification; prefer predefined types (`int` over `Int32`)
- Never use `ConfigureAwait(false)`
- No magic literals - extract to constants, enums, config
- Prefer the simplest clear implementation over incidental complexity, especially on hot paths; if two fixes are correct, choose the one that is easier to reason about and keeps group operations fast
- In concurrency-sensitive paths like `OrleansHubLifetimeManager`, prefer the smallest behavior-preserving fix; avoid widening the async/concurrency shape unless tests prove it is necessary, because this code is easy to make unsafe
- Never introduce explicit `lock`/`Monitor` synchronization in Orleans-related paths; fix races via ordering, idempotent cleanup, or Orleans/concurrent primitives instead of `_syncRoot`-style locking
- For Orleans-facing changes, follow the request scheduling model explicitly: prefer grain-aligned ordering/idempotency fixes over host-side synchronization, because group operations must stay fast and consistent with Orleans execution semantics

### Diagnostics

Expand Down
21 changes: 12 additions & 9 deletions ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using ManagedCode.Orleans.SignalR.Core.Interfaces;
Expand All @@ -8,8 +9,9 @@ namespace ManagedCode.Orleans.SignalR.Core.SignalR.Observers;

public sealed class Subscription(SignalRObserver observer) : IDisposable
{
private readonly HashSet<IObserverConnectionManager> _grains = new();
private readonly HashSet<GrainId> _heartbeatGrainIds = new();
// Use ConcurrentDictionary as a concurrent hash-set because batch group mutations can overlap disconnect cleanup.
private readonly ConcurrentDictionary<IObserverConnectionManager, bool> _grains = new();
private readonly ConcurrentDictionary<GrainId, bool> _heartbeatGrainIds = new();
private bool _disposed;

public ISignalRObserver Reference { get; private set; } = default!;
Expand All @@ -20,7 +22,7 @@ public sealed class Subscription(SignalRObserver observer) : IDisposable

public int PartitionId { get; private set; }

public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains;
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains.IsEmpty ? [] : [.. _grains.Keys];
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grains currently materializes a new array on every access ([.. _grains.Keys]). Since ConcurrentDictionary<TKey,TValue>.Keys is already a thread-safe collection view that implements IReadOnlyCollection<TKey>, you can likely return _grains.Keys directly to avoid repeated allocations and key enumeration.

Suggested change
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains.IsEmpty ? [] : [.. _grains.Keys];
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains.Keys;

Copilot uses AI. Check for mistakes.

public void Dispose()
{
Expand All @@ -30,6 +32,7 @@ public void Dispose()
}

_disposed = true;

observer?.Dispose();
_grains.Clear();
_heartbeatGrainIds.Clear();
Expand All @@ -41,14 +44,14 @@ public void Dispose()

public void AddGrain(IObserverConnectionManager grain)
{
_grains.Add(grain);
_heartbeatGrainIds.Add(((GrainReference)grain).GrainId);
_grains.TryAdd(grain, true);
_heartbeatGrainIds.TryAdd(((GrainReference)grain).GrainId, true);
}

public void RemoveGrain(IObserverConnectionManager grain)
{
_grains.Remove(grain);
_heartbeatGrainIds.Remove(((GrainReference)grain).GrainId);
_grains.TryRemove(grain, out _);
_heartbeatGrainIds.TryRemove(((GrainReference)grain).GrainId, out _);
}

public void ClearGrains()
Expand Down Expand Up @@ -76,13 +79,13 @@ public SignalRObserver GetObserver()

public ImmutableArray<GrainId> GetHeartbeatGrainIds()
{
if (_heartbeatGrainIds.Count == 0)
if (_heartbeatGrainIds.IsEmpty)
{
return ImmutableArray<GrainId>.Empty;
}

var builder = ImmutableArray.CreateBuilder<GrainId>(_heartbeatGrainIds.Count);
foreach (var grainId in _heartbeatGrainIds)
foreach (var grainId in _heartbeatGrainIds.Keys)
{
builder.Add(grainId);
}
Expand Down
101 changes: 87 additions & 14 deletions ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,42 @@ public async Task AddToGroupsAsync(string connectionId, IReadOnlyList<string> gr
return;
}

var subscriptionReference = subscription.Reference;

if (_orleansSignalOptions.Value.GroupPartitionCount > 1)
{
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
foreach (var groupName in uniqueGroupNames)
var partitionIds = await Task.Run(
() => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscriptionReference),
cancellationToken);
Comment on lines +376 to +378
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Register partition grains before awaiting coordinator writes

This now awaits coordinatorGrain.AddConnectionToGroups(...) before recording touched partition grains in subscription. If the coordinator call partially succeeds and then faults (it fans out partition writes in parallel and Task.WhenAll can fail after some partitions succeeded), the method exits before subscription.AddGrain(...) runs, so disconnect cleanup no longer knows which partition grains to remove from. That leaves stale group membership/observer entries for connections that later disconnect.

Useful? React with 👍 / 👎.


if (IsConnectionDisconnected(connectionId))
{
await CleanupDisconnectedBatchPartitionMembershipAsync(
coordinatorGrain,
uniqueGroupNames,
connectionId,
subscriptionReference,
cancellationToken);
return;
}

foreach (var partitionId in partitionIds)
{
var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken);
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
subscription.AddGrain(partitionGrain);
}

await Task.Run(
() => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscription.Reference),
cancellationToken);
if (IsConnectionDisconnected(connectionId))
{
await CleanupDisconnectedBatchPartitionMembershipAsync(
coordinatorGrain,
uniqueGroupNames,
connectionId,
subscriptionReference,
cancellationToken);
return;
}
}
else
{
Expand All @@ -389,19 +412,47 @@ await Task.Run(
.Distinct()
.ToArray();

foreach (var groupGrain in groupGrains)
{
subscription.AddGrain(groupGrain);
}

var tasks = groupGrains
.Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscription.Reference), cancellationToken))
.Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscriptionReference), cancellationToken))
.ToArray();

if (tasks.Length > 0)
{
await Task.WhenAll(tasks);
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the non-partitioned path, subscription.AddGrain(groupGrain) now happens after AddConnection completes. If Task.WhenAll(tasks) throws (e.g., one group add fails while others succeed), the subscription will never track the successfully-added grains, so OnDisconnectedAsync won't remove those memberships and the connection can leak in group state. Consider adding grains to the subscription before issuing the adds, or wrapping WhenAll in try/catch and running a compensating RemoveConnection for any/all groupGrains before returning/throwing.

Suggested change
await Task.WhenAll(tasks);
try
{
await Task.WhenAll(tasks);
}
catch
{
var cleanupTasks = groupGrains
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken))
.ToArray();
if (cleanupTasks.Length > 0)
{
await Task.WhenAll(cleanupTasks);
}
throw;
}

Copilot uses AI. Check for mistakes.
}
Comment on lines 419 to 422
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Track group grains before awaiting AddConnection fan-out

In the non-partitioned path, Task.WhenAll(tasks) is awaited before any subscription.AddGrain(groupGrain) calls. If one AddConnection task fails while others succeed, this throws and skips the later tracking step, so successful group memberships are never added to subscription.Grains and therefore are not removed during OnDisconnectedAsync. The result is stale memberships that can outlive the connection.

Useful? React with 👍 / 👎.


if (IsConnectionDisconnected(connectionId))
{
var cleanupTasks = groupGrains
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken))
.ToArray();

if (cleanupTasks.Length > 0)
{
await Task.WhenAll(cleanupTasks);
}

return;
}

foreach (var groupGrain in groupGrains)
{
subscription.AddGrain(groupGrain);
}

if (IsConnectionDisconnected(connectionId))
{
var cleanupTasks = groupGrains
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken))
.ToArray();

if (cleanupTasks.Length > 0)
{
await Task.WhenAll(cleanupTasks);
}

return;
}
Comment on lines +424 to +455
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disconnect-cleanup logic for the non-partitioned path is duplicated (two IsConnectionDisconnected checks that each build and WhenAll the same RemoveConnection task set). This makes the flow harder to reason about and increases the chance of divergence in future edits. Consider extracting a small helper (e.g., CleanupDisconnectedGroupMembershipAsync) and calling it from both sites, or restructuring to a single post-add check with a single cleanup path.

Copilot uses AI. Check for mistakes.
}

await UpdateConnectionHeartbeatAsync(connectionId, subscription);
Expand All @@ -428,11 +479,13 @@ public async Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList<strin
return;
}

var subscriptionReference = subscription.Reference;

if (_orleansSignalOptions.Value.GroupPartitionCount > 1)
{
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
var partitionIds = await Task.Run(
() => coordinatorGrain.RemoveConnectionFromGroups(uniqueGroupNames, connectionId, subscription.Reference),
() => coordinatorGrain.RemoveConnectionFromGroups(uniqueGroupNames, connectionId, subscriptionReference),
cancellationToken);

foreach (var partitionId in partitionIds)
Expand All @@ -455,7 +508,7 @@ public async Task RemoveFromGroupsAsync(string connectionId, IReadOnlyList<strin
.ToArray();

var tasks = groupGrains
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscription.Reference), cancellationToken))
.Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken))
.ToArray();

if (tasks.Length > 0)
Expand Down Expand Up @@ -723,9 +776,29 @@ private static string[] GetUniqueGroupNames(IReadOnlyList<string> groupNames)
return ordered.ToArray();
}

private static async Task CleanupDisconnectedBatchPartitionMembershipAsync(
ISignalRGroupCoordinatorGrain coordinatorGrain,
string[] groupNames,
string connectionId,
ISignalRObserver subscriptionReference,
CancellationToken cancellationToken)
{
await Task.Run(
() => coordinatorGrain.RemoveConnectionFromGroups(groupNames, connectionId, subscriptionReference),
cancellationToken);
}

private bool IsConnectionDisconnected(string connectionId)
{
var connection = _connections[connectionId];
return connection is null || connection.ConnectionAborted.IsCancellationRequested;
}

private Task UpdateConnectionHeartbeatAsync(string connectionId, Subscription subscription)
{
if (!_orleansSignalOptions.Value.KeepEachConnectionAlive || string.IsNullOrEmpty(subscription.HubKey))
if (!_orleansSignalOptions.Value.KeepEachConnectionAlive ||
string.IsNullOrEmpty(subscription.HubKey) ||
IsConnectionDisconnected(connectionId))
{
return Task.CompletedTask;
}
Expand Down
54 changes: 33 additions & 21 deletions ManagedCode.Orleans.SignalR.Server/SignalRObserverGrainBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ protected void TrackConnection(string connectionId, ISignalRObserver observer)

if (_gracePeriodEnabled && HealthTracker.IsInGracePeriod(connectionId))
{
// Critical: do NOT replay buffered SignalR messages on the Orleans scheduler.
_ = Task.Run(() => RestoreObserverFromGracePeriodAsync(connectionId, observer));
_ = RestoreObserverFromGracePeriodAsync(connectionId, observer);
}
}

Expand All @@ -149,8 +148,7 @@ protected void TouchObserver(ISignalRObserver observer)
if (_gracePeriodEnabled && _observerToConnectionId.TryGetValue(observer, out var connectionId) &&
HealthTracker.IsInGracePeriod(connectionId))
{
// Critical: do NOT replay buffered SignalR messages on the Orleans scheduler.
_ = Task.Run(() => RestoreObserverFromGracePeriodAsync(connectionId, observer));
_ = RestoreObserverFromGracePeriodAsync(connectionId, observer);
}
}

Expand Down Expand Up @@ -462,23 +460,7 @@ protected async Task<int> RestoreObserverFromGracePeriodAsync(string connectionI
connectionId,
bufferedMessages.Count);

var replayedCount = 0;
foreach (var message in bufferedMessages)
{
try
{
await observer.OnNextAsync(message);
replayedCount++;
}
catch (Exception ex)
{
Logger.LogWarning(
ex,
"Failed to replay buffered message to connection {ConnectionId}. Stopping replay.",
connectionId);
break;
}
}
var replayedCount = await ReplayBufferedMessagesAsync(connectionId, observer, bufferedMessages);

if (replayedCount > 0)
{
Expand All @@ -497,6 +479,36 @@ protected async Task<int> RestoreObserverFromGracePeriodAsync(string connectionI
return replayedCount;
}

private async Task<int> ReplayBufferedMessagesAsync(
string connectionId,
ISignalRObserver observer,
IReadOnlyList<HubMessage> bufferedMessages)
{
// Critical: do NOT replay buffered SignalR messages on the Orleans scheduler.
return await Task.Run(async () =>
{
var replayedCount = 0;
foreach (var message in bufferedMessages)
{
try
{
await observer.OnNextAsync(message);
replayedCount++;
}
catch (Exception ex)
{
Logger.LogWarning(
ex,
"Failed to replay buffered message to connection {ConnectionId}. Stopping replay.",
connectionId);
break;
}
}

return replayedCount;
});
}

/// <summary>
/// Called when grace periods expire for observers.
/// Override to implement custom cleanup logic.
Expand Down
5 changes: 3 additions & 2 deletions docs/Features/Group-Partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ Group operations are routed through a coordinator grain that assigns group names
- [x] Add package-level batch group APIs for hub and host-service callers.
- [x] Batch coordinator and partition updates so one request does not force sequential writes per group.
- [x] Cover batch add/remove with integration tests and direct coordinator verification.
- [x] Restore disconnect-safe subscription tracking for partitioned batch adds.
- [x] Restore disconnect-safe cleanup for partitioned batch adds.
- [x] Restore the hashed cleanup fallback when batch removals see missing coordinator assignments.
- [x] Add regression tests for disconnect cleanup and degraded coordinator state.
- [x] Re-run partition cleanup when a disconnected batch join finishes late so late partition adds cannot outlive the connection.

## Main flow

Expand All @@ -47,7 +48,7 @@ flowchart TD
- Partition grains hold `group -> connection -> observer` mappings and emit fan-out to observers.
- Empty groups trigger cleanup so partitions can shed state.
- Batch membership operations collapse repeated coordinator writes into one persistence step per request and one partition write per touched partition.
- Partitioned batch adds pre-register touched partitions in the connection subscription before the coordinator write finishes so disconnect cleanup still reaches every touched partition.
- If that coordinator write finishes after the connection has already disconnected, the lifetime manager immediately issues a compensating batch remove through the coordinator so late partition adds do not recreate stale membership.
- Batch removals fall back to the hashed partition when coordinator assignment metadata is missing, which preserves cleanup in degraded-state recovery scenarios.

## Configuration knobs
Expand Down
2 changes: 2 additions & 0 deletions docs/Features/Hub-Lifetime-Manager-Integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The ASP.NET Core host swaps the default SignalR hub lifetime manager with `Orlea
- [x] Route package-specific batch group membership calls through the Orleans lifetime manager.
- [x] Keep batch group helper calls usable when the host is running plain `AddSignalR()`.
- [x] Add regression coverage for the batch helper path without Orleans registration.
- [x] Re-run batch partition cleanup after disconnect when a late coordinator write finishes.

## Main flow

Expand All @@ -41,6 +42,7 @@ flowchart TD
- The lifetime manager creates a per-connection `Subscription` and registers observers with connection/group/user grains.
- Package-specific batch group operations (`AddToGroupsAsync` / `RemoveFromGroupsAsync`) also route through the lifetime manager instead of looping over sequential single-group writes.
- Hub batch helpers fall back to the registered `HubLifetimeManager<THub>` when `IOrleansGroupManager<THub>` is not explicitly registered, so the API still works on plain `AddSignalR()` hosts.
- If a partitioned batch join finishes after the connection has already disconnected, the lifetime manager immediately routes a compensating batch remove through the coordinator before returning.
- Detailed batching behavior and partition persistence rules live in `docs/Features/Group-Partitioning.md`.

## Configuration knobs
Expand Down
2 changes: 2 additions & 0 deletions docs/Features/Observer-Health-and-Circuit-Breaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Observer delivery is protected by health tracking, circuit breaker logic, and op
- [x] Route grace-period expiration to observer cleanup and health-state removal.
- [x] Offload observer notifications from the Orleans scheduler and document why it is critical.
- [x] Add tests for circuit-breaker threshold behavior (enabled vs disabled).
- [x] Keep grace-period state transitions on the Orleans scheduler and offload only observer replay I/O.

## Main flow

Expand All @@ -41,6 +42,7 @@ flowchart TD
- Failed deliveries are recorded in a rolling failure window.
- When thresholds are exceeded, the circuit opens and delivery is skipped.
- If a grace period is configured, messages are buffered and replayed on recovery; expired grace periods remove observers.
- Grace-period recovery mutates `ObserverHealthTracker` on the grain scheduler first, then replays buffered observer callbacks off-scheduler so Orleans-owned state is never touched from a thread-pool turn.
- Without a grace period, reaching the failure threshold removes the observer immediately.

## Configuration knobs
Expand Down
Loading