-
Notifications
You must be signed in to change notification settings - Fork 808
fix: handle BaseException in trace spans to prevent span leaks on Key… #2068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -138,6 +138,7 @@ async def event_loop_cycle( | |
| custom_trace_attributes=agent.trace_attributes, | ||
| ) | ||
| invocation_state["event_loop_cycle_span"] = cycle_span | ||
| model_events: AsyncGenerator[TypedEvent, None] | None = None | ||
|
|
||
| with trace_api.use_span(cycle_span, end_on_exit=False): | ||
| try: | ||
|
|
@@ -153,15 +154,21 @@ async def event_loop_cycle( | |
| model_events = _handle_model_execution( | ||
| agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context | ||
| ) | ||
| async for model_event in model_events: | ||
| if not isinstance(model_event, ModelStopReason): | ||
| yield model_event | ||
| try: | ||
| async for model_event in model_events: | ||
| if not isinstance(model_event, ModelStopReason): | ||
| yield model_event | ||
| finally: | ||
| await model_events.aclose() | ||
|
|
||
| stop_reason, message, *_ = model_event["stop"] | ||
| yield ModelMessageEvent(message=message) | ||
| except Exception as e: | ||
| tracer.end_span_with_error(cycle_span, str(e), e) | ||
| raise | ||
| except BaseException as e: | ||
| tracer.end_span_with_error(cycle_span, str(e), e) | ||
| raise | ||
|
|
||
| try: | ||
| if stop_reason == "max_tokens": | ||
|
|
@@ -238,6 +245,9 @@ async def event_loop_cycle( | |
| yield ForceStopEvent(reason=e) | ||
| logger.exception("cycle failed") | ||
| raise EventLoopException(e, invocation_state["request_state"]) from e | ||
| except BaseException as e: | ||
| tracer.end_span_with_error(cycle_span, str(e), e) | ||
| raise | ||
|
|
||
|
|
||
| async def recurse_event_loop( | ||
|
|
@@ -323,6 +333,7 @@ async def _handle_model_execution( | |
| system_prompt=agent.system_prompt, | ||
| system_prompt_content=agent._system_prompt_content, | ||
| ) | ||
| streamed_events: AsyncGenerator[TypedEvent, None] | None = None | ||
| with trace_api.use_span(model_invoke_span, end_on_exit=False): | ||
| try: | ||
| await agent.hooks.invoke_callbacks_async( | ||
|
|
@@ -338,18 +349,22 @@ async def _handle_model_execution( | |
| else: | ||
| tool_specs = agent.tool_registry.get_all_tool_specs() | ||
|
|
||
| async for event in stream_messages( | ||
| agent.model, | ||
| agent.system_prompt, | ||
| agent.messages, | ||
| tool_specs, | ||
| system_prompt_content=agent._system_prompt_content, | ||
| tool_choice=structured_output_context.tool_choice, | ||
| invocation_state=invocation_state, | ||
| model_state=agent._model_state, | ||
| cancel_signal=agent._cancel_signal, | ||
| ): | ||
| yield event | ||
| try: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would it be better to move
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix it. |
||
| streamed_events = stream_messages( | ||
| agent.model, | ||
| agent.system_prompt, | ||
| agent.messages, | ||
| tool_specs, | ||
| system_prompt_content=agent._system_prompt_content, | ||
| tool_choice=structured_output_context.tool_choice, | ||
| invocation_state=invocation_state, | ||
| model_state=agent._model_state, | ||
| cancel_signal=agent._cancel_signal, | ||
| ) | ||
| async for event in streamed_events: | ||
| yield event | ||
| finally: | ||
| await streamed_events.aclose() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Currently this is safe because Suggestion: Add a guard: finally:
if streamed_events is not None:
await streamed_events.aclose()The same pattern at line 162 for |
||
|
|
||
| stop_reason, message, usage, metrics = event["stop"] | ||
| invocation_state.setdefault("request_state", {}) | ||
|
|
@@ -417,6 +432,9 @@ async def _handle_model_execution( | |
| # No retry requested, raise the exception | ||
| yield ForceStopEvent(reason=e) | ||
| raise e | ||
| except BaseException as e: | ||
| tracer.end_span_with_error(model_invoke_span, str(e), e) | ||
| raise | ||
|
|
||
| try: | ||
| # Add message in trace and mark the end of the stream messages trace | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue: The
except Exceptionandexcept BaseExceptionhandlers here have identical bodies — both calltracer.end_span_with_errorand re-raise. SinceExceptionis a subclass ofBaseException, theexcept BaseExceptionhandler alone would catch both.Suggestion: Consolidate into a single handler:
Note: The other two locations (lines 233-250 and 398-430) are correctly separated since the
except Exceptionhandlers do additional work (wrapping inEventLoopException, retry logic, etc.) that should not apply toBaseExceptionsubclasses.