Skip to content

fix: use task-stream index instead of wall clock in get_task_stream context manager (#9253)#9282

Open
MohammadYusif wants to merge 1 commit into
dask:mainfrom
MohammadYusif:fix/issue-9253
Open

fix: use task-stream index instead of wall clock in get_task_stream context manager (#9253)#9282
MohammadYusif wants to merge 1 commit into
dask:mainfrom
MohammadYusif:fix/issue-9253

Conversation

@MohammadYusif
Copy link
Copy Markdown

Closes #9253

  • Tests added / passed
  • Passes pre-commit run --all-files

The get_task_stream context manager bounded the tasks it collected with a
wall-clock timestamp (time() - 0.1), and collect() bisects the buffer by
comparing that boundary against each task's recorded stop time. With latency
or clock skew between the client and the workers, a task that finished inside
the block can carry a stop time earlier than the client's start boundary
and be silently dropped — so get_task_stream() returns no tasks. This matches
the existing # FIXME ... We should query TaskStreamPlugin.index instead.

This records the scheduler's monotonic task-stream append index on entry and
collects everything appended after it on exit, removing the dependency on
synchronized clocks.

  • distributed/diagnostics/task_stream.py: collect() gains a start_index
    path that selects records by append position instead of timestamp.
  • distributed/scheduler.py: new get_task_stream_index RPC + start_index
    passthrough on get_task_stream. Refactored the plugin-init guard into
    _task_stream_plugin() helper used by both methods.
  • distributed/client.py: get_task_stream/_get_task_stream forward
    start_index; the context manager snapshots the index on enter and collects
    from it on exit (sync and async), removing the FIXME comment.
  • distributed/diagnostics/tests/test_task_stream.py: tests for the index
    semantics and the clock-skew regression.

…ontext manager (dask#9253)

The get_task_stream context manager bounded the collected tasks with a
wall-clock timestamp (time() - 0.1). collect() then bisected the buffer by
comparing that boundary against each task's recorded stop time. When there
was latency or clock skew between the client and the workers, a task that
finished inside the block could carry a stop time earlier than the client's
start boundary and be silently dropped, so get_task_stream() returned no
tasks.

Record the scheduler's monotonic task-stream append index on entry and
collect everything appended after it on exit. This removes the dependency on
synchronized clocks entirely, as the maintainers' FIXME suggested. Adds a
get_task_stream_index scheduler RPC and a start_index path through
collect()/get_task_stream(), with tests covering the index semantics and the
clock-skew regression.
@github-actions
Copy link
Copy Markdown
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    31 files  ± 0      31 suites  ±0   10h 49m 37s ⏱️ + 14m 21s
 4 082 tests + 2   3 966 ✅  -  1    113 💤 +1  3 ❌ +2 
59 249 runs  +32  56 643 ✅ +28  2 603 💤 +2  3 ❌ +2 

For more details on these failures, see this check.

Results for commit 2d46d3f. ± Comparison against base commit bcad953.

This pull request skips 1 test.
distributed.deploy.tests.test_ssh ‑ test_defer_to_old

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

High latency can cause get_task_stream() to miss tasks

1 participant