Skip to content

Epic: Pipeline SDK (livepeer-runner) #8

@rickstaa

Description

@rickstaa

Outcome

Developers go from "I have a Python ML model" to a discoverable BYOC capability on Livepeer in under 5 minutes — surfaced in the Developer Dashboard ready for any caller to invoke.

The Pipeline SDK is the authoring surface that makes this possible: write a Python class, get a containerised, BYOC-compatible, schema-described capability.

Spec

Design lives in livepeer-specs / pipeline-sdk.md. Update the spec rather than this issue body when the design moves.

Architecture decisions (monorepo + PEP 420 namespace packages, three distributions livepeer-runner / livepeer-client / livepeer-trickle) are captured in the spec — see the Architecture and the companion Client SDK packaging section.

Roadmap

Each step yields a working SDK strictly more capable than the previous one.

  • C1Pipeline base + serve() + hello-world BYOC E2E
  • C2setup() lifecycle + HuggingFace sentiment example
  • C3 — FastAPI HTTP layer (/health, /predict, /docs, /openapi.json)
  • C4 — Pydantic BaseModel for inputs / outputs via signature introspection
  • C5 — Image upscale example (binary I/O via Pydantic Base64Bytes)
  • C6/health state machine matching go-livepeer's HealthCheck wire format
  • C7 — SSE auto-detection from generator predict() + LLM chat example
  • C8LivePipeline for trickle transport (real-time video) — see breakdown below
  • C9livepeer push CLI + livepeer.yaml manifest
  • C10 — Schema as Docker image label (org.livepeer.pipeline.schema)
  • C11 — Agent-friendly docs (AGENTS.md, expanded Pipeline docstring, examples/runner/_template/)
  • C12 — Migrate to monorepo with PEP 420 namespace packages — see client SDK packaging spec. Coordinated with #9.
  • C13 — Container self-registration to orch /capability/register — env-gated, wired into serve() lifespan, lenient on failure (degrade /health, keep FastAPI serving). Deregister on shutdown. Retry on conn-refused / timeout / 5xx; fail fast on 400 / 404 / 405.

C8 breakdown — LivePipeline

  • Step 1: skeleton (HTTP routes + ABC) — 04cc697
  • Step 2: bytes-through (validate trickle wire) — 831ee44
  • Step 3: frame-loop dispatch + runner.frames namespace — 9688f67
  • Step 5: live_grayscale example + chroma assertion + ffplay viewer — 9ef95d9 series
  • Step 4 — _LiveSession + lifecycle (the only outstanding piece)
    • _LiveSession class encapsulating per-session state
    • Periodic heartbeat on events_url (gateway liveness signal)
    • emit_event(payload) user-facing helper
    • emit_data(payload) helper for data_url when enable_data_output=true
    • on_stream_stop lifecycle hook
    • Drain runner-side state on stop — measured: no leak (RSS plateaus ~170 MB after 25 sessions). State drain unnecessary.
    • Unified error surface — three error sources (subscribe / publish / user process_video raise) log distinctly today with no consistent state propagation. Add _record_error(source, exc, severity): structured ErrorEvent schema (severity ∈ WARN / ERROR / FATAL, source, message, timestamp, consecutive), per-source budget escalating WARN→ERROR after N consecutive failures, flip pipeline._state = ERROR on terminal failures, push events via events_url. Quick first step (~5 LOC): flip _state on TricklePublisherTerminalError. Full schema after live_transcribe surfaces real failure modes.
    • Verify the live-viewer demo can be brought back once heartbeat + state-drain land
    • Pydantic param schema for LivePipeline — today on_stream_start(params) and on_params_update(params) receive dict[str, Any]; users parse / validate manually. The batch Pipeline already supports typed params via signature introspection (C4). Extend the same to LivePipeline: let users type on_stream_start(params: MyParams) and have the runner introspect, validate at the HTTP boundary, and emit a meaningful /openapi.json schema for /stream/start's caller-supplied params (today the schema only describes the orchestrator's protocol fields). Required for the developer dashboard / client SDK to render param controls for live capabilities.
    • Enrich heartbeat payload with PipelineStatus (ai-runner pattern) — today's heartbeat is minimal {"type": "heartbeat", "timestamp"} keep-alive only. ai-runner's report_status_loop uses the same trickle push as both keep-alive AND status report (state, FPS, last_error, restart_count, last_params). One mechanism, dual purpose. Add FPS counters to MediaOutput / MediaPublish and swap heartbeat payload for the rich shape. Keeps /health minimal (k8s contract).

