Skip to content

feat(together): add streaming and async support with error handling#4138

Open
raghavenderreddygrudhanti wants to merge 3 commits into
traceloop:mainfrom
raghavenderreddygrudhanti:feat/together-streaming-and-async
Open

feat(together): add streaming and async support with error handling#4138
raghavenderreddygrudhanti wants to merge 3 commits into
traceloop:mainfrom
raghavenderreddygrudhanti:feat/together-streaming-and-async

Conversation

@raghavenderreddygrudhanti
Copy link
Copy Markdown

@raghavenderreddygrudhanti raghavenderreddygrudhanti commented May 13, 2026

What

Adds streaming support, async method instrumentation, and proper error handling to the Together AI instrumentation package.

Why

The Together AI instrumentation was the only LLM provider package that:

  • Had no streaming support — when stream=True is passed, telemetry was lost
  • Had no async supportAsyncChatCompletions.create and AsyncCompletions.create were not instrumented
  • Had no error handling — if the API call raised an exception, the span leaked without recording the error (related to 🐛 Bug Report: Errors are not logged #412)

Every other major instrumentation (OpenAI, Anthropic, Groq, Ollama, Mistral) handles all three. This PR brings Together AI to parity.

Changes

Streaming Support

  • Added _create_stream_processor() — a generator that wraps the streaming response, accumulates content across chunks, and records the full response as span attributes when the stream ends
  • Added _create_async_stream_processor() — async version of the above
  • Detects streaming via kwargs.get('stream', False) and response type checking

Async Support

  • Added WRAPPED_AMETHODS list targeting AsyncChatCompletions.create and AsyncCompletions.create
  • Added _awrap() async wrapper function following the same pattern as the sync wrapper
  • Properly handles await and async streaming responses

Error Handling

  • Both _wrap() and _awrap() now catch exceptions from the API call
  • On error: sets StatusCode.ERROR, calls span.record_exception(e), and ends the span before re-raising
  • Streaming processors also catch errors mid-stream and properly close spans

Testing

  • All 6 existing tests pass ✅
  • Lint passes ✅
  • Backward compatible — no changes to existing behavior for non-streaming, sync calls

Related

Summary by CodeRabbit

  • New Features

    • Added support for async client methods and async streaming responses in Together AI instrumentation.
  • Improvements

    • Streaming telemetry now accumulates streamed chunks and final finish reason for richer completion attributes.
    • Centralized span lifecycle with explicit status setting and exception recording for clearer traces.
    • Request-type inference improved via method-name substring matching.
    • Instrumentation/uninstrumentation made more resilient to missing async APIs across versions.

Review Change Stack

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 13, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: de00dcc2-0b6c-4599-8e09-8960bd06590b

📥 Commits

Reviewing files that changed from the base of the PR and between 2d026d9 and 53a529c.

📒 Files selected for processing (1)
  • packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/init.py

📝 Walkthrough

Walkthrough

This PR extends Together AI instrumentation to support async client methods and streaming responses. It adds async method configuration, response-type detection helpers, updates request-type inference to substring matching, implements streaming chunk accumulators and processors, refactors span error handling, and wires async methods into instrumentation with version-safe wrapping.

Changes

Async and Streaming Instrumentation

Layer / File(s) Summary
Async and streaming response detection
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py
WRAPPED_AMETHODS maps async client methods and _is_streaming_response / _is_async_streaming_response detect iterator types; _llm_request_type_by_method now infers request type via substring matching instead of exact method-name equality.
Streaming processors and wrapper implementations
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py
_extract_message_content, _build_accumulated_response, and streaming processors iterate chunks, accumulate content/finish reason, and set span attributes. Refactored _wrap adds explicit try/except, streaming detection, and span termination; new _awrap async wrapper mirrors _wrap behavior including async streaming and consistent ERROR-status/exception-recording on failures.
Async method instrumentation wiring
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py
_instrument and _uninstrument wrap and unwrap async methods with ModuleNotFoundError/AttributeError suppression to gracefully handle unavailable async APIs across client versions.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

A rabbit hops through async streams,
Gathering chunks in moonlit beams,
Spans end tidy, errors caught—
Together traces every thought. 🐇✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main changes: adding streaming support, async method instrumentation, and enhanced error handling to the Together AI instrumentation package.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py (1)

139-154: 💤 Low value

Optional: prefer types.SimpleNamespace over dynamic type(...) for the synthesized message.

The type("obj", (object,), {...})() trick at line 144 creates a one-off class just to attach attributes. types.SimpleNamespace (or a small @dataclass) is more idiomatic and easier to read, and reads better than the dynamic-class trick when constructing a response stand-in. No behavior change.

♻️ Suggested refactor
+from types import SimpleNamespace
@@
     class _AccumulatedChoice:
         def __init__(self, content, finish_reason):
-            self.message = type("obj", (object,), {"content": content, "role": "assistant"})()
+            self.message = SimpleNamespace(content=content, role="assistant")
             self.finish_reason = finish_reason
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`
around lines 139 - 154, The helper _build_accumulated_response creates a
synthetic message using a dynamic class via type("obj",...) inside
_AccumulatedChoice; replace that with a clearer, idiomatic container such as
types.SimpleNamespace (or a small `@dataclass`) so the message becomes
SimpleNamespace(content=content, role="assistant"); update the import to include
types.SimpleNamespace at the module top, adjust _AccumulatedChoice to assign
self.message = SimpleNamespace(content=content, role="assistant"), and leave
_AccumulatedResponse and return behavior unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`:
- Around line 157-187: The generator in _create_stream_processor can leave the
span un-ended if iteration is cut short; wrap the for-loop and subsequent
response-handling in a try/finally so that span.end() (and calling
_handle_response when last_chunk exists) always runs even on
GeneratorExit/close/early return; preserve the existing except block behavior
(set_status(StatusCode.ERROR), record_exception, re-raise) and ensure the
finally block checks span.is_recording() to set OK and then ends the span; apply
the same try/finally pattern to _create_async_stream_processor as well.
- Around line 354-364: The async wrap loop currently uses WRAPPED_AMETHODS paths
that don't exist in the together SDK and silences failures; update
WRAPPED_AMETHODS or the wrapping logic to target the actual AsyncTogether client
methods (e.g. client.chat.completions.create and client.completions.create) and
call wrap_function_wrapper with _awrap(tracer, event_logger, ...) on those real
attributes (or dynamically detect AsyncTogether and wrap its
.chat.completions.create and .completions.create methods), and replace the bare
except that swallows errors with a debug log via the module logger (or
event_logger) inside the except so failures are visible for diagnosis while
keeping ModuleNotFoundError/AttributeError handling.

---

Nitpick comments:
In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`:
- Around line 139-154: The helper _build_accumulated_response creates a
synthetic message using a dynamic class via type("obj",...) inside
_AccumulatedChoice; replace that with a clearer, idiomatic container such as
types.SimpleNamespace (or a small `@dataclass`) so the message becomes
SimpleNamespace(content=content, role="assistant"); update the import to include
types.SimpleNamespace at the module top, adjust _AccumulatedChoice to assign
self.message = SimpleNamespace(content=content, role="assistant"), and leave
_AccumulatedResponse and return behavior unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e8ae1862-ae3e-48d9-b083-a212c4a76704

📥 Commits

Reviewing files that changed from the base of the PR and between 6d3e696 and 4122319.

📒 Files selected for processing (1)
  • packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py (2)

142-157: 💤 Low value

Prefer types.SimpleNamespace over dynamic class creation.

type("obj", (object,), {...})() defines content/role as class attributes (not instance attributes) and creates a fresh class on every call. types.SimpleNamespace (or a small dataclass) communicates intent more clearly and avoids the class-attribute aliasing surprise if downstream code mutates these objects.

♻️ Proposed fix
+from types import SimpleNamespace
+
 def _build_accumulated_response(chunk, accumulated_content, finish_reason):
     """Build a response-like object from accumulated streaming data for attribute setting."""

-    class _AccumulatedChoice:
-        def __init__(self, content, finish_reason):
-            self.message = type("obj", (object,), {"content": content, "role": "assistant"})()
-            self.finish_reason = finish_reason
-
-    class _AccumulatedResponse:
-        def __init__(self, chunk, content, finish_reason):
-            self.model = getattr(chunk, "model", None)
-            self.id = getattr(chunk, "id", None)
-            self.choices = [_AccumulatedChoice(content, finish_reason)]
-            self.usage = getattr(chunk, "usage", None)
-
-    return _AccumulatedResponse(chunk, accumulated_content, finish_reason)
+    message = SimpleNamespace(content=accumulated_content, role="assistant")
+    choice = SimpleNamespace(message=message, finish_reason=finish_reason)
+    return SimpleNamespace(
+        model=getattr(chunk, "model", None),
+        id=getattr(chunk, "id", None),
+        choices=[choice],
+        usage=getattr(chunk, "usage", None),
+    )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`
around lines 142 - 157, _In _build_accumulated_response, replace the dynamic
class creation in _AccumulatedChoice (type("obj",...)) with an instance holder
like types.SimpleNamespace so message.content and message.role are instance
attributes rather than class attrs; update the imports to include types and
construct message as types.SimpleNamespace(content=content, role="assistant")
inside _AccumulatedChoice.__init__, leaving _AccumulatedResponse
(model/id/choices/usage) behavior unchanged so downstream code that reads
chunk.model, chunk.id, and response.choices continues to work.

134-137: ⚡ Quick win

Guard finish_reason access with getattr.

choice.finish_reason is accessed directly; if a streaming chunk's choice object ever lacks this attribute, this raises AttributeError mid-stream. Using getattr keeps the helper resilient across SDK versions/chunk shapes.

♻️ Proposed fix
-            if choice.finish_reason:
-                finish_reason = str(choice.finish_reason.value) if hasattr(
-                    choice.finish_reason, "value"
-                ) else str(choice.finish_reason)
+            choice_finish_reason = getattr(choice, "finish_reason", None)
+            if choice_finish_reason:
+                finish_reason = (
+                    str(choice_finish_reason.value)
+                    if hasattr(choice_finish_reason, "value")
+                    else str(choice_finish_reason)
+                )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`
around lines 134 - 137, The code directly accesses choice.finish_reason and then
checks hasattr on its value; change this to use getattr(choice, "finish_reason",
None) to avoid AttributeError for chunks that lack the attribute, then derive
finish_reason by checking the returned value for a .value attribute (e.g., if fr
is not None: finish_reason = str(fr.value) if hasattr(fr, "value") else str(fr)
else finish_reason = None or ""). Update the logic around the existing
finish_reason assignment (the block referencing choice.finish_reason and
finish_reason) to use this guarded fr variable so the helper is resilient across
SDK versions/chunk shapes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In
`@packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py`:
- Around line 142-157: _In _build_accumulated_response, replace the dynamic
class creation in _AccumulatedChoice (type("obj",...)) with an instance holder
like types.SimpleNamespace so message.content and message.role are instance
attributes rather than class attrs; update the imports to include types and
construct message as types.SimpleNamespace(content=content, role="assistant")
inside _AccumulatedChoice.__init__, leaving _AccumulatedResponse
(model/id/choices/usage) behavior unchanged so downstream code that reads
chunk.model, chunk.id, and response.choices continues to work.
- Around line 134-137: The code directly accesses choice.finish_reason and then
checks hasattr on its value; change this to use getattr(choice, "finish_reason",
None) to avoid AttributeError for chunks that lack the attribute, then derive
finish_reason by checking the returned value for a .value attribute (e.g., if fr
is not None: finish_reason = str(fr.value) if hasattr(fr, "value") else str(fr)
else finish_reason = None or ""). Update the logic around the existing
finish_reason assignment (the block referencing choice.finish_reason and
finish_reason) to use this guarded fr variable so the helper is resilient across
SDK versions/chunk shapes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 056dc087-1e54-412e-b173-b2b0b54c9677

📥 Commits

Reviewing files that changed from the base of the PR and between 4122319 and 2d026d9.

📒 Files selected for processing (1)
  • packages/opentelemetry-instrumentation-together/opentelemetry/instrumentation/together/__init__.py

@raghavenderreddygrudhanti
Copy link
Copy Markdown
Author

Verified locally — AsyncChatCompletions and AsyncCompletions do exist in together SDK 1.5.29 (the version installed per our constraints). Added debug logging when the fallback triggers for older versions where these may not be available.

Copy link
Copy Markdown
Author

@raghavenderreddygrudhanti raghavenderreddygrudhanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants