Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/cmd/api/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ type ApiService struct {
monitorMu sync.Mutex
lifecycleCtx context.Context
lifecycleCancel context.CancelFunc

// Reader for durable S2 telemetry storage. Nil when S2 is not configured.
telemetryReader *events.S2Reader
}

// s2Enabled reports whether durable S2 telemetry storage is configured.
func (s *ApiService) s2Enabled() bool {
return s.telemetryReader != nil
}

var _ oapi.StrictServerInterface = (*ApiService)(nil)
Expand All @@ -105,6 +113,7 @@ func New(
telemetrySession *telemetry.TelemetrySession,
eventStream *events.EventStream,
displayNum int,
telemetryReader *events.S2Reader,
) (*ApiService, error) {
switch {
case recordManager == nil:
Expand Down Expand Up @@ -140,6 +149,7 @@ func New(
cdpMonitor: mon,
lifecycleCtx: ctx,
lifecycleCancel: cancel,
telemetryReader: telemetryReader,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func newTelemetrySession(t *testing.T) (*telemetry.TelemetrySession, *events.Eve
func newSvc(t *testing.T, mgr recorder.RecordManager) (*ApiService, error) {
t.Helper()
ts, es := newTelemetrySession(t)
return New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0)
return New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0, nil)
}

func TestApiService_PatchChromiumFlags(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/display_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact
func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService {
t.Helper()
ts, es := newTelemetrySession(t)
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0)
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0, nil)
require.NoError(t, err)
return svc
}
Expand Down
156 changes: 155 additions & 1 deletion server/cmd/api/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/kernel/kernel-images/server/lib/events"
"github.com/kernel/kernel-images/server/lib/logger"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
)

Expand All @@ -35,7 +36,7 @@ func (s *ApiService) PublishTelemetryEvent(_ context.Context, req oapi.PublishTe
if cat, ok := events.CategoryForType(body.Type); ok {
ev.Category = cat
} else if body.Category != nil {
cat := oapi.TelemetryEventCategory(*body.Category)
cat := *body.Category
if !cat.Valid() {
return oapi.PublishTelemetryEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "invalid category"}}, nil
}
Expand Down Expand Up @@ -136,6 +137,159 @@ func (s *ApiService) StreamTelemetryEvents(ctx context.Context, req oapi.StreamT
return oapi.StreamTelemetryEvents200TexteventStreamResponse{Body: pr, Headers: headers}, nil
}

const (
// defaultSince is the window start used when neither since nor offset is
// given: a duration meaning "this long ago", matching the public API default.
defaultSince = "5m"
// defaultPageSize / maxPageSize bound how many records one page reads.
defaultPageSize = 20
maxPageSize = 100
)

// ReadTelemetryEvents handles GET /telemetry/events.
// Reads one page of archived telemetry envelopes for this browser from durable
// S2 storage in ascending sequence order, applying the category filter. The
// X-Has-More / X-Next-Offset response headers carry the pagination cursor.
// Returns an empty list when S2 storage is not configured.
func (s *ApiService) ReadTelemetryEvents(ctx context.Context, req oapi.ReadTelemetryEventsRequestObject) (oapi.ReadTelemetryEventsResponseObject, error) {
log := logger.FromContext(ctx)

if !s.s2Enabled() {
return readTelemetryEventsOKResponse{}, nil
}

opts, err := buildReadOptions(req.Params)
if err != nil {
return oapi.ReadTelemetryEvents400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: err.Error()}}, nil
}
result, err := s.telemetryReader.Read(ctx, opts, log)
if err != nil {
log.Error("failed to read telemetry events from S2", "err", err)
return oapi.ReadTelemetryEvents500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to read telemetry events"}}, nil
}

// has_more / next cursor track the raw stream position, independent of the
// category filter, so a filtered page may come back empty while more remain.
envs := filterByCategory(result.Envelopes, req.Params.Category)

return readTelemetryEventsOKResponse{envs: envs, nextSeqNum: result.NextSeqNum, hasMore: result.HasMore}, nil
}

// buildReadOptions maps query params to a bounded, paginated read. offset is the
// opaque cursor and takes precedence over since as the start (until still bounds
// the page); since/until accept an RFC-3339 timestamp or a duration like "5m".
func buildReadOptions(p oapi.ReadTelemetryEventsParams) (events.ReadOptions, error) {
var opts events.ReadOptions

switch {
case p.Offset != nil:
// Offset is the cursor; no request validator is mounted, so clamp a
// negative value to 0 rather than wrap it into a huge uint64.
seq := uint64(max(*p.Offset, 0))
opts.SeqNum = &seq
case p.Since != nil:
ms, err := parseTimeParam(*p.Since)
if err != nil {
return opts, fmt.Errorf("since: %w", err)
}
opts.Timestamp = &ms
case p.Until != nil:
// until-only: read from the start of the stream (seqnum 0, clamped to the
// oldest retained record) up to until, rather than anchoring the start at
// defaultSince ago which would silently empty a far-past window.
seq := uint64(0)
opts.SeqNum = &seq
default:
// No bounds given at all: default the start to defaultSince ago.
ms, err := parseTimeParam(defaultSince)
if err != nil {
return opts, fmt.Errorf("since: %w", err)
}
opts.Timestamp = &ms
}

if p.Until != nil {
ms, err := parseTimeParam(*p.Until)
if err != nil {
return opts, fmt.Errorf("until: %w", err)
}
opts.Until = &ms
}

count := uint64(pageSize(p.Limit))
opts.Count = &count
return opts, nil
}

// parseTimeParam parses an RFC-3339 timestamp or a non-negative duration like
// "5m" (interpreted as that long before now) into unix milliseconds.
func parseTimeParam(s string) (uint64, error) {
if d, err := time.ParseDuration(s); err == nil {
if d < 0 {
return 0, fmt.Errorf("duration must not be negative: %q", s)
}
return uint64(time.Now().Add(-d).UnixMilli()), nil
}
if t, err := time.Parse(time.RFC3339, s); err == nil {
return uint64(t.UnixMilli()), nil
}
return 0, fmt.Errorf("invalid time value %q: want an RFC-3339 timestamp or a duration like 5m", s)
}

// pageSize clamps a requested limit into [1, maxPageSize], defaulting when unset.
func pageSize(limit *int) int {
switch {
case limit == nil:
return defaultPageSize
case *limit < 1:
return 1
case *limit > maxPageSize:
return maxPageSize
default:
return *limit
}
}

func filterByCategory(envs []events.Envelope, cats *[]oapi.TelemetryEventCategory) []events.Envelope {
if cats == nil || len(*cats) == 0 {
return envs
}
want := make(map[oapi.TelemetryEventCategory]struct{}, len(*cats))
for _, c := range *cats {
want[c] = struct{}{}
}
out := make([]events.Envelope, 0, len(envs))
for _, e := range envs {
if _, ok := want[e.Event.Category]; ok {
out = append(out, e)
}
}
return out
}

// readTelemetryEventsOKResponse serializes a page of events.Envelope directly,
// matching the SSE stream and publish endpoints. The pagination cursor rides in
// the X-Has-More / X-Next-Offset headers (X-Next-Offset only when there is more).
type readTelemetryEventsOKResponse struct {
envs []events.Envelope
nextSeqNum uint64
hasMore bool
}

func (r readTelemetryEventsOKResponse) VisitReadTelemetryEventsResponse(w http.ResponseWriter) error {
w.Header().Set("X-Has-More", strconv.FormatBool(r.hasMore))
if r.hasMore {
w.Header().Set("X-Next-Offset", strconv.FormatUint(r.nextSeqNum, 10))
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
envs := r.envs
if envs == nil {
envs = []events.Envelope{}
}
return json.NewEncoder(w).Encode(envs)
}

// publishTelemetryEventOKResponse serializes events.Envelope directly so the response
// is identical in shape to the SSE stream frames.
type publishTelemetryEventOKResponse struct{ env events.Envelope }
Expand Down
120 changes: 115 additions & 5 deletions server/cmd/api/api/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bufio"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -56,7 +58,7 @@ func TestEventLifecycle(t *testing.T) {
}()

