diff --git a/src/openai/lib/streaming/_assistants.py b/src/openai/lib/streaming/_assistants.py index 6efb3ca3f1..7347c2bd9a 100644 --- a/src/openai/lib/streaming/_assistants.py +++ b/src/openai/lib/streaming/_assistants.py @@ -980,13 +980,24 @@ def accumulate_event( def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value - continue + # delta lists of dicts can contain multiple entries with the same + # `index`, so we initialise to [] and let the merge loop handle it + if is_list(delta_value) and delta_value and is_dict(delta_value[0]): + acc_delta_value: list[object] = [] + acc[key] = acc_delta_value + else: + acc[key] = delta_value + continue acc_value = acc[key] if acc_value is None: - acc[key] = delta_value - continue + if is_list(delta_value) and delta_value and is_dict(delta_value[0]): + acc_delta_value: list[object] = [] # type: ignore[no-redef] + acc_value = acc_delta_value + acc[key] = acc_value + else: + acc[key] = delta_value + continue # the `index` property is used in arrays of objects so it should # not be accumulated like other values e.g. @@ -1007,7 +1018,7 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> elif is_list(acc_value) and is_list(delta_value): # for lists of non-dictionary items we'll only ever get new entries # in the array, existing entries will never be changed - if all(isinstance(x, (str, int, float)) for x in acc_value): + if acc_value and all(isinstance(x, (str, int, float)) for x in acc_value): acc_value.extend(delta_value) continue diff --git a/src/openai/lib/streaming/_deltas.py b/src/openai/lib/streaming/_deltas.py index a5e1317612..f00b1fe5e6 100644 --- a/src/openai/lib/streaming/_deltas.py +++ b/src/openai/lib/streaming/_deltas.py @@ -6,13 +6,24 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value - continue + # delta lists of dicts can contain multiple entries with the same + # `index`, so we initialise to [] and let the merge loop handle it + if is_list(delta_value) and delta_value and is_dict(delta_value[0]): + acc_delta_value: list[object] = [] + acc[key] = acc_delta_value + else: + acc[key] = delta_value + continue acc_value = acc[key] if acc_value is None: - acc[key] = delta_value - continue + if is_list(delta_value) and delta_value and is_dict(delta_value[0]): + acc_delta_value: list[object] = [] # type: ignore[no-redef] + acc_value = acc_delta_value + acc[key] = acc_value + else: + acc[key] = delta_value + continue # the `index` property is used in arrays of objects so it should # not be accumulated like other values e.g. @@ -33,7 +44,7 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> elif is_list(acc_value) and is_list(delta_value): # for lists of non-dictionary items we'll only ever get new entries # in the array, existing entries will never be changed - if all(isinstance(x, (str, int, float)) for x in acc_value): + if acc_value and all(isinstance(x, (str, int, float)) for x in acc_value): acc_value.extend(delta_value) continue diff --git a/src/openai/lib/streaming/chat/_completions.py b/src/openai/lib/streaming/chat/_completions.py index 5f072cafbd..2e677276e3 100644 --- a/src/openai/lib/streaming/chat/_completions.py +++ b/src/openai/lib/streaming/chat/_completions.py @@ -415,7 +415,7 @@ def _accumulate_chunk(self, chunk: ChatCompletionChunk) -> ParsedChatCompletionS type_=ParsedChoiceSnapshot, value={ **choice.model_dump(exclude_unset=True, exclude={"delta"}), - "message": choice.delta.to_dict(), + "message": accumulate_delta({}, cast("dict[object, object]", choice.delta.to_dict())), }, ), ) @@ -744,7 +744,7 @@ def _convert_initial_chunk_into_snapshot(chunk: ChatCompletionChunk) -> ParsedCh for choice in chunk.choices: choices[choice.index] = { **choice.model_dump(exclude_unset=True, exclude={"delta"}), - "message": choice.delta.to_dict(), + "message": accumulate_delta({}, cast("dict[object, object]", choice.delta.to_dict())), } return cast( diff --git a/tests/lib/test_deltas.py b/tests/lib/test_deltas.py new file mode 100644 index 0000000000..f3caa70632 --- /dev/null +++ b/tests/lib/test_deltas.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from typing import Any, cast + +from openai.lib.streaming._deltas import accumulate_delta + + +def test_duplicate_indexes_in_unseeded_slot_are_merged() -> None: + acc: dict[object, object] = {} + accumulate_delta( + acc, + { + "tool_calls": [ + {"index": 0, "id": "call_abc", "function": {"name": "get_weather"}, "type": "function"}, + {"index": 0, "function": {"arguments": '{"city"'}}, + {"index": 0, "function": {"arguments": ': "Reykjavik"}'}}, + ], + }, + ) + tool_calls = cast("list[Any]", acc["tool_calls"]) + assert len(tool_calls) == 1 + assert tool_calls[0]["id"] == "call_abc" + assert tool_calls[0]["function"]["name"] == "get_weather" + assert tool_calls[0]["function"]["arguments"] == '{"city": "Reykjavik"}' + + +def test_unique_indexes_preserved() -> None: + acc: dict[object, object] = {} + accumulate_delta( + acc, + { + "tool_calls": [ + {"index": 0, "id": "call_1", "function": {"name": "fn_a"}, "type": "function"}, + {"index": 1, "id": "call_2", "function": {"name": "fn_b"}, "type": "function"}, + ], + }, + ) + tool_calls = cast("list[Any]", acc["tool_calls"]) + assert len(tool_calls) == 2 + assert tool_calls[0]["id"] == "call_1" + assert tool_calls[1]["id"] == "call_2"