diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index 84fc799a..a39e801d 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -131,9 +131,17 @@ type ImagesAutoDeleteConfig struct { Allowed []string `koanf:"allowed"` } +// OCICacheGCConfig holds settings for the OCI blob cache garbage collector. +type OCICacheGCConfig struct { + Enabled bool `koanf:"enabled"` + Interval string `koanf:"interval"` + MinBlobAge string `koanf:"min_blob_age"` +} + // ImagesConfig holds image-management settings. type ImagesConfig struct { AutoDelete ImagesAutoDeleteConfig `koanf:"auto_delete"` + OCICacheGC OCICacheGCConfig `koanf:"oci_cache_gc"` } // BuildConfig holds source-to-image build system settings. @@ -346,6 +354,11 @@ func defaultConfig() *Config { UnusedFor: "720h", Allowed: []string{}, }, + OCICacheGC: OCICacheGCConfig{ + Enabled: false, + Interval: "1h", + MinBlobAge: "1h", + }, }, Build: BuildConfig{ @@ -563,6 +576,20 @@ func (c *Config) Validate() error { for i, pattern := range c.Images.AutoDelete.Allowed { c.Images.AutoDelete.Allowed[i] = strings.TrimSpace(pattern) } + ociCacheGCInterval, err := time.ParseDuration(c.Images.OCICacheGC.Interval) + if err != nil { + return fmt.Errorf("images.oci_cache_gc.interval must be a valid duration, got %q: %w", c.Images.OCICacheGC.Interval, err) + } + if ociCacheGCInterval <= 0 { + return fmt.Errorf("images.oci_cache_gc.interval must be positive, got %q", c.Images.OCICacheGC.Interval) + } + ociCacheGCMinBlobAge, err := time.ParseDuration(c.Images.OCICacheGC.MinBlobAge) + if err != nil { + return fmt.Errorf("images.oci_cache_gc.min_blob_age must be a valid duration, got %q: %w", c.Images.OCICacheGC.MinBlobAge, err) + } + if ociCacheGCMinBlobAge < 0 { + return fmt.Errorf("images.oci_cache_gc.min_blob_age cannot be negative, got %q", c.Images.OCICacheGC.MinBlobAge) + } algorithm := strings.ToLower(c.Snapshot.CompressionDefault.Algorithm) c.Snapshot.CompressionDefault.Algorithm = algorithm if c.Snapshot.CompressionDefault.Enabled { diff --git a/cmd/api/config/config_test.go b/cmd/api/config/config_test.go index b63bda18..e7183504 100644 --- a/cmd/api/config/config_test.go +++ b/cmd/api/config/config_test.go @@ -43,6 +43,15 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) { if len(cfg.Images.AutoDelete.Allowed) != 0 { t.Fatalf("expected default images.auto_delete.allowed to be empty, got %v", cfg.Images.AutoDelete.Allowed) } + if cfg.Images.OCICacheGC.Enabled { + t.Fatalf("expected default images.oci_cache_gc.enabled to be false") + } + if cfg.Images.OCICacheGC.Interval != "1h" { + t.Fatalf("expected default images.oci_cache_gc.interval to be 1h, got %q", cfg.Images.OCICacheGC.Interval) + } + if cfg.Images.OCICacheGC.MinBlobAge != "1h" { + t.Fatalf("expected default images.oci_cache_gc.min_blob_age to be 1h, got %q", cfg.Images.OCICacheGC.MinBlobAge) + } if cfg.Instances.LifecycleEventBufferSize != 256 { t.Fatalf("expected default instances.lifecycle_event_buffer_size to be 256, got %d", cfg.Instances.LifecycleEventBufferSize) } @@ -247,6 +256,57 @@ func TestValidateRejectsInvalidImageAutoDeleteUnusedFor(t *testing.T) { } } +func TestLoadUsesDefaultOCICacheGCSettingsWhenEnabledOnly(t *testing.T) { + tmp := t.TempDir() + cfgPath := filepath.Join(tmp, "config.yaml") + if err := os.WriteFile(cfgPath, []byte("images:\n oci_cache_gc:\n enabled: true\n"), 0600); err != nil { + t.Fatalf("write temp config: %v", err) + } + + cfg, err := Load(cfgPath) + if err != nil { + t.Fatalf("load config: %v", err) + } + + if !cfg.Images.OCICacheGC.Enabled { + t.Fatalf("expected images.oci_cache_gc.enabled override to be true") + } + if cfg.Images.OCICacheGC.Interval != "1h" { + t.Fatalf("expected default images.oci_cache_gc.interval to remain 1h, got %q", cfg.Images.OCICacheGC.Interval) + } + if cfg.Images.OCICacheGC.MinBlobAge != "1h" { + t.Fatalf("expected default images.oci_cache_gc.min_blob_age to remain 1h, got %q", cfg.Images.OCICacheGC.MinBlobAge) + } +} + +func TestValidateRejectsInvalidOCICacheGCInterval(t *testing.T) { + cfg := defaultConfig() + cfg.Images.OCICacheGC.Interval = "not-a-duration" + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected validation error for invalid images.oci_cache_gc.interval") + } + + cfg = defaultConfig() + cfg.Images.OCICacheGC.Interval = "0s" + + err = cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "must be positive") { + t.Fatalf("expected positive validation error for zero images.oci_cache_gc.interval, got %v", err) + } +} + +func TestValidateRejectsNegativeOCICacheGCMinBlobAge(t *testing.T) { + cfg := defaultConfig() + cfg.Images.OCICacheGC.MinBlobAge = "-1s" + + err := cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "cannot be negative") { + t.Fatalf("expected non-negative validation error for images.oci_cache_gc.min_blob_age, got %v", err) + } +} + func TestValidateTrimsImageAutoDeleteAllowedPatterns(t *testing.T) { cfg := defaultConfig() cfg.Images.AutoDelete.Allowed = []string{" docker.io/library/* ", " ", "ghcr.io/kernel/*"} diff --git a/cmd/api/main.go b/cmd/api/main.go index 61ee9b0b..99ce7ebc 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -32,6 +32,7 @@ import ( loglib "github.com/kernel/hypeman/lib/logger" mw "github.com/kernel/hypeman/lib/middleware" "github.com/kernel/hypeman/lib/oapi" + "github.com/kernel/hypeman/lib/ocicachegc" "github.com/kernel/hypeman/lib/otel" "github.com/kernel/hypeman/lib/paths" "github.com/kernel/hypeman/lib/registry" @@ -98,6 +99,37 @@ func startImageRetentionController(grp *errgroup.Group, ctx context.Context, con return true } +type ociCacheGCRunner interface { + Run(ctx context.Context) error +} + +func configureOCICacheGC(cfg *config.Config, logger *slog.Logger, meter metric.Meter) (ociCacheGCRunner, error) { + if cfg == nil || !cfg.Images.OCICacheGC.Enabled { + return nil, nil + } + + interval, err := time.ParseDuration(cfg.Images.OCICacheGC.Interval) + if err != nil { + return nil, fmt.Errorf("invalid images.oci_cache_gc.interval %q: %w", cfg.Images.OCICacheGC.Interval, err) + } + minBlobAge, err := time.ParseDuration(cfg.Images.OCICacheGC.MinBlobAge) + if err != nil { + return nil, fmt.Errorf("invalid images.oci_cache_gc.min_blob_age %q: %w", cfg.Images.OCICacheGC.MinBlobAge, err) + } + + return ocicachegc.NewCollector(paths.New(cfg.DataDir), interval, minBlobAge, logger, meter) +} + +func startOCICacheGC(grp *errgroup.Group, ctx context.Context, runner ociCacheGCRunner) bool { + if grp == nil || runner == nil { + return false + } + grp.Go(func() error { + return runner.Run(ctx) + }) + return true +} + func run() error { // Load config early for OTel initialization // Config path can be specified via CONFIG_PATH env var or defaults to platform-specific locations @@ -491,6 +523,21 @@ func run() error { logger.Info("image auto-delete enabled", "unused_for", app.Config.Images.AutoDelete.UnusedFor) } + ociGC, err := configureOCICacheGC( + app.Config, + logger, + otelProvider.MeterFor(loglib.SubsystemImages), + ) + if err != nil { + return err + } + if startOCICacheGC(grp, gctx, ociGC) { + logger.Info("oci cache gc enabled", + "interval", app.Config.Images.OCICacheGC.Interval, + "min_blob_age", app.Config.Images.OCICacheGC.MinBlobAge, + ) + } + // Start build manager background services (vsock handler for builder VMs) if err := app.BuildManager.Start(gctx); err != nil { logger.Error("failed to start build manager", "error", err) diff --git a/cmd/api/oci_cache_gc_test.go b/cmd/api/oci_cache_gc_test.go new file mode 100644 index 00000000..9887bf48 --- /dev/null +++ b/cmd/api/oci_cache_gc_test.go @@ -0,0 +1,112 @@ +package main + +import ( + "context" + "io" + "log/slog" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/kernel/hypeman/cmd/api/config" + "golang.org/x/sync/errgroup" +) + +type stubOCICacheGCRunner struct { + runCount atomic.Int32 +} + +func (s *stubOCICacheGCRunner) Run(ctx context.Context) error { + s.runCount.Add(1) + <-ctx.Done() + return nil +} + +func loadTestConfig(t *testing.T) *config.Config { + t.Helper() + + tmp := t.TempDir() + cfgPath := filepath.Join(tmp, "config.yaml") + if err := os.WriteFile(cfgPath, []byte("{}\n"), 0o600); err != nil { + t.Fatalf("write temp config: %v", err) + } + + cfg, err := config.Load(cfgPath) + if err != nil { + t.Fatalf("load temp config: %v", err) + } + return cfg +} + +func TestConfigureOCICacheGCSkipsDisabledConfig(t *testing.T) { + cfg := loadTestConfig(t) + + runner, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + if err != nil { + t.Fatalf("configure disabled oci cache gc: %v", err) + } + if runner != nil { + t.Fatalf("expected disabled oci cache gc to return nil runner") + } +} + +func TestConfigureOCICacheGCBuildsCollectorWhenEnabled(t *testing.T) { + cfg := loadTestConfig(t) + cfg.Images.OCICacheGC.Enabled = true + cfg.Images.OCICacheGC.Interval = "2m" + cfg.Images.OCICacheGC.MinBlobAge = "30s" + + runner, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil) + if err != nil { + t.Fatalf("configure enabled oci cache gc: %v", err) + } + if runner == nil { + t.Fatalf("expected enabled oci cache gc to return runner") + } +} + +func TestConfigureOCICacheGCRejectsInvalidInterval(t *testing.T) { + cfg := loadTestConfig(t) + cfg.Images.OCICacheGC.Enabled = true + cfg.Images.OCICacheGC.Interval = "0s" + + if _, err := configureOCICacheGC(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)), nil); err == nil { + t.Fatalf("expected invalid oci cache gc interval to fail") + } +} + +func TestStartOCICacheGCSkipsNilRunner(t *testing.T) { + grp, ctx := errgroup.WithContext(context.Background()) + + started := startOCICacheGC(grp, ctx, nil) + if started { + t.Fatalf("expected nil oci cache gc runner not to start") + } +} + +func TestStartOCICacheGCStartsRunner(t *testing.T) { + grp, ctx := errgroup.WithContext(context.Background()) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + runner := &stubOCICacheGCRunner{} + started := startOCICacheGC(grp, ctx, runner) + if !started { + t.Fatalf("expected oci cache gc runner to start") + } + + deadline := time.Now().Add(time.Second) + for runner.runCount.Load() == 0 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + if runner.runCount.Load() != 1 { + t.Fatalf("expected runner to be started once, got %d", runner.runCount.Load()) + } + + cancel() + if err := grp.Wait(); err != nil { + t.Fatalf("wait for oci cache gc runner: %v", err) + } +} diff --git a/config.example.darwin.yaml b/config.example.darwin.yaml index 5b16e672..29f2f750 100644 --- a/config.example.darwin.yaml +++ b/config.example.darwin.yaml @@ -78,6 +78,11 @@ logging: # - docker.io/library/* # match normalized repository names # - ghcr.io/kernel/* # use ["*"] to allow deletion for every repository # # only affects data_dir/images, not the shared OCI cache +# oci_cache_gc: +# enabled: false # mark-and-sweep GC for data_dir/system/oci-cache +# interval: 1h # how often to run a sweep +# min_blob_age: 1h # grace period; blobs written more recently are kept +# # to avoid racing with concurrent pulls # ============================================================================= # Caddy / Ingress Configuration diff --git a/config.example.yaml b/config.example.yaml index 34e59d14..fd88ac17 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -71,6 +71,11 @@ data_dir: /var/lib/hypeman # - docker.io/library/* # match normalized repository names # - ghcr.io/kernel/* # use ["*"] to allow deletion for every repository # # only affects data_dir/images, not the shared OCI cache +# oci_cache_gc: +# enabled: false # mark-and-sweep GC for data_dir/system/oci-cache +# interval: 1h # how often to run a sweep +# min_blob_age: 1h # grace period; blobs written more recently are kept +# # to avoid racing with concurrent pulls # ============================================================================= # Caddy / Ingress Configuration diff --git a/lib/imageretention/README.md b/lib/imageretention/README.md index c70db06f..9ced4fc7 100644 --- a/lib/imageretention/README.md +++ b/lib/imageretention/README.md @@ -16,7 +16,7 @@ When auto-delete is enabled: - The server runs a retention sweep on startup and then every minute. - Only converted cached images under `data_dir/images` are eligible for deletion. -- Shared OCI cache data under `data_dir/system/oci-cache` is not modified. +- Shared OCI cache data under `data_dir/system/oci-cache` is not modified by this feature; see `lib/ocicachegc` for a separate mark-and-sweep collector that reclaims orphaned blobs from that directory. - An image repository must also match at least one `allowed` pattern before any retention state is recorded or deletion is attempted. An image is considered in use if any persisted instance metadata or snapshot record still references it. As long as at least one such reference exists, the image is protected from deletion. diff --git a/lib/ocicachegc/README.md b/lib/ocicachegc/README.md new file mode 100644 index 00000000..b209416b --- /dev/null +++ b/lib/ocicachegc/README.md @@ -0,0 +1,56 @@ +# OCI Cache GC + +Mark-and-sweep garbage collector for the shared OCI cache at +`data_dir/system/oci-cache`. + +The cache is populated every time an image is pulled or pushed and was +previously write-only: nothing ever removed layer, config, or manifest +blobs, so the cache grew unbounded. This collector reclaims the space +used by manifests and layers that are no longer referenced from +`index.json`. + +## Configuration + +```yaml +images: + oci_cache_gc: + enabled: false + interval: 1h + min_blob_age: 1h +``` + +When enabled, the server runs one pass immediately and then every +`interval` until shutdown. + +## Algorithm + +1. **Mark.** Read `index.json` and walk every referenced descriptor. For + each manifest or manifest-index blob we descend into its `config`, + `layers`, `manifests`, and `subject` references. The set of visited + digests is the live set. +2. **Sweep.** List `blobs/sha256/`. Delete every file whose name is a + valid 64-char hex digest, is absent from the live set, and whose + `mtime` is older than `min_blob_age`. + +Blobs that are referenced but unparseable are kept as opaque leaves; the +collector never deletes a blob it cannot prove is dead. + +## Concurrency + +Pulls (`layout.AppendImage`) and pushes (`BlobStore.Put`) write blobs +before updating `index.json`. During that window a blob exists on disk +but is not yet in the live set. `min_blob_age` is the grace period that +protects these in-flight writes — it should comfortably exceed the time +it takes to pull the largest image in your environment. + +Temporary files (`.tmp` used by `BlobStore.Put`) are ignored +entirely because they do not match the blob filename pattern. + +## Metrics + +| Metric | Type | Description | +| ------ | ---- | ----------- | +| `hypeman_oci_cache_gc_sweeps_total` | counter | Sweeps, tagged by status | +| `hypeman_oci_cache_gc_sweep_duration_seconds` | histogram | Sweep duration | +| `hypeman_oci_cache_gc_deleted_blobs_total` | counter | Blobs deleted | +| `hypeman_oci_cache_gc_deleted_bytes_total` | counter | Bytes reclaimed | diff --git a/lib/ocicachegc/gc.go b/lib/ocicachegc/gc.go new file mode 100644 index 00000000..1a1a6904 --- /dev/null +++ b/lib/ocicachegc/gc.go @@ -0,0 +1,326 @@ +// Package ocicachegc performs mark-and-sweep garbage collection of the +// shared OCI cache at data_dir/system/oci-cache. +// +// The cache stores content-addressed blobs under blobs/sha256/ and an +// index.json that references one or more manifests. Blobs are written +// whenever an image is pulled or pushed and are never removed by the +// normal code paths, so without a GC the cache grows unbounded. +// +// The collector walks index.json and every manifest (or manifest index) +// reachable from it to build the set of "live" blob digests, then +// deletes any blob that is not live. Blobs whose mtime is within +// MinBlobAge are always kept; this is the grace period used to avoid +// racing with concurrent pulls, which write layer blobs before updating +// index.json. +package ocicachegc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/fs" + "log/slog" + "os" + "path/filepath" + "regexp" + "sync" + "time" + + "github.com/kernel/hypeman/lib/paths" + "go.opentelemetry.io/otel/metric" +) + +// Collector garbage-collects the shared OCI cache. +type Collector struct { + paths *paths.Paths + interval time.Duration + minBlobAge time.Duration + logger *slog.Logger + metrics *Metrics + now func() time.Time + mu sync.Mutex +} + +// Stats summarises the outcome of one Collect pass. +type Stats struct { + LiveBlobs int + ScannedBlobs int + DeletedBlobs int + DeletedBytes int64 + SkippedRecent int +} + +// NewCollector creates a collector. minBlobAge is the minimum age a blob +// must have before it becomes eligible for deletion; it protects blobs +// that are currently being written by a concurrent pull or push. +func NewCollector(p *paths.Paths, interval, minBlobAge time.Duration, logger *slog.Logger, meter metric.Meter) (*Collector, error) { + if p == nil { + return nil, errors.New("paths is required") + } + if interval <= 0 { + return nil, fmt.Errorf("interval must be positive, got %s", interval) + } + if minBlobAge < 0 { + return nil, fmt.Errorf("min_blob_age cannot be negative, got %s", minBlobAge) + } + if logger == nil { + logger = slog.Default() + } + c := &Collector{ + paths: p, + interval: interval, + minBlobAge: minBlobAge, + logger: logger.With("component", "oci_cache_gc"), + now: time.Now, + } + if meter != nil { + m, err := newMetrics(meter) + if err != nil { + return nil, fmt.Errorf("create oci cache gc metrics: %w", err) + } + c.metrics = m + } + return c, nil +} + +// Run performs one Collect pass immediately and then every interval until +// ctx is cancelled. It never returns an error; individual sweep failures +// are logged and metrics are recorded, but the loop keeps running. +func (c *Collector) Run(ctx context.Context) error { + c.logger.InfoContext(ctx, "oci cache gc started", "interval", c.interval, "min_blob_age", c.minBlobAge) + if _, err := c.Collect(ctx); err != nil { + c.logger.ErrorContext(ctx, "oci cache gc sweep failed", "error", err) + } + + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if _, err := c.Collect(ctx); err != nil { + c.logger.ErrorContext(ctx, "oci cache gc sweep failed", "error", err) + } + } + } +} + +// Collect performs one full mark-and-sweep pass over the OCI cache. +func (c *Collector) Collect(ctx context.Context) (Stats, error) { + c.mu.Lock() + defer c.mu.Unlock() + + start := c.now() + stats, err := c.collect(ctx) + status := "success" + if err != nil { + status = "error" + } + if c.metrics != nil { + c.metrics.RecordSweep(ctx, status, c.now().Sub(start), stats) + } + if err != nil { + return stats, err + } + c.logger.DebugContext(ctx, "oci cache gc sweep completed", + "live_blobs", stats.LiveBlobs, + "scanned_blobs", stats.ScannedBlobs, + "deleted_blobs", stats.DeletedBlobs, + "deleted_bytes", stats.DeletedBytes, + "skipped_recent", stats.SkippedRecent, + ) + return stats, nil +} + +func (c *Collector) collect(ctx context.Context) (Stats, error) { + var stats Stats + + blobDir := c.paths.OCICacheBlobDir() + if _, err := os.Stat(blobDir); errors.Is(err, fs.ErrNotExist) { + return stats, nil + } else if err != nil { + return stats, fmt.Errorf("stat blob dir: %w", err) + } + + live, err := liveBlobs(c.paths) + if err != nil { + return stats, fmt.Errorf("compute live set: %w", err) + } + stats.LiveBlobs = len(live) + + cutoff := c.now().Add(-c.minBlobAge) + + entries, err := os.ReadDir(blobDir) + if err != nil { + return stats, fmt.Errorf("read blob dir: %w", err) + } + + for _, entry := range entries { + if err := ctx.Err(); err != nil { + return stats, err + } + if entry.IsDir() { + continue + } + name := entry.Name() + if !isBlobName(name) { + continue + } + stats.ScannedBlobs++ + if _, ok := live[name]; ok { + continue + } + info, err := entry.Info() + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + continue + } + return stats, fmt.Errorf("stat blob %s: %w", name, err) + } + if info.ModTime().After(cutoff) { + stats.SkippedRecent++ + continue + } + path := filepath.Join(blobDir, name) + size := info.Size() + if err := os.Remove(path); err != nil { + if errors.Is(err, fs.ErrNotExist) { + continue + } + return stats, fmt.Errorf("remove blob %s: %w", name, err) + } + stats.DeletedBlobs++ + stats.DeletedBytes += size + c.logger.InfoContext(ctx, "deleted unreferenced oci blob", "digest", name, "size", size) + } + + return stats, nil +} + +// blobNamePattern matches a valid sha256 blob filename (64 hex chars). +// Temporary files (e.g. ".tmp" written by the blob store) are +// intentionally excluded. +var blobNamePattern = regexp.MustCompile(`^[0-9a-f]{64}$`) + +func isBlobName(name string) bool { + return blobNamePattern.MatchString(name) +} + +// descriptor captures the subset of OCI/Docker descriptor fields we care +// about when walking manifests. Fields we do not use (urls, platform, +// size, annotations, mediaType) are omitted. +type descriptor struct { + MediaType string `json:"mediaType"` + Digest string `json:"digest"` +} + +// manifestDoc is a union of the fields found on OCI image manifests, +// OCI image indexes, Docker v2 manifests, and Docker v2 manifest lists. +// Decoding into one shape lets us traverse either format without +// branching on the mediaType, which is often unreliable in practice. +type manifestDoc struct { + Config descriptor `json:"config"` + Layers []descriptor `json:"layers"` + Manifests []descriptor `json:"manifests"` + Subject *descriptor `json:"subject,omitempty"` +} + +// liveBlobs returns the set of blob hex digests reachable from the OCI +// cache index.json. Keys are bare hex (no "sha256:" prefix), matching +// the filenames under blobs/sha256/. +func liveBlobs(p *paths.Paths) (map[string]struct{}, error) { + live := make(map[string]struct{}) + + indexPath := p.OCICacheIndex() + data, err := os.ReadFile(indexPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return live, nil + } + return nil, fmt.Errorf("read index.json: %w", err) + } + + var index manifestDoc + if err := json.Unmarshal(data, &index); err != nil { + return nil, fmt.Errorf("parse index.json: %w", err) + } + + visited := make(map[string]struct{}) + for _, m := range index.Manifests { + if err := walkDescriptor(p, m, live, visited); err != nil { + return nil, err + } + } + return live, nil +} + +// walkDescriptor records desc.Digest in live, then if the blob is a +// manifest (or manifest index) recurses into its referenced blobs. +// Unparseable blobs are treated as opaque leaves (recorded but not +// descended into) so the sweep errs on the side of keeping data when +// disk contents are malformed. +func walkDescriptor(p *paths.Paths, desc descriptor, live, visited map[string]struct{}) error { + hex, ok := digestHex(desc.Digest) + if !ok { + return nil + } + if _, seen := visited[hex]; seen { + return nil + } + visited[hex] = struct{}{} + live[hex] = struct{}{} + + blobPath := p.OCICacheBlob(hex) + data, err := os.ReadFile(blobPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil + } + return fmt.Errorf("read blob %s: %w", hex, err) + } + + var doc manifestDoc + if err := json.Unmarshal(data, &doc); err != nil { + return nil + } + + if h, ok := digestHex(doc.Config.Digest); ok { + live[h] = struct{}{} + } + for _, layer := range doc.Layers { + if h, ok := digestHex(layer.Digest); ok { + live[h] = struct{}{} + } + } + if doc.Subject != nil { + if err := walkDescriptor(p, *doc.Subject, live, visited); err != nil { + return err + } + } + for _, m := range doc.Manifests { + if err := walkDescriptor(p, m, live, visited); err != nil { + return err + } + } + return nil +} + +// digestHex extracts the hex portion of a "sha256:" digest. It +// returns false for empty strings, unsupported algorithms, or malformed +// hex so callers can skip bad data without erroring the sweep. +func digestHex(d string) (string, bool) { + if len(d) != len("sha256:")+64 { + return "", false + } + if d[:7] != "sha256:" { + return "", false + } + hex := d[7:] + if !blobNamePattern.MatchString(hex) { + return "", false + } + return hex, true +} diff --git a/lib/ocicachegc/gc_test.go b/lib/ocicachegc/gc_test.go new file mode 100644 index 00000000..b385e5fd --- /dev/null +++ b/lib/ocicachegc/gc_test.go @@ -0,0 +1,408 @@ +package ocicachegc + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/kernel/hypeman/lib/paths" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// layoutBuilder incrementally writes OCI layout blobs and assembles an +// index.json so tests can set up realistic cache contents. +type layoutBuilder struct { + t *testing.T + paths *paths.Paths + blobsDir string + cacheDir string + entries []indexEntry +} + +type indexEntry struct { + MediaType string `json:"mediaType"` + Digest string `json:"digest"` + Size int `json:"size"` + Annotations map[string]string `json:"annotations,omitempty"` +} + +func newLayoutBuilder(t *testing.T, dataDir string) *layoutBuilder { + t.Helper() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + return &layoutBuilder{ + t: t, + paths: p, + blobsDir: blobsDir, + cacheDir: p.SystemOCICache(), + } +} + +// writeBlob stores content at blobs/sha256/ and returns the +// digest string in canonical "sha256:" form. +func (b *layoutBuilder) writeBlob(content []byte) string { + b.t.Helper() + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(b.t, os.WriteFile(filepath.Join(b.blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest +} + +// writeOrphan stores a blob that won't be referenced by any manifest. +// Returns the filename (hex digest) so tests can assert on it. +func (b *layoutBuilder) writeOrphan(content []byte, mtime time.Time) string { + b.t.Helper() + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + path := filepath.Join(b.blobsDir, hexDigest) + require.NoError(b.t, os.WriteFile(path, content, 0o644)) + if !mtime.IsZero() { + require.NoError(b.t, os.Chtimes(path, mtime, mtime)) + } + return hexDigest +} + +// addImage appends an image manifest to the layout. Config and layer +// blobs are written first, then the manifest itself, then an index entry +// is recorded so writeIndex will include it. +func (b *layoutBuilder) addImage(tag string, configContent []byte, layerContents [][]byte) { + b.t.Helper() + + configDigest := b.writeBlob(configContent) + + type desc struct { + MediaType string `json:"mediaType"` + Digest string `json:"digest"` + Size int `json:"size"` + } + layers := make([]desc, len(layerContents)) + for i, content := range layerContents { + digest := b.writeBlob(content) + layers[i] = desc{ + MediaType: "application/vnd.oci.image.layer.v1.tar+gzip", + Digest: digest, + Size: len(content), + } + } + + manifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": desc{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: configDigest, + Size: len(configContent), + }, + "layers": layers, + } + manifestBytes, err := json.Marshal(manifest) + require.NoError(b.t, err) + manifestDigest := b.writeBlob(manifestBytes) + + b.entries = append(b.entries, indexEntry{ + MediaType: "application/vnd.oci.image.manifest.v1+json", + Digest: manifestDigest, + Size: len(manifestBytes), + Annotations: map[string]string{ + "org.opencontainers.image.ref.name": tag, + }, + }) +} + +func (b *layoutBuilder) writeIndex() { + b.t.Helper() + index := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": b.entries, + } + data, err := json.Marshal(index) + require.NoError(b.t, err) + require.NoError(b.t, os.WriteFile(b.paths.OCICacheIndex(), data, 0o644)) + + layout := map[string]string{"imageLayoutVersion": "1.0.0"} + layoutBytes, err := json.Marshal(layout) + require.NoError(b.t, err) + require.NoError(b.t, os.WriteFile(b.paths.OCICacheLayout(), layoutBytes, 0o644)) +} + +func newCollectorForTest(t *testing.T, dataDir string, minBlobAge time.Duration, now time.Time) *Collector { + t.Helper() + c, err := NewCollector(paths.New(dataDir), time.Hour, minBlobAge, nil, nil) + require.NoError(t, err) + c.now = func() time.Time { return now } + return c +} + +func TestCollectNoCacheDirIsNoop(t *testing.T) { + dataDir := t.TempDir() + c := newCollectorForTest(t, dataDir, time.Hour, time.Now()) + + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + assert.Equal(t, 0, stats.ScannedBlobs) + assert.Equal(t, 0, stats.DeletedBlobs) +} + +func TestCollectKeepsLiveBlobs(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"config":"a"}`), [][]byte{[]byte("layer-a-1"), []byte("layer-a-2")}) + b.addImage("img-b", []byte(`{"config":"b"}`), [][]byte{[]byte("layer-b-1")}) + b.writeIndex() + + now := time.Now() + c := newCollectorForTest(t, dataDir, time.Minute, now) + + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // 2 manifests + 2 configs + 3 layers = 7 live blobs, all present. + assert.Equal(t, 7, stats.LiveBlobs) + assert.Equal(t, 7, stats.ScannedBlobs) + assert.Equal(t, 0, stats.DeletedBlobs) + + // All blob files should still exist. + entries, err := os.ReadDir(b.blobsDir) + require.NoError(t, err) + assert.Len(t, entries, 7) +} + +func TestCollectDeletesOrphans(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"config":"a"}`), [][]byte{[]byte("layer-a")}) + b.writeIndex() + + now := time.Now() + // Old orphan: well outside the grace period. + orphan := b.writeOrphan([]byte("orphaned-layer"), now.Add(-2*time.Hour)) + + c := newCollectorForTest(t, dataDir, time.Minute, now) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + assert.Equal(t, 3, stats.LiveBlobs) + assert.Equal(t, 4, stats.ScannedBlobs) + assert.Equal(t, 1, stats.DeletedBlobs) + assert.Equal(t, int64(len("orphaned-layer")), stats.DeletedBytes) + + _, err = os.Stat(filepath.Join(b.blobsDir, orphan)) + assert.True(t, os.IsNotExist(err), "orphan should be deleted") +} + +func TestCollectSkipsRecentBlobs(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"config":"a"}`), [][]byte{[]byte("layer-a")}) + b.writeIndex() + + now := time.Now() + // Orphan written recently — within grace period. + orphan := b.writeOrphan([]byte("still-being-pulled"), now.Add(-30*time.Second)) + + c := newCollectorForTest(t, dataDir, time.Minute, now) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + assert.Equal(t, 1, stats.SkippedRecent) + assert.Equal(t, 0, stats.DeletedBlobs) + + _, err = os.Stat(filepath.Join(b.blobsDir, orphan)) + assert.NoError(t, err, "recent orphan should be preserved") +} + +func TestCollectIgnoresTempAndNonBlobFiles(t *testing.T) { + dataDir := t.TempDir() + b := newLayoutBuilder(t, dataDir) + b.addImage("img-a", []byte(`{"c":1}`), [][]byte{[]byte("layer")}) + b.writeIndex() + + // Simulate an in-progress BlobStore.Put: .tmp file. + tmpName := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.tmp" + require.NoError(t, os.WriteFile(filepath.Join(b.blobsDir, tmpName), []byte("partial"), 0o644)) + + // Also something unexpected with a wrong-length name. + require.NoError(t, os.WriteFile(filepath.Join(b.blobsDir, "not-a-blob"), []byte("x"), 0o644)) + + // Make both files "old" so the grace period doesn't protect them. + past := time.Now().Add(-2 * time.Hour) + require.NoError(t, os.Chtimes(filepath.Join(b.blobsDir, tmpName), past, past)) + require.NoError(t, os.Chtimes(filepath.Join(b.blobsDir, "not-a-blob"), past, past)) + + c := newCollectorForTest(t, dataDir, time.Minute, time.Now()) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // Neither the .tmp file nor the non-hex name should be touched. + assert.Equal(t, 0, stats.DeletedBlobs) + _, err = os.Stat(filepath.Join(b.blobsDir, tmpName)) + assert.NoError(t, err) + _, err = os.Stat(filepath.Join(b.blobsDir, "not-a-blob")) + assert.NoError(t, err) +} + +func TestCollectFollowsManifestIndex(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + + // Build an inner image: config + layer + manifest. + writeBlob := func(content []byte) string { + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(t, os.WriteFile(filepath.Join(blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest + } + + configContent := []byte(`{"inner-config":true}`) + layerContent := []byte("inner-layer") + configDigest := writeBlob(configContent) + layerDigest := writeBlob(layerContent) + + innerManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": configDigest, "size": len(configContent)}, + "layers": []map[string]any{{"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": layerDigest, "size": len(layerContent)}}, + } + innerBytes, err := json.Marshal(innerManifest) + require.NoError(t, err) + innerDigest := writeBlob(innerBytes) + + // Build an outer manifest index that references the inner manifest. + outerIndex := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": []map[string]any{{"mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": innerDigest, "size": len(innerBytes)}}, + } + outerBytes, err := json.Marshal(outerIndex) + require.NoError(t, err) + outerDigest := writeBlob(outerBytes) + + // Cache index.json points at the outer manifest index. + index := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": []map[string]any{{"mediaType": "application/vnd.oci.image.index.v1+json", "digest": outerDigest, "size": len(outerBytes)}}, + } + indexBytes, err := json.Marshal(index) + require.NoError(t, err) + require.NoError(t, os.WriteFile(p.OCICacheIndex(), indexBytes, 0o644)) + + // Drop an unrelated orphan to verify it still gets swept. + orphan := writeBlob([]byte("orphan-bytes")) + // writeBlob returns a sha256: prefix; we need the hex for os.Stat. + orphanHex := orphan[7:] + // Force past the grace period. + past := time.Now().Add(-2 * time.Hour) + require.NoError(t, os.Chtimes(filepath.Join(blobsDir, orphanHex), past, past)) + + c := newCollectorForTest(t, dataDir, time.Minute, time.Now()) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // Live: outer index + inner manifest + config + layer = 4. + assert.Equal(t, 4, stats.LiveBlobs) + assert.Equal(t, 1, stats.DeletedBlobs, "only the orphan should be deleted") + _, err = os.Stat(filepath.Join(blobsDir, orphanHex)) + assert.True(t, os.IsNotExist(err)) +} + +func TestCollectRecursesIntoSubject(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + blobsDir := p.OCICacheBlobDir() + require.NoError(t, os.MkdirAll(blobsDir, 0o755)) + + writeBlob := func(content []byte) string { + sum := sha256.Sum256(content) + hexDigest := hex.EncodeToString(sum[:]) + require.NoError(t, os.WriteFile(filepath.Join(blobsDir, hexDigest), content, 0o644)) + return "sha256:" + hexDigest + } + + // Subject image: config + layer + manifest. + subjectConfig := []byte(`{"subject-config":true}`) + subjectLayer := []byte("subject-layer") + subjectConfigDigest := writeBlob(subjectConfig) + subjectLayerDigest := writeBlob(subjectLayer) + + subjectManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": subjectConfigDigest, "size": len(subjectConfig)}, + "layers": []map[string]any{{"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": subjectLayerDigest, "size": len(subjectLayer)}}, + } + subjectBytes, err := json.Marshal(subjectManifest) + require.NoError(t, err) + subjectDigest := writeBlob(subjectBytes) + + // Referrer manifest: has its own config + layer, points at subject. + referrerConfig := []byte(`{"referrer-config":true}`) + referrerLayer := []byte("referrer-layer") + referrerConfigDigest := writeBlob(referrerConfig) + referrerLayerDigest := writeBlob(referrerLayer) + + referrerManifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": map[string]any{"mediaType": "application/vnd.oci.image.config.v1+json", "digest": referrerConfigDigest, "size": len(referrerConfig)}, + "layers": []map[string]any{{"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": referrerLayerDigest, "size": len(referrerLayer)}}, + "subject": map[string]any{"mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": subjectDigest, "size": len(subjectBytes)}, + } + referrerBytes, err := json.Marshal(referrerManifest) + require.NoError(t, err) + referrerDigest := writeBlob(referrerBytes) + + // Cache index.json references only the referrer; the subject should + // stay live via the subject link. + index := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": []map[string]any{{"mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": referrerDigest, "size": len(referrerBytes)}}, + } + indexBytes, err := json.Marshal(index) + require.NoError(t, err) + require.NoError(t, os.WriteFile(p.OCICacheIndex(), indexBytes, 0o644)) + + c := newCollectorForTest(t, dataDir, time.Minute, time.Now()) + stats, err := c.Collect(context.Background()) + require.NoError(t, err) + + // Live: referrer manifest + referrer config + referrer layer + // + subject manifest + subject config + subject layer = 6. + assert.Equal(t, 6, stats.LiveBlobs) + assert.Equal(t, 0, stats.DeletedBlobs, "subject's config and layers must not be swept") + + // Double-check subject's transitive blobs still exist. + for _, d := range []string{subjectConfigDigest, subjectLayerDigest, subjectDigest} { + _, err := os.Stat(filepath.Join(blobsDir, d[7:])) + assert.NoError(t, err, "subject-reachable blob %s should remain", d) + } +} + +func TestNewCollectorValidatesArgs(t *testing.T) { + dataDir := t.TempDir() + p := paths.New(dataDir) + + _, err := NewCollector(nil, time.Hour, time.Minute, nil, nil) + assert.Error(t, err) + + _, err = NewCollector(p, 0, time.Minute, nil, nil) + assert.Error(t, err) + + _, err = NewCollector(p, time.Hour, -time.Minute, nil, nil) + assert.Error(t, err) + + _, err = NewCollector(p, time.Hour, time.Minute, nil, nil) + assert.NoError(t, err) +} diff --git a/lib/ocicachegc/metrics.go b/lib/ocicachegc/metrics.go new file mode 100644 index 00000000..9ba9843e --- /dev/null +++ b/lib/ocicachegc/metrics.go @@ -0,0 +1,76 @@ +package ocicachegc + +import ( + "context" + "time" + + hypotel "github.com/kernel/hypeman/lib/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Metrics holds the OTel instruments for the OCI cache collector. +type Metrics struct { + sweepsTotal metric.Int64Counter + sweepDuration metric.Float64Histogram + deletedBlobs metric.Int64Counter + deletedBytes metric.Int64Counter +} + +func newMetrics(meter metric.Meter) (*Metrics, error) { + sweepsTotal, err := meter.Int64Counter( + "hypeman_oci_cache_gc_sweeps_total", + metric.WithDescription("Total number of OCI cache GC sweeps"), + ) + if err != nil { + return nil, err + } + + sweepDuration, err := meter.Float64Histogram( + "hypeman_oci_cache_gc_sweep_duration_seconds", + metric.WithDescription("Duration of OCI cache GC sweeps"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(hypotel.CommonDurationHistogramBuckets()...), + ) + if err != nil { + return nil, err + } + + deletedBlobs, err := meter.Int64Counter( + "hypeman_oci_cache_gc_deleted_blobs_total", + metric.WithDescription("Total number of blobs deleted by the OCI cache GC"), + ) + if err != nil { + return nil, err + } + + deletedBytes, err := meter.Int64Counter( + "hypeman_oci_cache_gc_deleted_bytes_total", + metric.WithDescription("Total bytes freed by the OCI cache GC"), + metric.WithUnit("By"), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + sweepsTotal: sweepsTotal, + sweepDuration: sweepDuration, + deletedBlobs: deletedBlobs, + deletedBytes: deletedBytes, + }, nil +} + +// RecordSweep records the outcome of one sweep. +func (m *Metrics) RecordSweep(ctx context.Context, status string, duration time.Duration, stats Stats) { + if m == nil { + return + } + attrs := metric.WithAttributes(attribute.String("status", status)) + m.sweepsTotal.Add(ctx, 1, attrs) + m.sweepDuration.Record(ctx, duration.Seconds(), attrs) + if stats.DeletedBlobs > 0 { + m.deletedBlobs.Add(ctx, int64(stats.DeletedBlobs)) + m.deletedBytes.Add(ctx, stats.DeletedBytes) + } +}