diff --git a/src/async.go b/src/async.go index 7d4871f..73ece7b 100644 --- a/src/async.go +++ b/src/async.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -70,8 +69,8 @@ func newAsyncEventProcessor(cfg asyncProcessorConfig, logger *zap.Logger) *async logger: logger, } - asyncWorkerCountGauge.Set(float64(cfg.WorkerCount)) - asyncQueueCapacityGauge.Set(float64(cfg.QueueSize)) + defaultMetricRecorder.SetAsyncWorkerCount(cfg.WorkerCount) + defaultMetricRecorder.SetAsyncQueueCapacity(cfg.QueueSize) return processor } @@ -99,11 +98,11 @@ func (p *asyncEventProcessor) Enqueue(ctx context.Context, eventType string, bod select { case p.queue <- event: - asyncQueueDepthGauge.Set(float64(len(p.queue))) + defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue)) return nil default: - asyncQueueDroppedCounter.WithLabelValues(eventType).Inc() - asyncQueueDepthGauge.Set(float64(len(p.queue))) + defaultMetricRecorder.RecordAsyncQueueDropped(eventType) + defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue)) return fmt.Errorf("event queue is full") } } @@ -112,19 +111,19 @@ func (p *asyncEventProcessor) runWorker(workerID int) { defer p.wg.Done() for event := range p.queue { - asyncQueueDepthGauge.Set(float64(len(p.queue))) + defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue)) start := time.Now() processor, ok := p.processFn[event.eventType] if !ok { - asyncUnsupportedEventsCounter.WithLabelValues(event.eventType).Inc() + defaultMetricRecorder.RecordAsyncUnsupportedEvent(event.eventType) continue } func() { defer func() { if recovered := recover(); recovered != nil { - asyncProcessingFailuresCounter.WithLabelValues(event.eventType).Inc() + defaultMetricRecorder.RecordAsyncProcessingFailure(event.eventType) p.logger.Error("Recovered from async event processor panic", zap.Int("workerID", workerID), zap.String("eventType", event.eventType), @@ -134,8 +133,8 @@ func (p *asyncEventProcessor) runWorker(workerID int) { }() processor(event.ctx, event.body) - asyncProcessedEventsCounter.WithLabelValues(event.eventType).Inc() - asyncProcessingDurationHistogram.With(prometheus.Labels{"event_type": event.eventType}).Observe(time.Since(start).Seconds()) + defaultMetricRecorder.RecordAsyncProcessedEvent(event.eventType) + defaultMetricRecorder.ObserveAsyncProcessingDuration(event.eventType, time.Since(start).Seconds()) }() } } diff --git a/src/github.go b/src/github.go index 872ddcf..7b1c5b2 100644 --- a/src/github.go +++ b/src/github.go @@ -184,7 +184,7 @@ func updateTrackedRunMetrics( details runMetricDetails, store runStoreMethods, entityName string, - metrics runMetricSet, + metricKind runMetricKind, ) { var storeAdapter runTransitionStore if stateStore != nil { @@ -193,7 +193,7 @@ func updateTrackedRunMetrics( processor := &runTransitionProcessor{ store: storeAdapter, - recorder: prometheusRunTransitionRecorder{metrics: metrics}, + recorder: metricRunTransitionRecorder{kind: metricKind, recorder: defaultMetricRecorder}, logger: logger, entityName: entityName, } @@ -243,14 +243,8 @@ func updateWorkflowMetrics(ctx context.Context, body []byte) { endedAt: payload.Workflow.UpdatedAt, }, workflowRunStoreMethods(), - "workflow_run", - runMetricSet{ - statusCounter: workflowStatusCounter, - queuedGauge: workflowQueuedGauge, - inProgressGauge: workflowInProgressGauge, - completedGauge: workflowCompletedGauge, - durationHistogram: workflowDurationHistogram, - }, + githubEventWorkflowRun, + runMetricKindWorkflow, ) } @@ -275,14 +269,8 @@ func updateJobMetrics(ctx context.Context, body []byte) { endedAt: payload.Job.CompletedAt, }, workflowJobStoreMethods(), - "workflow_job", - runMetricSet{ - statusCounter: jobStatusCounter, - queuedGauge: jobQueuedGauge, - inProgressGauge: jobInProgressGauge, - completedGauge: jobCompletedGauge, - durationHistogram: jobDurationHistogram, - }, + githubEventWorkflowJob, + runMetricKindJob, ) } @@ -295,7 +283,7 @@ func updateCommitMetrics(body []byte) { } for range payload.Commits { - commitPushedCounter.WithLabelValues(payload.Repository.FullName).Inc() + defaultMetricRecorder.RecordCommitPushed(payload.Repository.FullName) } } @@ -307,9 +295,9 @@ func updatePullRequestMetrics(body []byte) { return } - pullRequestCounter.WithLabelValues( + defaultMetricRecorder.RecordPullRequest( payload.Repository.FullName, payload.PullRequest.Base.Ref, payload.Action, - ).Inc() + ) } diff --git a/src/metric_recorder.go b/src/metric_recorder.go new file mode 100644 index 0000000..5ee51bf --- /dev/null +++ b/src/metric_recorder.go @@ -0,0 +1,130 @@ +package main + +type runMetricKind string + +const ( + runMetricKindWorkflow runMetricKind = "workflow" + runMetricKindJob runMetricKind = "job" +) + +var defaultMetricRecorder = prometheusMetricRecorder{} + +type prometheusMetricRecorder struct{} + +func (prometheusMetricRecorder) RecordDuplicateDelivery(eventType string) { + duplicateDeliveriesSeenCounter.WithLabelValues(eventType).Inc() + duplicateDeliveriesDroppedCounter.WithLabelValues(eventType).Inc() +} + +func (prometheusMetricRecorder) RecordCommitPushed(repository string) { + commitPushedCounter.WithLabelValues(repository).Inc() +} + +func (prometheusMetricRecorder) RecordPullRequest(repository, baseBranch, status string) { + pullRequestCounter.WithLabelValues(repository, baseBranch, status).Inc() +} + +func (prometheusMetricRecorder) SetAsyncWorkerCount(workers int) { + asyncWorkerCountGauge.Set(float64(workers)) +} + +func (prometheusMetricRecorder) SetAsyncQueueCapacity(capacity int) { + asyncQueueCapacityGauge.Set(float64(capacity)) +} + +func (prometheusMetricRecorder) SetAsyncQueueDepth(depth int) { + asyncQueueDepthGauge.Set(float64(depth)) +} + +func (prometheusMetricRecorder) RecordAsyncQueueDropped(eventType string) { + asyncQueueDroppedCounter.WithLabelValues(eventType).Inc() +} + +func (prometheusMetricRecorder) RecordAsyncUnsupportedEvent(eventType string) { + asyncUnsupportedEventsCounter.WithLabelValues(eventType).Inc() +} + +func (prometheusMetricRecorder) RecordAsyncProcessingFailure(eventType string) { + asyncProcessingFailuresCounter.WithLabelValues(eventType).Inc() +} + +func (prometheusMetricRecorder) RecordAsyncProcessedEvent(eventType string) { + asyncProcessedEventsCounter.WithLabelValues(eventType).Inc() +} + +func (prometheusMetricRecorder) ObserveAsyncProcessingDuration(eventType string, durationSeconds float64) { + asyncProcessingDurationHistogram.WithLabelValues(eventType).Observe(durationSeconds) +} + +func (prometheusMetricRecorder) RecordRunStatus(kind runMetricKind, state RunState) { + switch kind { + case runMetricKindWorkflow: + workflowStatusCounter.WithLabelValues( + state.Repository, + state.Branch, + state.Name, + state.Status, + state.Conclusion, + ).Inc() + case runMetricKindJob: + jobStatusCounter.WithLabelValues( + state.Repository, + state.Branch, + state.Name, + state.Status, + state.Conclusion, + ).Inc() + } +} + +func (prometheusMetricRecorder) AddRunGauge(kind runMetricKind, state RunState, delta float64) { + switch kind { + case runMetricKindWorkflow: + addWorkflowRunGauge(state, delta) + case runMetricKindJob: + addWorkflowJobGauge(state, delta) + } +} + +func (prometheusMetricRecorder) ObserveRunDuration(kind runMetricKind, state RunState, durationSeconds float64) { + switch kind { + case runMetricKindWorkflow: + workflowDurationHistogram.WithLabelValues( + state.Repository, + state.Branch, + state.Name, + state.Status, + state.Conclusion, + ).Observe(durationSeconds) + case runMetricKindJob: + jobDurationHistogram.WithLabelValues( + state.Repository, + state.Branch, + state.Name, + state.Status, + state.Conclusion, + ).Observe(durationSeconds) + } +} + +func addWorkflowRunGauge(state RunState, delta float64) { + switch normalizeStatus(state.Status) { + case statusQueued: + workflowQueuedGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) + case statusInProgress: + workflowInProgressGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) + case statusCompleted: + workflowCompletedGauge.WithLabelValues(state.Repository, state.Branch, state.Conclusion, state.Name).Add(delta) + } +} + +func addWorkflowJobGauge(state RunState, delta float64) { + switch normalizeStatus(state.Status) { + case statusQueued: + jobQueuedGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) + case statusInProgress: + jobInProgressGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) + case statusCompleted: + jobCompletedGauge.WithLabelValues(state.Repository, state.Branch, state.Conclusion, state.Name).Add(delta) + } +} diff --git a/src/metric_recorder_test.go b/src/metric_recorder_test.go new file mode 100644 index 0000000..bd370ca --- /dev/null +++ b/src/metric_recorder_test.go @@ -0,0 +1,176 @@ +//go:build !integration + +package main + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus/testutil" +) + +func TestMetricRecorderPreservesDuplicateDeliveryCardinality(t *testing.T) { + duplicateDeliveriesSeenCounter.Reset() + duplicateDeliveriesDroppedCounter.Reset() + + recorder := prometheusMetricRecorder{} + recorder.RecordDuplicateDelivery(githubEventWorkflowRun) + + if err := testutil.CollectAndCompare(duplicateDeliveriesSeenCounter, strings.NewReader(` + # HELP promgithub_duplicate_deliveries_seen_total Total number of duplicate GitHub webhook deliveries observed + # TYPE promgithub_duplicate_deliveries_seen_total counter + promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1 + `)); err != nil { + t.Fatalf("unexpected duplicate seen metric: %v", err) + } + + if err := testutil.CollectAndCompare(duplicateDeliveriesDroppedCounter, strings.NewReader(` + # HELP promgithub_duplicate_deliveries_dropped_total Total number of duplicate GitHub webhook deliveries dropped + # TYPE promgithub_duplicate_deliveries_dropped_total counter + promgithub_duplicate_deliveries_dropped_total{event_type="workflow_run"} 1 + `)); err != nil { + t.Fatalf("unexpected duplicate dropped metric: %v", err) + } +} + +func TestMetricRecorderPreservesRepositoryEventCardinality(t *testing.T) { + commitPushedCounter.Reset() + pullRequestCounter.Reset() + + recorder := prometheusMetricRecorder{} + recorder.RecordCommitPushed("user/repo") + recorder.RecordPullRequest("user/repo", "main", "opened") + + if err := testutil.CollectAndCompare(commitPushedCounter, strings.NewReader(` + # HELP promgithub_commit_pushed Total number of commits pushed + # TYPE promgithub_commit_pushed counter + promgithub_commit_pushed{repository="user/repo"} 1 + `)); err != nil { + t.Fatalf("unexpected commit metric: %v", err) + } + + if err := testutil.CollectAndCompare(pullRequestCounter, strings.NewReader(` + # HELP promgithub_pull_request Total number of pull requests + # TYPE promgithub_pull_request counter + promgithub_pull_request{base_branch="main",pull_request_status="opened",repository="user/repo"} 1 + `)); err != nil { + t.Fatalf("unexpected pull request metric: %v", err) + } +} + +func TestMetricRecorderPreservesRunMetricCardinality(t *testing.T) { + workflowStatusCounter.Reset() + workflowCompletedGauge.Reset() + workflowDurationHistogram.Reset() + + recorder := prometheusMetricRecorder{} + state := RunState{ + Repository: runTransitionTestRepository, + Branch: runTransitionTestBranch, + Name: runTransitionTestName, + Status: statusCompleted, + Conclusion: testConclusionSuccess, + StartedAt: runTransitionStartedAt, + EndedAt: runTransitionCompletedAt, + } + + recorder.RecordRunStatus(runMetricKindWorkflow, state) + recorder.AddRunGauge(runMetricKindWorkflow, state, 1) + recorder.ObserveRunDuration(runMetricKindWorkflow, state, 3600) + + if err := testutil.CollectAndCompare(workflowStatusCounter, strings.NewReader(` + # HELP promgithub_workflow_status Total number of workflow runs with status + # TYPE promgithub_workflow_status counter + promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1 + `)); err != nil { + t.Fatalf("unexpected workflow status metric: %v", err) + } + + if got := testutil.ToFloat64(workflowCompletedGauge.WithLabelValues(runTransitionTestRepository, runTransitionTestBranch, testConclusionSuccess, runTransitionTestName)); got != 1 { + t.Fatalf("expected completed gauge to be 1, got %v", got) + } + + if err := testutil.CollectAndCompare(workflowDurationHistogram, strings.NewReader(` + # HELP promgithub_workflow_duration Duration of workflow runs + # TYPE promgithub_workflow_duration histogram + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="0.005"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="0.01"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="0.025"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="0.05"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="0.1"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="0.25"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="0.5"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="1"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="2.5"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="5"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="10"} 0 + promgithub_workflow_duration_bucket{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed",le="+Inf"} 1 + promgithub_workflow_duration_sum{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 3600 + promgithub_workflow_duration_count{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1 + `)); err != nil { + t.Fatalf("unexpected workflow duration metric: %v", err) + } +} + +func TestMetricRecorderPreservesAsyncMetricCardinality(t *testing.T) { + asyncQueueDepthGauge.Set(0) + asyncQueueCapacityGauge.Set(0) + asyncWorkerCountGauge.Set(0) + asyncQueueDroppedCounter.Reset() + asyncUnsupportedEventsCounter.Reset() + asyncProcessingFailuresCounter.Reset() + asyncProcessedEventsCounter.Reset() + asyncProcessingDurationHistogram.Reset() + + recorder := prometheusMetricRecorder{} + recorder.SetAsyncQueueDepth(2) + recorder.SetAsyncQueueCapacity(8) + recorder.SetAsyncWorkerCount(4) + recorder.RecordAsyncQueueDropped(githubEventWorkflowRun) + recorder.RecordAsyncUnsupportedEvent("unknown_event") + recorder.RecordAsyncProcessingFailure(githubEventWorkflowRun) + recorder.RecordAsyncProcessedEvent(githubEventWorkflowRun) + recorder.ObserveAsyncProcessingDuration(githubEventWorkflowRun, 1.5) + + if got := testutil.ToFloat64(asyncQueueDepthGauge); got != 2 { + t.Fatalf("expected queue depth 2, got %v", got) + } + if got := testutil.ToFloat64(asyncQueueCapacityGauge); got != 8 { + t.Fatalf("expected queue capacity 8, got %v", got) + } + if got := testutil.ToFloat64(asyncWorkerCountGauge); got != 4 { + t.Fatalf("expected worker count 4, got %v", got) + } + if got := testutil.ToFloat64(asyncQueueDroppedCounter.WithLabelValues(githubEventWorkflowRun)); got != 1 { + t.Fatalf("expected queue dropped counter 1, got %v", got) + } + if got := testutil.ToFloat64(asyncUnsupportedEventsCounter.WithLabelValues("unknown_event")); got != 1 { + t.Fatalf("expected unsupported counter 1, got %v", got) + } + if got := testutil.ToFloat64(asyncProcessingFailuresCounter.WithLabelValues(githubEventWorkflowRun)); got != 1 { + t.Fatalf("expected processing failures counter 1, got %v", got) + } + if got := testutil.ToFloat64(asyncProcessedEventsCounter.WithLabelValues(githubEventWorkflowRun)); got != 1 { + t.Fatalf("expected processed counter 1, got %v", got) + } + if err := testutil.CollectAndCompare(asyncProcessingDurationHistogram, strings.NewReader(` + # HELP promgithub_event_processing_duration_seconds Duration of async webhook event processing + # TYPE promgithub_event_processing_duration_seconds histogram + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="0.005"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="0.01"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="0.025"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="0.05"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="0.1"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="0.25"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="0.5"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="1"} 0 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="2.5"} 1 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="5"} 1 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="10"} 1 + promgithub_event_processing_duration_seconds_bucket{event_type="workflow_run",le="+Inf"} 1 + promgithub_event_processing_duration_seconds_sum{event_type="workflow_run"} 1.5 + promgithub_event_processing_duration_seconds_count{event_type="workflow_run"} 1 + `)); err != nil { + t.Fatalf("unexpected async duration metric: %v", err) + } +} diff --git a/src/run_transition.go b/src/run_transition.go index 6f14820..0dcd22d 100644 --- a/src/run_transition.go +++ b/src/run_transition.go @@ -3,7 +3,6 @@ package main import ( "context" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -17,14 +16,6 @@ type runMetricDetails struct { endedAt string } -type runMetricSet struct { - statusCounter *prometheus.CounterVec - queuedGauge *prometheus.GaugeVec - inProgressGauge *prometheus.GaugeVec - completedGauge *prometheus.GaugeVec - durationHistogram *prometheus.HistogramVec -} - type runStoreMethods struct { get func(context.Context, int) (RunState, bool, error) update func(context.Context, int, RunState) error @@ -41,6 +32,12 @@ type runTransitionRecorder interface { ObserveDuration(RunState, float64) } +type runMetricRecorder interface { + RecordRunStatus(runMetricKind, RunState) + AddRunGauge(runMetricKind, RunState, float64) + ObserveRunDuration(runMetricKind, RunState, float64) +} + type runTransitionResult struct { Applied bool Skipped bool @@ -151,37 +148,19 @@ func (a runStoreAdapter) UpdateRunState(ctx context.Context, id int, state RunSt return a.methods.update(ctx, id, state) } -type prometheusRunTransitionRecorder struct { - metrics runMetricSet +type metricRunTransitionRecorder struct { + kind runMetricKind + recorder runMetricRecorder } -func (r prometheusRunTransitionRecorder) RecordStatus(state RunState) { - r.metrics.statusCounter.WithLabelValues( - state.Repository, - state.Branch, - state.Name, - state.Status, - state.Conclusion, - ).Inc() +func (r metricRunTransitionRecorder) RecordStatus(state RunState) { + r.recorder.RecordRunStatus(r.kind, state) } -func (r prometheusRunTransitionRecorder) AddGauge(state RunState, delta float64) { - switch normalizeStatus(state.Status) { - case statusQueued: - r.metrics.queuedGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) - case statusInProgress: - r.metrics.inProgressGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) - case statusCompleted: - r.metrics.completedGauge.WithLabelValues(state.Repository, state.Branch, state.Conclusion, state.Name).Add(delta) - } +func (r metricRunTransitionRecorder) AddGauge(state RunState, delta float64) { + r.recorder.AddRunGauge(r.kind, state, delta) } -func (r prometheusRunTransitionRecorder) ObserveDuration(state RunState, durationSeconds float64) { - r.metrics.durationHistogram.WithLabelValues( - state.Repository, - state.Branch, - state.Name, - state.Status, - state.Conclusion, - ).Observe(durationSeconds) +func (r metricRunTransitionRecorder) ObserveDuration(state RunState, durationSeconds float64) { + r.recorder.ObserveRunDuration(r.kind, state, durationSeconds) } diff --git a/src/webhook_ingestion.go b/src/webhook_ingestion.go index a3679b5..c17129b 100644 --- a/src/webhook_ingestion.go +++ b/src/webhook_ingestion.go @@ -67,7 +67,7 @@ func newDefaultWebhookIngestion() *webhookIngestion { deliveryStore: stateStore, localDeduper: deliveryDeduperCache, dispatcher: defaultWebhookEventDispatcher{}, - metrics: prometheusWebhookIngestionMetrics{}, + metrics: defaultMetricRecorder, now: time.Now, } if eventProcessor != nil { @@ -190,13 +190,6 @@ func (defaultWebhookEventDispatcher) Dispatch(ctx context.Context, eventType str return true } -type prometheusWebhookIngestionMetrics struct{} - -func (prometheusWebhookIngestionMetrics) RecordDuplicateDelivery(eventType string) { - duplicateDeliveriesSeenCounter.WithLabelValues(eventType).Inc() - duplicateDeliveriesDroppedCounter.WithLabelValues(eventType).Inc() -} - func webhookHTTPHandler(acceptor webhookAcceptor, logger *zap.Logger) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body)