Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/strands/tools/executors/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ async def _execute(
task_events = [asyncio.Event() for _ in tool_uses]
stop_event = object()

# Build a mapping from toolUseId to its original request index so we can
# restore request order after concurrent execution completes.
tool_use_id_to_index: dict[str, int] = {
str(tool_use.get("toolUseId")): idx for idx, tool_use in enumerate(tool_uses)
}

tasks = []
try:
for task_id, tool_use in enumerate(tool_uses):
Expand Down Expand Up @@ -86,6 +92,14 @@ async def _execute(
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

# Re-order tool_results to match the original request order.
# The base ``_stream`` method appends results as each task finishes,
# so when tools complete out-of-order the list reflects completion
# order rather than request order.
tool_results.sort(
key=lambda r: tool_use_id_to_index.get(str(r.get("toolUseId")), 0)
)

async def _task(
self,
agent: "Agent",
Expand Down
21 changes: 21 additions & 0 deletions tests/strands/tools/executors/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ async def test_concurrent_executor_execute(
assert tru_results == exp_results


@pytest.mark.asyncio
async def test_concurrent_executor_preserves_request_order(
executor, agent, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context, alist
):
"""tool_results must follow the original tool_uses order, not completion order."""
# temperature_tool is synchronous and completes instantly.
# We put it second to verify it doesn't overtake the first tool in tool_results.
tool_uses = [
{"name": "temperature_tool", "toolUseId": "id-first", "input": {}},
{"name": "weather_tool", "toolUseId": "id-second", "input": {}},
]
stream = executor._execute(
agent, tool_uses, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context
)
await alist(stream)

# The slow_tool result should appear first because it was requested first,
# even though weather_tool completes sooner.
assert [r["toolUseId"] for r in tool_results] == ["id-first", "id-second"]


@pytest.mark.asyncio
async def test_concurrent_executor_interrupt(
executor, agent, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context, alist
Expand Down