diff --git a/internal/collector/postgres_test.go b/internal/collector/postgres_test.go index e0e76c1896..3c6d44a63e 100644 --- a/internal/collector/postgres_test.go +++ b/internal/collector/postgres_test.go @@ -12,6 +12,7 @@ import ( "github.com/crunchydata/postgres-operator/internal/feature" "github.com/crunchydata/postgres-operator/internal/postgres" + "github.com/crunchydata/postgres-operator/internal/testing/cmp" "github.com/crunchydata/postgres-operator/internal/testing/require" "github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1" ) @@ -811,6 +812,85 @@ service: - filelog/postgres_jsonlog `) }) + + t.Run("LogsBatchesConfigured", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(context.Background(), gate) + + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.PostgresVersion = 99 + require.UnmarshalInto(t, &cluster.Spec, `{ + instrumentation: { + logs: { + batches: { + maxDelay: 5min 12sec, + maxRecords: 123, + minRecords: 45, + }, + }, + }, + }`) + + config := NewConfig(cluster.Spec.Instrumentation) + params := postgres.NewParameters() + + EnablePostgresLogging(ctx, cluster, params.Default, config) + + result, err := config.ToYAML() + assert.NilError(t, err) + assert.Assert(t, cmp.Contains(result, ` + batch/logs: + send_batch_max_size: 123 + send_batch_size: 45 + timeout: 5m12s +`)) + }) + + t.Run("DetectorsWithAttributes", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(context.Background(), gate) + + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.PostgresVersion = 99 + cluster.Spec.Instrumentation = testInstrumentationSpec() + cluster.Spec.Instrumentation.Config.Detectors = []v1beta1.OpenTelemetryResourceDetector{ + {Name: "gcp"}, + {Name: "aks", Attributes: map[string]bool{ + "k8s.cluster.name": true, + }}, + } + + config := NewConfig(cluster.Spec.Instrumentation) + params := postgres.NewParameters() + + EnablePostgresLogging(ctx, cluster, params.Default, config) + + result, err := config.ToYAML() + assert.NilError(t, err) + assert.Assert(t, cmp.Contains(result, ` + resourcedetection: + aks: + resource_attributes: + k8s.cluster.name: + enabled: true + detectors: + - gcp + - aks + override: false + timeout: 30s +`)) + // Verify resourcedetection is in the pipeline + assert.Assert(t, cmp.Contains(result, ` + - resourcedetection + - batch/logs +`)) + }) } func TestEnablePostgresMetrics(t *testing.T) { diff --git a/internal/crd/validation/postgrescluster_test.go b/internal/crd/validation/postgrescluster_test.go index 01749fc46a..6e62607b69 100644 --- a/internal/crd/validation/postgrescluster_test.go +++ b/internal/crd/validation/postgrescluster_test.go @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" + "github.com/crunchydata/postgres-operator/internal/controller/runtime" "github.com/crunchydata/postgres-operator/internal/testing/cmp" "github.com/crunchydata/postgres-operator/internal/testing/require" v1 "github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1" @@ -292,3 +293,207 @@ func TestAdditionalVolumes(t *testing.T) { assert.NilError(t, dryrun.Create(ctx, tmp.DeepCopy())) }) } + +func TestPostgresClusterInstrumentation(t *testing.T) { + ctx := context.Background() + cc := require.Kubernetes(t) + t.Parallel() + + namespace := require.Namespace(t, cc) + base := v1beta1.NewPostgresCluster() + base.Namespace = namespace.Name + base.Name = "postgres-instrumentation" + require.UnmarshalInto(t, &base.Spec, `{ + postgresVersion: 16, + instances: [{ + dataVolumeClaimSpec: { + accessModes: [ReadWriteOnce], + resources: { requests: { storage: 1Mi } }, + }, + }], + }`) + + assert.NilError(t, cc.Create(ctx, base.DeepCopy(), client.DryRunAll), + "expected this base to be valid") + + t.Run("LogsBatches", func(t *testing.T) { + t.Run("Disable", func(t *testing.T) { + for _, tt := range []struct { + batches string + valid bool + }{ + {valid: true, batches: ``}, // both null + {valid: true, batches: `minRecords: 1`}, // one null + {valid: true, batches: `maxDelay: 1s`}, // other null + + {valid: false, batches: `minRecords: 0`}, // one zero + {valid: false, batches: `maxDelay: 0m`}, // other zero + + {valid: true, batches: `minRecords: 0, maxDelay: 0m`}, // both zero + {valid: true, batches: `minRecords: 1, maxDelay: 1s`}, // both non-zero + } { + cluster := base.DeepCopy() + require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{ + logs: { batches: { `+tt.batches+` } } + }`) + + err := cc.Create(ctx, cluster, client.DryRunAll) + if tt.valid { + assert.NilError(t, err) + } else { + assert.Assert(t, apierrors.IsInvalid(err)) + assert.ErrorContains(t, err, "disable") + assert.ErrorContains(t, err, "minRecords") + assert.ErrorContains(t, err, "maxDelay") + + details := require.StatusErrorDetails(t, err) + assert.Assert(t, cmp.Len(details.Causes, 1)) + + for _, cause := range details.Causes { + assert.Equal(t, cause.Field, "spec.instrumentation.logs.batches") + assert.Assert(t, cmp.Contains(cause.Message, "disable batching")) + assert.Assert(t, cmp.Contains(cause.Message, "minRecords and maxDelay must be zero")) + } + } + } + }) + + t.Run("MaxDelay", func(t *testing.T) { + cluster := base.DeepCopy() + require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{ + logs: { + batches: { maxDelay: 100min }, + }, + }`) + + err := cc.Create(ctx, cluster, client.DryRunAll) + assert.Assert(t, apierrors.IsInvalid(err)) + assert.ErrorContains(t, err, "maxDelay") + assert.ErrorContains(t, err, "5m") + + details := require.StatusErrorDetails(t, err) + assert.Assert(t, cmp.Len(details.Causes, 1)) + + for _, cause := range details.Causes { + assert.Equal(t, cause.Field, "spec.instrumentation.logs.batches.maxDelay") + } + }) + + t.Run("MinMaxRecords", func(t *testing.T) { + cluster := base.DeepCopy() + require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{ + logs: { + batches: { minRecords: -11, maxRecords: 0 }, + }, + }`) + + err := cc.Create(ctx, cluster, client.DryRunAll) + assert.Assert(t, apierrors.IsInvalid(err)) + assert.ErrorContains(t, err, "minRecords") + assert.ErrorContains(t, err, "greater than or equal to 0") + assert.ErrorContains(t, err, "maxRecords") + assert.ErrorContains(t, err, "greater than or equal to 1") + + details := require.StatusErrorDetails(t, err) + assert.Assert(t, cmp.Len(details.Causes, 2)) + + for _, cause := range details.Causes { + switch cause.Field { + case "spec.instrumentation.logs.batches.maxRecords": + assert.Assert(t, cmp.Contains(cause.Message, "0")) + assert.Assert(t, cmp.Contains(cause.Message, "greater than or equal to 1")) + + case "spec.instrumentation.logs.batches.minRecords": + assert.Assert(t, cmp.Contains(cause.Message, "-11")) + assert.Assert(t, cmp.Contains(cause.Message, "greater than or equal to 0")) + } + } + + t.Run("Reversed", func(t *testing.T) { + for _, batches := range []string{ + `maxRecords: 99`, // default minRecords + `minRecords: 99, maxRecords: 21`, // + } { + cluster := base.DeepCopy() + require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{ + logs: { + batches: { `+batches+` }, + }, + }`) + + err := cc.Create(ctx, cluster, client.DryRunAll) + assert.Assert(t, apierrors.IsInvalid(err)) + assert.ErrorContains(t, err, "minRecords") + assert.ErrorContains(t, err, "maxRecords") + + details := require.StatusErrorDetails(t, err) + assert.Assert(t, cmp.Len(details.Causes, 1)) + + for _, cause := range details.Causes { + assert.Equal(t, cause.Field, "spec.instrumentation.logs.batches") + assert.Assert(t, cmp.Contains(cause.Message, "minRecords cannot be larger than maxRecords")) + } + } + }) + }) + }) + + t.Run("LogsRetentionPeriod", func(t *testing.T) { + cluster := base.DeepCopy() + require.UnmarshalInto(t, &cluster.Spec, `{ + instrumentation: { + logs: { retentionPeriod: 5m }, + }, + }`) + + err := cc.Create(ctx, cluster, client.DryRunAll) + assert.Assert(t, apierrors.IsInvalid(err)) + assert.ErrorContains(t, err, "retentionPeriod") + assert.ErrorContains(t, err, "hour|day|week") + assert.ErrorContains(t, err, "one hour") + + details := require.StatusErrorDetails(t, err) + assert.Assert(t, cmp.Len(details.Causes, 2)) + + for _, cause := range details.Causes { + assert.Equal(t, cause.Field, "spec.instrumentation.logs.retentionPeriod") + } + + t.Run("Valid", func(t *testing.T) { + for _, tt := range []string{ + "28 weeks", + "90 DAY", + "1 hr", + "PT1D2H", + "1 week 2 days", + } { + u, err := runtime.ToUnstructuredObject(cluster) + assert.NilError(t, err) + assert.NilError(t, unstructured.SetNestedField(u.Object, + tt, "spec", "instrumentation", "logs", "retentionPeriod")) + + assert.NilError(t, cc.Create(ctx, u, client.DryRunAll), tt) + } + }) + + t.Run("Invalid", func(t *testing.T) { + for _, tt := range []string{ + // Amount too small + "0 days", + "0", + + // Text too long + "2 weeks 3 days 4 hours", + } { + u, err := runtime.ToUnstructuredObject(cluster) + assert.NilError(t, err) + assert.NilError(t, unstructured.SetNestedField(u.Object, + tt, "spec", "instrumentation", "logs", "retentionPeriod")) + + err = cc.Create(ctx, u, client.DryRunAll) + assert.Assert(t, apierrors.IsInvalid(err), tt) + assert.ErrorContains(t, err, "retentionPeriod") + } + }) + }) +}