Companion issues — runner / examples polish

Targeted issues spun off from this epic. Two blocking, one cosmetic for full live_transcribe fidelity (5/5 transcripts delivered to SSE).

Blocking (data-loss bugs)

  • #12 — SDK-side. _resolve_next_seq returns -1 on probe failure; combined with the publisher's +1 increment this duplicate-POSTs to seg 0 on every trickle channel, dropping the first record. Observable on live_transcribe as missing transcript[1]. Fix: one line (return 0 instead of -1) + demote the warning to debug.
  • Upstream: livepeer/go-livepeer#3924 — gateway's data subscriber tears down too early on /stream/stop, dropping the final emit_data from on_stream_stop. Observable on live_transcribe as missing transcript[4] (orch log shows client disconnected on the final POST). Fix: bounded drain loop in byoc/trickle.go:startDataSubscribe.

Together these account for both observed transcript losses (runner emits 5 → SSE delivers 3 today). Either one fixed independently → 4/5 delivered.

Cosmetic (log noise, no functional impact)

  • Upstream: livepeer/go-livepeer#3922 — spurious ERROR-level logs at every clean /stream/stop (5 fix sites: ffmpeg subprocess output, trickle preconnect, rtmp2segment probe, orch trickle handler). Operations work; logs just look scary. Pure log-level demotion (if ctx.Err() != nil { debug }).

