feat: Python UDFs: per-session inlining toggle and strict refusal setting#1546
feat: Python UDFs: per-session inlining toggle and strict refusal setting#1546timsaucer wants to merge 14 commits into
Conversation
…fusal
Adds a per-session toggle that turns inline Python UDF encoding on or
off, plus the supporting plumbing to make it usable through
pickle.dumps.
Codec layer:
* PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining
bool (default true) and a with_python_udf_inlining(enabled) builder.
Each try_encode_udf{,af,wf} short-circuits to inner when the toggle
is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic
on a strict codec returns a clean Execution error instead of
invoking cloudpickle.loads. The refusal message names the UDF and
the wire family so an operator can see at a glance whether to
re-encode the bytes or register the UDF on the receiver.
Session layer:
* PySessionContext::with_python_udf_inlining(enabled) returns a new
session whose stacked logical + physical codecs both carry the
toggle. The Arc<SessionState> is cloned (cheap), only the codec
pair is rebuilt, so registrations and config stay attached.
* SessionContext.with_python_udf_inlining(*, enabled) is the Python
wrapper. enabled is keyword-only because positional booleans at
the call site read as opaque.
Sender-side context:
* datafusion.ipc gains set_sender_ctx / get_sender_ctx /
clear_sender_ctx thread-locals. Expr.__reduce__ now consults
get_sender_ctx() to pick the codec for outbound pickles, which is
the only path through which a strict session affects pickle.dumps
(the protocol calls __reduce__ with no arguments). Without a
sender context the default codec is used.
Tests:
* test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers
both directions of the toggle plus the explicit-ctx fast path),
TestWorkerCtxLifecycle (set/clear/threading), and
TestSenderCtxLifecycle.
* New test_pickle_multiprocessing.py + helpers exercise the full
driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx
installed in the worker initializer.
* CI workflow gets a 30-minute timeout-minutes backstop so a hung
pickle worker can't block the matrix indefinitely.
User-guide docs and the runnable examples land in PR4 of this series.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
d680e12 to
14178db
Compare
Rewrite with_python_udf_inlining docstring for readability and remove references to /user-guide/io/distributing_work, which does not exist yet. Keep security warning inline as a .. warning:: Security block, matching the existing pattern in Expr.to_bytes / from_bytes / __reduce__. The central doc will land in a follow-on PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per CLAUDE.md, every Python function needs a docstring example. Adds examples to with_python_udf_inlining, set_sender_ctx, clear_sender_ctx, and get_sender_ctx. Also clarifies that with_python_udf_inlining returns a new SessionContext and leaves the original unchanged, matching the with_logical_extension_codec pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* codec: strict refusal routes through `read_framed_payload` so malformed inline bytes surface their own diagnostic; the "inlining is disabled" message now fires only when the payload would have decoded. * codec: add summary line above `PythonPhysicalCodec::with_python_udf_inlining` cross-link for rustdoc rendering. * expr: hoist `get_sender_ctx` import to module top; note that `__reduce__` also drives `copy.copy` / `copy.deepcopy`. * context: accept `with_python_udf_inlining` positionally or as kwarg (drop `*,`). * tests: replace size-ratio heuristic with semantic check for the `DFPYUDF` family prefix; switch single-batch closure test to `pool.apply`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `SessionContext.with_python_udf_inlining` now keyword-only (`*, enabled`) to match the documented call style and the existing doctests/tests. - `refuse_if_inline` and the three `try_decode_python_*` decoders short- circuit on a `starts_with(family)` check before `Python::attach`, so plans whose UDFs are not Python-defined no longer pay a GIL acquisition per decode call. Semantics preserved: `strip_wire_header` already returns `Ok(None)` when the prefix does not match. - `datafusion.ipc` module docstring wraps the `set_sender_ctx` example in `try`/`finally` and notes that the thread-local holds a strong reference to the installed `SessionContext` until cleared. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Multiprocessing forkserver/spawn hang was diagnosed and fixed: workers could not import `tests._pickle_multiprocessing_helpers` because `pytest --import-mode=importlib` does not add the test parent dir to `sys.path`. The fix (appending the parent dir to `sys.path` so it is inherited by mp workers without shadowing the installed `datafusion` wheel) is retained. This commit drops the diagnostic scaffolding that was added to identify the hang point: - `_diag` + per-import / per-task log writes to /tmp - `snapshot_processes` and the `threading.Timer` that captured worker state mid-hang - `diag_init` Pool initializer - "Dump multiprocessing diagnostic log" CI step Pre-existing infrastructure is kept: per-test `@pytest.mark.timeout(120)` (backed by `pytest-timeout` dev dep) and the job-level `timeout-minutes: 30` backstop on the test matrix. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a per-session switch to control whether Python UDF definitions are inlined into serialized expressions, enabling “strict” decode behavior that refuses inline (cloudpickled) payloads, and introduces a driver-side sender context so pickle.dumps(Expr) can honor that session configuration.
Changes:
- Introduce
SessionContext.with_python_udf_inlining(enabled=...)and plumb the toggle through the Rust logical/physical codecs to gate inline encode/decode (and refuse inline payloads when disabled). - Add
datafusion.ipcsender-context APIs (set_sender_ctx/get_sender_ctx/clear_sender_ctx) and wireExpr.__reduce__to use the sender context for pickling. - Add unit tests (including multiprocessing coverage) plus CI/test harness guardrails (
pytest-timeout, workflow timeout).
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
crates/core/src/codec.rs |
Adds the inlining toggle to Python codecs and strict refusal logic for inline UDF payloads. |
crates/core/src/context.rs |
Exposes a Rust PySessionContext.with_python_udf_inlining constructor-style method. |
python/datafusion/context.py |
Adds public Python API SessionContext.with_python_udf_inlining. |
python/datafusion/expr.py |
Makes pickling honor a driver-side sender context via get_sender_ctx() and updates serialization docs. |
python/datafusion/ipc.py |
Adds sender-context thread-local APIs and expands driver/worker distribution docs. |
python/tests/test_pickle_expr.py |
Adds coverage for strict inlining behavior and sender/worker context lifecycle semantics. |
python/tests/test_pickle_multiprocessing.py |
Adds cross-process pickle tests across multiprocessing start methods with timeouts. |
python/tests/_pickle_multiprocessing_helpers.py |
Provides importable worker helpers for multiprocessing tests (not pytest-collected). |
.github/workflows/test.yml |
Adds a job-level timeout to prevent hung multiprocessing runs from stalling CI. |
pyproject.toml |
Adds pytest-timeout to dev dependencies. |
uv.lock |
Locks pytest-timeout and updates the resolved dev dependency set. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address PR review feedback: - codec.rs: rewrite strict-refusal error to present the two real remediations (sender re-encode by-name + receiver register; or receiver enables inlining, accepting cloudpickle risk) instead of bundling registration with both-side inlining. - expr.py: qualify to_bytes docstring so Python UDF self-contained behavior is conditional on with_python_udf_inlining being enabled. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| // codec time, not a planner-stage failure. Downstream error | ||
| // classification keys off the variant — surfacing this as a planner | ||
| // error would mis-route it into "fix your SQL" buckets. | ||
| datafusion::error::DataFusionError::Execution(format!( |
There was a problem hiding this comment.
It would be nice if there was a page on this so we could include a url with even more context. I think your descriptions in the previous two prs are good but I suspect someone will stumble upon this with very little context as a general user and thinking about how to mitigate that. Can be wrapped in next PR or even a follow on for further nits/clarifiaction.
| #[derive(Debug)] | ||
| pub struct PythonLogicalCodec { | ||
| inner: Arc<dyn LogicalExtensionCodec>, | ||
| python_udf_inlining: bool, |
There was a problem hiding this comment.
NIT: throughout this UDF seems to be used in two contexts. UDF as short hand for scalar UDF and UDF as short hand for the broader set of all flavors of user functions udf,udaf,udwf. Everytime I've been reviewing these I kind of forget that and have to revisit things to make sure I'm doing the right mapping in my head. Maybe that's expected and standard. No change request more of just an FYI in case there are thoughts on how to resolve this overloading for potentially making it easier to bring on future people.
| def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: | ||
| """Control whether Python UDFs are embedded in serialized expressions. | ||
|
|
||
| When ``enabled=True`` (the default), serialized expressions carry |
There was a problem hiding this comment.
Enabled isn't an optional argument with a default to true here. So either this is misaligned or it's saying that the class has things enabled by default which isn't clear in the docstring.
| >>> from datafusion import SessionContext | ||
| >>> strict = SessionContext().with_python_udf_inlining(enabled=False) | ||
| >>> isinstance(strict, SessionContext) | ||
| True |
There was a problem hiding this comment.
This doesn't really demonstrate this functionality
| :meth:`SessionContext.with_python_udf_inlining` to every pickled | ||
| expression on this thread: | ||
|
|
||
| .. code-block:: python |
There was a problem hiding this comment.
NIT: I don't think doc test hits code-block, I think it needs the >>> prefix and since this looks standalone executable might be nice to make it in that format
| encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied | ||
| ``ctx``. | ||
|
|
||
| The thread-local holds a strong reference to the installed |
There was a problem hiding this comment.
I typically think of thread-local as an adjective. So I think you mean the thread-local sender context which helps differentiate from the _local.ctx
| >>> set_sender_ctx(driver) | ||
| >>> get_sender_ctx() is driver | ||
| True | ||
| >>> clear_sender_ctx() |
There was a problem hiding this comment.
Does skipping this step cause other doc tests to fail and this is basically cleanup? If not looks unneeded for this doc string
Which issue does this PR close?
Addresses part of #1517
This is PR 3 of 4. The subsequent PRs target this branch's tip until it merges.
Follow up PR:
Rationale for this change
PRs 1 and 2 ship Python UDFs inline through the codec. There is a follow-on need: Untrusted-input decoding. A receiver that may read
Expr.from_bytesinput from an untrusted source must refuse to invokecloudpickle.loadson the inline payload. (pickle.loadson untrusted input is still unsafe regardless of this toggle — see the security note in the docstrings.)We resolve this by an on/off switch at the codec level. The codec already sits on every session, so the toggle is naturally per-session.
What changes are included in this PR?
SessionContextlevel to enable/disable Python inlining of UDFs, which gets passed through to the codec layer.Are there any user-facing changes?
Yes, but it's pure addition:
SessionContext.with_python_udf_inliningis a new method.datafusion.ipc.set_sender_ctx/get_sender_ctx/clear_sender_ctxare new functions for propagating a configured session throughpickle.dumps.The user-guide page documenting the full pattern (and the multiprocessing / Ray runnable examples) lands in PR 4 of this series.