Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@
"count": 3,
"state_size_max_bytes": 256,
"state_size_mean_bytes": 254,
"state_size_median_bytes": 254
"state_size_median_bytes": 254,
"state_compressed_size_max_bytes": 203,
"state_compressed_size_mean_bytes": 202,
"state_compressed_size_median_bytes": 202
},
{
"resource_type": "pipelines",
"count": 2,
"state_size_max_bytes": 205,
"state_size_mean_bytes": 205,
"state_size_median_bytes": 205
"state_size_median_bytes": 205,
"state_compressed_size_max_bytes": 168,
"state_compressed_size_mean_bytes": 167,
"state_compressed_size_median_bytes": 167
}
]
}
68 changes: 68 additions & 0 deletions bundle/direct/dstate/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package dstate

import (
"bytes"
"compress/flate"
)

// compressedStateSize returns the DEFLATE-compressed size in bytes of a
// resource's serialized state. It is a rough proxy, used purely for deploy
// telemetry, for what the state sizes look like on the server side (which
// compresses with zstd): we deliberately use the standard library's
// compress/flate rather than pull in a dedicated zstd dependency, keeping the
// supply chain small while still getting useful signal on compressibility.
// Returns 0 for empty state.
//
// This always terminates: DEFLATE is a single linear pass over a finite,
// in-memory buffer (no input loop can diverge — non-termination is a
// decompression concern, e.g. zip bombs, not compression), and the writer
// targets a bytes.Buffer that never blocks. Cost is O(len(state)) — a few
// milliseconds even for the largest (~1 MB) resource states — so a background
// goroutine is not warranted.
func compressedStateSize(state []byte) int {
if len(state) == 0 {
return 0
}
var buf bytes.Buffer
w, err := flate.NewWriter(&buf, flate.DefaultCompression)
if err != nil {
return 0
}
if _, err := w.Write(state); err != nil {
return 0
}
if err := w.Close(); err != nil {
return 0
}
return buf.Len()
}

// compressStateSizes returns each resource's compressed state size, keyed by
// resource key. Compression is independent per resource and dominates the cost,
// so each resource is compressed in its own goroutine (the runtime schedules
// them across cores; no worker pool needed) and the result is sent on a
// buffered channel that the caller drains into the map.
//
// This always terminates: the channel is buffered to the resource count, so
// every goroutine sends exactly once and exits without blocking, and the drain
// loop reads exactly that many results — no goroutine leaks, no deadlock.
func compressStateSizes(data Database) map[string]int {
type result struct {
key string
size int
}

ch := make(chan result, len(data.State))
for key, entry := range data.State {
go func() {
ch <- result{key: key, size: compressedStateSize(entry.State)}
}()
}

sizes := make(map[string]int, len(data.State))
for range data.State {
r := <-ch
sizes[r.key] = r.size
}
return sizes
}
105 changes: 105 additions & 0 deletions bundle/direct/dstate/compress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package dstate