// Publish a custom event. Unknown types must carry an explicit category.
sys := oapi.PublishEventRequestCategorySystem
sys := oapi.TelemetryEventCategorySystem
resp, err := svc.PublishTelemetryEvent(ctx, oapi.PublishTelemetryEventRequestObject{
Body: &oapi.PublishEventRequest{Type: "test.event", Category: &sys},
})
Expand Down Expand Up @@ -88,7 +90,7 @@ func TestPublishDroppedWhenTelemetryInactive(t *testing.T) {
ctx := context.Background()
svc := newTestService(t, newMockRecordManager())

sys := oapi.PublishEventRequestCategorySystem
sys := oapi.TelemetryEventCategorySystem
resp, err := svc.PublishTelemetryEvent(ctx, oapi.PublishTelemetryEventRequestObject{
Body: &oapi.PublishEventRequest{Type: "test.event", Category: &sys},
})
Expand Down Expand Up @@ -119,7 +121,7 @@ func TestPublishKnownTypeCategoryIsServerAuthoritative(t *testing.T) {

// api_call is a known type that maps to the control category. A caller
// supplying a different category must be overridden by the server.
console := oapi.PublishEventRequestCategoryConsole
console := oapi.TelemetryEventCategoryConsole
resp, err := svc.PublishTelemetryEvent(ctx, oapi.PublishTelemetryEventRequestObject{
Body: &oapi.PublishEventRequest{Type: "api_call", Category: &console},
})
Expand Down Expand Up @@ -149,7 +151,7 @@ func TestPublishDroppedWhenCategoryDisabled(t *testing.T) {
})
require.NoError(t, err)

page := oapi.PublishEventRequestCategoryPage
page := oapi.TelemetryEventCategoryPage
resp, err := svc.PublishTelemetryEvent(ctx, oapi.PublishTelemetryEventRequestObject{
Body: &oapi.PublishEventRequest{Type: "test.page", Category: &page},
})
Expand All @@ -161,7 +163,7 @@ func TestPublishDroppedWhenCategoryDisabled(t *testing.T) {
// telemetry session. Seqs run 1..n on a fresh stream.
func publishTestEvents(ctx context.Context, t *testing.T, svc *ApiService, n int) {
t.Helper()
sys := oapi.PublishEventRequestCategorySystem
sys := oapi.TelemetryEventCategorySystem
for i := 0; i < n; i++ {
resp, err := svc.PublishTelemetryEvent(ctx, oapi.PublishTelemetryEventRequestObject{
Body: &oapi.PublishEventRequest{Type: "test.event", Category: &sys},
Expand Down Expand Up @@ -296,3 +298,111 @@ func TestStreamResumeAfterLastEventIDUnchanged(t *testing.T) {
id := streamFirstID(t, svc, oapi.StreamTelemetryEventsParams{LastEventID: ptrOf("5")})
assert.Equal(t, uint64(6), id, "Last-Event-ID without replay must behave as before and resume after seq 5")
}

func TestReadTelemetryEventsS2Disabled(t *testing.T) {
t.Parallel()
svc := newTestService(t, newMockRecordManager()) // s2 creds empty -> disabled

resp, err := svc.ReadTelemetryEvents(context.Background(), oapi.ReadTelemetryEventsRequestObject{})
require.NoError(t, err)
ok, isOK := resp.(readTelemetryEventsOKResponse)
require.True(t, isOK, "expected 200 response when S2 is disabled")

rec := httptest.NewRecorder()
require.NoError(t, ok.VisitReadTelemetryEventsResponse(rec))
assert.Equal(t, http.StatusOK, rec.Code)
// Empty result must serialize as [] not null, or the Python SDK chokes.
assert.JSONEq(t, `[]`, rec.Body.String())
assert.Equal(t, "false", rec.Header().Get("X-Has-More"))
assert.Empty(t, rec.Header().Get("X-Next-Offset"), "no cursor when there is no more")
}

func TestFilterByCategory(t *testing.T) {
t.Parallel()
mk := func(c oapi.TelemetryEventCategory) events.Envelope {
return events.Envelope{Event: events.Event{Category: c}}
}
envs := []events.Envelope{mk(events.Console), mk(events.Network), mk(events.Console)}

assert.Len(t, filterByCategory(envs, nil), 3, "nil filter keeps everything")

cats := []oapi.TelemetryEventCategory{events.Console}
assert.Len(t, filterByCategory(envs, &cats), 2)
}

func TestPageSize(t *testing.T) {
t.Parallel()
assert.Equal(t, defaultPageSize, pageSize(nil), "unset defaults")

ptr := func(n int) *int { return &n }
assert.Equal(t, 50, pageSize(ptr(50)))
assert.Equal(t, 1, pageSize(ptr(0)), "clamped up to 1")
assert.Equal(t, 1, pageSize(ptr(-5)), "clamped up to 1")
assert.Equal(t, maxPageSize, pageSize(ptr(5000)), "clamped down to max")
}

func TestBuildReadOptions(t *testing.T) {
t.Parallel()
strp := func(s string) *string { return &s }

// No params: start defaults to ~defaultSince ago, no end bound, default page.
opts, err := buildReadOptions(oapi.ReadTelemetryEventsParams{})
require.NoError(t, err)
require.NotNil(t, opts.Timestamp)
assert.Nil(t, opts.SeqNum)
assert.Nil(t, opts.Until)
require.NotNil(t, opts.Count)
assert.Equal(t, uint64(defaultPageSize), *opts.Count)

// since/until accept RFC-3339 timestamps (→ unix ms); limit bounds the page.
limit := 25
opts, err = buildReadOptions(oapi.ReadTelemetryEventsParams{
Since: strp("2020-01-01T00:00:00Z"),
Until: strp("2020-01-02T00:00:00Z"),
Limit: &limit,
})
require.NoError(t, err)
require.NotNil(t, opts.Timestamp)
assert.Equal(t, uint64(1577836800000), *opts.Timestamp)
require.NotNil(t, opts.Until)
assert.Equal(t, uint64(1577923200000), *opts.Until)
require.NotNil(t, opts.Count)
assert.Equal(t, uint64(25), *opts.Count)

// since also accepts a duration meaning "this long ago".
opts, err = buildReadOptions(oapi.ReadTelemetryEventsParams{Since: strp("5m")})
require.NoError(t, err)
require.NotNil(t, opts.Timestamp)
assert.InDelta(t, time.Now().Add(-5*time.Minute).UnixMilli(), int64(*opts.Timestamp), 5000)

// offset is the cursor and takes precedence over since (SeqNum start, no Timestamp).
offset := int64(4213)
opts, err = buildReadOptions(oapi.ReadTelemetryEventsParams{Offset: &offset, Since: strp("5m")})
require.NoError(t, err)
require.NotNil(t, opts.SeqNum)
assert.Equal(t, uint64(4213), *opts.SeqNum)
assert.Nil(t, opts.Timestamp, "since is ignored when offset is set")

// until-only: start from seqnum 0 (the beginning), not anchored at now-5m,
// so a far-past until doesn't silently yield an empty page.
opts, err = buildReadOptions(oapi.ReadTelemetryEventsParams{Until: strp("2020-01-01T00:00:00Z")})
require.NoError(t, err)
require.NotNil(t, opts.SeqNum)
assert.Equal(t, uint64(0), *opts.SeqNum, "until-only reads from the start of the stream")
assert.Nil(t, opts.Timestamp)
require.NotNil(t, opts.Until)
assert.Equal(t, uint64(1577836800000), *opts.Until)

// Negative offset clamps to 0 rather than wrapping into a huge uint64.
neg := int64(-1)
opts, err = buildReadOptions(oapi.ReadTelemetryEventsParams{Offset: &neg})
require.NoError(t, err)
require.NotNil(t, opts.SeqNum)
assert.Equal(t, uint64(0), *opts.SeqNum)

// Unparseable / negative time values are rejected (→ 400 at the handler).
_, err = buildReadOptions(oapi.ReadTelemetryEventsParams{Since: strp("nonsense")})
assert.Error(t, err)
_, err = buildReadOptions(oapi.ReadTelemetryEventsParams{Until: strp("-5m")})
assert.Error(t, err, "negative duration rejected")
}
Loading
Loading