-
Notifications
You must be signed in to change notification settings - Fork 184
Track compressed resource state sizes in deploy telemetry (direct engine) #5608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
shreyas-goenka
wants to merge
1
commit into
main
Choose a base branch
from
shreyas-goenka/telemetry-compressed-resource-sizes
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| package dstate | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "compress/flate" | ||
| ) | ||
|
|
||
| // compressedStateSize returns the DEFLATE-compressed size in bytes of a | ||
| // resource's serialized state. It is a rough proxy, used purely for deploy | ||
| // telemetry, for what the state sizes look like on the server side (which | ||
| // compresses with zstd): we deliberately use the standard library's | ||
| // compress/flate rather than pull in a dedicated zstd dependency, keeping the | ||
| // supply chain small while still getting useful signal on compressibility. | ||
| // Returns 0 for empty state. | ||
| // | ||
| // This always terminates: DEFLATE is a single linear pass over a finite, | ||
| // in-memory buffer (no input loop can diverge — non-termination is a | ||
| // decompression concern, e.g. zip bombs, not compression), and the writer | ||
| // targets a bytes.Buffer that never blocks. Cost is O(len(state)) — a few | ||
| // milliseconds even for the largest (~1 MB) resource states — so a background | ||
| // goroutine is not warranted. | ||
| func compressedStateSize(state []byte) int { | ||
| if len(state) == 0 { | ||
| return 0 | ||
| } | ||
| var buf bytes.Buffer | ||
| w, err := flate.NewWriter(&buf, flate.DefaultCompression) | ||
| if err != nil { | ||
| return 0 | ||
| } | ||
| if _, err := w.Write(state); err != nil { | ||
| return 0 | ||
| } | ||
| if err := w.Close(); err != nil { | ||
| return 0 | ||
| } | ||
| return buf.Len() | ||
| } | ||
|
|
||
| // compressStateSizes returns each resource's compressed state size, keyed by | ||
| // resource key. Compression is independent per resource and dominates the cost, | ||
| // so each resource is compressed in its own goroutine (the runtime schedules | ||
| // them across cores; no worker pool needed) and the result is sent on a | ||
| // buffered channel that the caller drains into the map. | ||
| // | ||
| // This always terminates: the channel is buffered to the resource count, so | ||
| // every goroutine sends exactly once and exits without blocking, and the drain | ||
| // loop reads exactly that many results — no goroutine leaks, no deadlock. | ||
| func compressStateSizes(data Database) map[string]int { | ||
| type result struct { | ||
| key string | ||
| size int | ||
| } | ||
|
|
||
| ch := make(chan result, len(data.State)) | ||
| for key, entry := range data.State { | ||
| go func() { | ||
| ch <- result{key: key, size: compressedStateSize(entry.State)} | ||
| }() | ||
| } | ||
|
|
||
| sizes := make(map[string]int, len(data.State)) | ||
| for range data.State { | ||
| r := <-ch | ||
| sizes[r.key] = r.size | ||
| } | ||
| return sizes | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| package dstate | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "encoding/json" | ||
| "fmt" | ||
| "math/rand/v2" | ||
| "strconv" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| ) | ||
|
|
||
| func TestCompressedStateSize(t *testing.T) { | ||
| // Empty state has no compressed size. | ||
| assert.Equal(t, 0, compressedStateSize(nil)) | ||
| assert.Equal(t, 0, compressedStateSize([]byte{})) | ||
|
|
||
| // A highly compressible blob shrinks: positive size, smaller than raw. | ||
| blob := bytes.Repeat([]byte(`{"key":"value"}`), 1000) | ||
| got := compressedStateSize(blob) | ||
| assert.Positive(t, got) | ||
| assert.Less(t, got, len(blob)) | ||
| } | ||
|
|
||
| // jsonState builds a JSON-ish resource-state blob of about `size` bytes: | ||
| // repeated keys with varied values, representative of real resource state | ||
| // (compressible, but not trivially so). `seed` varies the content so different | ||
| // resources don't compress identically. The exact byte count is approximate. | ||
| func jsonState(size, seed int) json.RawMessage { | ||
| r := rand.New(rand.NewPCG(uint64(seed), 0)) | ||
| b := make([]byte, 0, size+128) | ||
| b = append(b, '{') | ||
| for i := 0; len(b) < size; i++ { | ||
| if i > 0 { | ||
| b = append(b, ',') | ||
| } | ||
| b = append(b, fmt.Sprintf( | ||
| `"key_%d":{"node_type":"i3.xlarge","num_workers":%d,"path":"/Workspace/Users/u%d/resource_%d","tag":"%x"}`, | ||
| i, r.IntN(64), r.IntN(100000), i, r.Uint64(), | ||
| )...) | ||
| } | ||
| b = append(b, '}') | ||
| return json.RawMessage(b) | ||
| } | ||
|
|
||
| // Representative results (go1.26.4, linux/amd64, Intel Xeon Platinum 8375C @ | ||
| // 2.90GHz; realistic varied-JSON state — absolute numbers are hardware- and | ||
| // data-dependent): | ||
| // | ||
| // BenchmarkCompressedStateSize/4KB 211 µs 19 MB/s | ||
| // BenchmarkCompressedStateSize/64KB 1.3 ms 49 MB/s | ||
| // BenchmarkCompressedStateSize/256KB 4.9 ms 54 MB/s | ||
| // BenchmarkCompressedStateSize/1024KB 19.0 ms 55 MB/s (largest per-resource state ~1 MB) | ||
| // BenchmarkExportStateFromData/200x8KB 22.2 ms 74 MB/s | ||
| // BenchmarkExportStateFromData/50x64KB 9.5 ms 345 MB/s | ||
| // BenchmarkExportStateFromData/8x1MB 26.0 ms 323 MB/s (~6x vs sequential via per-resource fan-out) | ||
| // BenchmarkExportStateFromData/1x1MB 19.4 ms 54 MB/s (single blob) | ||
|
|
||
| // BenchmarkCompressedStateSize measures per-resource compression cost. The | ||
| // largest per-resource state files are around 1 MB, so 1024 KB is the top size. | ||
| func BenchmarkCompressedStateSize(b *testing.B) { | ||
| for _, kb := range []int{4, 64, 256, 1024} { | ||
| data := jsonState(kb<<10, 1) | ||
| b.Run(fmt.Sprintf("%dKB", kb), func(b *testing.B) { | ||
| b.SetBytes(int64(len(data))) | ||
| for b.Loop() { | ||
| _ = compressedStateSize(data) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // BenchmarkExportStateFromData measures the full export (including the | ||
| // per-resource compression fan-out) for a few representative bundle shapes, | ||
| // from many small resources to a few ~1 MB ones. | ||
| func BenchmarkExportStateFromData(b *testing.B) { | ||
| cases := []struct { | ||
| name string | ||
| count, size int | ||
| }{ | ||
| {"200x8KB", 200, 8 << 10}, | ||
| {"50x64KB", 50, 64 << 10}, | ||
| {"8x1MB", 8, 1 << 20}, | ||
| {"1x1MB", 1, 1 << 20}, | ||
| } | ||
| for _, c := range cases { | ||
| data := Database{State: make(map[string]ResourceEntry, c.count)} | ||
| var total int64 | ||
| for i := range c.count { | ||
| st := jsonState(c.size, i) | ||
| total += int64(len(st)) | ||
| data.State[fmt.Sprintf("resources.jobs.job_%d", i)] = ResourceEntry{ | ||
| ID: strconv.Itoa(i), | ||
| State: st, | ||
| } | ||
| } | ||
| b.Run(c.name, func(b *testing.B) { | ||
| b.SetBytes(total) | ||
| for b.Loop() { | ||
| _ = ExportStateFromData(data) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't have anything to do with state, only with compression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done — the compression test and benchmarks now live in compress_test.go alongside the compression code.