import (
"bytes"
"encoding/json"
"fmt"
"math/rand/v2"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func TestCompressedStateSize(t *testing.T) {
// Empty state has no compressed size.
assert.Equal(t, 0, compressedStateSize(nil))
assert.Equal(t, 0, compressedStateSize([]byte{}))

// A highly compressible blob shrinks: positive size, smaller than raw.
blob := bytes.Repeat([]byte(`{"key":"value"}`), 1000)
got := compressedStateSize(blob)
assert.Positive(t, got)
assert.Less(t, got, len(blob))
}

// jsonState builds a JSON-ish resource-state blob of about `size` bytes:
// repeated keys with varied values, representative of real resource state
// (compressible, but not trivially so). `seed` varies the content so different
// resources don't compress identically. The exact byte count is approximate.
func jsonState(size, seed int) json.RawMessage {
r := rand.New(rand.NewPCG(uint64(seed), 0))
b := make([]byte, 0, size+128)
b = append(b, '{')
for i := 0; len(b) < size; i++ {
if i > 0 {
b = append(b, ',')
}
b = append(b, fmt.Sprintf(
`"key_%d":{"node_type":"i3.xlarge","num_workers":%d,"path":"/Workspace/Users/u%d/resource_%d","tag":"%x"}`,
i, r.IntN(64), r.IntN(100000), i, r.Uint64(),
)...)
}
b = append(b, '}')
return json.RawMessage(b)
}

// Representative results (go1.26.4, linux/amd64, Intel Xeon Platinum 8375C @
// 2.90GHz; realistic varied-JSON state — absolute numbers are hardware- and
// data-dependent):
//
// BenchmarkCompressedStateSize/4KB 211 µs 19 MB/s
// BenchmarkCompressedStateSize/64KB 1.3 ms 49 MB/s
// BenchmarkCompressedStateSize/256KB 4.9 ms 54 MB/s
// BenchmarkCompressedStateSize/1024KB 19.0 ms 55 MB/s (largest per-resource state ~1 MB)
// BenchmarkExportStateFromData/200x8KB 22.2 ms 74 MB/s
// BenchmarkExportStateFromData/50x64KB 9.5 ms 345 MB/s
// BenchmarkExportStateFromData/8x1MB 26.0 ms 323 MB/s (~6x vs sequential via per-resource fan-out)
// BenchmarkExportStateFromData/1x1MB 19.4 ms 54 MB/s (single blob)

// BenchmarkCompressedStateSize measures per-resource compression cost. The
// largest per-resource state files are around 1 MB, so 1024 KB is the top size.
func BenchmarkCompressedStateSize(b *testing.B) {
for _, kb := range []int{4, 64, 256, 1024} {
data := jsonState(kb<<10, 1)
b.Run(fmt.Sprintf("%dKB", kb), func(b *testing.B) {
b.SetBytes(int64(len(data)))
for b.Loop() {
_ = compressedStateSize(data)
}
})
}
}

// BenchmarkExportStateFromData measures the full export (including the
// per-resource compression fan-out) for a few representative bundle shapes,
// from many small resources to a few ~1 MB ones.
func BenchmarkExportStateFromData(b *testing.B) {
cases := []struct {
name string
count, size int
}{
{"200x8KB", 200, 8 << 10},
{"50x64KB", 50, 64 << 10},
{"8x1MB", 8, 1 << 20},
{"1x1MB", 1, 1 << 20},
}
for _, c := range cases {
data := Database{State: make(map[string]ResourceEntry, c.count)}
var total int64
for i := range c.count {
st := jsonState(c.size, i)
total += int64(len(st))
data.State[fmt.Sprintf("resources.jobs.job_%d", i)] = ResourceEntry{
ID: strconv.Itoa(i),
State: st,
}
}
b.Run(c.name, func(b *testing.B) {
b.SetBytes(total)
for b.Loop() {
_ = ExportStateFromData(data)
}
})
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't have anything to do with state, only with compression.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — the compression test and benchmarks now live in compress_test.go alongside the compression code.

11 changes: 8 additions & 3 deletions bundle/direct/dstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ func (db *DeploymentState) AssertOpenedForWrite() {

// ExportStateFromData extracts resource IDs and ETags from a database snapshot.
func ExportStateFromData(data Database) resourcestate.ExportedResourcesMap {
// Compressing each resource's state to measure its compressed size is the
// one non-trivial cost here, so compute those sizes in parallel up front.
compressedSizes := compressStateSizes(data)

result := make(resourcestate.ExportedResourcesMap)
for key, entry := range data.State {
var etag string
Expand All @@ -469,9 +473,10 @@ func ExportStateFromData(data Database) resourcestate.ExportedResourcesMap {
}

result[key] = resourcestate.ResourceState{
ID: entry.ID,
ETag: etag,
StateSizeBytes: len(entry.State),
ID: entry.ID,
ETag: etag,
StateSizeBytes: len(entry.State),
StateCompressedSizeBytes: compressedSizes[key],
}
}
return result
Expand Down
24 changes: 16 additions & 8 deletions bundle/phases/resources_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ const directEngine = "direct"
// Only direct deploys are measured. b.Metrics.ResourceState is the direct
// engine's finalized state, populated in deployCore from the WAL replay the
// deploy already performs; each entry carries StateSizeBytes (len of the JSON
// blob stored in resources.json). So no marshalling, file read, or JSON parsing
// blob stored in resources.json) and StateCompressedSizeBytes (its compressed
// length, computed during export). So no marshalling, file read, or JSON parsing
// happens here — sizes are read straight off the in-memory map. The whole-file
// size comes from a single os.Stat (no parse). Returns nil for terraform
// deploys (ResourceState is nil) and when no resources are in state.
Expand All @@ -44,16 +45,18 @@ func collectResourcesMetadata(ctx context.Context, b *bundle.Bundle) *protos.Bun
}

// resourceMetadataFromState groups the deployment state by resource type and
// computes per-type count and max/mean/median state size. Sizes are sorted per
// type (needed for the median).
// computes per-type count and max/mean/median state size, both raw and
// compressed. Sizes are sorted per type (needed for the median).
func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []protos.ResourceMetadata {
sizesByType := make(map[string][]int64)
compressedByType := make(map[string][]int64)
for key, rs := range state {
t := config.GetResourceTypeFromKey(key)
if t == "" {
continue
}
sizesByType[t] = append(sizesByType[t], int64(rs.StateSizeBytes))
compressedByType[t] = append(compressedByType[t], int64(rs.StateCompressedSizeBytes))
}

types := make([]string, 0, len(sizesByType))
Expand All @@ -66,12 +69,17 @@ func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []proto
for _, t := range types {
sizes := sizesByType[t]
slices.Sort(sizes)
compressed := compressedByType[t]
slices.Sort(compressed)
resources = append(resources, protos.ResourceMetadata{
ResourceType: t,
Count: int64(len(sizes)),
StateSizeMaxBytes: statMax(sizes),
StateSizeMeanBytes: statMean(sizes),
StateSizeMedianBytes: statMedian(sizes),
ResourceType: t,
Count: int64(len(sizes)),
StateSizeMaxBytes: statMax(sizes),
StateSizeMeanBytes: statMean(sizes),
StateSizeMedianBytes: statMedian(sizes),
StateCompressedSizeMaxBytes: statMax(compressed),
StateCompressedSizeMeanBytes: statMean(compressed),
StateCompressedSizeMedianBytes: statMedian(compressed),
})
}
return resources
Expand Down
40 changes: 29 additions & 11 deletions bundle/phases/resources_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,35 @@ import (

func TestResourceMetadataFromState_GroupsByType(t *testing.T) {
state := resourcestate.ExportedResourcesMap{
"resources.jobs.foo": {StateSizeBytes: 20},
"resources.jobs.bar": {StateSizeBytes: 10},
"resources.jobs.foo.permissions": {StateSizeBytes: 2},
"resources.pipelines.qux": {StateSizeBytes: 14},
"resources.jobs.foo": {StateSizeBytes: 20, StateCompressedSizeBytes: 12},
"resources.jobs.bar": {StateSizeBytes: 10, StateCompressedSizeBytes: 8},
"resources.jobs.foo.permissions": {StateSizeBytes: 2, StateCompressedSizeBytes: 3},
"resources.pipelines.qux": {StateSizeBytes: 14, StateCompressedSizeBytes: 9},
}

got := resourceMetadataFromState(state)

// Sorted by resource type. Sub-resources (permissions) group under
// "<parent>.permissions" per config.GetResourceTypeFromKey. jobs median is
// the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10.
// the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10. Raw and
// compressed stats are computed independently (each slice sorted on its own),
// so a resource's raw and compressed values need not share a rank.
assert.Equal(t, []protos.ResourceMetadata{
{ResourceType: "jobs", Count: 2, StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10},
{ResourceType: "jobs.permissions", Count: 1, StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2},
{ResourceType: "pipelines", Count: 1, StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14},
{
ResourceType: "jobs", Count: 2,
StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10,
StateCompressedSizeMaxBytes: 12, StateCompressedSizeMeanBytes: 10, StateCompressedSizeMedianBytes: 8,
},
{
ResourceType: "jobs.permissions", Count: 1,
StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2,
StateCompressedSizeMaxBytes: 3, StateCompressedSizeMeanBytes: 3, StateCompressedSizeMedianBytes: 3,
},
{
ResourceType: "pipelines", Count: 1,
StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14,
StateCompressedSizeMaxBytes: 9, StateCompressedSizeMeanBytes: 9, StateCompressedSizeMedianBytes: 9,
},
}, got)
}

Expand All @@ -39,12 +53,16 @@ func TestStatHelpers(t *testing.T) {

func TestResourceMetadataFromState_SkipsNonResourceKeys(t *testing.T) {
state := resourcestate.ExportedResourcesMap{
"resources.jobs.foo": {StateSizeBytes: 5},
"bogus": {StateSizeBytes: 99},
"resources.jobs.foo": {StateSizeBytes: 5, StateCompressedSizeBytes: 4},
"bogus": {StateSizeBytes: 99, StateCompressedSizeBytes: 50},
}
got := resourceMetadataFromState(state)
assert.Equal(t, []protos.ResourceMetadata{
{ResourceType: "jobs", Count: 1, StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5},
{
ResourceType: "jobs", Count: 1,
StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5,
StateCompressedSizeMaxBytes: 4, StateCompressedSizeMeanBytes: 4, StateCompressedSizeMedianBytes: 4,
},
}, got)
}

Expand Down
7 changes: 7 additions & 0 deletions bundle/statemgmt/resourcestate/resourcestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ type ResourceState struct {
// direct engine (len of the JSON stored in resources.json) for deploy
// telemetry; left zero by the terraform path.
StateSizeBytes int

// Size in bytes of the resource's serialized state blob after compression.
// Populated by the direct engine alongside StateSizeBytes for deploy
// telemetry; left zero by the terraform path. A rough proxy for how much
// resource state shrinks under compression on the server side; see
// dstate.compressedStateSize for how it is computed.
StateCompressedSizeBytes int
}

// ExportedResourcesMap stores relevant attributes from terraform/direct state for all resources
Expand Down
10 changes: 10 additions & 0 deletions libs/telemetry/protos/bundle_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ type ResourceMetadata struct {
StateSizeMaxBytes int64 `json:"state_size_max_bytes,omitempty"`
StateSizeMeanBytes int64 `json:"state_size_mean_bytes,omitempty"`
StateSizeMedianBytes int64 `json:"state_size_median_bytes,omitempty"`

// Compressed state-size statistics across resources of this type, each
// measured as the DEFLATE-compressed length of the same per-resource state
// blob. This is a rough proxy for what the state sizes look like on the
// server side (which compresses with zstd); the standard library's flate is
// used to avoid an extra dependency. Captures how much resource state
// shrinks under compression rather than just the raw sizes above.
StateCompressedSizeMaxBytes int64 `json:"state_compressed_size_max_bytes,omitempty"`
StateCompressedSizeMeanBytes int64 `json:"state_compressed_size_mean_bytes,omitempty"`
StateCompressedSizeMedianBytes int64 `json:"state_compressed_size_median_bytes,omitempty"`
}

type BoolMapEntry struct {
Expand Down
Loading