What are you really trying to do?
Solve a non-determinism issue in a complicated workflow. I would like to understand if the setup is just not supported or if this a bug. If the former, it would be useful to get some suggestions on how to detect or even define these situations so we can avoid in future.
Describe the bug
Had a long journey tracking down a confusing non-determinism in a tricky (read over-complicated) workflow. Turns out it was possible to get the workflow to write a history that was non-deterministic if this particular workflow handles an update when "cold". It looks like the update handler gets executed before the recorded signal handler has completed its work. There's some interaction between the initial timer, the workflow.wait_condition(workflow.all_handlers_finished) (with work after) and the await on a background task. Things like using workflow.wait_condition(workflow.all_handlers_finished) mid workflow might be considered bad practice/UB and so might using a background task. Would be interested in finding out.
I've tried to reduce the offending workflow to a minimal reproduction, which I can't guarantee is minimum, but I think it's close.
Minimal Reproduction
"""
Python SDK non-determinism bug.
The bug requires:
1. run() awaits a background Task after wait_condition(all_handlers_finished)
2. A timer fires while an activity is pending (creating an empty intervening WT)
3. An update handler reads state set by the signal handler to decide whether to
produce workflow commands
The signal handler sets self._busy = True, awaits an activity, then sets self._busy = False.
During cold/replay start, the update handler runs before the signal handler's coroutine advances
past the activity await to execute self._busy = False.
"""
import asyncio
from datetime import timedelta
from uuid import uuid4
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker, Replayer
@activity.defn
async def do_work() -> str:
return "done"
@activity.defn
async def do_update() -> str:
return "updated"
TIMER = timedelta(seconds=2)
@workflow.defn
class BugWorkflow:
def __init__(self):
self._busy = False
self._task = asyncio.create_task(self._block_forever())
async def _block_forever(self):
await workflow.wait_condition(lambda: False, timeout=timedelta(hours=1))
@workflow.run
async def run(self, input: str) -> str:
await workflow.sleep(TIMER)
await workflow.wait_condition(workflow.all_handlers_finished)
await self._task # Block via task
return "done"
@workflow.signal
async def process(self) -> None:
self._busy = True
await workflow.execute_activity(do_work, schedule_to_close_timeout=timedelta(minutes=5))
self._busy = False
@workflow.update
async def update_if_not_busy(self) -> str:
if self._busy:
return "busy"
return await workflow.execute_activity(do_update, schedule_to_close_timeout=timedelta(minutes=1))
async def works_with_hot_workflow():
task_queue = f"bug-{uuid4()}"
client = await Client.connect("localhost:7233")
proceed = asyncio.Event()
started = asyncio.Event()
@activity.defn(name="do_work")
async def do_work_slow() -> str:
started.set()
while not proceed.is_set():
activity.heartbeat()
await asyncio.sleep(0.5)
return "done"
@activity.defn(name="do_update")
async def do_update_impl() -> str:
return "updated"
print("Workflow executes just fine with a hot worker (no replay before update).")
print("Phase 1: Signal, activity blocks, timer fires, activity completes + handles update")
async with Worker(
client, task_queue=task_queue, workflows=[BugWorkflow], activities=[do_work_slow, do_update_impl]
):
wf = await client.start_workflow(BugWorkflow.run, "x", id=f"bug-{uuid4()}", task_queue=task_queue)
await asyncio.sleep(0.5) # Let timer start in its own WT
await wf.signal(BugWorkflow.process)
await asyncio.wait_for(started.wait(), timeout=10)
print(" - Activity started (flag=True)")
await asyncio.sleep(TIMER.total_seconds() + 2) # Timer fires while activity pending
print(" - Timer fired")
proceed.set()
await asyncio.sleep(1)
print(" - Activity completed (flag=False)")
print("Phase 2: Worker still running + handles update")
result = await wf.execute_update(BugWorkflow.update_if_not_busy)
print(f" - Result: '{result}' {'BUG: flag seen as True' if result == 'busy' else 'correct'}")
print("Phase 3: Replay")
history = await client.get_workflow_handle(wf.id).fetch_history()
try:
await Replayer(workflows=[BugWorkflow]).replay_workflow(history)
print(" - Replay passed")
except Exception as e:
if "nondetermin" in str(e).lower():
print(f"NON-DETERMINISM ERROR: {e}")
else:
print(f"{e}")
async def non_determinism_with_cold_start():
task_queue = f"bug-{uuid4()}"
client = await Client.connect("localhost:7233")
proceed = asyncio.Event()
started = asyncio.Event()
@activity.defn(name="do_work")
async def do_work_slow() -> str:
started.set()
while not proceed.is_set():
activity.heartbeat()
await asyncio.sleep(0.5)
return "done"
@activity.defn(name="do_update")
async def do_update_impl() -> str:
return "updated"
print("Phase 1: Signal, activity blocks, timer fires, activity completes")
async with Worker(
client, task_queue=task_queue, workflows=[BugWorkflow], activities=[do_work_slow, do_update_impl]
):
wf = await client.start_workflow(BugWorkflow.run, "x", id=f"bug-{uuid4()}", task_queue=task_queue)
await asyncio.sleep(0.5) # Let timer start in its own WT
await wf.signal(BugWorkflow.process)
await asyncio.wait_for(started.wait(), timeout=10)
print(" - Activity started (flag=True)")
await asyncio.sleep(TIMER.total_seconds() + 2) # Timer fires while activity pending
print(" - Timer fired")
proceed.set()
await asyncio.sleep(1)
print(" - Activity completed (flag=False)")
print("Phase 2: Cold worker replays + handles update")
async with Worker(
client,
task_queue=task_queue,
workflows=[BugWorkflow],
activities=[do_work_slow, do_update_impl],
max_cached_workflows=0,
):
result = await wf.execute_update(BugWorkflow.update_if_not_busy)
print(
f" - Result: '{result}' {'BUG: flag seen as True after cold start/replay' if result == 'busy' else 'correct'}"
)
print("Phase 3: Replay")
history = await client.get_workflow_handle(wf.id).fetch_history()
try:
await Replayer(workflows=[BugWorkflow]).replay_workflow(history)
print(" - Replay passed")
except Exception as e:
if "nondetermin" in str(e).lower():
print(f"NON-DETERMINISM ERROR: {e}")
else:
print(f"{e}")
async def main():
print("=== Testing with hot worker ===")
await works_with_hot_workflow()
print("")
print("=== Testing with cold worker ===")
await non_determinism_with_cold_start()
if __name__ == "__main__":
asyncio.run(main())
Environment/Versions
- OS and processor: Linux x64
- Temporal Version: Docker temporalio/temporal:1.7.0 with Python SDK 1.27.0
- Docker
Additional context
None
What are you really trying to do?
Solve a non-determinism issue in a complicated workflow. I would like to understand if the setup is just not supported or if this a bug. If the former, it would be useful to get some suggestions on how to detect or even define these situations so we can avoid in future.
Describe the bug
Had a long journey tracking down a confusing non-determinism in a tricky (read over-complicated) workflow. Turns out it was possible to get the workflow to write a history that was non-deterministic if this particular workflow handles an update when "cold". It looks like the update handler gets executed before the recorded signal handler has completed its work. There's some interaction between the initial timer, the
workflow.wait_condition(workflow.all_handlers_finished)(with work after) and the await on a background task. Things like usingworkflow.wait_condition(workflow.all_handlers_finished)mid workflow might be considered bad practice/UB and so might using a background task. Would be interested in finding out.I've tried to reduce the offending workflow to a minimal reproduction, which I can't guarantee is minimum, but I think it's close.
Minimal Reproduction
Environment/Versions
Additional context
None