From c9ac30f54b5a25bab5824789e3bb1bc6bc5a0c2d Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 15 Jun 2026 12:10:58 +0000 Subject: [PATCH] Track compressed resource state sizes in deploy telemetry (direct engine) Deploy telemetry already reports per-resource-type raw state-size statistics (state_size_{max,mean,median}_bytes). The deployment metadata service stores that same per-resource state compressed, so this adds compressed-size counterparts to gauge how much resource state shrinks under compression rather than just the raw sizes: - state_compressed_size_max_bytes - state_compressed_size_mean_bytes - state_compressed_size_median_bytes The compressed length is computed per resource at state-export time (alongside the existing raw length) using the standard library's compress/flate -- a deliberately rough proxy for the server side (which uses zstd) that keeps the dependency/supply-chain surface small while still giving useful signal on compressibility. Since the largest resource states (~1 MB, ~20 ms to compress) dominate the cost, the per-resource compression is fanned out across workers, keeping multi-resource bundles cheap. Only the direct engine is measured, matching the existing raw-size behavior. Co-authored-by: Isaac --- .../deploy/out.resources_metadata.direct.txt | 10 +- bundle/direct/dstate/compress.go | 68 ++++++++++++ bundle/direct/dstate/compress_test.go | 105 ++++++++++++++++++ bundle/direct/dstate/state.go | 11 +- bundle/phases/resources_metadata.go | 24 ++-- bundle/phases/resources_metadata_test.go | 40 +++++-- .../statemgmt/resourcestate/resourcestate.go | 7 ++ libs/telemetry/protos/bundle_deploy.go | 10 ++ 8 files changed, 251 insertions(+), 24 deletions(-) create mode 100644 bundle/direct/dstate/compress.go create mode 100644 bundle/direct/dstate/compress_test.go diff --git a/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt b/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt index 551a48fa89b..905d2b34e59 100644 --- a/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt +++ b/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt @@ -6,14 +6,20 @@ "count": 3, "state_size_max_bytes": 256, "state_size_mean_bytes": 254, - "state_size_median_bytes": 254 + "state_size_median_bytes": 254, + "state_compressed_size_max_bytes": 203, + "state_compressed_size_mean_bytes": 202, + "state_compressed_size_median_bytes": 202 }, { "resource_type": "pipelines", "count": 2, "state_size_max_bytes": 205, "state_size_mean_bytes": 205, - "state_size_median_bytes": 205 + "state_size_median_bytes": 205, + "state_compressed_size_max_bytes": 168, + "state_compressed_size_mean_bytes": 167, + "state_compressed_size_median_bytes": 167 } ] } diff --git a/bundle/direct/dstate/compress.go b/bundle/direct/dstate/compress.go new file mode 100644 index 00000000000..74fe9752867 --- /dev/null +++ b/bundle/direct/dstate/compress.go @@ -0,0 +1,68 @@ +package dstate + +import ( + "bytes" + "compress/flate" +) + +// compressedStateSize returns the DEFLATE-compressed size in bytes of a +// resource's serialized state. It is a rough proxy, used purely for deploy +// telemetry, for what the state sizes look like on the server side (which +// compresses with zstd): we deliberately use the standard library's +// compress/flate rather than pull in a dedicated zstd dependency, keeping the +// supply chain small while still getting useful signal on compressibility. +// Returns 0 for empty state. +// +// This always terminates: DEFLATE is a single linear pass over a finite, +// in-memory buffer (no input loop can diverge — non-termination is a +// decompression concern, e.g. zip bombs, not compression), and the writer +// targets a bytes.Buffer that never blocks. Cost is O(len(state)) — a few +// milliseconds even for the largest (~1 MB) resource states — so a background +// goroutine is not warranted. +func compressedStateSize(state []byte) int { + if len(state) == 0 { + return 0 + } + var buf bytes.Buffer + w, err := flate.NewWriter(&buf, flate.DefaultCompression) + if err != nil { + return 0 + } + if _, err := w.Write(state); err != nil { + return 0 + } + if err := w.Close(); err != nil { + return 0 + } + return buf.Len() +} + +// compressStateSizes returns each resource's compressed state size, keyed by +// resource key. Compression is independent per resource and dominates the cost, +// so each resource is compressed in its own goroutine (the runtime schedules +// them across cores; no worker pool needed) and the result is sent on a +// buffered channel that the caller drains into the map. +// +// This always terminates: the channel is buffered to the resource count, so +// every goroutine sends exactly once and exits without blocking, and the drain +// loop reads exactly that many results — no goroutine leaks, no deadlock. +func compressStateSizes(data Database) map[string]int { + type result struct { + key string + size int + } + + ch := make(chan result, len(data.State)) + for key, entry := range data.State { + go func() { + ch <- result{key: key, size: compressedStateSize(entry.State)} + }() + } + + sizes := make(map[string]int, len(data.State)) + for range data.State { + r := <-ch + sizes[r.key] = r.size + } + return sizes +} diff --git a/bundle/direct/dstate/compress_test.go b/bundle/direct/dstate/compress_test.go new file mode 100644 index 00000000000..207ef6850df --- /dev/null +++ b/bundle/direct/dstate/compress_test.go @@ -0,0 +1,105 @@ +package dstate + +import ( + "bytes" + "encoding/json" + "fmt" + "math/rand/v2" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompressedStateSize(t *testing.T) { + // Empty state has no compressed size. + assert.Equal(t, 0, compressedStateSize(nil)) + assert.Equal(t, 0, compressedStateSize([]byte{})) + + // A highly compressible blob shrinks: positive size, smaller than raw. + blob := bytes.Repeat([]byte(`{"key":"value"}`), 1000) + got := compressedStateSize(blob) + assert.Positive(t, got) + assert.Less(t, got, len(blob)) +} + +// jsonState builds a JSON-ish resource-state blob of about `size` bytes: +// repeated keys with varied values, representative of real resource state +// (compressible, but not trivially so). `seed` varies the content so different +// resources don't compress identically. The exact byte count is approximate. +func jsonState(size, seed int) json.RawMessage { + r := rand.New(rand.NewPCG(uint64(seed), 0)) + b := make([]byte, 0, size+128) + b = append(b, '{') + for i := 0; len(b) < size; i++ { + if i > 0 { + b = append(b, ',') + } + b = append(b, fmt.Sprintf( + `"key_%d":{"node_type":"i3.xlarge","num_workers":%d,"path":"/Workspace/Users/u%d/resource_%d","tag":"%x"}`, + i, r.IntN(64), r.IntN(100000), i, r.Uint64(), + )...) + } + b = append(b, '}') + return json.RawMessage(b) +} + +// Representative results (go1.26.4, linux/amd64, Intel Xeon Platinum 8375C @ +// 2.90GHz; realistic varied-JSON state — absolute numbers are hardware- and +// data-dependent): +// +// BenchmarkCompressedStateSize/4KB 211 µs 19 MB/s +// BenchmarkCompressedStateSize/64KB 1.3 ms 49 MB/s +// BenchmarkCompressedStateSize/256KB 4.9 ms 54 MB/s +// BenchmarkCompressedStateSize/1024KB 19.0 ms 55 MB/s (largest per-resource state ~1 MB) +// BenchmarkExportStateFromData/200x8KB 22.2 ms 74 MB/s +// BenchmarkExportStateFromData/50x64KB 9.5 ms 345 MB/s +// BenchmarkExportStateFromData/8x1MB 26.0 ms 323 MB/s (~6x vs sequential via per-resource fan-out) +// BenchmarkExportStateFromData/1x1MB 19.4 ms 54 MB/s (single blob) + +// BenchmarkCompressedStateSize measures per-resource compression cost. The +// largest per-resource state files are around 1 MB, so 1024 KB is the top size. +func BenchmarkCompressedStateSize(b *testing.B) { + for _, kb := range []int{4, 64, 256, 1024} { + data := jsonState(kb<<10, 1) + b.Run(fmt.Sprintf("%dKB", kb), func(b *testing.B) { + b.SetBytes(int64(len(data))) + for b.Loop() { + _ = compressedStateSize(data) + } + }) + } +} + +// BenchmarkExportStateFromData measures the full export (including the +// per-resource compression fan-out) for a few representative bundle shapes, +// from many small resources to a few ~1 MB ones. +func BenchmarkExportStateFromData(b *testing.B) { + cases := []struct { + name string + count, size int + }{ + {"200x8KB", 200, 8 << 10}, + {"50x64KB", 50, 64 << 10}, + {"8x1MB", 8, 1 << 20}, + {"1x1MB", 1, 1 << 20}, + } + for _, c := range cases { + data := Database{State: make(map[string]ResourceEntry, c.count)} + var total int64 + for i := range c.count { + st := jsonState(c.size, i) + total += int64(len(st)) + data.State[fmt.Sprintf("resources.jobs.job_%d", i)] = ResourceEntry{ + ID: strconv.Itoa(i), + State: st, + } + } + b.Run(c.name, func(b *testing.B) { + b.SetBytes(total) + for b.Loop() { + _ = ExportStateFromData(data) + } + }) + } +} diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index 0bf50809e16..4d42acc66e6 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -449,6 +449,10 @@ func (db *DeploymentState) AssertOpenedForWrite() { // ExportStateFromData extracts resource IDs and ETags from a database snapshot. func ExportStateFromData(data Database) resourcestate.ExportedResourcesMap { + // Compressing each resource's state to measure its compressed size is the + // one non-trivial cost here, so compute those sizes in parallel up front. + compressedSizes := compressStateSizes(data) + result := make(resourcestate.ExportedResourcesMap) for key, entry := range data.State { var etag string @@ -469,9 +473,10 @@ func ExportStateFromData(data Database) resourcestate.ExportedResourcesMap { } result[key] = resourcestate.ResourceState{ - ID: entry.ID, - ETag: etag, - StateSizeBytes: len(entry.State), + ID: entry.ID, + ETag: etag, + StateSizeBytes: len(entry.State), + StateCompressedSizeBytes: compressedSizes[key], } } return result diff --git a/bundle/phases/resources_metadata.go b/bundle/phases/resources_metadata.go index ae45f6ad46a..2a8dcc83afe 100644 --- a/bundle/phases/resources_metadata.go +++ b/bundle/phases/resources_metadata.go @@ -21,7 +21,8 @@ const directEngine = "direct" // Only direct deploys are measured. b.Metrics.ResourceState is the direct // engine's finalized state, populated in deployCore from the WAL replay the // deploy already performs; each entry carries StateSizeBytes (len of the JSON -// blob stored in resources.json). So no marshalling, file read, or JSON parsing +// blob stored in resources.json) and StateCompressedSizeBytes (its compressed +// length, computed during export). So no marshalling, file read, or JSON parsing // happens here — sizes are read straight off the in-memory map. The whole-file // size comes from a single os.Stat (no parse). Returns nil for terraform // deploys (ResourceState is nil) and when no resources are in state. @@ -44,16 +45,18 @@ func collectResourcesMetadata(ctx context.Context, b *bundle.Bundle) *protos.Bun } // resourceMetadataFromState groups the deployment state by resource type and -// computes per-type count and max/mean/median state size. Sizes are sorted per -// type (needed for the median). +// computes per-type count and max/mean/median state size, both raw and +// compressed. Sizes are sorted per type (needed for the median). func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []protos.ResourceMetadata { sizesByType := make(map[string][]int64) + compressedByType := make(map[string][]int64) for key, rs := range state { t := config.GetResourceTypeFromKey(key) if t == "" { continue } sizesByType[t] = append(sizesByType[t], int64(rs.StateSizeBytes)) + compressedByType[t] = append(compressedByType[t], int64(rs.StateCompressedSizeBytes)) } types := make([]string, 0, len(sizesByType)) @@ -66,12 +69,17 @@ func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []proto for _, t := range types { sizes := sizesByType[t] slices.Sort(sizes) + compressed := compressedByType[t] + slices.Sort(compressed) resources = append(resources, protos.ResourceMetadata{ - ResourceType: t, - Count: int64(len(sizes)), - StateSizeMaxBytes: statMax(sizes), - StateSizeMeanBytes: statMean(sizes), - StateSizeMedianBytes: statMedian(sizes), + ResourceType: t, + Count: int64(len(sizes)), + StateSizeMaxBytes: statMax(sizes), + StateSizeMeanBytes: statMean(sizes), + StateSizeMedianBytes: statMedian(sizes), + StateCompressedSizeMaxBytes: statMax(compressed), + StateCompressedSizeMeanBytes: statMean(compressed), + StateCompressedSizeMedianBytes: statMedian(compressed), }) } return resources diff --git a/bundle/phases/resources_metadata_test.go b/bundle/phases/resources_metadata_test.go index bb34e43519a..c5fac1d2c43 100644 --- a/bundle/phases/resources_metadata_test.go +++ b/bundle/phases/resources_metadata_test.go @@ -11,21 +11,35 @@ import ( func TestResourceMetadataFromState_GroupsByType(t *testing.T) { state := resourcestate.ExportedResourcesMap{ - "resources.jobs.foo": {StateSizeBytes: 20}, - "resources.jobs.bar": {StateSizeBytes: 10}, - "resources.jobs.foo.permissions": {StateSizeBytes: 2}, - "resources.pipelines.qux": {StateSizeBytes: 14}, + "resources.jobs.foo": {StateSizeBytes: 20, StateCompressedSizeBytes: 12}, + "resources.jobs.bar": {StateSizeBytes: 10, StateCompressedSizeBytes: 8}, + "resources.jobs.foo.permissions": {StateSizeBytes: 2, StateCompressedSizeBytes: 3}, + "resources.pipelines.qux": {StateSizeBytes: 14, StateCompressedSizeBytes: 9}, } got := resourceMetadataFromState(state) // Sorted by resource type. Sub-resources (permissions) group under // ".permissions" per config.GetResourceTypeFromKey. jobs median is - // the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10. + // the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10. Raw and + // compressed stats are computed independently (each slice sorted on its own), + // so a resource's raw and compressed values need not share a rank. assert.Equal(t, []protos.ResourceMetadata{ - {ResourceType: "jobs", Count: 2, StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10}, - {ResourceType: "jobs.permissions", Count: 1, StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2}, - {ResourceType: "pipelines", Count: 1, StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14}, + { + ResourceType: "jobs", Count: 2, + StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10, + StateCompressedSizeMaxBytes: 12, StateCompressedSizeMeanBytes: 10, StateCompressedSizeMedianBytes: 8, + }, + { + ResourceType: "jobs.permissions", Count: 1, + StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2, + StateCompressedSizeMaxBytes: 3, StateCompressedSizeMeanBytes: 3, StateCompressedSizeMedianBytes: 3, + }, + { + ResourceType: "pipelines", Count: 1, + StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14, + StateCompressedSizeMaxBytes: 9, StateCompressedSizeMeanBytes: 9, StateCompressedSizeMedianBytes: 9, + }, }, got) } @@ -39,12 +53,16 @@ func TestStatHelpers(t *testing.T) { func TestResourceMetadataFromState_SkipsNonResourceKeys(t *testing.T) { state := resourcestate.ExportedResourcesMap{ - "resources.jobs.foo": {StateSizeBytes: 5}, - "bogus": {StateSizeBytes: 99}, + "resources.jobs.foo": {StateSizeBytes: 5, StateCompressedSizeBytes: 4}, + "bogus": {StateSizeBytes: 99, StateCompressedSizeBytes: 50}, } got := resourceMetadataFromState(state) assert.Equal(t, []protos.ResourceMetadata{ - {ResourceType: "jobs", Count: 1, StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5}, + { + ResourceType: "jobs", Count: 1, + StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5, + StateCompressedSizeMaxBytes: 4, StateCompressedSizeMeanBytes: 4, StateCompressedSizeMedianBytes: 4, + }, }, got) } diff --git a/bundle/statemgmt/resourcestate/resourcestate.go b/bundle/statemgmt/resourcestate/resourcestate.go index ec98f1bc827..834458bb05f 100644 --- a/bundle/statemgmt/resourcestate/resourcestate.go +++ b/bundle/statemgmt/resourcestate/resourcestate.go @@ -12,6 +12,13 @@ type ResourceState struct { // direct engine (len of the JSON stored in resources.json) for deploy // telemetry; left zero by the terraform path. StateSizeBytes int + + // Size in bytes of the resource's serialized state blob after compression. + // Populated by the direct engine alongside StateSizeBytes for deploy + // telemetry; left zero by the terraform path. A rough proxy for how much + // resource state shrinks under compression on the server side; see + // dstate.compressedStateSize for how it is computed. + StateCompressedSizeBytes int } // ExportedResourcesMap stores relevant attributes from terraform/direct state for all resources diff --git a/libs/telemetry/protos/bundle_deploy.go b/libs/telemetry/protos/bundle_deploy.go index 06521160c41..0c7b294a829 100644 --- a/libs/telemetry/protos/bundle_deploy.go +++ b/libs/telemetry/protos/bundle_deploy.go @@ -144,6 +144,16 @@ type ResourceMetadata struct { StateSizeMaxBytes int64 `json:"state_size_max_bytes,omitempty"` StateSizeMeanBytes int64 `json:"state_size_mean_bytes,omitempty"` StateSizeMedianBytes int64 `json:"state_size_median_bytes,omitempty"` + + // Compressed state-size statistics across resources of this type, each + // measured as the DEFLATE-compressed length of the same per-resource state + // blob. This is a rough proxy for what the state sizes look like on the + // server side (which compresses with zstd); the standard library's flate is + // used to avoid an extra dependency. Captures how much resource state + // shrinks under compression rather than just the raw sizes above. + StateCompressedSizeMaxBytes int64 `json:"state_compressed_size_max_bytes,omitempty"` + StateCompressedSizeMeanBytes int64 `json:"state_compressed_size_mean_bytes,omitempty"` + StateCompressedSizeMedianBytes int64 `json:"state_compressed_size_median_bytes,omitempty"` } type BoolMapEntry struct {