Examples follow-ups

  • Live viewer tool for live_grayscale — bring back the webcam-pushed live viewer (deleted because the current PyAV decode→user→encode loop can't sustain real-time webcam load: ring buffer drains, mediamtx kicks the egress publisher). Today the example uses synthetic testsrc + capture-to-file + replay. Bring back the webcam viewer once C8 Step 4 lands.

  • Worked example covering full LivePipeline lifecyclelive_grayscale exercises the SDK plumbing but only overrides process_video. live_transcribe (Whisper STT) and live_depth (DepthAnything V2) now exercise more of the lifecycle (setup, on_stream_start, process_audio, emit_data, emit_event, on_stream_stop). Still TODO: a test.sh that subscribes to data_url from the caller side and asserts structured records arrive — needs start_byoc_job from #6.

  • Exercise on_params_update in an example — the only LivePipeline hook with no live demo. Fires on mid-stream parameter changes (caller pushes new params to /stream/params without restart). Smallest viable demo: extend live_detect to accept {"detection_threshold": 0.5} mid-stream and update the YOLO confidence cutoff in-place, with a test.sh step that pushes a new threshold mid-run and asserts the emitted records reflect it. Alternatively document the hook in the SDK README and defer the example until a real use case demands it.

  • Migrate /stream/params and /stream/stop to control_url subscribe — per the spec, the long-term shape is one HTTP endpoint (/stream/start) plus everything else over the trickle plane. Today BYOC already publishes params + keepalives to control_url (byoc/trickle.go:539) but the orchestrator HTTP-forwards each message via /stream/params (byoc/stream_orchestrator.go:421). Migration must be coordinated upstream: orchestrator drops the HTTP-forwarding step + runner adds trickle subscribe in lockstep. Blocked on: trickle control-channel size / changeover bug (see "Future protocol work" below).

  • Production-grade live transcribe examplelive_transcribe is intentionally the minimal lifecycle demo with explicit 3 s chunking + vad_filter=True; first-transcript latency is ~3 s and word boundaries can split mid-window. For users who actually want production live transcription, add a separate example using whisper_streaming's OnlineASRProcessor (LocalAgreement-2 → ~1 s latency, cleaner boundaries) — same LivePipeline lifecycle, different process_audio body. Same folder shape as live_transcribe, marketed as "production transcribe". Possibly also covers VAD-driven segmentation and emit_data partial-vs-final transcript distinction.

  • Recover from orchestrator capability drop — gateway sometimes drops the orchestrator from its capability pool after stream failures (Retrying stream with a different orchestrator err=unknown swap reasonno orchestrators available, ending stream). Once dropped, every subsequent /process/stream/start either 400s or kills mid-flight, until register_capability is re-run manually. Investigate (a) re-register watchdog, (b) healthcheck-driven re-register hook, or (c) push a fix upstream in go-livepeer's gateway swap-orch logic.

  • Switch examples to -network offchain once go-livepeer #3906 lands. Current compose files run with -network arbitrum-one-mainnet -ethUrl https://arb1.arbitrum.io/rpc -ethPassword secret-password and rely on pricePerUnit=0 so no real on-chain payment occurs — but the gateway still polls Arbitrum for orchestrator stake lookups (db_discovery.go), and the public RPC throttles with 429 Too Many Requests lines all over the gateway log. Tracked upstream as livepeer/go-livepeer#3905. When the PR merges, drop -network, -ethUrl, -ethPassword from each example's docker-compose.yml (5 files) and run with bare -network offchain. Eliminates the 429 noise entirely.

  • Assert grayscale, not just bytes-receivedlive_grayscale/test.sh now extracts U / V plane averages via ffprobe signalstats and asserts ≈128 (chroma-zero = grayscale).

  • End state: retire examples/runner/, replace with unit tests, move worked examples to a separate repo — once the SDK stabilizes, delete the in-tree examples/runner/ folder. The lifecycle/coverage value those examples currently provide (setup, on_stream_start, process_video / process_audio, emit_data, emit_event, on_stream_stop, error paths) gets reified as proper unit tests inside livepeer-python-gateway. The worked examples themselves (live_grayscale, live_transcribe, live_depth, replicate_flux, …) move to a standalone livepeer/pipeline-examples repo that depends on the published livepeer.runner package as a normal pip dependency. Aligns examples with how external developers actually consume the SDK, decouples example evolution from SDK release cadence, and keeps this repo focused on the runner itself.

Performance & future improvements

Captured while building examples that surfaced specific optimization
opportunities. Not roadmap-blocking; revisit when concrete use cases
demand them.

  • Parallel process_video / process_audio execution — the frame
    loop today dispatches both hooks sequentially in a single async for-loop,
    so heavy inference in one stalls the other. Refactor into two queues fed
    from one decoder, drained in separate tasks. Not needed for
    live_detect or
    live_transcribe today; triggered
    by a real pipeline that needs it (e.g. a Moondream2-class video model
    running alongside whisper). Contract change ("frames may interleave across
    hooks") so deserves explicit design before flipping.

  • live_describe — VLM-driven video understanding (GPU) — natural-
    language scene description via Moondream2
    (~1.6 GB, ~300 ms / inference on GPU). Same LivePipeline lifecycle as
    live_detect, swaps YOLO for a vision-
    language model that emits descriptions instead of bounding boxes. Mirrors
    live_depth's GPU pattern. Compelling
    for "real video understanding" positioning; not strictly needed since
    live_detect already demonstrates multi-modal LivePipeline.

  • Concurrent inference patterns documented in SDK README — three-tier
    pattern users adopt as inference cost grows: (1) asyncio.to_thread for
    offloading individual inference calls so the main loop stays responsive,
    (2) asyncio.create_task for fire-and-forget windowed work
    (transcribe → emit when done, decouples inference latency from frame
    cadence), (3) separate process / IPC for GPU-isolated heavy models. Should
    land alongside the parallel process_* refactor above so users understand
    which knob to reach for.

Runner SDK code-quality improvements

Findings from a focused code review of src/livepeer_gateway/runner/. The
Bugs items are real correctness issues worth fixing before C13 (auto-
registration) lands so we don't bake them into a lifecycle path.

Bugs

  • Async-generator pipelines silently broken (serve.py:225) — inspect.isgeneratorfunction() returns False for async def generators. A user writing async def run(self, ...): yield ... falls into the sync-call branch; StreamingResponse can't iterate the resulting async-gen object. Fix: detect inspect.isasyncgenfunction() and use an async SSE formatter, or reject async generators with a clear error.
  • Sync run() blocks the event loop (serve.py:84) — pipeline.run(...) is invoked directly inside an async FastAPI handler. Any CPU/IO-bound run() (sentiment, every HF pipeline) stalls /health and concurrent requests. Fix: await asyncio.to_thread(pipeline.run, ...) when run is sync; keep the direct call only for async def or generators.
  • /stream/start concurrent-session race (serve.py:131) — two concurrent calls both pass the _session is None check, both construct _LiveSession, second overwrites first → orphaned tasks + publishers. Wrap session creation with an asyncio.Lock on the pipeline.
  • result.frame AttributeError on raw PyAV return (live_pipeline.py:353) — a user who returns a raw av.VideoFrame (natural after PyAV work) hits AttributeError because the SDK expects the wrapper. Either accept both (getattr(result, "frame", result)) or document+enforce the wrapper.
  • on_params_update semantics undocumented — delta or full replacement? live_tint reads as delta; serve.py:209 does full replace. Pick one and document. Compounding: session.params is mutated BEFORE the hook runs, so a raising hook leaves partial state. Roll back on exception.

Code quality

  • Private cross-module imports (serve.py:13-22) — _LiveSession, _run_frame_loop, _has_user_processing imported across module boundaries with underscore prefix. They're not private anymore; either drop the underscore or move the /stream/* handler factory into live_pipeline.py.
  • Duplicate introspection (live_pipeline.py:288-299) — _emit_flags and _has_user_processing recompute the same process_* hook overrides. Collapse into one function returning (emit_video, emit_audio, has_user_processing).
  • **kwargs silently swallowed (serve.py:38-56) — a user writing def run(self, **kwargs) (allowed by the ABC) gets an empty input model and silently loses request body. Either reject VAR_KEYWORD/VAR_POSITIONAL with a clear error, or treat **kwargs as extra="allow" on the generated model.
  • OpenAPI schema misses non-BaseModel returns (serve.py:230-231) — list[Foo] or Foo | None is silently dropped from the schema. Widen detection via pydantic.TypeAdapter, or document the limitation.
  • /health reads private _state across two classes (serve.py:105) — pipeline._state is a private attribute on Pipeline and LivePipeline (two unrelated classes that happen to share the name). Lift into a shared base or Protocol, or expose a public state property.

Naming / readability (bundle with C8 perf refactor)

  • Rename MediaOutput/MediaPublishmedia_in/media_out — current names describe verbs (output, publish) but trickle direction is the opposite, which is a foot-gun every time someone reads the code. Cross-cutting refactor; bundle with the parallel process_video/process_audio work in Performance & future improvements since both touch the frame loop.
  • Unify /stream/* response shapes{"status": "started", "gateway_request_id": ...} vs {"status": "ok"} vs {"status": "stopped"} across handlers. Pick one Pydantic response model.

Framework adapters (deferred — build on demand)

Migration paths for users from existing ML frameworks. Each ships as its own pip package with its own foreign dep, isolated from core SDK. Build only when a real migration ask shows up.

  • livepeer-runner-cog — wraps cog.BasePredictor
  • livepeer-runner-fal — wraps fal.App
  • livepeer-runner-modal — wraps Modal @app.function
  • livepeer-runner-bentoml — wraps @bentoml.service
  • livepeer-runner-confyscript

Future protocol work (cross-team, gated on C9 + upstream go-livepeer)

  • Fix trickle control-channel size / segment-changeover bug (upstream)control_url params updates fail silently or get truncated when payload is more than small JSON (~1 MB practical ceiling observed). Hunch is segment-changeover behavior during large writes — possibly FirstByteTimeout, pipe buffering on segment boundaries, or chunk-write semantics across the rollover. Workaround in byoc/stream_gateway.go:1007-1009 switched stop / params to HTTP POST after the bug bit on base64-binary payloads. Blocking dependency for the "migrate to control_url subscribe" follow-up. Upstream go-livepeer change.

  • Capability identity via OCI digest — replace free-form capability names with content-hashed references like byoc/<repo>@sha256:<digest>. Aligns BYOC with Replicate's reproducibility model. SDK side: livepeer push captures the digest at publish time and bakes it into the manifest. Upstream side: orchestrator registration + gateway routing + OrchestratorInfo carry the digest.

  • Cosign / Sigstore signing of capability digests — optional layer on top of digest pinning. Publisher signs the digest, gateway verifies signature against publisher's key.

  • Name:version aliases over digest-pinned wire — Replicate-style mutable names (byoc/text-reverser:v2) that resolve to a digest at lookup time. Wire protocol always pins the digest; aliases are a UX layer.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Epic.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions