diff --git a/src/ServiceControl.Audit/Infrastructure/WebApi/Cors.cs b/src/ServiceControl.Audit/Infrastructure/WebApi/Cors.cs index 200a33f45e..d206459ad2 100644 --- a/src/ServiceControl.Audit/Infrastructure/WebApi/Cors.cs +++ b/src/ServiceControl.Audit/Infrastructure/WebApi/Cors.cs @@ -28,7 +28,7 @@ public static CorsPolicy GetDefaultPolicy(CorsSettings settings) } // Headers exposed to the client in the response (accessible via JavaScript) - builder.WithExposedHeaders(["ETag", "Last-Modified", "Link", "Total-Count", "X-Particular-Version"]); + builder.WithExposedHeaders(["ETag", "Last-Modified", "Link", "Total-Count", "X-Particular-Version", "Request-Id"]); // Headers allowed in the request from the client builder.WithHeaders(["Origin", "X-Requested-With", "Content-Type", "Accept", "Authorization"]); // HTTP methods allowed for cross-origin requests diff --git a/src/ServiceControl.Audit/WebApplicationExtensions.cs b/src/ServiceControl.Audit/WebApplicationExtensions.cs index 76785dd77d..deab24c365 100644 --- a/src/ServiceControl.Audit/WebApplicationExtensions.cs +++ b/src/ServiceControl.Audit/WebApplicationExtensions.cs @@ -1,7 +1,9 @@ namespace ServiceControl.Audit; +using System.Threading.Tasks; using Infrastructure.WebApi; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; using ServiceControl.Hosting.ForwardedHeaders; using ServiceControl.Hosting.Https; using ServiceControl.Infrastructure; @@ -10,6 +12,20 @@ public static class WebApplicationExtensions { public static void UseServiceControlAudit(this WebApplication app, ForwardedHeadersSettings forwardedHeadersSettings, HttpsSettings httpsSettings) { + // Surface the per-request id so callers can correlate and quote it. TraceIdentifier is stable + // for the request; OnStarting sets it before the response flushes. + app.Use((context, next) => + { + context.Response.OnStarting(static state => + { + var httpContext = (HttpContext)state; + httpContext.Response.Headers["Request-Id"] = httpContext.TraceIdentifier; + return Task.CompletedTask; + }, context); + + return next(context); + }); + app.UseServiceControlForwardedHeaders(forwardedHeadersSettings); app.UseServiceControlHttps(httpsSettings); app.UseResponseCompression(); diff --git a/src/ServiceControl.Hosting/Auth/PermissionAuthorizationExtensions.cs b/src/ServiceControl.Hosting/Auth/PermissionAuthorizationExtensions.cs index 534ac5d95b..f19f16744d 100644 --- a/src/ServiceControl.Hosting/Auth/PermissionAuthorizationExtensions.cs +++ b/src/ServiceControl.Hosting/Auth/PermissionAuthorizationExtensions.cs @@ -50,6 +50,11 @@ public static void AddServiceControlAuthorization(this IHostApplicationBuilder h services.AddSingleton(); services.AddSingleton(); + // Message-action audit trail. Registered unconditionally (independent of OIDC being enabled) so + // the action trail is recorded even without authentication, attributed to AuditUser.Anonymous. + services.AddSingleton(); + services.AddSingleton(); + // Backs the my/routes manifest: a singleton table projected from the wired endpoints. Reuses // the EndpointDataSource the framework registers, so it sees exactly the routes that are served. services.AddSingleton(); diff --git a/src/ServiceControl.Infrastructure.Tests/Auth/AuditHeadersTests.cs b/src/ServiceControl.Infrastructure.Tests/Auth/AuditHeadersTests.cs new file mode 100644 index 0000000000..171e1ce318 --- /dev/null +++ b/src/ServiceControl.Infrastructure.Tests/Auth/AuditHeadersTests.cs @@ -0,0 +1,47 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Tests.Auth; + +using System.Collections.Generic; +using NServiceBus; +using NServiceBus.Testing; +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; + +[TestFixture] +public class AuditHeadersTests +{ + [Test] + public void Stamp_writes_id_name_and_operation_headers() + { + var options = new SendOptions(); + AuditHeaders.Stamp(options, new AuditUser("alice-sub", "Alice"), "op-123"); + + var headers = options.GetHeaders(); + Assert.That(headers[AuditHeaders.SubjectId], Is.EqualTo("alice-sub")); + Assert.That(headers[AuditHeaders.SubjectName], Is.EqualTo("Alice")); + Assert.That(headers[AuditHeaders.OperationId], Is.EqualTo("op-123")); + } + + [Test] + public void Read_round_trips_stamped_identity_and_operation() + { + var headers = new Dictionary + { + [AuditHeaders.SubjectId] = "alice-sub", + [AuditHeaders.SubjectName] = "Alice", + [AuditHeaders.OperationId] = "op-123" + }; + + var (user, operationId) = AuditHeaders.Read(headers); + Assert.That(user, Is.EqualTo(new AuditUser("alice-sub", "Alice"))); + Assert.That(operationId, Is.EqualTo("op-123")); + } + + [Test] + public void Read_returns_anonymous_when_headers_absent() + { + var (user, operationId) = AuditHeaders.Read(new Dictionary()); + Assert.That(user, Is.EqualTo(AuditUser.Anonymous)); + Assert.That(operationId, Is.Null); + } +} diff --git a/src/ServiceControl.Infrastructure.Tests/Auth/AuditServiceRegistrationTests.cs b/src/ServiceControl.Infrastructure.Tests/Auth/AuditServiceRegistrationTests.cs new file mode 100644 index 0000000000..d3acbf8145 --- /dev/null +++ b/src/ServiceControl.Infrastructure.Tests/Auth/AuditServiceRegistrationTests.cs @@ -0,0 +1,29 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Tests.Auth; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using NUnit.Framework; +using ServiceControl.Configuration; +using ServiceControl.Hosting.Auth; +using ServiceControl.Infrastructure; +using ServiceControl.Infrastructure.Auth; + +[TestFixture] +public class AuditServiceRegistrationTests +{ + [Test] + public void Registers_message_action_audit_and_user_accessor() + { + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddSingleton(LoggerFactory.Create(_ => { })); + var settings = new OpenIdConnectSettings(new SettingsRootNamespace("ServiceControl"), validateConfiguration: false, requireServicePulseSettings: false); + + builder.AddServiceControlAuthorization(settings); + + using var provider = builder.Services.BuildServiceProvider(); + Assert.That(provider.GetService(), Is.TypeOf()); + Assert.That(provider.GetService(), Is.TypeOf()); + } +} diff --git a/src/ServiceControl.Infrastructure.Tests/Auth/AuditUserTests.cs b/src/ServiceControl.Infrastructure.Tests/Auth/AuditUserTests.cs new file mode 100644 index 0000000000..ff188e0f7b --- /dev/null +++ b/src/ServiceControl.Infrastructure.Tests/Auth/AuditUserTests.cs @@ -0,0 +1,17 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Tests.Auth; + +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; + +[TestFixture] +public class AuditUserTests +{ + [Test] + public void Anonymous_has_sentinel_id_and_name() + { + Assert.That(AuditUser.Anonymous.Id, Is.EqualTo("anonymous")); + Assert.That(AuditUser.Anonymous.Name, Is.EqualTo("anonymous")); + Assert.That(AuditUser.AnonymousValue, Is.EqualTo("anonymous")); + } +} diff --git a/src/ServiceControl.Infrastructure.Tests/Auth/AuthorizationAuditLogTests.cs b/src/ServiceControl.Infrastructure.Tests/Auth/AuthorizationAuditLogTests.cs index 69213d9887..1db40a1b6c 100644 --- a/src/ServiceControl.Infrastructure.Tests/Auth/AuthorizationAuditLogTests.cs +++ b/src/ServiceControl.Infrastructure.Tests/Auth/AuthorizationAuditLogTests.cs @@ -45,7 +45,7 @@ public void Decision_deny_emits_one_entry_on_audit_category() Assert.That(ecs.GetProperty("event").GetProperty("type")[0].GetString(), Is.EqualTo("denied")); Assert.That(ecs.GetProperty("event").GetProperty("outcome").GetString(), Is.EqualTo("failure")); Assert.That(ecs.GetProperty("user").GetProperty("id").GetString(), Is.EqualTo("bob-sub-002")); - Assert.That(ecs.GetProperty("servicecontrol").GetProperty("resource").ValueKind, Is.EqualTo(JsonValueKind.Null)); + Assert.That(ecs.GetProperty("servicecontrol").TryGetProperty("resource", out _), Is.False, "null resource should be omitted"); Assert.That(entries[0].Level, Is.EqualTo(LogLevel.Warning)); } diff --git a/src/ServiceControl.Infrastructure.Tests/Auth/CurrentUserAccessorTests.cs b/src/ServiceControl.Infrastructure.Tests/Auth/CurrentUserAccessorTests.cs new file mode 100644 index 0000000000..0c4ee3badf --- /dev/null +++ b/src/ServiceControl.Infrastructure.Tests/Auth/CurrentUserAccessorTests.cs @@ -0,0 +1,55 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Tests.Auth; + +using System.Security.Claims; +using NUnit.Framework; +using ServiceControl.Configuration; +using ServiceControl.Infrastructure; +using ServiceControl.Infrastructure.Auth; + +[TestFixture] +public class CurrentUserAccessorTests +{ + static CurrentUserAccessor Create() + { + // Default claim keys: SubjectIdClaim = "sub", SubjectNameClaim = "preferred_username". + var settings = new OpenIdConnectSettings(new SettingsRootNamespace("ServiceControl"), validateConfiguration: false, requireServicePulseSettings: false); + return new CurrentUserAccessor(settings); + } + + static ClaimsPrincipal Authenticated(params Claim[] claims) => + new(new ClaimsIdentity(claims, authenticationType: "test")); + + [Test] + public void Resolves_id_and_name_from_configured_claims() + { + var user = Create().Resolve(Authenticated(new Claim("sub", "alice-sub"), new Claim("preferred_username", "Alice"))); + Assert.That(user.Id, Is.EqualTo("alice-sub")); + Assert.That(user.Name, Is.EqualTo("Alice")); + } + + [Test] + public void Falls_back_to_id_when_name_claim_missing() + { + var user = Create().Resolve(Authenticated(new Claim("sub", "alice-sub"))); + Assert.That(user.Name, Is.EqualTo("alice-sub")); + } + + [Test] + public void Anonymous_when_principal_is_null() + { + Assert.That(Create().Resolve(null), Is.EqualTo(AuditUser.Anonymous)); + } + + [Test] + public void Anonymous_when_not_authenticated() + { + Assert.That(Create().Resolve(new ClaimsPrincipal(new ClaimsIdentity())), Is.EqualTo(AuditUser.Anonymous)); + } + + [Test] + public void Anonymous_when_subject_claim_absent() + { + Assert.That(Create().Resolve(Authenticated(new Claim("preferred_username", "Alice"))), Is.EqualTo(AuditUser.Anonymous)); + } +} diff --git a/src/ServiceControl.Infrastructure.Tests/Auth/MessageActionAuditLogTests.cs b/src/ServiceControl.Infrastructure.Tests/Auth/MessageActionAuditLogTests.cs new file mode 100644 index 0000000000..384b32814f --- /dev/null +++ b/src/ServiceControl.Infrastructure.Tests/Auth/MessageActionAuditLogTests.cs @@ -0,0 +1,116 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Tests.Auth; + +using System.Text.Json; +using Microsoft.Extensions.Logging; +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; + +[TestFixture] +public class MessageActionAuditLogTests +{ + static (RecordingLoggerProvider provider, MessageActionAuditLog log) Create() + { + var provider = new RecordingLoggerProvider(); + var factory = LoggerFactory.Create(b => b.AddProvider(provider)); + return (provider, new MessageActionAuditLog(factory)); + } + + [Test] + public void Operation_emits_one_entry_on_operation_category() + { + var (provider, log) = Create(); + + log.Operation(new AuditUser("alice-sub", "Alice"), MessageActionKind.Retry, + "error:recoverabilitygroups:retry", MessageActionScope.Group, resource: "group-1", count: 42, operationId: "op-1"); + + var entries = provider.EntriesFor("ServiceControl.Audit"); + Assert.That(entries, Has.Count.EqualTo(1)); + Assert.That(entries[0].Level, Is.EqualTo(LogLevel.Information)); + var ecs = JsonDocument.Parse(entries[0].Message).RootElement; + Assert.That(ecs.GetProperty("event").GetProperty("category")[0].GetString(), Is.EqualTo("configuration")); + Assert.That(ecs.GetProperty("event").GetProperty("type")[0].GetString(), Is.EqualTo("change")); + Assert.That(ecs.GetProperty("event").GetProperty("action").GetString(), Is.EqualTo("error:recoverabilitygroups:retry")); + Assert.That(ecs.GetProperty("event").GetProperty("outcome").GetString(), Is.EqualTo("success")); + Assert.That(ecs.GetProperty("user").GetProperty("id").GetString(), Is.EqualTo("alice-sub")); + Assert.That(ecs.GetProperty("servicecontrol").GetProperty("scope").GetString(), Is.EqualTo("group")); + Assert.That(ecs.GetProperty("servicecontrol").GetProperty("resource").GetString(), Is.EqualTo("group-1")); + Assert.That(ecs.GetProperty("servicecontrol").GetProperty("count").GetInt32(), Is.EqualTo(42)); + Assert.That(ecs.GetProperty("servicecontrol").GetProperty("operation").GetProperty("id").GetString(), Is.EqualTo("op-1")); + } + + [Test] + public void Archive_maps_to_deletion_event_type() + { + var (provider, log) = Create(); + + log.Operation(AuditUser.Anonymous, MessageActionKind.Archive, + "error:messages:archive", MessageActionScope.Single, resource: "m-1", count: 1, operationId: "op-2"); + + var ecs = JsonDocument.Parse(provider.EntriesFor("ServiceControl.Audit")[0].Message).RootElement; + Assert.That(ecs.GetProperty("event").GetProperty("type")[0].GetString(), Is.EqualTo("deletion")); + Assert.That(ecs.GetProperty("user").GetProperty("id").GetString(), Is.EqualTo("anonymous")); + } + + [Test] + public void MessageAction_emits_on_messages_subcategory_with_event_id_2002() + { + var (provider, log) = Create(); + + log.MessageAction(new AuditUser("bob-sub", "Bob"), MessageActionKind.Unarchive, + "error:messages:unarchive", MessageActionScope.Batch, messageId: "m-9", operationId: "op-3"); + + Assert.That(provider.EntriesFor("ServiceControl.Audit"), Is.Empty); + var entries = provider.EntriesFor("ServiceControl.Audit.Messages"); + Assert.That(entries, Has.Count.EqualTo(1)); + Assert.That(entries[0].EventId.Id, Is.EqualTo(2002)); + var ecs = JsonDocument.Parse(entries[0].Message).RootElement; + Assert.That(ecs.GetProperty("servicecontrol").GetProperty("message").GetProperty("id").GetString(), Is.EqualTo("m-9")); + Assert.That(ecs.GetProperty("event").GetProperty("type")[0].GetString(), Is.EqualTo("change")); + } + + [Test] + public void Operation_failure_logs_as_warning() + { + var (provider, log) = Create(); + + log.Operation(new AuditUser("a", "a"), MessageActionKind.Retry, "error:messages:retry", + MessageActionScope.All, resource: null, count: null, operationId: "op-4", success: false); + + var entry = provider.EntriesFor("ServiceControl.Audit")[0]; + Assert.That(entry.Level, Is.EqualTo(LogLevel.Warning)); + Assert.That(entry.EventId.Id, Is.EqualTo(2001)); + var ecs = JsonDocument.Parse(entry.Message).RootElement; + Assert.That(ecs.GetProperty("event").GetProperty("outcome").GetString(), Is.EqualTo("failure")); + } + + [Test] + public void Null_valued_fields_are_omitted() + { + var (provider, log) = Create(); + + log.Operation(new AuditUser("a", "a"), MessageActionKind.Retry, "error:messages:retry", + MessageActionScope.All, resource: null, count: null, operationId: "op-5"); + + var sc = JsonDocument.Parse(provider.EntriesFor("ServiceControl.Audit")[0].Message).RootElement.GetProperty("servicecontrol"); + using (Assert.EnterMultipleScope()) + { + Assert.That(sc.TryGetProperty("resource", out _), Is.False); + Assert.That(sc.TryGetProperty("count", out _), Is.False); + Assert.That(sc.TryGetProperty("message", out _), Is.False); + Assert.That(sc.GetProperty("operation").GetProperty("id").GetString(), Is.EqualTo("op-5")); + } + } + + [TestCase(null, "op")] + [TestCase("", "op")] + [TestCase("error:messages:retry", null)] + [TestCase("error:messages:retry", "")] + public void Operation_throws_when_permission_or_operationId_missing(string? permission, string? operationId) + { + var (_, log) = Create(); + Assert.That( + () => log.Operation(AuditUser.Anonymous, MessageActionKind.Retry, permission!, MessageActionScope.All, null, null, operationId!), + Throws.InstanceOf()); + } +} diff --git a/src/ServiceControl.Infrastructure.Tests/LoggingConfiguratorTests.cs b/src/ServiceControl.Infrastructure.Tests/LoggingConfiguratorTests.cs index cbc790d9de..f500c518a6 100644 --- a/src/ServiceControl.Infrastructure.Tests/LoggingConfiguratorTests.cs +++ b/src/ServiceControl.Infrastructure.Tests/LoggingConfiguratorTests.cs @@ -105,4 +105,14 @@ public void Audit_decisions_render_as_valid_structured_json() Assert.That(deny.GetProperty("servicecontrol").GetProperty("resource").ValueKind, Is.EqualTo(JsonValueKind.Null), "absent resource should be JSON null"); }); } + + [Test] + public void Message_action_subcategory_is_captured_by_the_audit_rule() + { + var config = BuildConfig(); + var auditRule = config.LoggingRules.Single(r => r.LoggerNamePattern == AuditPattern); + + Assert.That(auditRule.NameMatches(ServiceControl.Infrastructure.Auth.MessageActionAuditLog.MessageCategory), Is.True); + Assert.That(auditRule.NameMatches(ServiceControl.Infrastructure.Auth.MessageActionAuditLog.OperationCategory), Is.True); + } } diff --git a/src/ServiceControl.Infrastructure.Tests/ServiceControl.Infrastructure.Tests.csproj b/src/ServiceControl.Infrastructure.Tests/ServiceControl.Infrastructure.Tests.csproj index 4cddac2b26..e8d3871d15 100644 --- a/src/ServiceControl.Infrastructure.Tests/ServiceControl.Infrastructure.Tests.csproj +++ b/src/ServiceControl.Infrastructure.Tests/ServiceControl.Infrastructure.Tests.csproj @@ -7,11 +7,13 @@ + + diff --git a/src/ServiceControl.Infrastructure/Auth/AuditHeaders.cs b/src/ServiceControl.Infrastructure/Auth/AuditHeaders.cs new file mode 100644 index 0000000000..1469f950d9 --- /dev/null +++ b/src/ServiceControl.Infrastructure/Auth/AuditHeaders.cs @@ -0,0 +1,41 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Auth; + +using System.Collections.Generic; +using NServiceBus; + +/// +/// Carries the initiating principal on ServiceControl's own internal command messages so asynchronous +/// handlers can attribute per-message actions. Trusted as-is (trusted-subsystem model): the integrity +/// rests on transport access control, consistent with how the command itself is already trusted. This +/// type is the single stamp/read choke point — cryptographic signing would be added here. +/// +public static class AuditHeaders +{ + public const string SubjectId = "ServiceControl.Audit.InitiatedBy.Id"; + public const string SubjectName = "ServiceControl.Audit.InitiatedBy.Name"; + public const string OperationId = "ServiceControl.Audit.OperationId"; + + public static void Stamp(SendOptions options, AuditUser user, string operationId) + { + options.SetHeader(SubjectId, user.Id); + options.SetHeader(SubjectName, user.Name); + if (!string.IsNullOrEmpty(operationId)) + { + options.SetHeader(OperationId, operationId); + } + } + + public static (AuditUser User, string? OperationId) Read(IReadOnlyDictionary headers) + { + headers.TryGetValue(OperationId, out var operationId); + + if (headers.TryGetValue(SubjectId, out var id) && !string.IsNullOrEmpty(id)) + { + headers.TryGetValue(SubjectName, out var name); + return (new AuditUser(id, string.IsNullOrEmpty(name) ? id : name), operationId); + } + + return (AuditUser.Anonymous, operationId); + } +} diff --git a/src/ServiceControl.Infrastructure/Auth/AuditUser.cs b/src/ServiceControl.Infrastructure/Auth/AuditUser.cs new file mode 100644 index 0000000000..a78685e3b0 --- /dev/null +++ b/src/ServiceControl.Infrastructure/Auth/AuditUser.cs @@ -0,0 +1,13 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Auth; + +/// +/// The principal an audited action is attributed to. is recorded when +/// authentication is disabled or no identified principal is present. +/// +public readonly record struct AuditUser(string Id, string Name) +{ + public const string AnonymousValue = "anonymous"; + + public static readonly AuditUser Anonymous = new(AnonymousValue, AnonymousValue); +} diff --git a/src/ServiceControl.Infrastructure/Auth/AuthorizationAuditLog.cs b/src/ServiceControl.Infrastructure/Auth/AuthorizationAuditLog.cs index 75a15fb980..38718c0fa4 100644 --- a/src/ServiceControl.Infrastructure/Auth/AuthorizationAuditLog.cs +++ b/src/ServiceControl.Infrastructure/Auth/AuthorizationAuditLog.cs @@ -5,6 +5,7 @@ namespace ServiceControl.Infrastructure.Auth; using System.Collections.Generic; using System.Text.Encodings.Web; using System.Text.Json; +using System.Text.Json.Serialization; using Microsoft.Extensions.Logging; /// @@ -19,7 +20,7 @@ public sealed partial class AuthorizationAuditLog(ILoggerFactory loggerFactory) // Relaxed escaping keeps the JSON readable for log sinks (no \uXXXX for '+', '<', accented names, …); // the HTML-safe default only matters in a browser context, which an audit log is not. - static readonly JsonSerializerOptions EcsJsonOptions = new() { Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping }; + static readonly JsonSerializerOptions EcsJsonOptions = new() { Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }; public void Decision(string subjectId, string subjectName, string permission, string? resource, bool allowed, string reason) { diff --git a/src/ServiceControl.Infrastructure/Auth/CurrentUserAccessor.cs b/src/ServiceControl.Infrastructure/Auth/CurrentUserAccessor.cs new file mode 100644 index 0000000000..4b5d8208d1 --- /dev/null +++ b/src/ServiceControl.Infrastructure/Auth/CurrentUserAccessor.cs @@ -0,0 +1,29 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Auth; + +using System.Security.Claims; + +/// +/// Reads the subject id/name from the configured OIDC claim keys (the same keys +/// PermissionVerbHandler uses). Falls back to rather than +/// throwing, so the action trail is still recorded when authentication is disabled. +/// +public sealed class CurrentUserAccessor(OpenIdConnectSettings oidcSettings) : ICurrentUserAccessor +{ + public AuditUser Resolve(ClaimsPrincipal? principal) + { + if (principal?.Identity?.IsAuthenticated != true) + { + return AuditUser.Anonymous; + } + + var id = principal.FindFirst(oidcSettings.SubjectIdClaim)?.Value; + if (string.IsNullOrEmpty(id)) + { + return AuditUser.Anonymous; + } + + var name = principal.FindFirst(oidcSettings.SubjectNameClaim)?.Value; + return new AuditUser(id, string.IsNullOrEmpty(name) ? id : name); + } +} diff --git a/src/ServiceControl.Infrastructure/Auth/ICurrentUserAccessor.cs b/src/ServiceControl.Infrastructure/Auth/ICurrentUserAccessor.cs new file mode 100644 index 0000000000..a6093fa4ed --- /dev/null +++ b/src/ServiceControl.Infrastructure/Auth/ICurrentUserAccessor.cs @@ -0,0 +1,11 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Auth; + +using System.Security.Claims; + +/// Resolves the audited from the current request principal. +public interface ICurrentUserAccessor +{ + /// Returns the principal's subject id/name, or when there is no identified principal. + AuditUser Resolve(ClaimsPrincipal? principal); +} diff --git a/src/ServiceControl.Infrastructure/Auth/IMessageActionAuditLog.cs b/src/ServiceControl.Infrastructure/Auth/IMessageActionAuditLog.cs new file mode 100644 index 0000000000..62364020f4 --- /dev/null +++ b/src/ServiceControl.Infrastructure/Auth/IMessageActionAuditLog.cs @@ -0,0 +1,17 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Auth; + +/// +/// Records user-initiated recoverability message actions (retry / archive / unarchive) as structured +/// audit entries. Operation-level entries answer "who did what to which resource"; per-message entries +/// record each affected message. Both are emitted on the stable ServiceControl.Audit category +/// family so SIEM sinks can collect them without coupling to the concrete type name. +/// +public interface IMessageActionAuditLog +{ + /// Records one user operation (a single click / API call), whatever its fan-out. + void Operation(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string? resource, int? count, string operationId, bool success = true); + + /// Records one affected message, correlated to its operation via . + void MessageAction(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string messageId, string operationId, bool success = true); +} diff --git a/src/ServiceControl.Infrastructure/Auth/MessageAction.cs b/src/ServiceControl.Infrastructure/Auth/MessageAction.cs new file mode 100644 index 0000000000..c620a147af --- /dev/null +++ b/src/ServiceControl.Infrastructure/Auth/MessageAction.cs @@ -0,0 +1,22 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Auth; + +/// The kind of recoverability action being audited. Determines the ECS event.type. +public enum MessageActionKind +{ + Retry, + Archive, + Unarchive +} + +/// How the action selected the messages it acts on. +public enum MessageActionScope +{ + Single, + Batch, + Group, + Queue, + Endpoint, + All, + Range +} diff --git a/src/ServiceControl.Infrastructure/Auth/MessageActionAuditLog.cs b/src/ServiceControl.Infrastructure/Auth/MessageActionAuditLog.cs new file mode 100644 index 0000000000..862082a3ca --- /dev/null +++ b/src/ServiceControl.Infrastructure/Auth/MessageActionAuditLog.cs @@ -0,0 +1,112 @@ +#nullable enable +namespace ServiceControl.Infrastructure.Auth; + +using System; +using System.Collections.Generic; +using System.Text.Encodings.Web; +using System.Text.Json; +using System.Text.Json.Serialization; +using Microsoft.Extensions.Logging; + +/// +/// Emits message-action audit entries as Elastic Common Schema (ECS) documents. Operation-level entries +/// go on (shared audit umbrella); per-message entries go on the +/// sub-category so operators can filter the high-volume per-message stream +/// independently through standard logging configuration. +/// +public sealed partial class MessageActionAuditLog : IMessageActionAuditLog +{ + public const string OperationCategory = AuthorizationAuditLog.AuditCategory; // "ServiceControl.Audit" + public const string MessageCategory = AuthorizationAuditLog.AuditCategory + ".Messages"; // "ServiceControl.Audit.Messages" + + // Relaxed escaping keeps the JSON readable for log sinks, matching AuthorizationAuditLog. + static readonly JsonSerializerOptions EcsJsonOptions = new() { Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }; + + readonly ILogger operationLogger; + readonly ILogger messageLogger; + + public MessageActionAuditLog(ILoggerFactory loggerFactory) + { + operationLogger = loggerFactory.CreateLogger(OperationCategory); + messageLogger = loggerFactory.CreateLogger(MessageCategory); + } + + public void Operation(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string? resource, int? count, string operationId, bool success = true) + { + ArgumentException.ThrowIfNullOrEmpty(permission); + ArgumentException.ThrowIfNullOrEmpty(operationId); + + var ecs = BuildEcsEvent(user, kind, permission, scope, resource, messageId: null, count, operationId, success); + + if (success) + { + LogOperation(operationLogger, ecs); + } + else + { + LogOperationFailure(operationLogger, ecs); + } + } + + public void MessageAction(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string messageId, string operationId, bool success = true) + { + ArgumentException.ThrowIfNullOrEmpty(permission); + ArgumentException.ThrowIfNullOrEmpty(messageId); + ArgumentException.ThrowIfNullOrEmpty(operationId); + + var ecs = BuildEcsEvent(user, kind, permission, scope, resource: null, messageId, count: null, operationId, success); + + if (success) + { + LogMessage(messageLogger, ecs); + } + else + { + LogMessageFailure(messageLogger, ecs); + } + } + + static string BuildEcsEvent(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string? resource, string? messageId, int? count, string operationId, bool success) + { + var ecs = new Dictionary + { + ["@timestamp"] = DateTimeOffset.UtcNow.ToString("O"), + ["event"] = new + { + kind = "event", + category = new[] { "configuration" }, + type = new[] { kind == MessageActionKind.Archive ? "deletion" : "change" }, + action = permission, + outcome = success ? "success" : "failure" + }, + ["user"] = new + { + id = user.Id, + name = user.Name + }, + ["servicecontrol"] = new + { + permission, + scope = scope.ToString().ToLowerInvariant(), + resource, + message = messageId is null ? null : new { id = messageId }, + count, + operation = new { id = operationId } + } + }; + + return JsonSerializer.Serialize(ecs, EcsJsonOptions); + } + + [LoggerMessage(EventId = 2001, Level = LogLevel.Information, Message = "{AuditEvent}")] + static partial void LogOperation(ILogger logger, string auditEvent); + + [LoggerMessage(EventId = 2001, Level = LogLevel.Warning, Message = "{AuditEvent}")] + static partial void LogOperationFailure(ILogger logger, string auditEvent); + + [LoggerMessage(EventId = 2002, Level = LogLevel.Information, Message = "{AuditEvent}")] + static partial void LogMessage(ILogger logger, string auditEvent); + + [LoggerMessage(EventId = 2002, Level = LogLevel.Warning, Message = "{AuditEvent}")] + static partial void LogMessageFailure(ILogger logger, string auditEvent); +} diff --git a/src/ServiceControl.Monitoring/WebApplicationExtensions.cs b/src/ServiceControl.Monitoring/WebApplicationExtensions.cs index fad91eef55..58d7a7c58b 100644 --- a/src/ServiceControl.Monitoring/WebApplicationExtensions.cs +++ b/src/ServiceControl.Monitoring/WebApplicationExtensions.cs @@ -1,6 +1,8 @@ namespace ServiceControl.Monitoring.Infrastructure; +using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; using ServiceControl.Hosting.ForwardedHeaders; using ServiceControl.Hosting.Https; using ServiceControl.Infrastructure; @@ -9,6 +11,20 @@ public static class WebApplicationExtensions { public static void UseServiceControlMonitoring(this WebApplication appBuilder, ForwardedHeadersSettings forwardedHeadersSettings, HttpsSettings httpsSettings, CorsSettings corsSettings) { + // Surface the per-request id so callers can correlate and quote it. TraceIdentifier is stable + // for the request; OnStarting sets it before the response flushes. + appBuilder.Use((context, next) => + { + context.Response.OnStarting(static state => + { + var httpContext = (HttpContext)state; + httpContext.Response.Headers["Request-Id"] = httpContext.TraceIdentifier; + return Task.CompletedTask; + }, context); + + return next(context); + }); + appBuilder.UseServiceControlForwardedHeaders(forwardedHeadersSettings); appBuilder.UseServiceControlHttps(httpsSettings); @@ -30,7 +46,7 @@ public static void UseServiceControlMonitoring(this WebApplication appBuilder, F } // Headers exposed to the client in the response (accessible via JavaScript) - policyBuilder.WithExposedHeaders(["ETag", "Last-Modified", "Link", "Total-Count", "X-Particular-Version"]); + policyBuilder.WithExposedHeaders(["ETag", "Last-Modified", "Link", "Total-Count", "X-Particular-Version", "Request-Id"]); // Headers allowed in the request from the client policyBuilder.WithHeaders(["Origin", "X-Requested-With", "Content-Type", "Accept", "Authorization"]); // HTTP methods allowed for cross-origin requests diff --git a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveDocumentManager.cs b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveDocumentManager.cs index c83e693141..81f822c078 100644 --- a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveDocumentManager.cs +++ b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveDocumentManager.cs @@ -16,7 +16,7 @@ class ArchiveDocumentManager(ExpirationManager expirationManager, ILogger logger { public Task LoadArchiveOperation(IAsyncDocumentSession session, string groupId, ArchiveType archiveType) => session.LoadAsync(ArchiveOperation.MakeId(groupId, archiveType)); - public async Task CreateArchiveOperation(IAsyncDocumentSession session, string groupId, ArchiveType archiveType, int numberOfMessages, string groupName, int batchSize) + public async Task CreateArchiveOperation(IAsyncDocumentSession session, string groupId, ArchiveType archiveType, int numberOfMessages, string groupName, int batchSize, string initiatedById = null, string initiatedByName = null, string operationId = null) { var operation = new ArchiveOperation { @@ -28,7 +28,10 @@ public async Task CreateArchiveOperation(IAsyncDocumentSession Started = DateTime.UtcNow, GroupName = groupName, NumberOfBatches = (int)Math.Ceiling(numberOfMessages / (float)batchSize), - CurrentBatch = 0 + CurrentBatch = 0, + InitiatedById = initiatedById, + InitiatedByName = initiatedByName, + OperationId = operationId }; await session.StoreAsync(operation); diff --git a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveOperation.cs b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveOperation.cs index 8fedc64233..d57e90d86a 100644 --- a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveOperation.cs +++ b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/ArchiveOperation.cs @@ -13,6 +13,13 @@ class ArchiveOperation // raven public DateTime Started { get; set; } public int NumberOfBatches { get; set; } public int CurrentBatch { get; set; } + + // Audit attribution for the initiating operation, carried so per-message audit entries can be + // emitted (and correlated to the operation) as each batch is archived, including after a restart. + public string InitiatedById { get; set; } + public string InitiatedByName { get; set; } + public string OperationId { get; set; } + public static string MakeId(string requestId, ArchiveType archiveType) { return $"ArchiveOperations/{(int)archiveType}/{requestId}"; diff --git a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/MessageArchiver.cs b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/MessageArchiver.cs index b2957ba9a8..56f63c0ced 100644 --- a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/MessageArchiver.cs +++ b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/MessageArchiver.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.Logging; using RavenDB; + using ServiceControl.Infrastructure.Auth; using ServiceControl.Infrastructure.DomainEvents; using ServiceControl.Persistence.Recoverability; using ServiceControl.Recoverability; @@ -17,12 +18,14 @@ public MessageArchiver( OperationsManager operationsManager, IDomainEvents domainEvents, ExpirationManager expirationManager, + IMessageActionAuditLog auditLog, ILogger logger ) { this.sessionProvider = sessionProvider; this.domainEvents = domainEvents; this.expirationManager = expirationManager; + this.auditLog = auditLog; this.logger = logger; this.operationsManager = operationsManager; @@ -33,7 +36,7 @@ ILogger logger unarchivingManager = new UnarchivingManager(domainEvents, operationsManager); } - public async Task ArchiveAllInGroup(string groupId) + public async Task ArchiveAllInGroup(string groupId, AuditUser? initiatedBy = null, string operationId = null) { logger.LogInformation("Archiving of {GroupId} started", groupId); ArchiveOperation archiveOperation; @@ -54,13 +57,17 @@ public async Task ArchiveAllInGroup(string groupId) } logger.LogInformation("Splitting group {GroupId} into batches", groupId); - archiveOperation = await archiveDocumentManager.CreateArchiveOperation(session, groupId, ArchiveType.FailureGroup, groupDetails.NumberOfMessagesInGroup, groupDetails.GroupName, batchSize); + archiveOperation = await archiveDocumentManager.CreateArchiveOperation(session, groupId, ArchiveType.FailureGroup, groupDetails.NumberOfMessagesInGroup, groupDetails.GroupName, batchSize, initiatedBy?.Id, initiatedBy?.Name, operationId); await session.SaveChangesAsync(); logger.LogInformation("Group {GroupId} has been split into {NumberOfBatches} batches", groupId, archiveOperation.NumberOfBatches); } } + // Captured from the persisted operation so resumed operations remain attributed to the initiator. + var auditUser = new AuditUser(archiveOperation.InitiatedById, archiveOperation.InitiatedByName); + var auditOperationId = archiveOperation.OperationId; + await archivingManager.StartArchiving(archiveOperation); while (archiveOperation.CurrentBatch < archiveOperation.NumberOfBatches) @@ -90,11 +97,15 @@ public async Task ArchiveAllInGroup(string groupId) if (nextBatch != null) { + // Remove `FailedMessages/` prefix and publish pure GUIDs without Raven collection name + var messageIds = nextBatch.DocumentIds.Select(id => id.Replace("FailedMessages/", "")).ToArray(); + await domainEvents.Raise(new FailedMessageGroupBatchArchived { - // Remove `FailedMessages/` prefix and publish pure GUIDs without Raven collection name - FailedMessagesIds = nextBatch.DocumentIds.Select(id => id.Replace("FailedMessages/", "")).ToArray() + FailedMessagesIds = messageIds }); + + AuditArchivedMessages(MessageActionKind.Archive, Permissions.ErrorRecoverabilityGroupsArchive, auditUser, auditOperationId, messageIds); } if (nextBatch != null) @@ -125,7 +136,7 @@ await domainEvents.Raise(new FailedMessageGroupArchived logger.LogInformation("Archiving of group {GroupId} completed", groupId); } - public async Task UnarchiveAllInGroup(string groupId) + public async Task UnarchiveAllInGroup(string groupId, AuditUser? initiatedBy = null, string operationId = null) { logger.LogInformation("Unarchiving of {GroupId} started", groupId); UnarchiveOperation unarchiveOperation; @@ -147,13 +158,17 @@ public async Task UnarchiveAllInGroup(string groupId) } logger.LogInformation("Splitting group {GroupId} into batches", groupId); - unarchiveOperation = await unarchiveDocumentManager.CreateUnarchiveOperation(session, groupId, ArchiveType.FailureGroup, groupDetails.NumberOfMessagesInGroup, groupDetails.GroupName, batchSize); + unarchiveOperation = await unarchiveDocumentManager.CreateUnarchiveOperation(session, groupId, ArchiveType.FailureGroup, groupDetails.NumberOfMessagesInGroup, groupDetails.GroupName, batchSize, initiatedBy?.Id, initiatedBy?.Name, operationId); await session.SaveChangesAsync(); logger.LogInformation("Group {GroupId} has been split into {NumberOfBatches} batches", groupId, unarchiveOperation.NumberOfBatches); } } + // Captured from the persisted operation so resumed operations remain attributed to the initiator. + var auditUser = new AuditUser(unarchiveOperation.InitiatedById, unarchiveOperation.InitiatedByName); + var auditOperationId = unarchiveOperation.OperationId; + await unarchivingManager.StartUnarchiving(unarchiveOperation); while (unarchiveOperation.CurrentBatch < unarchiveOperation.NumberOfBatches) @@ -182,11 +197,15 @@ public async Task UnarchiveAllInGroup(string groupId) if (nextBatch != null) { + // Remove `FailedMessages/` prefix and publish pure GUIDs without Raven collection name + var messageIds = nextBatch.DocumentIds.Select(id => id.Replace("FailedMessages/", "")).ToArray(); + await domainEvents.Raise(new FailedMessageGroupBatchUnarchived { - // Remove `FailedMessages/` prefix and publish pure GUIDs without Raven collection name - FailedMessagesIds = nextBatch.DocumentIds.Select(id => id.Replace("FailedMessages/", "")).ToArray() + FailedMessagesIds = messageIds }); + + AuditArchivedMessages(MessageActionKind.Unarchive, Permissions.ErrorRecoverabilityGroupsUnarchive, auditUser, auditOperationId, messageIds); } if (nextBatch != null) @@ -215,6 +234,21 @@ await domainEvents.Raise(new FailedMessageGroupUnarchived }); } + // Emits one per-message audit entry for each message in a batch, correlated to the initiating + // operation. Skipped when no OperationId was captured (e.g. legacy in-flight operations). + void AuditArchivedMessages(MessageActionKind kind, string permission, AuditUser user, string operationId, string[] messageIds) + { + if (string.IsNullOrEmpty(operationId)) + { + return; + } + + foreach (var messageId in messageIds) + { + auditLog.MessageAction(user, kind, permission, MessageActionScope.Group, messageId, operationId); + } + } + public bool IsOperationInProgressFor(string groupId, ArchiveType archiveType) => operationsManager.IsOperationInProgressFor(groupId, archiveType); public bool IsArchiveInProgressFor(string groupId) @@ -236,6 +270,7 @@ public IEnumerable GetArchivalOperations() readonly OperationsManager operationsManager; readonly IDomainEvents domainEvents; readonly ExpirationManager expirationManager; + readonly IMessageActionAuditLog auditLog; readonly ArchiveDocumentManager archiveDocumentManager; readonly ArchivingManager archivingManager; readonly UnarchiveDocumentManager unarchiveDocumentManager; diff --git a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveDocumentManager.cs b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveDocumentManager.cs index 83d2e315cb..92615e5bf9 100644 --- a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveDocumentManager.cs +++ b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveDocumentManager.cs @@ -15,7 +15,7 @@ class UnarchiveDocumentManager { public Task LoadUnarchiveOperation(IAsyncDocumentSession session, string groupId, ArchiveType archiveType) => session.LoadAsync(UnarchiveOperation.MakeId(groupId, archiveType)); - public async Task CreateUnarchiveOperation(IAsyncDocumentSession session, string groupId, ArchiveType archiveType, int numberOfMessages, string groupName, int batchSize) + public async Task CreateUnarchiveOperation(IAsyncDocumentSession session, string groupId, ArchiveType archiveType, int numberOfMessages, string groupName, int batchSize, string initiatedById = null, string initiatedByName = null, string operationId = null) { var operation = new UnarchiveOperation { @@ -27,7 +27,10 @@ public async Task CreateUnarchiveOperation(IAsyncDocumentSes Started = DateTime.UtcNow, GroupName = groupName, NumberOfBatches = (int)Math.Ceiling(numberOfMessages / (float)batchSize), - CurrentBatch = 0 + CurrentBatch = 0, + InitiatedById = initiatedById, + InitiatedByName = initiatedByName, + OperationId = operationId }; await session.StoreAsync(operation); diff --git a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveOperation.cs b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveOperation.cs index 4227ce05f5..93487f3419 100644 --- a/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveOperation.cs +++ b/src/ServiceControl.Persistence.RavenDB/Recoverability/Archiving/UnarchiveOperation.cs @@ -13,6 +13,13 @@ class UnarchiveOperation // raven public DateTime Started { get; set; } public int NumberOfBatches { get; set; } public int CurrentBatch { get; set; } + + // Audit attribution for the initiating operation, carried so per-message audit entries can be + // emitted (and correlated to the operation) as each batch is unarchived, including after a restart. + public string InitiatedById { get; set; } + public string InitiatedByName { get; set; } + public string OperationId { get; set; } + public static string MakeId(string requestId, ArchiveType archiveType) { return $"UnarchiveOperations/{(int)archiveType}/{requestId}"; diff --git a/src/ServiceControl.Persistence.RavenDB/RetryDocumentDataStore.cs b/src/ServiceControl.Persistence.RavenDB/RetryDocumentDataStore.cs index 686b77df9f..999e80ee42 100644 --- a/src/ServiceControl.Persistence.RavenDB/RetryDocumentDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDB/RetryDocumentDataStore.cs @@ -54,7 +54,8 @@ public async Task MoveBatchToStaging(string batchDocumentId) public async Task CreateBatchDocument(string retrySessionId, string requestId, RetryType retryType, string[] failedMessageRetryIds, string originator, - DateTime startTime, DateTime? last = null, string batchName = null, string classifier = null) + DateTime startTime, DateTime? last = null, string batchName = null, string classifier = null, + string initiatedById = null, string initiatedByName = null, string operationId = null) { var batchDocumentId = RetryBatch.MakeDocumentId(Guid.NewGuid().ToString()); using var session = await sessionProvider.OpenSession(); @@ -71,7 +72,10 @@ await session.StoreAsync(new RetryBatch InitialBatchSize = failedMessageRetryIds.Length, RetrySessionId = retrySessionId, FailureRetries = failedMessageRetryIds, - Status = RetryBatchStatus.MarkingDocuments + Status = RetryBatchStatus.MarkingDocuments, + InitiatedById = initiatedById, + InitiatedByName = initiatedByName, + OperationId = operationId }); await session.SaveChangesAsync(); diff --git a/src/ServiceControl.Persistence.Tests.RavenDB/Archiving/ArchiveGroupPerMessageAuditTests.cs b/src/ServiceControl.Persistence.Tests.RavenDB/Archiving/ArchiveGroupPerMessageAuditTests.cs new file mode 100644 index 0000000000..55fcc5f300 --- /dev/null +++ b/src/ServiceControl.Persistence.Tests.RavenDB/Archiving/ArchiveGroupPerMessageAuditTests.cs @@ -0,0 +1,85 @@ +namespace ServiceControl.Persistence.Tests.RavenDB.Archiving; + +using System; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using NServiceBus.Testing; +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; +using ServiceControl.MessageFailures; +using ServiceControl.Persistence.Tests.Recoverability; +using ServiceControl.Recoverability; + +[TestFixture] +class ArchiveGroupPerMessageAuditTests : RavenPersistenceTestBase +{ + readonly RecordingMessageActionAuditLog audit = new(); + + public ArchiveGroupPerMessageAuditTests() => + RegisterServices = services => + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(audit); + }; + + [Test] + public async Task Each_archived_message_is_audited_with_the_initiating_user() + { + var groupId = "TestGroup"; + var user = new AuditUser("alice-sub", "Alice"); + const string operationId = "op-arch"; + + using (var session = DocumentStore.OpenAsyncSession()) + { + foreach (var id in new[] { "A", "B" }) + { + await session.StoreAsync(new FailedMessage + { + Id = "FailedMessages/" + id, + UniqueMessageId = id, + Status = FailedMessageStatus.Unresolved + }); + } + + await session.StoreAsync(new ArchiveBatch + { + Id = ArchiveBatch.MakeId(groupId, ArchiveType.FailureGroup, 0), + DocumentIds = ["FailedMessages/A", "FailedMessages/B"] + }); + + await session.StoreAsync(new ArchiveOperation + { + Id = ArchiveOperation.MakeId(groupId, ArchiveType.FailureGroup), + RequestId = groupId, + ArchiveType = ArchiveType.FailureGroup, + TotalNumberOfMessages = 2, + NumberOfMessagesArchived = 0, + Started = DateTime.UtcNow, + GroupName = "Test Group", + NumberOfBatches = 1, + CurrentBatch = 0, + InitiatedById = user.Id, + InitiatedByName = user.Name, + OperationId = operationId + }); + + await session.SaveChangesAsync(); + } + + var handler = ServiceProvider.GetRequiredService(); + var context = new TestableMessageHandlerContext(); + + await handler.Handle(new ArchiveAllInGroup { GroupId = groupId }, context); + + Assert.That(audit.Messages.Select(m => m.MessageId), Is.EquivalentTo(new[] { "A", "B" })); + using (Assert.EnterMultipleScope()) + { + Assert.That(audit.Messages, Has.All.Matches(m => m.User.Equals(user))); + Assert.That(audit.Messages, Has.All.Matches(m => m.OperationId == operationId)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Kind == MessageActionKind.Archive)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Scope == MessageActionScope.Group)); + } + } +} diff --git a/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs b/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs index fdd41695c7..b0fd60e9ac 100644 --- a/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs +++ b/src/ServiceControl.Persistence.Tests/PersistenceTestBase.cs @@ -10,6 +10,7 @@ using NUnit.Framework; using Particular.LicensingComponent.Persistence; using ServiceControl.Infrastructure; +using ServiceControl.Infrastructure.Auth; using ServiceControl.Infrastructure.DomainEvents; using ServiceControl.Operations.BodyStorage; using ServiceControl.Persistence; @@ -44,6 +45,7 @@ public async Task SetUp() hostBuilder.Services.AddSingleton(new CriticalError((_, __) => Task.CompletedTask)); hostBuilder.Services.AddSingleton(new SettingsHolder()); hostBuilder.Services.AddSingleton(new ReceiveAddresses("fakeReceiveAddress")); + hostBuilder.Services.AddSingleton(); RegisterServices.Invoke(hostBuilder.Services); diff --git a/src/ServiceControl.Persistence.Tests/Recoverability/RecordingMessageActionAuditLog.cs b/src/ServiceControl.Persistence.Tests/Recoverability/RecordingMessageActionAuditLog.cs new file mode 100644 index 0000000000..0506bd0e1a --- /dev/null +++ b/src/ServiceControl.Persistence.Tests/Recoverability/RecordingMessageActionAuditLog.cs @@ -0,0 +1,19 @@ +#nullable enable +namespace ServiceControl.Persistence.Tests.Recoverability; + +using System.Collections.Generic; +using ServiceControl.Infrastructure.Auth; + +sealed class RecordingMessageActionAuditLog : IMessageActionAuditLog +{ + public List Messages { get; } = []; + + public void Operation(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string? resource, int? count, string operationId, bool success = true) + { + } + + public void MessageAction(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string messageId, string operationId, bool success = true) => + Messages.Add(new MessageEntry(user, kind, permission, scope, messageId, operationId, success)); + + public sealed record MessageEntry(AuditUser User, MessageActionKind Kind, string Permission, MessageActionScope Scope, string MessageId, string OperationId, bool Success); +} diff --git a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs index 7ba2c8d239..8f22784e67 100644 --- a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs +++ b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs @@ -12,10 +12,12 @@ using NUnit.Framework; using ServiceBus.Management.Infrastructure.Settings; using ServiceControl.Contracts.Operations; + using ServiceControl.Infrastructure.Auth; using ServiceControl.Infrastructure.BackgroundTasks; using ServiceControl.Infrastructure.DomainEvents; using ServiceControl.MessageFailures; using ServiceControl.Persistence; + using ServiceControl.Persistence.Tests.Recoverability; using ServiceControl.Recoverability; using ServiceControl.Transports; using static ServiceControl.Recoverability.RecoverabilityComponent; @@ -98,6 +100,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte new TestTransportCustomization()), retryManager, new Lazy(() => sender), + new RecordingMessageActionAuditLog(), NullLogger.Instance); // Needs index RetryBatches_ByStatus_ReduceInitialBatchSize @@ -124,6 +127,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte new TestTransportCustomization()), retryManager, new Lazy(() => sender), + new RecordingMessageActionAuditLog(), NullLogger.Instance); await processor.ProcessBatches(); @@ -143,7 +147,7 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed() var sender = new TestSender(); var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore, NullLogger.Instance), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender), NullLogger.Instance); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender), new RecordingMessageActionAuditLog(), NullLogger.Instance); await processor.ProcessBatches(); // mark ready await processor.ProcessBatches(); @@ -173,7 +177,7 @@ public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_ }; var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore, NullLogger.Instance), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender), NullLogger.Instance); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender), new RecordingMessageActionAuditLog(), NullLogger.Instance); bool c; do @@ -213,7 +217,7 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_ var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy(() => sender), NullLogger.Instance); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy(() => sender), new RecordingMessageActionAuditLog(), NullLogger.Instance); CompleteDatabaseOperation(); @@ -224,12 +228,93 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_ Assert.That(status.RetryState, Is.EqualTo(RetryState.Forwarding)); } + [Test] + public async Task When_a_selection_is_staged_each_message_is_audited_as_a_batch() + { + var domainEvents = new FakeDomainEvents(); + var retryManager = new RetryingManager(domainEvents, NullLogger.Instance); + var user = new AuditUser("alice-sub", "Alice"); + const string operationId = "op-sel"; + var ids = new[] { "A", "B" }; + + var messages = ids.Select(id => new FailedMessage + { + Id = FailedMessageIdGenerator.MakeDocumentId(id), + UniqueMessageId = id, + Status = FailedMessageStatus.Unresolved, + ProcessingAttempts = + [ + new FailedMessage.ProcessingAttempt + { + AttemptedAt = DateTime.UtcNow, + MessageMetadata = [], + FailureDetails = new FailureDetails(), + Headers = [] + } + ] + }).ToArray(); + + await ErrorStore.StoreFailedMessagesForTestsOnly(messages); + CompleteDatabaseOperation(); + + var gateway = new CustomRetriesGateway(true, RetryStore, retryManager); + await gateway.StartRetryForMessageSelection(ids, user, operationId); + CompleteDatabaseOperation(); + + var audit = new RecordingMessageActionAuditLog(); + var sender = new TestSender(); + var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore, NullLogger.Instance), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender), audit, NullLogger.Instance); + + await processor.ProcessBatches(); // stage + await processor.ProcessBatches(); // forward + + Assert.That(audit.Messages.Select(m => m.MessageId), Is.EquivalentTo(ids)); + using (Assert.EnterMultipleScope()) + { + Assert.That(audit.Messages, Has.All.Matches(m => m.OperationId == operationId)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Kind == MessageActionKind.Retry)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Scope == MessageActionScope.Batch)); + } + } + + [Test] + public async Task When_a_group_is_staged_each_message_is_audited_with_the_initiating_user() + { + var domainEvents = new FakeDomainEvents(); + var retryManager = new RetryingManager(domainEvents, NullLogger.Instance); + var user = new AuditUser("alice-sub", "Alice"); + const string operationId = "op-abc"; + + await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, user, operationId, "A", "B"); + + var audit = new RecordingMessageActionAuditLog(); + var sender = new TestSender(); + var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore, NullLogger.Instance), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender), audit, NullLogger.Instance); + + await processor.ProcessBatches(); // stage (emits per-message audit) + await processor.ProcessBatches(); // forward + + Assert.That(audit.Messages.Select(m => m.MessageId), Is.EquivalentTo(new[] { "A", "B" })); + using (Assert.EnterMultipleScope()) + { + Assert.That(audit.Messages, Has.All.Matches(m => m.User.Equals(user))); + Assert.That(audit.Messages, Has.All.Matches(m => m.OperationId == operationId)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Kind == MessageActionKind.Retry)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Scope == MessageActionScope.Group)); + } + } + Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(RetryingManager retryManager, string groupId, bool progressToStaged, int numberOfMessages) { return CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, groupId, progressToStaged, Enumerable.Range(0, numberOfMessages).Select(i => Guid.NewGuid().ToString()).ToArray()); } - async Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(RetryingManager retryManager, string groupId, bool progressToStaged, params string[] messageIds) + Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(RetryingManager retryManager, string groupId, bool progressToStaged, params string[] messageIds) => + CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, groupId, progressToStaged, null, null, messageIds); + + async Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(RetryingManager retryManager, string groupId, bool progressToStaged, AuditUser? initiatedBy, string operationId, params string[] messageIds) { var messages = messageIds.Select(id => new FailedMessage { @@ -266,7 +351,7 @@ async Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(RetryingManager retryMa var documentManager = new CustomRetryDocumentManager(progressToStaged, RetryStore, retryManager); var gateway = new CustomRetriesGateway(progressToStaged, RetryStore, retryManager); - gateway.EnqueueRetryForFailureGroup(new RetriesGateway.RetryForFailureGroup(groupId, "Test-Context", groupType: null, DateTime.UtcNow)); + gateway.EnqueueRetryForFailureGroup(new RetriesGateway.RetryForFailureGroup(groupId, "Test-Context", groupType: null, DateTime.UtcNow, initiatedBy, operationId)); CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs b/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs index 0071812cd7..f28b4f640f 100644 --- a/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs +++ b/src/ServiceControl.Persistence/IRetryDocumentDataStore.cs @@ -15,7 +15,8 @@ public interface IRetryDocumentDataStore Task CreateBatchDocument(string retrySessionId, string requestId, RetryType retryType, string[] failedMessageRetryIds, string originator, DateTime startTime, DateTime? last = null, - string batchName = null, string classifier = null); + string batchName = null, string classifier = null, + string initiatedById = null, string initiatedByName = null, string operationId = null); Task>> QueryOrphanedBatches(string retrySessionId); Task> QueryAvailableBatches(); diff --git a/src/ServiceControl.Persistence/Recoverability/Archiving/IArchiveMessages.cs b/src/ServiceControl.Persistence/Recoverability/Archiving/IArchiveMessages.cs index a95f03faf1..763893e9bb 100644 --- a/src/ServiceControl.Persistence/Recoverability/Archiving/IArchiveMessages.cs +++ b/src/ServiceControl.Persistence/Recoverability/Archiving/IArchiveMessages.cs @@ -2,6 +2,7 @@ { using System.Collections.Generic; using System.Threading.Tasks; + using ServiceControl.Infrastructure.Auth; using ServiceControl.Recoverability; /// @@ -9,8 +10,8 @@ /// public interface IArchiveMessages { - Task ArchiveAllInGroup(string groupId); - Task UnarchiveAllInGroup(string groupId); + Task ArchiveAllInGroup(string groupId, AuditUser? initiatedBy = null, string operationId = null); + Task UnarchiveAllInGroup(string groupId, AuditUser? initiatedBy = null, string operationId = null); bool IsOperationInProgressFor(string groupId, ArchiveType archiveType); diff --git a/src/ServiceControl.Persistence/RetryBatch.cs b/src/ServiceControl.Persistence/RetryBatch.cs index 15c8d5b43c..b4aa4806f2 100644 --- a/src/ServiceControl.Persistence/RetryBatch.cs +++ b/src/ServiceControl.Persistence/RetryBatch.cs @@ -19,6 +19,13 @@ public class RetryBatch public RetryBatchStatus Status { get; set; } public IList FailureRetries { get; set; } = []; + // Audit attribution for the initiating operation. Populated only for operations whose messages + // are resolved asynchronously (retry all/endpoint/queue/group), so the per-message audit entry + // can be emitted at the point the batch is actually staged. Null for paths audited at the API. + public string InitiatedById { get; set; } + public string InitiatedByName { get; set; } + public string OperationId { get; set; } + public static string MakeDocumentId(string messageUniqueId) => "RetryBatches/" + messageUniqueId; } } \ No newline at end of file diff --git a/src/ServiceControl.UnitTests/ExternalIntegrations/MessageFailedConverterTests.cs b/src/ServiceControl.UnitTests/ExternalIntegrations/MessageFailedConverterTests.cs index e5f8988c8e..70742c55b1 100644 --- a/src/ServiceControl.UnitTests/ExternalIntegrations/MessageFailedConverterTests.cs +++ b/src/ServiceControl.UnitTests/ExternalIntegrations/MessageFailedConverterTests.cs @@ -5,7 +5,7 @@ using System.Linq; using Contracts; using Contracts.Operations; - using MessageFailures; + using ServiceControl.MessageFailures; using NUnit.Framework; using ServiceControl.Operations; using ServiceControl.Recoverability.ExternalIntegration; diff --git a/src/ServiceControl.UnitTests/MessageFailures/ArchiveUnarchivePendingAuditTests.cs b/src/ServiceControl.UnitTests/MessageFailures/ArchiveUnarchivePendingAuditTests.cs new file mode 100644 index 0000000000..34706994d4 --- /dev/null +++ b/src/ServiceControl.UnitTests/MessageFailures/ArchiveUnarchivePendingAuditTests.cs @@ -0,0 +1,107 @@ +#nullable enable +namespace ServiceControl.UnitTests.MessageFailures; + +using System.Linq; +using System.Threading.Tasks; +using NServiceBus.Testing; +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; +using ServiceControl.MessageFailures.Api; +using ServiceControl.UnitTests.Recoverability; + +[TestFixture] +public class ArchiveUnarchivePendingAuditTests +{ + static readonly AuditUser User = new("alice-sub", "Alice"); + static StubCurrentUserAccessor Accessor => new(User); + + [Test] + public async Task ArchiveBatch_emits_batch_archive_operation() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new ArchiveMessagesController(new TestableMessageSession(), null, Accessor, audit); + + await controller.ArchiveBatch(new[] { "m-1", "m-2", "m-3" }); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Archive)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Batch)); + Assert.That(op.Count, Is.EqualTo(3)); + Assert.That(op.Resource, Is.Null); + } + + [Test] + public async Task Archive_single_emits_single_archive_operation() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new ArchiveMessagesController(new TestableMessageSession(), null, Accessor, audit); + + await controller.Archive("m-1"); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Archive)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Single)); + Assert.That(op.Resource, Is.EqualTo("m-1")); + Assert.That(op.Count, Is.EqualTo(1)); + } + + [Test] + public async Task Unarchive_ids_emits_batch_unarchive_operation() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new UnArchiveMessagesController(new TestableMessageSession(), Accessor, audit); + + await controller.Unarchive(new[] { "m-1", "m-2" }); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Unarchive)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Batch)); + Assert.That(op.Count, Is.EqualTo(2)); + Assert.That(op.Resource, Is.Null); + } + + [Test] + public async Task Unarchive_range_emits_range_unarchive_operation() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new UnArchiveMessagesController(new TestableMessageSession(), Accessor, audit); + + await controller.Unarchive("2024-01-01T00:00:00Z", "2024-01-02T00:00:00Z"); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Unarchive)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Range)); + Assert.That(op.Resource, Is.EqualTo("2024-01-01T00:00:00Z...2024-01-02T00:00:00Z")); + Assert.That(op.Count, Is.Null); + } + + [Test] + public async Task RetryBy_ids_emits_batch_retry_operation() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new PendingRetryMessagesController(new TestableMessageSession(), Accessor, audit); + + await controller.RetryBy(new[] { "m-1", "m-2" }); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Batch)); + Assert.That(op.Count, Is.EqualTo(2)); + Assert.That(op.Resource, Is.Null); + } + + [Test] + public async Task RetryBy_request_emits_queue_retry_operation() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new PendingRetryMessagesController(new TestableMessageSession(), Accessor, audit); + + await controller.RetryBy(new PendingRetryMessagesController.PendingRetryRequest { QueueAddress = "queue-a" }); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Queue)); + Assert.That(op.Resource, Is.EqualTo("queue-a")); + Assert.That(op.Count, Is.Null); + } +} diff --git a/src/ServiceControl.UnitTests/MessageFailures/AsyncRangeAndQueueAuditTests.cs b/src/ServiceControl.UnitTests/MessageFailures/AsyncRangeAndQueueAuditTests.cs new file mode 100644 index 0000000000..ffc925a526 --- /dev/null +++ b/src/ServiceControl.UnitTests/MessageFailures/AsyncRangeAndQueueAuditTests.cs @@ -0,0 +1,190 @@ +#nullable enable +namespace ServiceControl.UnitTests.MessageFailures; + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using CompositeViews.Messages; +using NServiceBus.Testing; +using NUnit.Framework; +using ServiceControl.EventLog; +using ServiceControl.Infrastructure; +using ServiceControl.Infrastructure.Auth; +using ServiceControl.MessageFailures; +using ServiceControl.MessageFailures.Api; +using ServiceControl.MessageFailures.Handlers; +using ServiceControl.MessageFailures.InternalMessages; +using ServiceControl.Operations; +using ServiceControl.Persistence; +using ServiceControl.Persistence.Infrastructure; +using ServiceControl.Recoverability; +using ServiceControl.UnitTests.Operations; +using ServiceControl.UnitTests.Recoverability; + +[TestFixture] +public class AsyncRangeAndQueueAuditTests +{ + static readonly AuditUser User = new("alice-sub", "Alice"); + + static Dictionary StampedHeaders(string operationId) => new() + { + [AuditHeaders.SubjectId] = User.Id, + [AuditHeaders.SubjectName] = User.Name, + [AuditHeaders.OperationId] = operationId + }; + + [Test] + public async Task PendingRetries_by_queue_audits_each_resolved_message() + { + var audit = new RecordingMessageActionAuditLog(); + var store = new StubErrorMessageDataStore { RetryPendingMessagesResult = ["m-1", "m-2"] }; + var handler = new PendingRetriesHandler(store, audit); + + var context = new TestableMessageHandlerContext { MessageHeaders = StampedHeaders("op-q") }; + await handler.Handle(new RetryPendingMessages { QueueAddress = "q", PeriodFrom = DateTime.UtcNow, PeriodTo = DateTime.UtcNow }, context); + + Assert.That(audit.Messages.Select(m => m.MessageId), Is.EquivalentTo(new[] { "m-1", "m-2" })); + using (Assert.EnterMultipleScope()) + { + Assert.That(audit.Messages, Has.All.Matches(m => m.User.Equals(User))); + Assert.That(audit.Messages, Has.All.Matches(m => m.OperationId == "op-q")); + Assert.That(audit.Messages, Has.All.Matches(m => m.Kind == MessageActionKind.Retry)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Scope == MessageActionScope.Queue)); + } + } + + [Test] + public async Task PendingRetries_by_ids_audits_each_message() + { + var audit = new RecordingMessageActionAuditLog(); + var handler = new PendingRetriesHandler(new StubErrorMessageDataStore(), audit); + + var context = new TestableMessageHandlerContext { MessageHeaders = StampedHeaders("op-pi") }; + await handler.Handle(new RetryPendingMessagesById { MessageUniqueIds = ["m-1", "m-2"] }, context); + + Assert.That(audit.Messages.Select(m => m.MessageId), Is.EquivalentTo(new[] { "m-1", "m-2" })); + using (Assert.EnterMultipleScope()) + { + Assert.That(audit.Messages, Has.All.Matches(m => m.OperationId == "op-pi")); + Assert.That(audit.Messages, Has.All.Matches(m => m.Kind == MessageActionKind.Retry)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Scope == MessageActionScope.Batch)); + } + } + + [Test] + public async Task ArchiveMessage_audits_the_archived_message() + { + var audit = new RecordingMessageActionAuditLog(); + var store = new StubErrorMessageDataStore { ErrorByResult = new FailedMessage { Status = FailedMessageStatus.Unresolved } }; + var handler = new ArchiveMessageHandler(store, new FakeDomainEvents(), audit); + + var context = new TestableMessageHandlerContext { MessageHeaders = StampedHeaders("op-a") }; + await handler.Handle(new ArchiveMessage { FailedMessageId = "m-1" }, context); + + var msg = audit.Messages.Single(); + using (Assert.EnterMultipleScope()) + { + Assert.That(msg.MessageId, Is.EqualTo("m-1")); + Assert.That(msg.OperationId, Is.EqualTo("op-a")); + Assert.That(msg.Kind, Is.EqualTo(MessageActionKind.Archive)); + Assert.That(msg.Scope, Is.EqualTo(MessageActionScope.Single)); + } + } + + [Test] + public async Task ArchiveMessage_already_archived_is_not_audited() + { + var audit = new RecordingMessageActionAuditLog(); + var store = new StubErrorMessageDataStore { ErrorByResult = new FailedMessage { Status = FailedMessageStatus.Archived } }; + var handler = new ArchiveMessageHandler(store, new FakeDomainEvents(), audit); + + var context = new TestableMessageHandlerContext { MessageHeaders = StampedHeaders("op-a") }; + await handler.Handle(new ArchiveMessage { FailedMessageId = "m-1" }, context); + + Assert.That(audit.Messages, Is.Empty); + } + + [Test] + public async Task UnArchiveMessages_audits_each_message_with_bare_id() + { + var audit = new RecordingMessageActionAuditLog(); + var store = new StubErrorMessageDataStore { UnArchiveMessagesResult = ["FailedMessages/m-1", "FailedMessages/m-2"] }; + var handler = new UnArchiveMessagesHandler(store, new FakeDomainEvents(), audit); + + var context = new TestableMessageHandlerContext { MessageHeaders = StampedHeaders("op-u") }; + await handler.Handle(new UnArchiveMessages { FailedMessageIds = ["m-1", "m-2"] }, context); + + Assert.That(audit.Messages.Select(m => m.MessageId), Is.EquivalentTo(new[] { "m-1", "m-2" })); + using (Assert.EnterMultipleScope()) + { + Assert.That(audit.Messages, Has.All.Matches(m => m.OperationId == "op-u")); + Assert.That(audit.Messages, Has.All.Matches(m => m.Kind == MessageActionKind.Unarchive)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Scope == MessageActionScope.Batch)); + } + } + + [Test] + public async Task Unarchive_by_range_audits_each_message_with_bare_id() + { + var audit = new RecordingMessageActionAuditLog(); + var store = new StubErrorMessageDataStore { UnArchiveByRangeResult = ["FailedMessages/m-1", "FailedMessages/m-2"] }; + var handler = new UnArchiveMessagesByRangeHandler(store, new FakeDomainEvents(), audit); + + var context = new TestableMessageHandlerContext { MessageHeaders = StampedHeaders("op-r") }; + await handler.Handle(new UnArchiveMessagesByRange { From = DateTime.UtcNow, To = DateTime.UtcNow }, context); + + Assert.That(audit.Messages.Select(m => m.MessageId), Is.EquivalentTo(new[] { "m-1", "m-2" })); + using (Assert.EnterMultipleScope()) + { + Assert.That(audit.Messages, Has.All.Matches(m => m.User.Equals(User))); + Assert.That(audit.Messages, Has.All.Matches(m => m.OperationId == "op-r")); + Assert.That(audit.Messages, Has.All.Matches(m => m.Kind == MessageActionKind.Unarchive)); + Assert.That(audit.Messages, Has.All.Matches(m => m.Scope == MessageActionScope.Range)); + } + } + + sealed class StubErrorMessageDataStore : IErrorMessageDataStore + { + public string[] RetryPendingMessagesResult { get; set; } = []; + public string[] UnArchiveByRangeResult { get; set; } = []; + public string[] UnArchiveMessagesResult { get; set; } = []; + public FailedMessage ErrorByResult { get; set; } = new(); + + public Task GetRetryPendingMessages(DateTime from, DateTime to, string queueAddress) => Task.FromResult(RetryPendingMessagesResult); + public Task RemoveFailedMessageRetryDocument(string uniqueMessageId) => Task.CompletedTask; + public Task UnArchiveMessagesByRange(DateTime from, DateTime to) => Task.FromResult(UnArchiveByRangeResult); + public Task UnArchiveMessages(IEnumerable failedMessageIds) => Task.FromResult(UnArchiveMessagesResult); + public Task ErrorBy(string failedMessageId) => Task.FromResult(ErrorByResult); + public Task FailedMessageMarkAsArchived(string failedMessageId) => Task.CompletedTask; + + public Task>> GetAllMessages(PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages, DateTimeRange? timeSentRange = null) => throw new NotImplementedException(); + public Task>> GetAllMessagesForEndpoint(string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages, DateTimeRange? timeSentRange = null) => throw new NotImplementedException(); + public Task>> GetAllMessagesByConversation(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages) => throw new NotImplementedException(); + public Task>> GetAllMessagesForSearch(string searchTerms, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null) => throw new NotImplementedException(); + public Task>> SearchEndpointMessages(string endpointName, string searchKeyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null) => throw new NotImplementedException(); + public Task FailedMessagesFetch(Guid[] ids) => throw new NotImplementedException(); + public Task StoreFailedErrorImport(FailedErrorImport failure) => throw new NotImplementedException(); + public Task CreateEditFailedMessageManager() => throw new NotImplementedException(); + public Task> GetFailureGroupView(string groupId, string status, string modified) => throw new NotImplementedException(); + public Task> GetFailureGroupsByClassifier(string classifier) => throw new NotImplementedException(); + public Task>> ErrorGet(string status, string modified, string queueAddress, PagingInfo pagingInfo, SortInfo sortInfo) => throw new NotImplementedException(); + public Task ErrorsHead(string status, string modified, string queueAddress) => throw new NotImplementedException(); + public Task>> ErrorsByEndpointName(string status, string endpointName, string modified, PagingInfo pagingInfo, SortInfo sortInfo) => throw new NotImplementedException(); + public Task> ErrorsSummary() => throw new NotImplementedException(); + public Task ErrorLastBy(string failedMessageId) => throw new NotImplementedException(); + public Task CreateNotificationsManager() => throw new NotImplementedException(); + public Task EditComment(string groupId, string comment) => throw new NotImplementedException(); + public Task DeleteComment(string groupId) => throw new NotImplementedException(); + public Task>> GetGroupErrors(string groupId, string status, string modified, SortInfo sortInfo, PagingInfo pagingInfo) => throw new NotImplementedException(); + public Task GetGroupErrorsCount(string groupId, string status, string modified) => throw new NotImplementedException(); + public Task>> GetGroup(string groupId, string status, string modified) => throw new NotImplementedException(); + public Task MarkMessageAsResolved(string failedMessageId) => throw new NotImplementedException(); + public Task ProcessPendingRetries(DateTime periodFrom, DateTime periodTo, string queueAddress, Func processCallback) => throw new NotImplementedException(); + public Task RevertRetry(string messageUniqueId) => throw new NotImplementedException(); + public Task FetchFromFailedMessage(string uniqueMessageId) => throw new NotImplementedException(); + public Task StoreEventLogItem(EventLogItem logItem) => throw new NotImplementedException(); + public Task StoreFailedMessagesForTestsOnly(params FailedMessage[] failedMessages) => throw new NotImplementedException(); + } +} diff --git a/src/ServiceControl.UnitTests/MessageFailures/RetryMessagesControllerAuditTests.cs b/src/ServiceControl.UnitTests/MessageFailures/RetryMessagesControllerAuditTests.cs new file mode 100644 index 0000000000..942d94927d --- /dev/null +++ b/src/ServiceControl.UnitTests/MessageFailures/RetryMessagesControllerAuditTests.cs @@ -0,0 +1,84 @@ +#nullable enable +namespace ServiceControl.UnitTests.MessageFailures; + +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging.Abstractions; +using NServiceBus.Testing; +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; +using ServiceControl.MessageFailures.Api; +using ServiceControl.UnitTests.Recoverability; +using ServiceBus.Management.Infrastructure.Settings; + +[TestFixture] +public class RetryMessagesControllerAuditTests +{ + static RetryMessagesController Create(TestableMessageSession session, RecordingMessageActionAuditLog audit) => + new(new Settings(), null, null, session, NullLogger.Instance, + new StubCurrentUserAccessor(new AuditUser("alice-sub", "Alice")), audit); + + [Test] + public async Task RetryMessageBy_local_emits_single_operation() + { + var audit = new RecordingMessageActionAuditLog(); + await Create(new TestableMessageSession(), audit).RetryMessageBy(null, "msg-1"); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Single)); + Assert.That(op.Resource, Is.EqualTo("msg-1")); + Assert.That(op.Count, Is.EqualTo(1)); + } + + [Test] + public async Task RetryAllBy_ids_emits_batch_operation() + { + var audit = new RecordingMessageActionAuditLog(); + await Create(new TestableMessageSession(), audit).RetryAllBy(["m-1", "m-2"]); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Batch)); + Assert.That(op.Count, Is.EqualTo(2)); + } + + [Test] + public async Task RetryAllBy_queueAddress_emits_queue_operation() + { + var audit = new RecordingMessageActionAuditLog(); + await Create(new TestableMessageSession(), audit).RetryAllBy("queue-a"); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Queue)); + Assert.That(op.Resource, Is.EqualTo("queue-a")); + Assert.That(op.Count, Is.Null); + } + + [Test] + public async Task RetryAll_emits_all_scope_operation() + { + var audit = new RecordingMessageActionAuditLog(); + await Create(new TestableMessageSession(), audit).RetryAll(); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.All)); + Assert.That(op.Resource, Is.Null); + Assert.That(op.Count, Is.Null); + } + + [Test] + public async Task RetryAllByEndpoint_emits_endpoint_operation() + { + var audit = new RecordingMessageActionAuditLog(); + await Create(new TestableMessageSession(), audit).RetryAllByEndpoint("endpoint-a"); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Endpoint)); + Assert.That(op.Resource, Is.EqualTo("endpoint-a")); + Assert.That(op.Count, Is.Null); + } +} diff --git a/src/ServiceControl.UnitTests/Recoverability/EndpointInstanceIdClassifierTests.cs b/src/ServiceControl.UnitTests/Recoverability/EndpointInstanceIdClassifierTests.cs index bfca53b205..68f779b8c4 100644 --- a/src/ServiceControl.UnitTests/Recoverability/EndpointInstanceIdClassifierTests.cs +++ b/src/ServiceControl.UnitTests/Recoverability/EndpointInstanceIdClassifierTests.cs @@ -5,7 +5,7 @@ using NServiceBus; using NUnit.Framework; using ServiceControl.Recoverability; - using static MessageFailures.FailedMessage; + using static ServiceControl.MessageFailures.FailedMessage; [TestFixture] public class EndpointInstanceIdClassifierTests diff --git a/src/ServiceControl.UnitTests/Recoverability/FailureGroupsArchiveUnarchiveAuditTests.cs b/src/ServiceControl.UnitTests/Recoverability/FailureGroupsArchiveUnarchiveAuditTests.cs new file mode 100644 index 0000000000..c9601cab8b --- /dev/null +++ b/src/ServiceControl.UnitTests/Recoverability/FailureGroupsArchiveUnarchiveAuditTests.cs @@ -0,0 +1,43 @@ +#nullable enable +namespace ServiceControl.UnitTests.Recoverability; + +using System.Linq; +using System.Threading.Tasks; +using NServiceBus.Testing; +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; +using ServiceControl.Recoverability.API; + +[TestFixture] +public class FailureGroupsArchiveUnarchiveAuditTests +{ + static readonly AuditUser User = new("alice-sub", "Alice"); + + [Test] + public async Task Archive_group_emits_operation_entry() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new FailureGroupsArchiveController(new TestableMessageSession(), new NoopArchiveMessages(), new StubCurrentUserAccessor(User), audit); + + await controller.ArchiveGroupErrors("group-7"); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Archive)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Group)); + Assert.That(op.Resource, Is.EqualTo("group-7")); + } + + [Test] + public async Task Unarchive_group_emits_operation_entry() + { + var audit = new RecordingMessageActionAuditLog(); + var controller = new FailureGroupsUnarchiveController(new TestableMessageSession(), new NoopArchiveMessages(), new StubCurrentUserAccessor(User), audit); + + await controller.UnarchiveGroupErrors("group-8"); + + var op = audit.Operations.Single(); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Unarchive)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Group)); + Assert.That(op.Resource, Is.EqualTo("group-8")); + } +} diff --git a/src/ServiceControl.UnitTests/Recoverability/FailureGroupsRetryControllerAuditTests.cs b/src/ServiceControl.UnitTests/Recoverability/FailureGroupsRetryControllerAuditTests.cs new file mode 100644 index 0000000000..c1c26dda00 --- /dev/null +++ b/src/ServiceControl.UnitTests/Recoverability/FailureGroupsRetryControllerAuditTests.cs @@ -0,0 +1,36 @@ +#nullable enable +namespace ServiceControl.UnitTests.Recoverability; + +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging.Abstractions; +using NServiceBus.Testing; +using NUnit.Framework; +using ServiceControl.Infrastructure.Auth; +using ServiceControl.Recoverability; +using ServiceControl.Recoverability.API; +using ServiceControl.UnitTests.Operations; + +[TestFixture] +public class FailureGroupsRetryControllerAuditTests +{ + [Test] + public async Task Emits_group_retry_operation_entry() + { + var session = new TestableMessageSession(); + var audit = new RecordingMessageActionAuditLog(); + var user = new AuditUser("alice-sub", "Alice"); + var retryingManager = new RetryingManager(new FakeDomainEvents(), NullLogger.Instance); + var controller = new FailureGroupsRetryController(session, retryingManager, new StubCurrentUserAccessor(user), audit); + + await controller.ArchiveGroupErrors("group-42"); + + Assert.That(audit.Operations, Has.Count.EqualTo(1)); + var op = audit.Operations.Single(); + Assert.That(op.User, Is.EqualTo(user)); + Assert.That(op.Kind, Is.EqualTo(MessageActionKind.Retry)); + Assert.That(op.Scope, Is.EqualTo(MessageActionScope.Group)); + Assert.That(op.Resource, Is.EqualTo("group-42")); + Assert.That(op.Permission, Is.EqualTo(Permissions.ErrorRecoverabilityGroupsRetry)); + } +} diff --git a/src/ServiceControl.UnitTests/Recoverability/LockedHeaderModificationValidatorTests.cs b/src/ServiceControl.UnitTests/Recoverability/LockedHeaderModificationValidatorTests.cs index 82c0646ccc..a0b78e70e5 100644 --- a/src/ServiceControl.UnitTests/Recoverability/LockedHeaderModificationValidatorTests.cs +++ b/src/ServiceControl.UnitTests/Recoverability/LockedHeaderModificationValidatorTests.cs @@ -1,7 +1,7 @@ namespace ServiceControl.UnitTests.Recoverability { using System.Collections.Generic; - using MessageFailures.Api; + using ServiceControl.MessageFailures.Api; using NUnit.Framework; [TestFixture] diff --git a/src/ServiceControl.UnitTests/Recoverability/NoopArchiveMessages.cs b/src/ServiceControl.UnitTests/Recoverability/NoopArchiveMessages.cs new file mode 100644 index 0000000000..fe19844613 --- /dev/null +++ b/src/ServiceControl.UnitTests/Recoverability/NoopArchiveMessages.cs @@ -0,0 +1,29 @@ +#nullable enable +namespace ServiceControl.UnitTests.Recoverability; + +using System.Collections.Generic; +using System.Threading.Tasks; +using ServiceControl.Infrastructure.Auth; +using ServiceControl.Persistence.Recoverability; +using ServiceControl.Recoverability; + +sealed class NoopArchiveMessages : IArchiveMessages +{ + public Task ArchiveAllInGroup(string groupId, AuditUser? initiatedBy = null, string? operationId = null) => Task.CompletedTask; + + public Task UnarchiveAllInGroup(string groupId, AuditUser? initiatedBy = null, string? operationId = null) => Task.CompletedTask; + + public bool IsOperationInProgressFor(string groupId, ArchiveType archiveType) => false; + + public bool IsArchiveInProgressFor(string groupId) => false; + + public void DismissArchiveOperation(string groupId, ArchiveType archiveType) + { + } + + public Task StartArchiving(string groupId, ArchiveType archiveType) => Task.CompletedTask; + + public Task StartUnarchiving(string groupId, ArchiveType archiveType) => Task.CompletedTask; + + public IEnumerable GetArchivalOperations() => []; +} diff --git a/src/ServiceControl.UnitTests/Recoverability/RecordingMessageActionAuditLog.cs b/src/ServiceControl.UnitTests/Recoverability/RecordingMessageActionAuditLog.cs new file mode 100644 index 0000000000..7073b15ed3 --- /dev/null +++ b/src/ServiceControl.UnitTests/Recoverability/RecordingMessageActionAuditLog.cs @@ -0,0 +1,20 @@ +#nullable enable +namespace ServiceControl.UnitTests.Recoverability; + +using System.Collections.Generic; +using ServiceControl.Infrastructure.Auth; + +sealed class RecordingMessageActionAuditLog : IMessageActionAuditLog +{ + public List Operations { get; } = []; + public List Messages { get; } = []; + + public void Operation(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string? resource, int? count, string operationId, bool success = true) => + Operations.Add(new OperationEntry(user, kind, permission, scope, resource, count, operationId, success)); + + public void MessageAction(AuditUser user, MessageActionKind kind, string permission, MessageActionScope scope, string messageId, string operationId, bool success = true) => + Messages.Add(new MessageEntry(user, kind, permission, scope, messageId, operationId, success)); + + public sealed record OperationEntry(AuditUser User, MessageActionKind Kind, string Permission, MessageActionScope Scope, string? Resource, int? Count, string OperationId, bool Success); + public sealed record MessageEntry(AuditUser User, MessageActionKind Kind, string Permission, MessageActionScope Scope, string MessageId, string OperationId, bool Success); +} diff --git a/src/ServiceControl.UnitTests/Recoverability/StubCurrentUserAccessor.cs b/src/ServiceControl.UnitTests/Recoverability/StubCurrentUserAccessor.cs new file mode 100644 index 0000000000..94ea9df4eb --- /dev/null +++ b/src/ServiceControl.UnitTests/Recoverability/StubCurrentUserAccessor.cs @@ -0,0 +1,10 @@ +#nullable enable +namespace ServiceControl.UnitTests.Recoverability; + +using System.Security.Claims; +using ServiceControl.Infrastructure.Auth; + +sealed class StubCurrentUserAccessor(AuditUser user) : ICurrentUserAccessor +{ + public AuditUser Resolve(ClaimsPrincipal? principal) => user; +} diff --git a/src/ServiceControl/Infrastructure/WebApi/AuditOperationIdExtensions.cs b/src/ServiceControl/Infrastructure/WebApi/AuditOperationIdExtensions.cs new file mode 100644 index 0000000000..ff412d28fe --- /dev/null +++ b/src/ServiceControl/Infrastructure/WebApi/AuditOperationIdExtensions.cs @@ -0,0 +1,20 @@ +namespace ServiceControl.Infrastructure.WebApi; + +using System; +using Microsoft.AspNetCore.Mvc; + +static class AuditOperationIdExtensions +{ + /// + /// The audit operation id that ties the synchronous operation audit entry to the asynchronous + /// per-message entries emitted while the operation is carried out. Reuses ASP.NET Core's + /// per-request TraceIdentifier so the id also equals the RequestId already attached + /// to every other log line of the request. Falls back to a GUID when there is no HttpContext + /// (e.g. unit tests invoking the controller directly). + /// + public static string AuditOperationId(this ControllerBase controller) + { + var traceIdentifier = controller.HttpContext?.TraceIdentifier; + return string.IsNullOrEmpty(traceIdentifier) ? Guid.NewGuid().ToString("N") : traceIdentifier; + } +} diff --git a/src/ServiceControl/Infrastructure/WebApi/Cors.cs b/src/ServiceControl/Infrastructure/WebApi/Cors.cs index bb05073086..ce2c9929db 100644 --- a/src/ServiceControl/Infrastructure/WebApi/Cors.cs +++ b/src/ServiceControl/Infrastructure/WebApi/Cors.cs @@ -25,7 +25,7 @@ public static CorsPolicy GetDefaultPolicy(CorsSettings settings) } // Expose custom headers that clients need to read from responses - builder.WithExposedHeaders(["ETag", "Last-Modified", "Link", "Total-Count", "X-Particular-Version", "Content-Disposition"]); + builder.WithExposedHeaders(["ETag", "Last-Modified", "Link", "Total-Count", "X-Particular-Version", "Content-Disposition", "Request-Id"]); // Allow standard headers required for API requests builder.WithHeaders(["Origin", "X-Requested-With", "Content-Type", "Accept", "Authorization"]); // Allow all HTTP methods used by the ServiceControl API diff --git a/src/ServiceControl/MessageFailures/Api/ArchiveMessagesController.cs b/src/ServiceControl/MessageFailures/Api/ArchiveMessagesController.cs index 84384bec6f..c1f732f2fa 100644 --- a/src/ServiceControl/MessageFailures/Api/ArchiveMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/ArchiveMessagesController.cs @@ -1,5 +1,6 @@ namespace ServiceControl.MessageFailures.Api { + using System; using System.Linq; using System.Threading.Tasks; using Infrastructure.Auth; @@ -13,7 +14,7 @@ namespace ServiceControl.MessageFailures.Api [ApiController] [Route("api")] - public class ArchiveMessagesController(IMessageSession messageSession, IErrorMessageDataStore dataStore) : ControllerBase + public class ArchiveMessagesController(IMessageSession messageSession, IErrorMessageDataStore dataStore, ICurrentUserAccessor userAccessor, IMessageActionAuditLog auditLog) : ControllerBase { [Authorize(Policy = Permissions.ErrorMessagesArchive)] [Route("errors/archive")] @@ -27,11 +28,18 @@ public async Task ArchiveBatch(string[] messageIds) return UnprocessableEntity(ModelState); } + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Archive, Permissions.ErrorMessagesArchive, MessageActionScope.Batch, + resource: null, count: messageIds.Length, operationId: operationId); + foreach (var id in messageIds) { - var request = new ArchiveMessage { FailedMessageId = id }; + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); - await messageSession.SendLocal(request); + await messageSession.Send(new ArchiveMessage { FailedMessageId = id }, sendOptions); } return Accepted(); @@ -55,7 +63,16 @@ public async Task GetArchiveMessageGroups(string classifier = "Ex [HttpPatch] public async Task Archive(string messageId) { - await messageSession.SendLocal(m => m.FailedMessageId = messageId); + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Archive, Permissions.ErrorMessagesArchive, MessageActionScope.Single, + resource: messageId, count: 1, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await messageSession.Send(m => m.FailedMessageId = messageId, sendOptions); return Accepted(); } diff --git a/src/ServiceControl/MessageFailures/Api/PendingRetryMessagesController.cs b/src/ServiceControl/MessageFailures/Api/PendingRetryMessagesController.cs index 1ad75d3bee..4c5db0c9fd 100644 --- a/src/ServiceControl/MessageFailures/Api/PendingRetryMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/PendingRetryMessagesController.cs @@ -6,6 +6,7 @@ using System.Text.Json.Serialization; using System.Threading.Tasks; using Infrastructure.Auth; + using Infrastructure.WebApi; using InternalMessages; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; @@ -13,7 +14,7 @@ [ApiController] [Route("api")] - public class PendingRetryMessagesController(IMessageSession session) : ControllerBase + public class PendingRetryMessagesController(IMessageSession session, ICurrentUserAccessor userAccessor, IMessageActionAuditLog auditLog) : ControllerBase { [Authorize(Policy = Permissions.ErrorMessagesRetry)] [Route("pendingretries/retry")] @@ -26,7 +27,16 @@ public async Task RetryBy(string[] ids) return UnprocessableEntity(ModelState); } - await session.SendLocal(m => m.MessageUniqueIds = ids); + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Batch, + resource: null, count: ids.Length, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await session.Send(m => m.MessageUniqueIds = ids, sendOptions); return Accepted(); } @@ -36,12 +46,21 @@ public async Task RetryBy(string[] ids) [HttpPost] public async Task RetryBy(PendingRetryRequest request) { - await session.SendLocal(m => + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Queue, + resource: request.QueueAddress, count: null, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await session.Send(m => { m.QueueAddress = request.QueueAddress; m.PeriodFrom = request.From; m.PeriodTo = request.To; - }); + }, sendOptions); return Accepted(); } diff --git a/src/ServiceControl/MessageFailures/Api/RetryMessagesController.cs b/src/ServiceControl/MessageFailures/Api/RetryMessagesController.cs index 29fb12966c..86d5109305 100644 --- a/src/ServiceControl/MessageFailures/Api/RetryMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/RetryMessagesController.cs @@ -1,10 +1,12 @@ namespace ServiceControl.MessageFailures.Api { + using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading.Tasks; using Infrastructure.Auth; + using Infrastructure.WebApi; using InternalMessages; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Http; @@ -23,7 +25,9 @@ public class RetryMessagesController( HttpMessageInvoker httpMessageInvoker, IHttpForwarder forwarder, IMessageSession messageSession, - ILogger logger) : ControllerBase + ILogger logger, + ICurrentUserAccessor userAccessor, + IMessageActionAuditLog auditLog) : ControllerBase { [Authorize(Policy = Permissions.ErrorMessagesRetry)] [Route("errors/{failedMessageId:required:minlength(1)}/retry")] @@ -32,7 +36,16 @@ public async Task RetryMessageBy([FromQuery(Name = "instance_id") { if (string.IsNullOrWhiteSpace(instanceId) || instanceId == settings.InstanceId) { - await messageSession.SendLocal(m => m.FailedMessageId = failedMessageId); + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Single, + resource: failedMessageId, count: 1, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await messageSession.Send(m => m.FailedMessageId = failedMessageId, sendOptions); return Accepted(); } @@ -62,7 +75,16 @@ public async Task RetryAllBy(List messageIds) return BadRequest(); } - await messageSession.SendLocal(m => m.MessageUniqueIds = messageIds.ToArray()); + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Batch, + resource: null, count: messageIds.Count, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await messageSession.Send(m => m.MessageUniqueIds = messageIds.ToArray(), sendOptions); return Accepted(); } @@ -72,11 +94,20 @@ public async Task RetryAllBy(List messageIds) [HttpPost] public async Task RetryAllBy(string queueAddress) { - await messageSession.SendLocal(m => + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Queue, + resource: queueAddress, count: null, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await messageSession.Send(m => { m.QueueAddress = queueAddress; m.Status = FailedMessageStatus.Unresolved; - }); + }, sendOptions); return Accepted(); } @@ -86,7 +117,16 @@ await messageSession.SendLocal(m => [HttpPost] public async Task RetryAll() { - await messageSession.SendLocal(new RequestRetryAll()); + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.All, + resource: null, count: null, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await messageSession.Send(new RequestRetryAll(), sendOptions); return Accepted(); } @@ -96,7 +136,16 @@ public async Task RetryAll() [HttpPost] public async Task RetryAllByEndpoint(string endpointName) { - await messageSession.SendLocal(new RequestRetryAll { Endpoint = endpointName }); + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Endpoint, + resource: endpointName, count: null, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await messageSession.Send(new RequestRetryAll { Endpoint = endpointName }, sendOptions); return Accepted(); } diff --git a/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs b/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs index d36940bc99..dd756c2ede 100644 --- a/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/UnArchiveMessagesController.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Threading.Tasks; using Infrastructure.Auth; + using Infrastructure.WebApi; using InternalMessages; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; @@ -12,7 +13,7 @@ [ApiController] [Route("api")] - public class UnArchiveMessagesController(IMessageSession session) : ControllerBase + public class UnArchiveMessagesController(IMessageSession session, ICurrentUserAccessor userAccessor, IMessageActionAuditLog auditLog) : ControllerBase { [Authorize(Policy = Permissions.ErrorMessagesUnarchive)] [Route("errors/unarchive")] @@ -24,9 +25,16 @@ public async Task Unarchive(string[] ids) return BadRequest(); } - var request = new UnArchiveMessages { FailedMessageIds = ids }; + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Unarchive, Permissions.ErrorMessagesUnarchive, MessageActionScope.Batch, + resource: null, count: ids.Length, operationId: operationId); - await session.SendLocal(request); + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await session.Send(new UnArchiveMessages { FailedMessageIds = ids }, sendOptions); return Accepted(); } @@ -48,7 +56,16 @@ public async Task Unarchive(string from, string to) return BadRequest(); } - await session.SendLocal(new UnArchiveMessagesByRange { From = fromDateTime, To = toDateTime }); + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Unarchive, Permissions.ErrorMessagesUnarchive, MessageActionScope.Range, + resource: $"{from}...{to}", count: null, operationId: operationId); + + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await session.Send(new UnArchiveMessagesByRange { From = fromDateTime, To = toDateTime }, sendOptions); return Accepted(); } diff --git a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs index 6bbdd411d3..2713abb4cf 100644 --- a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs @@ -2,13 +2,14 @@ { using System.Threading.Tasks; using Contracts.MessageFailures; + using Infrastructure.Auth; using Infrastructure.DomainEvents; using InternalMessages; using NServiceBus; using ServiceControl.Persistence; [Handler] - class ArchiveMessageHandler(IErrorMessageDataStore dataStore, IDomainEvents domainEvents) : IHandleMessages + class ArchiveMessageHandler(IErrorMessageDataStore dataStore, IDomainEvents domainEvents, IMessageActionAuditLog auditLog) : IHandleMessages { public async Task Handle(ArchiveMessage message, IMessageHandlerContext context) { @@ -24,6 +25,12 @@ await domainEvents.Raise(new FailedMessageArchived }, context.CancellationToken); await dataStore.FailedMessageMarkAsArchived(failedMessageId); + + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + if (!string.IsNullOrEmpty(operationId)) + { + auditLog.MessageAction(user, MessageActionKind.Archive, Permissions.ErrorMessagesArchive, MessageActionScope.Single, failedMessageId, operationId); + } } } } diff --git a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs index 83eef58e5c..c7d2b6dbc6 100644 --- a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs @@ -1,19 +1,31 @@ namespace ServiceControl.MessageFailures.Handlers { + using System.Linq; using System.Threading.Tasks; using Contracts.MessageFailures; + using Infrastructure.Auth; using Infrastructure.DomainEvents; using InternalMessages; using NServiceBus; using Persistence; [Handler] - class UnArchiveMessagesByRangeHandler(IErrorMessageDataStore dataStore, IDomainEvents domainEvents) : IHandleMessages + class UnArchiveMessagesByRangeHandler(IErrorMessageDataStore dataStore, IDomainEvents domainEvents, IMessageActionAuditLog auditLog) : IHandleMessages { public async Task Handle(UnArchiveMessagesByRange message, IMessageHandlerContext context) { var ids = await dataStore.UnArchiveMessagesByRange(message.From, message.To); + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + if (!string.IsNullOrEmpty(operationId)) + { + foreach (var id in ids) + { + // ids are Raven document ids (FailedMessages/{uniqueId}); audit records the bare unique id + auditLog.MessageAction(user, MessageActionKind.Unarchive, Permissions.ErrorMessagesUnarchive, MessageActionScope.Range, id.Replace("FailedMessages/", ""), operationId); + } + } + await domainEvents.Raise(new FailedMessagesUnArchived { DocumentIds = ids, diff --git a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs index b85f1a00e3..894cf17730 100644 --- a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs @@ -1,20 +1,32 @@ namespace ServiceControl.MessageFailures.Handlers { + using System.Linq; using System.Threading.Tasks; using Contracts.MessageFailures; + using Infrastructure.Auth; using Infrastructure.DomainEvents; using InternalMessages; using NServiceBus; using Persistence; [Handler] - class UnArchiveMessagesHandler(IErrorMessageDataStore store, IDomainEvents domainEvents) + class UnArchiveMessagesHandler(IErrorMessageDataStore store, IDomainEvents domainEvents, IMessageActionAuditLog auditLog) : IHandleMessages { public async Task Handle(UnArchiveMessages messages, IMessageHandlerContext context) { var ids = await store.UnArchiveMessages(messages.FailedMessageIds); + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + if (!string.IsNullOrEmpty(operationId)) + { + foreach (var id in ids) + { + // ids are Raven document ids (FailedMessages/{uniqueId}); audit records the bare unique id + auditLog.MessageAction(user, MessageActionKind.Unarchive, Permissions.ErrorMessagesUnarchive, MessageActionScope.Batch, id.Replace("FailedMessages/", ""), operationId); + } + } + await domainEvents.Raise(new FailedMessagesUnArchived { DocumentIds = ids, diff --git a/src/ServiceControl/Recoverability/API/FailureGroupsArchiveController.cs b/src/ServiceControl/Recoverability/API/FailureGroupsArchiveController.cs index 69c805f199..5ca138b40f 100644 --- a/src/ServiceControl/Recoverability/API/FailureGroupsArchiveController.cs +++ b/src/ServiceControl/Recoverability/API/FailureGroupsArchiveController.cs @@ -1,7 +1,9 @@ namespace ServiceControl.Recoverability.API { + using System; using System.Threading.Tasks; using Infrastructure.Auth; + using Infrastructure.WebApi; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using NServiceBus; @@ -9,18 +11,32 @@ [ApiController] [Route("api")] - public class FailureGroupsArchiveController(IMessageSession bus, IArchiveMessages archiver) : ControllerBase + public class FailureGroupsArchiveController( + IMessageSession bus, + IArchiveMessages archiver, + ICurrentUserAccessor userAccessor, + IMessageActionAuditLog auditLog) : ControllerBase { [Authorize(Policy = Permissions.ErrorRecoverabilityGroupsArchive)] [Route("recoverability/groups/{groupId:required:minlength(1)}/errors/archive")] [HttpPost] public async Task ArchiveGroupErrors(string groupId) { + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Archive, + Permissions.ErrorRecoverabilityGroupsArchive, MessageActionScope.Group, + resource: groupId, count: null, operationId: operationId); + if (!archiver.IsOperationInProgressFor(groupId, ArchiveType.FailureGroup)) { await archiver.StartArchiving(groupId, ArchiveType.FailureGroup); - await bus.SendLocal(m => { m.GroupId = groupId; }); + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await bus.Send(m => { m.GroupId = groupId; }, sendOptions); } return Accepted(); diff --git a/src/ServiceControl/Recoverability/API/FailureGroupsRetryController.cs b/src/ServiceControl/Recoverability/API/FailureGroupsRetryController.cs index 8ef2ec86d2..9cde9774ef 100644 --- a/src/ServiceControl/Recoverability/API/FailureGroupsRetryController.cs +++ b/src/ServiceControl/Recoverability/API/FailureGroupsRetryController.cs @@ -3,6 +3,7 @@ namespace ServiceControl.Recoverability.API using System; using System.Threading.Tasks; using Infrastructure.Auth; + using Infrastructure.WebApi; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using NServiceBus; @@ -10,7 +11,11 @@ namespace ServiceControl.Recoverability.API [ApiController] [Route("api")] - public class FailureGroupsRetryController(IMessageSession bus, RetryingManager retryingManager) : ControllerBase + public class FailureGroupsRetryController( + IMessageSession bus, + RetryingManager retryingManager, + ICurrentUserAccessor userAccessor, + IMessageActionAuditLog auditLog) : ControllerBase { [Authorize(Policy = Permissions.ErrorRecoverabilityGroupsRetry)] [Route("recoverability/groups/{groupId:required:minlength(1)}/errors/retry")] @@ -19,15 +24,25 @@ public async Task ArchiveGroupErrors(string groupId) { var started = DateTime.UtcNow; + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Retry, + Permissions.ErrorRecoverabilityGroupsRetry, MessageActionScope.Group, + resource: groupId, count: null, operationId: operationId); + if (!retryingManager.IsOperationInProgressFor(groupId, RetryType.FailureGroup)) { await retryingManager.Wait(groupId, RetryType.FailureGroup, started); - await bus.SendLocal(new RetryAllInGroup + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await bus.Send(new RetryAllInGroup { GroupId = groupId, Started = started - }); + }, sendOptions); } return Accepted(); diff --git a/src/ServiceControl/Recoverability/API/FailureGroupsUnarchiveController.cs b/src/ServiceControl/Recoverability/API/FailureGroupsUnarchiveController.cs index 37e2f0b6d9..11382db8e8 100644 --- a/src/ServiceControl/Recoverability/API/FailureGroupsUnarchiveController.cs +++ b/src/ServiceControl/Recoverability/API/FailureGroupsUnarchiveController.cs @@ -1,7 +1,9 @@ namespace ServiceControl.Recoverability.API { + using System; using System.Threading.Tasks; using Infrastructure.Auth; + using Infrastructure.WebApi; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using NServiceBus; @@ -9,18 +11,32 @@ [ApiController] [Route("api")] - public class FailureGroupsUnarchiveController(IMessageSession bus, IArchiveMessages archiver) : ControllerBase + public class FailureGroupsUnarchiveController( + IMessageSession bus, + IArchiveMessages archiver, + ICurrentUserAccessor userAccessor, + IMessageActionAuditLog auditLog) : ControllerBase { [Authorize(Policy = Permissions.ErrorRecoverabilityGroupsUnarchive)] [Route("recoverability/groups/{groupId:required:minlength(1)}/errors/unarchive")] [HttpPost] public async Task UnarchiveGroupErrors(string groupId) { + var user = userAccessor.Resolve(User); + var operationId = this.AuditOperationId(); + auditLog.Operation(user, MessageActionKind.Unarchive, + Permissions.ErrorRecoverabilityGroupsUnarchive, MessageActionScope.Group, + resource: groupId, count: null, operationId: operationId); + if (!archiver.IsOperationInProgressFor(groupId, ArchiveType.FailureGroup)) { await archiver.StartUnarchiving(groupId, ArchiveType.FailureGroup); - await bus.SendLocal(m => { m.GroupId = groupId; }); + var sendOptions = new SendOptions(); + sendOptions.RouteToThisEndpoint(); + AuditHeaders.Stamp(sendOptions, user, operationId); + + await bus.Send(m => { m.GroupId = groupId; }, sendOptions); } return Accepted(); diff --git a/src/ServiceControl/Recoverability/Archiving/ArchiveAllInGroupHandler.cs b/src/ServiceControl/Recoverability/Archiving/ArchiveAllInGroupHandler.cs index bf70107845..69e0dd1ae2 100644 --- a/src/ServiceControl/Recoverability/Archiving/ArchiveAllInGroupHandler.cs +++ b/src/ServiceControl/Recoverability/Archiving/ArchiveAllInGroupHandler.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Recoverability { using System.Threading.Tasks; + using Infrastructure.Auth; using Microsoft.Extensions.Logging; using NServiceBus; using ServiceControl.Persistence.Recoverability; @@ -16,7 +17,9 @@ public async Task Handle(ArchiveAllInGroup message, IMessageHandlerContext conte return; } - await archiver.ArchiveAllInGroup(message.GroupId); + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + + await archiver.ArchiveAllInGroup(message.GroupId, user, operationId); } } } diff --git a/src/ServiceControl/Recoverability/Archiving/UnArchiveAllInGroupHandler.cs b/src/ServiceControl/Recoverability/Archiving/UnArchiveAllInGroupHandler.cs index c84c486be1..a1c6af97a2 100644 --- a/src/ServiceControl/Recoverability/Archiving/UnArchiveAllInGroupHandler.cs +++ b/src/ServiceControl/Recoverability/Archiving/UnArchiveAllInGroupHandler.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Recoverability { using System.Threading.Tasks; + using Infrastructure.Auth; using Microsoft.Extensions.Logging; using NServiceBus; using ServiceControl.Persistence.Recoverability; @@ -16,7 +17,9 @@ public async Task Handle(UnarchiveAllInGroup message, IMessageHandlerContext con return; } - await archiver.UnarchiveAllInGroup(message.GroupId); + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + + await archiver.UnarchiveAllInGroup(message.GroupId, user, operationId); } } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Retrying/Handlers/PendingRetriesHandler.cs b/src/ServiceControl/Recoverability/Retrying/Handlers/PendingRetriesHandler.cs index 62cde7fe7f..6e1a8649f9 100644 --- a/src/ServiceControl/Recoverability/Retrying/Handlers/PendingRetriesHandler.cs +++ b/src/ServiceControl/Recoverability/Retrying/Handlers/PendingRetriesHandler.cs @@ -2,6 +2,7 @@ namespace ServiceControl.Recoverability { using System.Collections.Generic; using System.Threading.Tasks; + using Infrastructure.Auth; using MessageFailures.InternalMessages; using NServiceBus; using Persistence; @@ -10,9 +11,10 @@ namespace ServiceControl.Recoverability class PendingRetriesHandler : IHandleMessages, IHandleMessages { - public PendingRetriesHandler(IErrorMessageDataStore dataStore) + public PendingRetriesHandler(IErrorMessageDataStore dataStore, IMessageActionAuditLog auditLog) { this.dataStore = dataStore; + this.auditLog = auditLog; } public async Task Handle(RetryPendingMessages message, IMessageHandlerContext context) @@ -21,10 +23,17 @@ public async Task Handle(RetryPendingMessages message, IMessageHandlerContext co var ids = await dataStore.GetRetryPendingMessages(message.PeriodFrom, message.PeriodTo, message.QueueAddress); + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + foreach (var id in ids) { await dataStore.RemoveFailedMessageRetryDocument(id); messageIds.Add(id); + + if (!string.IsNullOrEmpty(operationId)) + { + auditLog.MessageAction(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Queue, id, operationId); + } } await context.SendLocal(new RetryMessagesById { MessageUniqueIds = messageIds.ToArray() }); @@ -32,14 +41,22 @@ public async Task Handle(RetryPendingMessages message, IMessageHandlerContext co public async Task Handle(RetryPendingMessagesById message, IMessageHandlerContext context) { + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + foreach (var messageUniqueId in message.MessageUniqueIds) { await dataStore.RemoveFailedMessageRetryDocument(messageUniqueId); + + if (!string.IsNullOrEmpty(operationId)) + { + auditLog.MessageAction(user, MessageActionKind.Retry, Permissions.ErrorMessagesRetry, MessageActionScope.Batch, messageUniqueId, operationId); + } } await context.SendLocal(m => m.MessageUniqueIds = message.MessageUniqueIds); } readonly IErrorMessageDataStore dataStore; + readonly IMessageActionAuditLog auditLog; } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Retrying/Handlers/RetriesHandler.cs b/src/ServiceControl/Recoverability/Retrying/Handlers/RetriesHandler.cs index 4a2e7266c5..09d7ef11c3 100644 --- a/src/ServiceControl/Recoverability/Retrying/Handlers/RetriesHandler.cs +++ b/src/ServiceControl/Recoverability/Retrying/Handlers/RetriesHandler.cs @@ -2,6 +2,7 @@ namespace ServiceControl.Recoverability { using System.Threading.Tasks; using Contracts.MessageFailures; + using Infrastructure.Auth; using MessageFailures.InternalMessages; using NServiceBus; using ServiceControl.Persistence; @@ -29,27 +30,38 @@ public Task Handle(MessageFailed message, IMessageHandlerContext context) public Task Handle(RequestRetryAll message, IMessageHandlerContext context) { + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + if (!string.IsNullOrWhiteSpace(message.Endpoint)) { - retries.StartRetryForEndpoint(message.Endpoint); + retries.StartRetryForEndpoint(message.Endpoint, user, operationId); } else { - retries.StartRetryForAllMessages(); + retries.StartRetryForAllMessages(user, operationId); } return Task.CompletedTask; } - public Task Handle(RetryMessage message, IMessageHandlerContext context) => retries.StartRetryForSingleMessage(message.FailedMessageId); + public Task Handle(RetryMessage message, IMessageHandlerContext context) + { + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + return retries.StartRetryForSingleMessage(message.FailedMessageId, user, operationId); + } - public Task Handle(RetryMessagesById message, IMessageHandlerContext context) => retries.StartRetryForMessageSelection(message.MessageUniqueIds); + public Task Handle(RetryMessagesById message, IMessageHandlerContext context) + { + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + return retries.StartRetryForMessageSelection(message.MessageUniqueIds, user, operationId); + } public Task Handle(RetryMessagesByQueueAddress message, IMessageHandlerContext context) { var failedQueueAddress = message.QueueAddress; - retries.StartRetryForFailedQueueAddress(failedQueueAddress, message.Status); + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + retries.StartRetryForFailedQueueAddress(failedQueueAddress, message.Status, user, operationId); return Task.CompletedTask; } diff --git a/src/ServiceControl/Recoverability/Retrying/Handlers/RetryAllInGroupHandler.cs b/src/ServiceControl/Recoverability/Retrying/Handlers/RetryAllInGroupHandler.cs index 97271902fd..38da31b01f 100644 --- a/src/ServiceControl/Recoverability/Retrying/Handlers/RetryAllInGroupHandler.cs +++ b/src/ServiceControl/Recoverability/Retrying/Handlers/RetryAllInGroupHandler.cs @@ -2,6 +2,7 @@ namespace ServiceControl.Recoverability { using System; using System.Threading.Tasks; + using Infrastructure.Auth; using Microsoft.Extensions.Logging; using NServiceBus; using ServiceControl.Persistence; @@ -37,11 +38,15 @@ public async Task Handle(RetryAllInGroup message, IMessageHandlerContext context var started = message.Started ?? DateTime.UtcNow; await retryingManager.Wait(message.GroupId, RetryType.FailureGroup, started, originator, group?.Type, group?.Last); + var (user, operationId) = AuditHeaders.Read(context.MessageHeaders); + retries.EnqueueRetryForFailureGroup(new RetriesGateway.RetryForFailureGroup( message.GroupId, originator, group?.Type, - started + started, + user, + operationId )); } } diff --git a/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs b/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs index 7c1f8941a7..793310a1c0 100644 --- a/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs +++ b/src/ServiceControl/Recoverability/Retrying/RetriesGateway.cs @@ -6,6 +6,7 @@ namespace ServiceControl.Recoverability using System.Linq; using System.Threading.Tasks; using Infrastructure; + using Infrastructure.Auth; using MessageFailures; using Microsoft.Extensions.Logging; using ServiceControl.Persistence; @@ -19,7 +20,7 @@ public RetriesGateway(IRetryDocumentDataStore store, RetryingManager operationMa this.logger = logger; } - public async Task StartRetryForSingleMessage(string uniqueMessageId) + public async Task StartRetryForSingleMessage(string uniqueMessageId, AuditUser? initiatedBy = null, string operationId = null) { logger.LogInformation("Retrying a single message {UniqueMessageId}", uniqueMessageId); @@ -28,11 +29,11 @@ public async Task StartRetryForSingleMessage(string uniqueMessageId) var numberOfMessages = 1; await operationManager.Preparing(requestId, retryType, numberOfMessages); - await StageRetryByUniqueMessageIds(requestId, retryType, new[] { uniqueMessageId }, DateTime.UtcNow); + await StageRetryByUniqueMessageIds(requestId, retryType, new[] { uniqueMessageId }, DateTime.UtcNow, initiatedBy: initiatedBy, operationId: operationId); await operationManager.PreparedBatch(requestId, retryType, numberOfMessages); } - public async Task StartRetryForMessageSelection(string[] uniqueMessageIds) + public async Task StartRetryForMessageSelection(string[] uniqueMessageIds, AuditUser? initiatedBy = null, string operationId = null) { logger.LogInformation("Retrying a selection of {MessageCount} messages", uniqueMessageIds.Length); @@ -41,11 +42,11 @@ public async Task StartRetryForMessageSelection(string[] uniqueMessageIds) var numberOfMessages = uniqueMessageIds.Length; await operationManager.Preparing(requestId, retryType, numberOfMessages); - await StageRetryByUniqueMessageIds(requestId, retryType, uniqueMessageIds, DateTime.UtcNow); + await StageRetryByUniqueMessageIds(requestId, retryType, uniqueMessageIds, DateTime.UtcNow, initiatedBy: initiatedBy, operationId: operationId); await operationManager.PreparedBatch(requestId, retryType, numberOfMessages); } - async Task StageRetryByUniqueMessageIds(string requestId, RetryType retryType, string[] messageIds, DateTime startTime, DateTime? last = null, string originator = null, string batchName = null, string classifier = null) + async Task StageRetryByUniqueMessageIds(string requestId, RetryType retryType, string[] messageIds, DateTime startTime, DateTime? last = null, string originator = null, string batchName = null, string classifier = null, AuditUser? initiatedBy = null, string operationId = null) { if (messageIds == null || !messageIds.Any()) { @@ -55,7 +56,7 @@ async Task StageRetryByUniqueMessageIds(string requestId, RetryType retryType, s var failedMessageRetryIds = messageIds.Select(FailedMessageRetry.MakeDocumentId).ToArray(); - var batchDocumentId = await store.CreateBatchDocument(RetryDocumentManager.RetrySessionId, requestId, retryType, failedMessageRetryIds, originator, startTime, last, batchName, classifier); + var batchDocumentId = await store.CreateBatchDocument(RetryDocumentManager.RetrySessionId, requestId, retryType, failedMessageRetryIds, originator, startTime, last, batchName, classifier, initiatedBy?.Id, initiatedBy?.Name, operationId); logger.LogInformation("Created Batch '{BatchDocumentId}' with {BatchMessageCount} messages for '{BatchName}'", batchDocumentId, messageIds.Length, batchName); @@ -94,7 +95,7 @@ async Task ProcessRequest(BulkRetryRequest request) for (var i = 0; i < batches.Count; i++) { - await StageRetryByUniqueMessageIds(request.RequestId, request.RetryType, batches[i], request.StartTime, latestAttempt, request.Originator, GetBatchName(i + 1, batches.Count, request.Originator), request.Classifier); + await StageRetryByUniqueMessageIds(request.RequestId, request.RetryType, batches[i], request.StartTime, latestAttempt, request.Originator, GetBatchName(i + 1, batches.Count, request.Originator), request.Classifier, request.InitiatedBy, request.OperationId); numberOfMessagesAdded += batches[i].Length; await operationManager.PreparedBatch(request.RequestId, request.RetryType, numberOfMessagesAdded); @@ -112,23 +113,23 @@ static string GetBatchName(int pageNum, int totalPages, string context) return $"'{context}' batch {pageNum} of {totalPages}"; } - public void StartRetryForAllMessages() + public void StartRetryForAllMessages(AuditUser? initiatedBy = null, string operationId = null) { - var item = new RetryForAllMessages(); + var item = new RetryForAllMessages(initiatedBy, operationId); logger.LogInformation("Enqueuing index based bulk retry '{Item}'", item); bulkRequests.Enqueue(item); } - public void StartRetryForEndpoint(string endpoint) + public void StartRetryForEndpoint(string endpoint, AuditUser? initiatedBy = null, string operationId = null) { - var item = new RetryForEndpoint(endpoint); + var item = new RetryForEndpoint(endpoint, initiatedBy, operationId); logger.LogInformation("Enqueuing index based bulk retry '{Item}'", item); bulkRequests.Enqueue(item); } - public void StartRetryForFailedQueueAddress(string failedQueueAddress, FailedMessageStatus status) + public void StartRetryForFailedQueueAddress(string failedQueueAddress, FailedMessageStatus status, AuditUser? initiatedBy = null, string operationId = null) { - var item = new RetryForFailedQueueAddress(failedQueueAddress, status); + var item = new RetryForFailedQueueAddress(failedQueueAddress, status, initiatedBy, operationId); logger.LogInformation("Enqueuing index based bulk retry '{Item}'", item); bulkRequests.Enqueue(item); } @@ -153,18 +154,24 @@ public abstract class BulkRetryRequest public string Originator { get; } public string Classifier { get; } public DateTime StartTime { get; } + public AuditUser? InitiatedBy { get; } + public string OperationId { get; } public BulkRetryRequest( string requestId, RetryType retryType, DateTime startTime, - string originator + string originator, + AuditUser? initiatedBy = null, + string operationId = null ) { RequestId = requestId; RetryType = retryType; Originator = originator; StartTime = startTime; + InitiatedBy = initiatedBy; + OperationId = operationId; } protected abstract Task Invoke(IRetryDocumentDataStore store, Func callback); @@ -209,7 +216,7 @@ Task Process(string uniqueMessageId, DateTime latestTimeOfFailure) class RetryForAllMessages : BulkRetryRequest { - public RetryForAllMessages() : base(requestId: "All", RetryType.All, DateTime.UtcNow, "all messages") + public RetryForAllMessages(AuditUser? initiatedBy = null, string operationId = null) : base(requestId: "All", RetryType.All, DateTime.UtcNow, "all messages", initiatedBy, operationId) { } @@ -223,7 +230,7 @@ class RetryForEndpoint : BulkRetryRequest { public string Endpoint { get; } - public RetryForEndpoint(string endpoint) : base(requestId: endpoint, RetryType.AllForEndpoint, DateTime.UtcNow, originator: $"all messages for endpoint {endpoint}") + public RetryForEndpoint(string endpoint, AuditUser? initiatedBy = null, string operationId = null) : base(requestId: endpoint, RetryType.AllForEndpoint, DateTime.UtcNow, originator: $"all messages for endpoint {endpoint}", initiatedBy, operationId) { Endpoint = endpoint; } @@ -240,7 +247,7 @@ public sealed class RetryForFailureGroup : BulkRetryRequest public string GroupTitle { get; } public string GroupType { get; } - public RetryForFailureGroup(string groupId, string groupTitle, string groupType, DateTime started) : base(requestId: groupId, RetryType.FailureGroup, started, originator: groupTitle) + public RetryForFailureGroup(string groupId, string groupTitle, string groupType, DateTime started, AuditUser? initiatedBy = null, string operationId = null) : base(requestId: groupId, RetryType.FailureGroup, started, originator: groupTitle, initiatedBy, operationId) { GroupId = groupId; GroupType = groupType; @@ -267,8 +274,10 @@ class RetryForFailedQueueAddress : BulkRetryRequest public RetryForFailedQueueAddress( string failedQueueAddress, - FailedMessageStatus status - ) : base(requestId: failedQueueAddress, RetryType.ByQueueAddress, DateTime.UtcNow, originator: $"all messages for failed queue address '{failedQueueAddress}'") + FailedMessageStatus status, + AuditUser? initiatedBy = null, + string operationId = null + ) : base(requestId: failedQueueAddress, RetryType.ByQueueAddress, DateTime.UtcNow, originator: $"all messages for failed queue address '{failedQueueAddress}'", initiatedBy, operationId) { FailedQueueAddress = failedQueueAddress; Status = status; diff --git a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs index dbdeab6477..d22c433fb9 100644 --- a/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs +++ b/src/ServiceControl/Recoverability/Retrying/RetryProcessor.cs @@ -5,6 +5,7 @@ namespace ServiceControl.Recoverability using System.Linq; using System.Threading; using System.Threading.Tasks; + using Infrastructure.Auth; using Infrastructure.DomainEvents; using MessageFailures; using Microsoft.Extensions.Logging; @@ -22,6 +23,7 @@ public RetryProcessor( ReturnToSenderDequeuer returnToSender, RetryingManager retryingManager, Lazy messageDispatcher, + IMessageActionAuditLog auditLog, ILogger logger) { this.store = store; @@ -29,6 +31,7 @@ public RetryProcessor( this.retryingManager = retryingManager; this.domainEvents = domainEvents; this.messageDispatcher = messageDispatcher; + this.auditLog = auditLog; this.logger = logger; corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName, logger); } @@ -215,6 +218,8 @@ async Task Stage(RetryBatch stagingBatch, IRetryBatchesManager manager) await TryDispatch(transportOperations, messages, failedMessageRetriesById, stagingId, previousAttemptFailed); + AuditStagedMessages(stagingBatch, messages); + if (stagingBatch.RetryType != RetryType.FailureGroup) //FailureGroup published on completion of entire group { var failedIds = messages.Select(x => x.UniqueMessageId).ToArray(); @@ -235,6 +240,38 @@ await domainEvents.Raise(new MessagesSubmittedForRetry return messages.Length; } + // Emits one per-message audit entry for each message actually staged for retry. Only bulk/group + // operations (retry all/endpoint/queue/group) carry an OperationId here; the explicit-id and + // single paths are audited synchronously at the API and leave OperationId null so we don't double-log. + void AuditStagedMessages(RetryBatch stagingBatch, IReadOnlyCollection messages) + { + if (string.IsNullOrEmpty(stagingBatch.OperationId)) + { + return; + } + + var user = new AuditUser(stagingBatch.InitiatedById, stagingBatch.InitiatedByName); + var scope = stagingBatch.RetryType switch + { + RetryType.All => MessageActionScope.All, + RetryType.AllForEndpoint => MessageActionScope.Endpoint, + RetryType.ByQueueAddress => MessageActionScope.Queue, + RetryType.FailureGroup => MessageActionScope.Group, + RetryType.MultipleMessages => MessageActionScope.Batch, + RetryType.SingleMessage => MessageActionScope.Single, + RetryType.Unknown => MessageActionScope.Single, + _ => MessageActionScope.Single + }; + var permission = stagingBatch.RetryType == RetryType.FailureGroup + ? Permissions.ErrorRecoverabilityGroupsRetry + : Permissions.ErrorMessagesRetry; + + foreach (var failedMessage in messages) + { + auditLog.MessageAction(user, MessageActionKind.Retry, permission, scope, failedMessage.UniqueMessageId, stagingBatch.OperationId); + } + } + Task TryDispatch(TransportOperation[] transportOperations, IReadOnlyCollection messages, IReadOnlyDictionary failedMessageRetriesById, string stagingId, bool previousAttemptFailed) @@ -332,6 +369,7 @@ TransportOperation ToTransportOperation(FailedMessage message, string stagingId) readonly ReturnToSenderDequeuer returnToSender; readonly RetryingManager retryingManager; readonly Lazy messageDispatcher; + readonly IMessageActionAuditLog auditLog; MessageRedirectsCollection redirects; bool isRecoveringFromPrematureShutdown = true; CorruptedReplyToHeaderStrategy corruptedReplyToHeaderStrategy; diff --git a/src/ServiceControl/WebApplicationExtensions.cs b/src/ServiceControl/WebApplicationExtensions.cs index 685bc7dc16..ddd083db9e 100644 --- a/src/ServiceControl/WebApplicationExtensions.cs +++ b/src/ServiceControl/WebApplicationExtensions.cs @@ -1,8 +1,10 @@ namespace ServiceControl; +using System.Threading.Tasks; using Infrastructure.SignalR; using Infrastructure.WebApi; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; using ServiceControl.Hosting.ForwardedHeaders; using ServiceControl.Hosting.Https; using ServiceControl.Infrastructure; @@ -11,6 +13,20 @@ public static class WebApplicationExtensions { public static void UseServiceControl(this WebApplication app, ForwardedHeadersSettings forwardedHeadersSettings, HttpsSettings httpsSettings) { + // Surface the per-request id (same value used as the audit operation id) so callers can correlate + // and quote it. TraceIdentifier is stable for the request; OnStarting sets it before the response flushes. + app.Use((context, next) => + { + context.Response.OnStarting(static state => + { + var httpContext = (HttpContext)state; + httpContext.Response.Headers["Request-Id"] = httpContext.TraceIdentifier; + return Task.CompletedTask; + }, context); + + return next(context); + }); + app.UseServiceControlForwardedHeaders(forwardedHeadersSettings); app.UseServiceControlHttps(httpsSettings); app.UseResponseCompression();