diff --git a/src/strands/tools/executors/concurrent.py b/src/strands/tools/executors/concurrent.py index 835e5abff..9566df50f 100644 --- a/src/strands/tools/executors/concurrent.py +++ b/src/strands/tools/executors/concurrent.py @@ -48,6 +48,12 @@ async def _execute( task_events = [asyncio.Event() for _ in tool_uses] stop_event = object() + # Build a mapping from toolUseId to its original request index so we can + # restore request order after concurrent execution completes. + tool_use_id_to_index: dict[str, int] = { + str(tool_use.get("toolUseId")): idx for idx, tool_use in enumerate(tool_uses) + } + tasks = [] try: for task_id, tool_use in enumerate(tool_uses): @@ -86,6 +92,14 @@ async def _execute( task.cancel() await asyncio.gather(*tasks, return_exceptions=True) + # Re-order tool_results to match the original request order. + # The base ``_stream`` method appends results as each task finishes, + # so when tools complete out-of-order the list reflects completion + # order rather than request order. + tool_results.sort( + key=lambda r: tool_use_id_to_index.get(str(r.get("toolUseId")), 0) + ) + async def _task( self, agent: "Agent", diff --git a/tests/strands/tools/executors/test_concurrent.py b/tests/strands/tools/executors/test_concurrent.py index a8ac05830..f0b959dcf 100644 --- a/tests/strands/tools/executors/test_concurrent.py +++ b/tests/strands/tools/executors/test_concurrent.py @@ -41,6 +41,27 @@ async def test_concurrent_executor_execute( assert tru_results == exp_results +@pytest.mark.asyncio +async def test_concurrent_executor_preserves_request_order( + executor, agent, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context, alist +): + """tool_results must follow the original tool_uses order, not completion order.""" + # temperature_tool is synchronous and completes instantly. + # We put it second to verify it doesn't overtake the first tool in tool_results. + tool_uses = [ + {"name": "temperature_tool", "toolUseId": "id-first", "input": {}}, + {"name": "weather_tool", "toolUseId": "id-second", "input": {}}, + ] + stream = executor._execute( + agent, tool_uses, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context + ) + await alist(stream) + + # The slow_tool result should appear first because it was requested first, + # even though weather_tool completes sooner. + assert [r["toolUseId"] for r in tool_results] == ["id-first", "id-second"] + + @pytest.mark.asyncio async def test_concurrent_executor_interrupt( executor, agent, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context, alist