Skip to content

[Bug] Non-deterministic history generated after update handled on cold-start with specific setup #1591

@EddieWhi

Description

@EddieWhi

What are you really trying to do?

Solve a non-determinism issue in a complicated workflow. I would like to understand if the setup is just not supported or if this a bug. If the former, it would be useful to get some suggestions on how to detect or even define these situations so we can avoid in future.

Describe the bug

Had a long journey tracking down a confusing non-determinism in a tricky (read over-complicated) workflow. Turns out it was possible to get the workflow to write a history that was non-deterministic if this particular workflow handles an update when "cold". It looks like the update handler gets executed before the recorded signal handler has completed its work. There's some interaction between the initial timer, the workflow.wait_condition(workflow.all_handlers_finished) (with work after) and the await on a background task. Things like using workflow.wait_condition(workflow.all_handlers_finished) mid workflow might be considered bad practice/UB and so might using a background task. Would be interested in finding out.

I've tried to reduce the offending workflow to a minimal reproduction, which I can't guarantee is minimum, but I think it's close.

Minimal Reproduction

"""
Python SDK non-determinism bug.

The bug requires:
    1. run() awaits a background Task after wait_condition(all_handlers_finished)
    2. A timer fires while an activity is pending (creating an empty intervening WT)
    3. An update handler reads state set by the signal handler to decide whether to
       produce workflow commands

The signal handler sets self._busy = True, awaits an activity, then sets self._busy = False.
During cold/replay start, the update handler runs before the signal handler's coroutine advances
past the activity await to execute self._busy = False.
"""

import asyncio
from datetime import timedelta
from uuid import uuid4

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker, Replayer


@activity.defn
async def do_work() -> str:
    return "done"


@activity.defn
async def do_update() -> str:
    return "updated"


TIMER = timedelta(seconds=2)


@workflow.defn
class BugWorkflow:
    def __init__(self):
        self._busy = False
        self._task = asyncio.create_task(self._block_forever())

    async def _block_forever(self):
        await workflow.wait_condition(lambda: False, timeout=timedelta(hours=1))

    @workflow.run
    async def run(self, input: str) -> str:
        await workflow.sleep(TIMER)
        await workflow.wait_condition(workflow.all_handlers_finished)
        await self._task  # Block via task
        return "done"

    @workflow.signal
    async def process(self) -> None:
        self._busy = True
        await workflow.execute_activity(do_work, schedule_to_close_timeout=timedelta(minutes=5))
        self._busy = False

    @workflow.update
    async def update_if_not_busy(self) -> str:
        if self._busy:
            return "busy"
        return await workflow.execute_activity(do_update, schedule_to_close_timeout=timedelta(minutes=1))


async def works_with_hot_workflow():
    task_queue = f"bug-{uuid4()}"
    client = await Client.connect("localhost:7233")

    proceed = asyncio.Event()
    started = asyncio.Event()

    @activity.defn(name="do_work")
    async def do_work_slow() -> str:
        started.set()
        while not proceed.is_set():
            activity.heartbeat()
            await asyncio.sleep(0.5)
        return "done"

    @activity.defn(name="do_update")
    async def do_update_impl() -> str:
        return "updated"

    print("Workflow executes just fine with a hot worker (no replay before update).")

    print("Phase 1: Signal, activity blocks, timer fires, activity completes + handles update")
    async with Worker(
        client, task_queue=task_queue, workflows=[BugWorkflow], activities=[do_work_slow, do_update_impl]
    ):
        wf = await client.start_workflow(BugWorkflow.run, "x", id=f"bug-{uuid4()}", task_queue=task_queue)
        await asyncio.sleep(0.5)  # Let timer start in its own WT
        await wf.signal(BugWorkflow.process)
        await asyncio.wait_for(started.wait(), timeout=10)
        print("  - Activity started (flag=True)")
        await asyncio.sleep(TIMER.total_seconds() + 2)  # Timer fires while activity pending
        print("  - Timer fired")
        proceed.set()
        await asyncio.sleep(1)
        print("  - Activity completed (flag=False)")

        print("Phase 2: Worker still running + handles update")
        result = await wf.execute_update(BugWorkflow.update_if_not_busy)
        print(f"  - Result: '{result}' {'BUG: flag seen as True' if result == 'busy' else 'correct'}")

    print("Phase 3: Replay")
    history = await client.get_workflow_handle(wf.id).fetch_history()
    try:
        await Replayer(workflows=[BugWorkflow]).replay_workflow(history)
        print("  - Replay passed")
    except Exception as e:
        if "nondetermin" in str(e).lower():
            print(f"NON-DETERMINISM ERROR:  {e}")
        else:
            print(f"{e}")


async def non_determinism_with_cold_start():
    task_queue = f"bug-{uuid4()}"
    client = await Client.connect("localhost:7233")

    proceed = asyncio.Event()
    started = asyncio.Event()

    @activity.defn(name="do_work")
    async def do_work_slow() -> str:
        started.set()
        while not proceed.is_set():
            activity.heartbeat()
            await asyncio.sleep(0.5)
        return "done"

    @activity.defn(name="do_update")
    async def do_update_impl() -> str:
        return "updated"

    print("Phase 1: Signal, activity blocks, timer fires, activity completes")
    async with Worker(
        client, task_queue=task_queue, workflows=[BugWorkflow], activities=[do_work_slow, do_update_impl]
    ):
        wf = await client.start_workflow(BugWorkflow.run, "x", id=f"bug-{uuid4()}", task_queue=task_queue)
        await asyncio.sleep(0.5)  # Let timer start in its own WT
        await wf.signal(BugWorkflow.process)
        await asyncio.wait_for(started.wait(), timeout=10)
        print("  - Activity started (flag=True)")
        await asyncio.sleep(TIMER.total_seconds() + 2)  # Timer fires while activity pending
        print("  - Timer fired")
        proceed.set()
        await asyncio.sleep(1)
        print("  - Activity completed (flag=False)")

    print("Phase 2: Cold worker replays + handles update")
    async with Worker(
        client,
        task_queue=task_queue,
        workflows=[BugWorkflow],
        activities=[do_work_slow, do_update_impl],
        max_cached_workflows=0,
    ):
        result = await wf.execute_update(BugWorkflow.update_if_not_busy)
        print(
            f"  - Result: '{result}' {'BUG: flag seen as True after cold start/replay' if result == 'busy' else 'correct'}"
        )

    print("Phase 3: Replay")
    history = await client.get_workflow_handle(wf.id).fetch_history()
    try:
        await Replayer(workflows=[BugWorkflow]).replay_workflow(history)
        print("  - Replay passed")
    except Exception as e:
        if "nondetermin" in str(e).lower():
            print(f"NON-DETERMINISM ERROR:  {e}")
        else:
            print(f"{e}")


async def main():
    print("=== Testing with hot worker ===")
    await works_with_hot_workflow()
    print("")
    print("=== Testing with cold worker ===")
    await non_determinism_with_cold_start()


if __name__ == "__main__":
    asyncio.run(main())

Environment/Versions

  • OS and processor: Linux x64
  • Temporal Version: Docker temporalio/temporal:1.7.0 with Python SDK 1.27.0
  • Docker

Additional context

None

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions