-
Notifications
You must be signed in to change notification settings - Fork 5
fix: harden batch group cleanup #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -368,19 +368,42 @@ public async Task AddToGroupsAsync(string connectionId, IReadOnlyList<string> gr | |||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| var subscriptionReference = subscription.Reference; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if (_orleansSignalOptions.Value.GroupPartitionCount > 1) | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient); | ||||||||||||||||||||||||||||||||||||||
| foreach (var groupName in uniqueGroupNames) | ||||||||||||||||||||||||||||||||||||||
| var partitionIds = await Task.Run( | ||||||||||||||||||||||||||||||||||||||
| () => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscriptionReference), | ||||||||||||||||||||||||||||||||||||||
| cancellationToken); | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+376
to
+378
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This now awaits Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if (IsConnectionDisconnected(connectionId)) | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| await CleanupDisconnectedBatchPartitionMembershipAsync( | ||||||||||||||||||||||||||||||||||||||
| coordinatorGrain, | ||||||||||||||||||||||||||||||||||||||
| uniqueGroupNames, | ||||||||||||||||||||||||||||||||||||||
| connectionId, | ||||||||||||||||||||||||||||||||||||||
| subscriptionReference, | ||||||||||||||||||||||||||||||||||||||
| cancellationToken); | ||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| foreach (var partitionId in partitionIds) | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken); | ||||||||||||||||||||||||||||||||||||||
| var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId); | ||||||||||||||||||||||||||||||||||||||
| subscription.AddGrain(partitionGrain); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| await Task.Run( | ||||||||||||||||||||||||||||||||||||||
| () => coordinatorGrain.AddConnectionToGroups(uniqueGroupNames, connectionId, subscription.Reference), | ||||||||||||||||||||||||||||||||||||||
| cancellationToken); | ||||||||||||||||||||||||||||||||||||||
| if (IsConnectionDisconnected(connectionId)) | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| await CleanupDisconnectedBatchPartitionMembershipAsync( | ||||||||||||||||||||||||||||||||||||||
| coordinatorGrain, | ||||||||||||||||||||||||||||||||||||||
| uniqueGroupNames, | ||||||||||||||||||||||||||||||||||||||
| connectionId, | ||||||||||||||||||||||||||||||||||||||
| subscriptionReference, | ||||||||||||||||||||||||||||||||||||||
| cancellationToken); | ||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| else | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -389,19 +412,47 @@ await Task.Run( | |||||||||||||||||||||||||||||||||||||
| .Distinct() | ||||||||||||||||||||||||||||||||||||||
| .ToArray(); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| foreach (var groupGrain in groupGrains) | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| subscription.AddGrain(groupGrain); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| var tasks = groupGrains | ||||||||||||||||||||||||||||||||||||||
| .Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscription.Reference), cancellationToken)) | ||||||||||||||||||||||||||||||||||||||
| .Select(groupGrain => Task.Run(() => groupGrain.AddConnection(connectionId, subscriptionReference), cancellationToken)) | ||||||||||||||||||||||||||||||||||||||
| .ToArray(); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if (tasks.Length > 0) | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| await Task.WhenAll(tasks); | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
| await Task.WhenAll(tasks); | |
| try | |
| { | |
| await Task.WhenAll(tasks); | |
| } | |
| catch | |
| { | |
| var cleanupTasks = groupGrains | |
| .Select(groupGrain => Task.Run(() => groupGrain.RemoveConnection(connectionId, subscriptionReference), cancellationToken)) | |
| .ToArray(); | |
| if (cleanupTasks.Length > 0) | |
| { | |
| await Task.WhenAll(cleanupTasks); | |
| } | |
| throw; | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Track group grains before awaiting AddConnection fan-out
In the non-partitioned path, Task.WhenAll(tasks) is awaited before any subscription.AddGrain(groupGrain) calls. If one AddConnection task fails while others succeed, this throws and skips the later tracking step, so successful group memberships are never added to subscription.Grains and therefore are not removed during OnDisconnectedAsync. The result is stale memberships that can outlive the connection.
Useful? React with 👍 / 👎.
Copilot
AI
Apr 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The disconnect-cleanup logic for the non-partitioned path is duplicated (two IsConnectionDisconnected checks that each build and WhenAll the same RemoveConnection task set). This makes the flow harder to reason about and increases the chance of divergence in future edits. Consider extracting a small helper (e.g., CleanupDisconnectedGroupMembershipAsync) and calling it from both sites, or restructuring to a single post-add check with a single cleanup path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grainscurrently materializes a new array on every access ([.. _grains.Keys]). SinceConcurrentDictionary<TKey,TValue>.Keysis already a thread-safe collection view that implementsIReadOnlyCollection<TKey>, you can likely return_grains.Keysdirectly to avoid repeated allocations and key enumeration.