feat(together): add streaming and async support with error handling#4138
Conversation
|
|
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR extends Together AI instrumentation to support async client methods and streaming responses. It adds async method configuration, response-type detection helpers, updates request-type inference to substring matching, implements streaming chunk accumulators and processors, refactors span error handling, and wires async methods into instrumentation with version-safe wrapping. ChangesAsync and Streaming Instrumentation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py (1)
139-154: 💤 Low valueOptional: prefer
types.SimpleNamespaceover dynamictype(...)for the synthesized message.The
type("obj", (object,), {...})()trick at line 144 creates a one-off class just to attach attributes.types.SimpleNamespace(or a small@dataclass) is more idiomatic and easier to read, and reads better than the dynamic-class trick when constructing a response stand-in. No behavior change.♻️ Suggested refactor
+from types import SimpleNamespace @@ class _AccumulatedChoice: def __init__(self, content, finish_reason): - self.message = type("obj", (object,), {"content": content, "role": "assistant"})() + self.message = SimpleNamespace(content=content, role="assistant") self.finish_reason = finish_reason🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py` around lines 139 - 154, The helper _build_accumulated_response creates a synthetic message using a dynamic class via type("obj",...) inside _AccumulatedChoice; replace that with a clearer, idiomatic container such as types.SimpleNamespace (or a small `@dataclass`) so the message becomes SimpleNamespace(content=content, role="assistant"); update the import to include types.SimpleNamespace at the module top, adjust _AccumulatedChoice to assign self.message = SimpleNamespace(content=content, role="assistant"), and leave _AccumulatedResponse and return behavior unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`:
- Around line 157-187: The generator in _create_stream_processor can leave the
span un-ended if iteration is cut short; wrap the for-loop and subsequent
response-handling in a try/finally so that span.end() (and calling
_handle_response when last_chunk exists) always runs even on
GeneratorExit/close/early return; preserve the existing except block behavior
(set_status(StatusCode.ERROR), record_exception, re-raise) and ensure the
finally block checks span.is_recording() to set OK and then ends the span; apply
the same try/finally pattern to _create_async_stream_processor as well.
- Around line 354-364: The async wrap loop currently uses WRAPPED_AMETHODS paths
that don't exist in the together SDK and silences failures; update
WRAPPED_AMETHODS or the wrapping logic to target the actual AsyncTogether client
methods (e.g. client.chat.completions.create and client.completions.create) and
call wrap_function_wrapper with _awrap(tracer, event_logger, ...) on those real
attributes (or dynamically detect AsyncTogether and wrap its
.chat.completions.create and .completions.create methods), and replace the bare
except that swallows errors with a debug log via the module logger (or
event_logger) inside the except so failures are visible for diagnosis while
keeping ModuleNotFoundError/AttributeError handling.
---
Nitpick comments:
In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`:
- Around line 139-154: The helper _build_accumulated_response creates a
synthetic message using a dynamic class via type("obj",...) inside
_AccumulatedChoice; replace that with a clearer, idiomatic container such as
types.SimpleNamespace (or a small `@dataclass`) so the message becomes
SimpleNamespace(content=content, role="assistant"); update the import to include
types.SimpleNamespace at the module top, adjust _AccumulatedChoice to assign
self.message = SimpleNamespace(content=content, role="assistant"), and leave
_AccumulatedResponse and return behavior unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e8ae1862-ae3e-48d9-b083-a212c4a76704
📒 Files selected for processing (1)
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py
There was a problem hiding this comment.
🧹 Nitpick comments (2)
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py (2)
142-157: 💤 Low valuePrefer
types.SimpleNamespaceover dynamic class creation.
type("obj", (object,), {...})()definescontent/roleas class attributes (not instance attributes) and creates a fresh class on every call.types.SimpleNamespace(or a small dataclass) communicates intent more clearly and avoids the class-attribute aliasing surprise if downstream code mutates these objects.♻️ Proposed fix
+from types import SimpleNamespace + def _build_accumulated_response(chunk, accumulated_content, finish_reason): """Build a response-like object from accumulated streaming data for attribute setting.""" - class _AccumulatedChoice: - def __init__(self, content, finish_reason): - self.message = type("obj", (object,), {"content": content, "role": "assistant"})() - self.finish_reason = finish_reason - - class _AccumulatedResponse: - def __init__(self, chunk, content, finish_reason): - self.model = getattr(chunk, "model", None) - self.id = getattr(chunk, "id", None) - self.choices = [_AccumulatedChoice(content, finish_reason)] - self.usage = getattr(chunk, "usage", None) - - return _AccumulatedResponse(chunk, accumulated_content, finish_reason) + message = SimpleNamespace(content=accumulated_content, role="assistant") + choice = SimpleNamespace(message=message, finish_reason=finish_reason) + return SimpleNamespace( + model=getattr(chunk, "model", None), + id=getattr(chunk, "id", None), + choices=[choice], + usage=getattr(chunk, "usage", None), + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py` around lines 142 - 157, _In _build_accumulated_response, replace the dynamic class creation in _AccumulatedChoice (type("obj",...)) with an instance holder like types.SimpleNamespace so message.content and message.role are instance attributes rather than class attrs; update the imports to include types and construct message as types.SimpleNamespace(content=content, role="assistant") inside _AccumulatedChoice.__init__, leaving _AccumulatedResponse (model/id/choices/usage) behavior unchanged so downstream code that reads chunk.model, chunk.id, and response.choices continues to work.
134-137: ⚡ Quick winGuard
finish_reasonaccess withgetattr.
choice.finish_reasonis accessed directly; if a streaming chunk'schoiceobject ever lacks this attribute, this raisesAttributeErrormid-stream. Usinggetattrkeeps the helper resilient across SDK versions/chunk shapes.♻️ Proposed fix
- if choice.finish_reason: - finish_reason = str(choice.finish_reason.value) if hasattr( - choice.finish_reason, "value" - ) else str(choice.finish_reason) + choice_finish_reason = getattr(choice, "finish_reason", None) + if choice_finish_reason: + finish_reason = ( + str(choice_finish_reason.value) + if hasattr(choice_finish_reason, "value") + else str(choice_finish_reason) + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py` around lines 134 - 137, The code directly accesses choice.finish_reason and then checks hasattr on its value; change this to use getattr(choice, "finish_reason", None) to avoid AttributeError for chunks that lack the attribute, then derive finish_reason by checking the returned value for a .value attribute (e.g., if fr is not None: finish_reason = str(fr.value) if hasattr(fr, "value") else str(fr) else finish_reason = None or ""). Update the logic around the existing finish_reason assignment (the block referencing choice.finish_reason and finish_reason) to use this guarded fr variable so the helper is resilient across SDK versions/chunk shapes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`:
- Around line 142-157: _In _build_accumulated_response, replace the dynamic
class creation in _AccumulatedChoice (type("obj",...)) with an instance holder
like types.SimpleNamespace so message.content and message.role are instance
attributes rather than class attrs; update the imports to include types and
construct message as types.SimpleNamespace(content=content, role="assistant")
inside _AccumulatedChoice.__init__, leaving _AccumulatedResponse
(model/id/choices/usage) behavior unchanged so downstream code that reads
chunk.model, chunk.id, and response.choices continues to work.
- Around line 134-137: The code directly accesses choice.finish_reason and then
checks hasattr on its value; change this to use getattr(choice, "finish_reason",
None) to avoid AttributeError for chunks that lack the attribute, then derive
finish_reason by checking the returned value for a .value attribute (e.g., if fr
is not None: finish_reason = str(fr.value) if hasattr(fr, "value") else str(fr)
else finish_reason = None or ""). Update the logic around the existing
finish_reason assignment (the block referencing choice.finish_reason and
finish_reason) to use this guarded fr variable so the helper is resilient across
SDK versions/chunk shapes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 056dc087-1e54-412e-b173-b2b0b54c9677
📒 Files selected for processing (1)
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py
|
Verified locally — AsyncChatCompletions and AsyncCompletions do exist in together SDK 1.5.29 (the version installed per our constraints). Added debug logging when the fallback triggers for older versions where these may not be available. |
raghavenderreddygrudhanti
left a comment
There was a problem hiding this comment.
Reviewed in CodeRabbit Review Stack
What
Adds streaming support, async method instrumentation, and proper error handling to the Together AI instrumentation package.
Why
The Together AI instrumentation was the only LLM provider package that:
stream=Trueis passed, telemetry was lostAsyncChatCompletions.createandAsyncCompletions.createwere not instrumentedEvery other major instrumentation (OpenAI, Anthropic, Groq, Ollama, Mistral) handles all three. This PR brings Together AI to parity.
Changes
Streaming Support
_create_stream_processor()— a generator that wraps the streaming response, accumulates content across chunks, and records the full response as span attributes when the stream ends_create_async_stream_processor()— async version of the abovekwargs.get('stream', False)and response type checkingAsync Support
WRAPPED_AMETHODSlist targetingAsyncChatCompletions.createandAsyncCompletions.create_awrap()async wrapper function following the same pattern as the sync wrapperawaitand async streaming responsesError Handling
_wrap()and_awrap()now catch exceptions from the API callStatusCode.ERROR, callsspan.record_exception(e), and ends the span before re-raisingTesting
Related
Summary by CodeRabbit
New Features
Improvements