Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/Exceptionless.Core/Jobs/CleanupDataJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ ILoggerFactory loggerFactory

protected override Task<ILock?> GetLockAsync(CancellationToken cancellationToken = default)
{
return _lockProvider.TryAcquireAsync(nameof(CleanupDataJob), TimeSpan.FromMinutes(15), cancellationToken);
return _lockProvider.TryAcquireAsync(nameof(CleanupDataJob), TimeSpan.FromHours(2), cancellationToken);
}

protected override async Task<JobResult> RunInternalAsync(JobContext context)
Expand Down Expand Up @@ -114,8 +114,13 @@ private async Task CleanupSoftDeletedOrganizationsAsync(JobContext context)

while (organizationResults.Documents.Count > 0 && !context.CancellationToken.IsCancellationRequested)
{
await RenewLockAsync(context);

foreach (var organization in organizationResults.Documents)
{
if (context.CancellationToken.IsCancellationRequested)
break;

using var _ = _logger.BeginScope(new ExceptionlessState().Organization(organization.Id));
try
{
Expand All @@ -142,8 +147,13 @@ private async Task CleanupSoftDeletedProjectsAsync(JobContext context)

while (projectResults.Documents.Count > 0 && !context.CancellationToken.IsCancellationRequested)
{
await RenewLockAsync(context);

foreach (var project in projectResults.Documents)
{
if (context.CancellationToken.IsCancellationRequested)
break;

using var _ = _logger.BeginScope(new ExceptionlessState().Organization(project.OrganizationId).Project(project.Id));
try
{
Expand Down Expand Up @@ -278,8 +288,13 @@ private async Task EnforceRetentionAsync(JobContext context)
var results = await _organizationRepository.FindAsync(q => q.Include(o => o.Id, o => o.Name, o => o.RetentionDays), o => o.SearchAfterPaging().PageLimit(100));
while (results.Documents.Count > 0 && !context.CancellationToken.IsCancellationRequested)
{
await RenewLockAsync(context);

foreach (var organization in results.Documents)
{
if (context.CancellationToken.IsCancellationRequested)
break;

using var _ = _logger.BeginScope(new ExceptionlessState().Organization(organization.Id));

int retentionDays = _billingManager.GetBillingPlanByUpsellingRetentionPeriod(organization.RetentionDays)?.RetentionDays ?? _appOptions.MaximumRetentionDays;
Expand Down Expand Up @@ -349,6 +364,8 @@ private async Task EnforceEventRetentionDaysAsync(Organization organization, int

private Task RenewLockAsync(JobContext context)
{
// Called at each page boundary to prevent the distributed lock from expiring
// during long-running bulk cleanup operations that span multiple pages.
_lastRun = _timeProvider.GetUtcNow().UtcDateTime;
return context.RenewLockAsync();
}
Expand Down
359 changes: 129 additions & 230 deletions src/Exceptionless.Core/Jobs/CleanupOrphanedDataJob.cs

Large diffs are not rendered by default.

185 changes: 181 additions & 4 deletions src/Exceptionless.Core/Repositories/EventRepository.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
using Elastic.Clients.Elasticsearch.QueryDsl;
using System.Linq.Expressions;
using Elastic.Clients.Elasticsearch;
using Elastic.Clients.Elasticsearch.Aggregations;
using Elastic.Clients.Elasticsearch.QueryDsl;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using Exceptionless.Core.Models;
using Exceptionless.Core.Repositories.Configuration;
using Exceptionless.Core.Repositories.Queries;
using Exceptionless.Core.Validation;
using Exceptionless.DateTimeExtensions;
using Foundatio.Repositories;
using Foundatio.Repositories.Elasticsearch.Extensions;
using Foundatio.Repositories.Exceptions;
using Foundatio.Repositories.Models;

namespace Exceptionless.Core.Repositories;

public class EventRepository : RepositoryOwnedByOrganizationAndProject<PersistentEvent>, IEventRepository
{
private readonly ExceptionlessElasticConfiguration _configuration;
private readonly TimeProvider _timeProvider;

public EventRepository(ExceptionlessElasticConfiguration configuration, AppOptions options, MiniValidationValidator validator)
: base(configuration.Events, validator, options)
{
_configuration = configuration;
_timeProvider = configuration.TimeProvider;

DisableCache(); // NOTE: If cache is ever enabled, then fast paths for patching/deleting with scripts will be super slow!
Expand Down Expand Up @@ -74,7 +83,7 @@ public Task<long> RemoveAllAsync(string organizationId, string? clientIpAddress,
if (!String.IsNullOrEmpty(clientIpAddress))
query = query.FieldEquals(EventIndex.Alias.IpAddress, clientIpAddress);

return RemoveAllAsync(q => query, options);
return RemoveAllIgnoringMissingEventIndexesAsync(q => query, options);
}

public Task<FindResults<PersistentEvent>> GetByReferenceIdAsync(string projectId, string referenceId)
Expand Down Expand Up @@ -188,12 +197,180 @@ public override Task<FindResults<PersistentEvent>> GetByProjectIdAsync(string pr
return FindAsync(q => q.Project(projectId).SortDescending(e => e.Date).SortDescending(e => e.Id), options);
}

public override Task<long> RemoveAllByOrganizationIdAsync(string organizationId)
{
ArgumentException.ThrowIfNullOrEmpty(organizationId);

return RemoveAllIgnoringMissingEventIndexesAsync(q => q.Organization(organizationId));
}

public override Task<long> RemoveAllByProjectIdAsync(string organizationId, string projectId)
{
ArgumentException.ThrowIfNullOrEmpty(organizationId);
ArgumentException.ThrowIfNullOrEmpty(projectId);

return RemoveAllIgnoringMissingEventIndexesAsync(q => q.Organization(organizationId).Project(projectId));
}

public Task<long> RemoveAllByStackIdsAsync(string[] stackIds)
{
ArgumentNullException.ThrowIfNull(stackIds);
if (stackIds.Length == 0)
if (stackIds is [])
throw new ArgumentOutOfRangeException(nameof(stackIds));

return RemoveAllAsync(q => q.Stack(stackIds));
return RemoveAllIgnoringMissingEventIndexesAsync(q => q.Stack(stackIds));
}

public Task<long> RemoveAllByProjectIdsAsync(string[] projectIds)
{
ArgumentNullException.ThrowIfNull(projectIds);
if (projectIds is [])
throw new ArgumentOutOfRangeException(nameof(projectIds));

return RemoveAllIgnoringMissingEventIndexesAsync(q => q.Project(projectIds));
}

public Task<long> RemoveAllByOrganizationIdsAsync(string[] organizationIds)
{
ArgumentNullException.ThrowIfNull(organizationIds);
if (organizationIds is [])
throw new ArgumentOutOfRangeException(nameof(organizationIds));

return RemoveAllIgnoringMissingEventIndexesAsync(q => q.Organization(organizationIds));
}

private async Task<long> RemoveAllIgnoringMissingEventIndexesAsync(
RepositoryQueryDescriptor<PersistentEvent> query, CommandOptionsDescriptor<PersistentEvent>? options = null)
{
try
{
return await RemoveAllAsync(query, options);
}
catch (RepositoryException ex) when (IsIndexNotFound(ex.InnerException as TransportException))
{
return 0;
}
catch (TransportException ex) when (IsIndexNotFound(ex))
{
return 0;
}
}

private static bool IsIndexNotFound(TransportException? ex)
{
if (ex?.ApiCallDetails?.HttpStatusCode != 404)
return false;

return ex.ApiCallDetails.ProductError is ElasticsearchServerError serverError
? IsIndexNotFound(serverError)
: ex.DebugInformation.Contains("index_not_found_exception", StringComparison.Ordinal);
}

private static bool IsIndexNotFound(ElasticsearchServerError serverError)
{
if (serverError.Status != 404 || serverError.Error is null)
return false;

return String.Equals(serverError.Error.Type, "index_not_found_exception", StringComparison.Ordinal)
|| serverError.Error.RootCause?.Any(IsIndexNotFound) == true;
}

private static bool IsIndexNotFound(Elastic.Transport.Products.Elasticsearch.ErrorCause? cause)
{
return cause is not null
&& (String.Equals(cause.Type, "index_not_found_exception", StringComparison.Ordinal)
|| cause.CausedBy is not null && IsIndexNotFound(cause.CausedBy));
}

/// <summary>
/// Reassigns all events from the source stacks to the target stack using a parameterized
/// Painless script (no string interpolation) to prevent script injection.
/// </summary>
public Task<long> ReassignStackAsync(IEnumerable<string> sourceStackIds, string targetStackId)
{
ArgumentNullException.ThrowIfNull(sourceStackIds);
ArgumentException.ThrowIfNullOrEmpty(targetStackId);

// Materialize to avoid multiple enumeration and guard against empty; an empty
// .Stack() filter would match ALL events and reassign them to the target stack.
var sourceIds = sourceStackIds.ToList();
if (sourceIds.Count == 0)
return Task.FromResult(0L);

return PatchAllAsync(
q => q.Stack(sourceIds),
new ScriptPatch("ctx._source.stack_id = params.targetStackId")
{
Params = new Dictionary<string, object> { ["targetStackId"] = targetStackId }
});
}

public Task<IReadOnlyCollection<string>> GetDistinctStackIdsAsync(int batchSize, CompositeKeyResult? afterKey = null)
{
return GetDistinctFieldValuesAsync("stack_id", e => e.StackId, batchSize, afterKey);
}

public Task<IReadOnlyCollection<string>> GetDistinctProjectIdsAsync(int batchSize, CompositeKeyResult? afterKey = null)
{
return GetDistinctFieldValuesAsync("project_id", e => e.ProjectId, batchSize, afterKey);
}

public Task<IReadOnlyCollection<string>> GetDistinctOrganizationIdsAsync(int batchSize, CompositeKeyResult? afterKey = null)
{
return GetDistinctFieldValuesAsync("organization_id", e => e.OrganizationId, batchSize, afterKey);
}

/// <summary>
/// Uses a composite aggregation to paginate through all distinct values of a field.
/// Composite aggregations are preferred over terms aggregations for high-cardinality fields
/// because terms aggregations can silently miss values when the unique count exceeds the
/// configured size parameter. Composite aggregations guarantee correct iteration via an
/// after_key cursor, at the cost of requiring sequential page fetches.
/// </summary>
private async Task<IReadOnlyCollection<string>> GetDistinctFieldValuesAsync(
string fieldName, Expression<Func<PersistentEvent, object>> fieldExpression, int batchSize, CompositeKeyResult? afterKey)
{
var afterKeyValues = afterKey?.AfterKey;
string aggregationName = $"composite_{fieldName}";
var sources = new List<KeyValuePair<string, CompositeAggregationSource>>
{
new(fieldName, new CompositeAggregationSource
{
Terms = new CompositeTermsAggregation { Field = fieldExpression }
})
};

var search = await _configuration.Client.SearchAsync<PersistentEvent>(s =>
{
s.Size(0)
.AddAggregation(aggregationName, a => a.Composite(c =>
{
c.Size(batchSize)
.Sources(sources);

if (afterKeyValues is { Count: > 0 })
c.After(afterKeyValues);
}));
});

var composite = search.Aggregations?.GetComposite(aggregationName);

// Always clear the cursor first; repopulate only when a next page exists.
// This ensures callers that check afterKey.AfterKey.Count > 0 correctly
// detect end-of-pagination without requiring a final empty-result fetch.
if (afterKey is not null)
{
afterKey.AfterKey.Clear();
if (composite?.AfterKey is not null)
{
foreach (var kvp in composite.AfterKey)
afterKey.AfterKey[kvp.Key] = kvp.Value;
}
}

if (composite?.Buckets is not { Count: > 0 })
return [];

return composite.Buckets.Select(b => b.Key[fieldName].ToString()!).ToArray();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Exceptionless.Core.Models;
using Elastic.Clients.Elasticsearch;
using Exceptionless.Core.Models;
using Exceptionless.Core.Repositories.Queries;
using Foundatio.Repositories;
using Foundatio.Repositories.Models;
Expand All @@ -13,6 +14,17 @@ public interface IEventRepository : IRepositoryOwnedByOrganizationAndProject<Per
Task<bool> UpdateSessionStartLastActivityAsync(string id, DateTime lastActivityUtc, bool isSessionEnd = false, bool hasError = false, bool sendNotifications = true);
Task<long> RemoveAllAsync(string organizationId, string? clientIpAddress, DateTime? utcStart, DateTime? utcEnd, CommandOptionsDescriptor<PersistentEvent>? options = null);
Task<long> RemoveAllByStackIdsAsync(string[] stackIds);
Task<long> RemoveAllByProjectIdsAsync(string[] projectIds);
Task<long> RemoveAllByOrganizationIdsAsync(string[] organizationIds);
Task<long> ReassignStackAsync(IEnumerable<string> sourceStackIds, string targetStackId);
Task<IReadOnlyCollection<string>> GetDistinctStackIdsAsync(int batchSize, CompositeKeyResult? afterKey = null);
Task<IReadOnlyCollection<string>> GetDistinctProjectIdsAsync(int batchSize, CompositeKeyResult? afterKey = null);
Task<IReadOnlyCollection<string>> GetDistinctOrganizationIdsAsync(int batchSize, CompositeKeyResult? afterKey = null);
}

public record CompositeKeyResult
{
public Dictionary<Field, FieldValue> AfterKey { get; init; } = [];
}

public static class EventRepositoryExtensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ public interface IStackRepository : IRepositoryOwnedByOrganizationAndProject<Sta
Task<FindResults<Stack>> GetStacksForCleanupAsync(string organizationId, DateTime cutoff);
Task<FindResults<Stack>> GetSoftDeleted();
Task<long> SoftDeleteByProjectIdAsync(string organizationId, string projectId);
Task<IReadOnlyCollection<string>> GetDuplicateSignaturesAsync(int maxResults = 10000);
}
16 changes: 16 additions & 0 deletions src/Exceptionless.Core/Repositories/StackRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,22 @@ public Task<long> SoftDeleteByProjectIdAsync(string organizationId, string proje
);
}

public async Task<IReadOnlyCollection<string>> GetDuplicateSignaturesAsync(int maxResults = 10000)
{
// ImmediateConsistency forces a segment refresh before the aggregation so that
// any stacks soft-deleted in a previous batch are excluded here. Cost: one refresh
// per batch (not per item), equivalent to the original explicit index refresh.
var result = await CountAsync(
q => q.AggregationsExpression($"terms:(duplicate_signature~{maxResults} @min:2)"),
o => o.ImmediateConsistency());

var buckets = result.Aggregations.Terms("terms_duplicate_signature")?.Buckets;
if (buckets is not { Count: > 0 })
return [];

return buckets.Select(b => b.Key).ToArray();
}

protected override async Task AddDocumentsToCacheAsync(ICollection<FindHit<Stack>> findHits, ICommandOptions options, bool isDirtyRead)
{
await base.AddDocumentsToCacheAsync(findHits, options, isDirtyRead);
Expand Down
Loading