Skip to content

fix(bedrock): add aioboto3 async client instrumentation#3536

Closed
benhylak wants to merge 1 commit into
traceloop:mainfrom
benhylak:fix/bedrock-aioboto3-instrumentation
Closed

fix(bedrock): add aioboto3 async client instrumentation#3536
benhylak wants to merge 1 commit into
traceloop:mainfrom
benhylak:fix/bedrock-aioboto3-instrumentation

Conversation

@benhylak
Copy link
Copy Markdown

@benhylak benhylak commented Jan 14, 2026

Fixes #3510

Problem

aioboto3 (async boto3) clients were not being instrumented, resulting in no OpenTelemetry spans or attributes for async Bedrock API calls.

Root Cause

aioboto3 uses ClientCreatorContext.__aenter__ (async context manager) for client creation, which wasn't wrapped by the instrumentation. The existing wrappers only covered sync boto3's create_client() method.

Solution

  • Added ClientCreatorContext.__aenter__ to WRAPPED_METHODS
  • Created async wrapper _wrap_aioboto3_context() that instruments clients when they're created from the async context manager
  • Added 4 async instrumentation wrappers for all Bedrock methods:
    • _instrumented_model_invoke_async()
    • _instrumented_model_invoke_with_response_stream_async()
    • _instrumented_converse_async()
    • _instrumented_converse_stream_async()
  • Modified instrumentation loop to use appropriate wrapper based on async flag

Testing

  • Added comprehensive aioboto3 tests in test_aioboto3.py
  • All 79 tests pass (77 existing + 2 new)
  • Verified backward compatibility with sync boto3
  • aioboto3 now generates spans with all OpenTelemetry semantic convention attributes (gen_ai.system, model, tokens, content, etc.)

Before Fix

aioboto3 client.converse() call:
→ Spans created: 0 ❌

After Fix

aioboto3 client.converse() call:
→ Spans created: 1 ✅
→ Attributes:
  ✓ gen_ai.system = AWS
  ✓ gen_ai.request.model = claude-3-haiku-20240307-v1:0
  ✓ gen_ai.usage.input_tokens = 10
  ✓ gen_ai.usage.output_tokens = 5
  ✓ gen_ai.prompt.0.content = [{"text": "Tell me a joke"}]
  ✓ gen_ai.completion.0.content = Test response from aioboto3

Important

Add async instrumentation for aioboto3 clients in OpenTelemetry Bedrock package, including new async wrappers and tests.

  • Behavior:
    • Add ClientCreatorContext.__aenter__ to WRAPPED_METHODS for aioboto3 async client creation.
    • Create async wrapper _wrap_aioboto3_context() to instrument clients from async context manager.
    • Add async wrappers for Bedrock methods: _instrumented_model_invoke_async(), _instrumented_model_invoke_with_response_stream_async(), _instrumented_converse_async(), _instrumented_converse_stream_async().
    • Modify instrumentation loop in BedrockInstrumentor._instrument() to use async wrapper based on async flag.
  • Testing:
    • Add test_aioboto3.py with tests for aioboto3 async client instrumentation.
    • Verify aioboto3 generates spans with OpenTelemetry attributes.
    • Ensure backward compatibility with sync boto3.
  • Dependencies:
    • Add aioboto3 and pytest-asyncio to pyproject.toml dev dependencies.

This description was created by Ellipsis for 6081bbe. You can customize this summary. It will automatically update as commits are pushed.

Summary by CodeRabbit

  • New Features

    • Added async instrumentation support for AWS Bedrock clients, enabling monitoring of async model invocations, streaming responses, and conversation calls.
  • Tests

    • Added test coverage validating async client instrumentation and span attributes.
  • Chores

    • Updated development dependencies to support async testing.

✏️ Tip: You can customize this high-level summary in your review settings.

Fixes traceloop#3510

## Problem
aioboto3 (async boto3) clients were not being instrumented, resulting in
no OpenTelemetry spans or attributes for async Bedrock API calls.

## Root Cause
aioboto3 uses `ClientCreatorContext.__aenter__` (async context manager)
for client creation, which wasn't wrapped by the instrumentation. The
existing wrappers only covered sync boto3's `create_client()` method.

## Solution
- Added `ClientCreatorContext.__aenter__` to WRAPPED_METHODS
- Created async wrapper `_wrap_aioboto3_context()` that instruments
  clients when they're created from the async context manager
- Added 4 async instrumentation wrappers for all Bedrock methods:
  - `_instrumented_model_invoke_async()`
  - `_instrumented_model_invoke_with_response_stream_async()`
  - `_instrumented_converse_async()`
  - `_instrumented_converse_stream_async()`
- Modified instrumentation loop to use appropriate wrapper based on
  async flag

## Testing
- Added comprehensive aioboto3 tests in `test_aioboto3.py`
- All 79 tests pass (77 existing + 2 new)
- Verified backward compatibility with sync boto3
- aioboto3 now generates spans with all OpenTelemetry semantic
  convention attributes (gen_ai.system, model, tokens, content, etc.)
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jan 14, 2026

📝 Walkthrough

Walkthrough

Added async support for aioboto3 Bedrock client instrumentation, including async wrapper functions for bedrock-runtime operations (converse, invoke_model, and streaming variants), aiobotocore async context manager integration, and corresponding test coverage.

Changes

Cohort / File(s) Summary
Core Async Instrumentation
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py
Added async wrapper functions (_wrap_aioboto3_context, _instrumented_model_invoke_async, _instrumented_model_invoke_with_response_stream_async, _instrumented_converse_async, _instrumented_converse_stream_async) to instrument aioboto3 bedrock-runtime clients. Extended WRAPPED_METHODS to include aiobotocore.session.ClientCreatorContext.aenter with async flag. Modified wrapping dispatch logic in _instrument to select async-specific wrappers based on async flag. Added asyncio and inspect imports.
Development Dependencies
packages/opentelemetry-instrumentation-bedrock/pyproject.toml
Added dev dependencies: aioboto3 (^15.5.0) and pytest-asyncio (>=0.21,<0.24).
Async Test Coverage
packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py
New test module with two async test cases: test_aioboto3_converse verifies span creation and attributes for bedrock.converse operations; test_aioboto3_wrapping validates that client methods are properly wrapped.

Sequence Diagram

sequenceDiagram
    participant App as Application Code
    participant AioSession as aioboto3.Session
    participant AsyncCtx as ClientCreatorContext<br/>(__aenter__)
    participant Wrapper as _wrap_aioboto3_context
    participant BedClient as Instrumented<br/>Bedrock Client
    participant Tracer as Tracer/Metrics
    participant BedAPI as Bedrock API

    App->>AioSession: async with session.client()
    AioSession->>AsyncCtx: __aenter__()
    AsyncCtx->>Wrapper: Wrapping triggered
    Wrapper->>BedClient: Inject async instrumentation<br/>(converse, invoke_model, etc.)
    BedClient-->>App: Return instrumented client
    App->>BedClient: await client.converse(...)
    BedClient->>Tracer: Start span + record metrics
    BedClient->>BedAPI: Invoke bedrock operation
    BedAPI-->>BedClient: Response + usage tokens
    BedClient->>Tracer: End span + populate attributes<br/>(gen_ai.usage.*)
    BedClient-->>App: Return response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 Async paths now shimmer bright,
Aioboto3 runs through the night,
Bedrock converse spans take flight,
With usage tokens gleaming in sight,
No more waiting—just pure delight! 🚀✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main change: adding aioboto3 async client instrumentation to the bedrock package.
Linked Issues check ✅ Passed The PR fully addresses issue #3510 by implementing async client instrumentation, span generation for converse calls, and standard GenAI span attributes for aioboto3 async clients.
Out of Scope Changes check ✅ Passed All changes are directly scoped to adding aioboto3 async instrumentation support; no unrelated or out-of-scope modifications are present.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@ellipsis-dev ellipsis-dev Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Changes requested ❌

Reviewed everything up to 6081bbe in 2 minutes and 43 seconds. Click for details.
  • Reviewed 296 lines of code in 3 files
  • Skipped 1 files when reviewing.
  • Skipped posting 4 draft comments. View those below.
  • Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py:108
  • Draft comment:
    Ensure that the package path 'aiobotocore.session' for ClientCreatorContext matches the aioboto3 version in use. This is critical for proper async client instrumentation.
  • Reason this comment was not posted:
    Comment did not seem useful. Confidence is useful = 0% <= threshold 50% The comment is asking the author to ensure compatibility between aiobotocore.session and aioboto3 versions, which is related to dependencies. The comment does not provide a specific suggestion or ask for a test to be written, and it is more of a cautionary note. This violates the rule against asking the author to ensure compatibility without a specific actionable suggestion.
2. packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py:333
  • Draft comment:
    Async wrappers (e.g. _instrumented_model_invoke_async) use a synchronous 'with' for starting spans. If the tracer supports async context management, consider using 'async with' for better compatibility in async code.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 15% vs. threshold = 50% This comment is speculative because it starts with "If the tracer supports..." - it's not stating definitively that there's a problem. The rules explicitly say "Do NOT make speculative comments, such as 'If X, then Y is an issue'. Only comment if it is definitely an issue." Additionally, the comment doesn't provide evidence that the OpenTelemetry tracer actually supports async context management or that using synchronous 'with' is causing problems. The code appears to be working as intended - the 'with' statement is synchronous but that doesn't mean it's wrong in an async context. Without seeing documentation or evidence that the tracer has an async context manager interface, this is just a suggestion without strong evidence. Perhaps OpenTelemetry tracers do support async context managers and this is a legitimate best practice for async code. The comment might be pointing out a real compatibility or performance issue that I'm not aware of. Even if OpenTelemetry tracers support async context managers, the comment is phrased speculatively ("If the tracer supports...") rather than definitively. Without strong evidence that this is causing an actual problem or that it's required, this falls under speculative comments that should be deleted per the rules. The PR author likely tested this code and it works with synchronous context managers. This comment should be deleted because it's speculative (uses "If the tracer supports...") and doesn't provide strong evidence of an actual issue. The rules explicitly state not to make speculative comments.
3. packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py:82
  • Draft comment:
    Test 'test_aioboto3_wrapping' correctly asserts that the client methods are wrapped. This confirms that both sync and async instrumentation are applied as expected.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50% None
4. packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py:108
  • Draft comment:
    Typographical note: The comment on this block refers to 'aioboto3' client creation, but the package specified in the dictionary is 'aiobotocore.session'. Please verify if this is intentional or if the naming should be aligned for consistency.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 20% vs. threshold = 50% This comment is asking the PR author to "verify" something, which is explicitly against the rules ("Do NOT ask the PR author to confirm their intention, to explain, to double-check things, to ensure the behavior is intended"). The comment is not suggesting a clear code change - it's just pointing out that the comment mentions aioboto3 while the code uses aiobotocore. While there is a technical distinction (aioboto3 wraps aiobotocore), this is more of a documentation/comment clarity issue rather than a functional code issue. The comment doesn't suggest what should be changed or why it matters functionally. It's asking for verification rather than pointing out a definite problem. The comment might be highlighting a legitimate documentation inconsistency that could confuse future maintainers. If the inline comment is misleading about what's actually being wrapped, that could be a valid issue to fix. However, it's phrased as "please verify" which is explicitly against the rules. While there may be a documentation inconsistency, the comment violates the rule about not asking the PR author to verify or confirm things. The comment should either suggest a specific change to the comment text or be removed. Since it's asking for verification rather than pointing out a definite problem with a clear fix, it should be deleted. This comment should be deleted because it asks the PR author to "verify" something, which violates the rules. It's not suggesting a clear code change, just pointing out a potential naming inconsistency in a comment that may or may not be intentional.

Workflow ID: wflow_rV5Qx5jbfintddhm

You can customize Ellipsis by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.

client.converse_stream, tracer, metric_params, event_logger
)
except Exception as e:
logger.debug(f"Failed to instrument aioboto3 client: {e}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider expanding the debug log in the exception block so that failures in instrumentation of _wrap_aioboto3_context are easier to diagnose.

Suggested change
logger.debug(f"Failed to instrument aioboto3 client: {e}")
logger.exception(f"Failed to instrument aioboto3 client: {e}")

"""OpenTelemetry Bedrock instrumentation"""

import asyncio
import inspect
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inspect module is imported but not used. Remove it to clean up unused dependencies.

Suggested change
import inspect

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`:
- Around line 3-4: The file imports unused modules asyncio and inspect at the
top; remove the two import statements (the "asyncio" and "inspect" imports) from
the module-level imports in __init__.py so that no unused imports remain, verify
there are no references to asyncio or inspect elsewhere in that module after
removal, and run linting (flake8) to confirm the unused-import warning is
resolved.
- Around line 333-346: The async instrument wrapper
_instrumented_model_invoke_async currently calls the sync _handle_call which
uses ReusableStreamingBody and response.get("body").read(), causing failures for
aioboto3 async streaming bodies; implement an async counterpart (e.g.,
_handle_call_async) that accepts the same params (span, kwargs, response,
metric_params, event_logger), awaits async body reads when response["body"] is
an async stream (await response["body"].read()), avoids wrapping async streams
with ReusableStreamingBody, and preserves the same telemetry/metric/event
behavior; then update _instrumented_model_invoke_async to await
_handle_call_async(response...) instead of calling the sync _handle_call so
async invoke_model responses are handled correctly.
- Around line 349-364: The async wrappers
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async are calling synchronous handlers
(_handle_stream_call and _handle_converse_stream) that wrap response bodies with
the synchronous StreamingWrapper (using iter()/next()), which breaks for
aioboto3 async iterators; implement async-aware handlers (e.g.,
_handle_stream_call_async and _handle_converse_stream_async) that create an
AsyncStreamingWrapper implementing __aiter__ and __anext__ (consuming async for
/ anext) instead of iter()/next(), and update
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async to call these new async handlers (and pass
the same kwargs/metric_params/event_logger) so async streaming responses are
handled without TypeError.

In
`@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`:
- Around line 2-4: Remove the unused top-level import "asyncio" from the imports
block (delete the line "import asyncio") so only used modules remain (e.g.,
"import os" and "from unittest.mock import AsyncMock, patch"); update the import
block accordingly and run linters/tests to confirm no unused-import errors
remain.
- Around line 49-54: The test assigns an unused variable "response" when calling
client.converse with client._make_api_call patched; replace the unused
assignment with an underscore (await client.converse(...)) or add meaningful
assertions on the returned value to use it (referencing client, _make_api_call,
and converse) so the test no longer contains an unused-variable. Ensure the
patch still uses AsyncMock(return_value=mock_response) and that any added
assertions validate mock_response fields or behavior.
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py (1)

236-258: Missing error metrics for async wrapper.

The sync _wrap function (lines 189-202) records duration_histogram and exception_counter metrics when client creation fails. The async _wrap_aioboto3_context only logs the exception at debug level without recording metrics, creating an inconsistency in observability between sync and async code paths.

Proposed fix to add error metrics
     if is_bedrock:
         try:
+            start_time = time.time()
             # Instrument the client's methods with async wrappers
             client.invoke_model = _instrumented_model_invoke_async(
                 client.invoke_model, tracer, metric_params, event_logger
             )
             client.invoke_model_with_response_stream = (
                 _instrumented_model_invoke_with_response_stream_async(
                     client.invoke_model_with_response_stream,
                     tracer,
                     metric_params,
                     event_logger,
                 )
             )
             client.converse = _instrumented_converse_async(
                 client.converse, tracer, metric_params, event_logger
             )
             client.converse_stream = _instrumented_converse_stream_async(
                 client.converse_stream, tracer, metric_params, event_logger
             )
         except Exception as e:
+            end_time = time.time()
+            duration = end_time - start_time if "start_time" in locals() else 0
+            attributes = {"error.type": e.__class__.__name__}
+            if duration > 0 and metric_params.duration_histogram:
+                metric_params.duration_histogram.record(duration, attributes=attributes)
+            if metric_params.exception_counter:
+                metric_params.exception_counter.add(1, attributes=attributes)
             logger.debug(f"Failed to instrument aioboto3 client: {e}")
 
     return client
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8e52c97 and 6081bbe.

⛔ Files ignored due to path filters (1)
  • packages/opentelemetry-instrumentation-bedrock/poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py
  • packages/opentelemetry-instrumentation-bedrock/pyproject.toml
  • packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules

Files:

  • packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py
  • packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py
🧬 Code graph analysis (1)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py (3)
packages/opentelemetry-instrumentation-llamaindex/opentelemetry/instrumentation/llamaindex/utils.py (1)
  • _with_tracer_wrapper (16-23)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (1)
  • _with_tracer_wrapper (116-123)
packages/opentelemetry-instrumentation-weaviate/opentelemetry/instrumentation/weaviate/wrapper.py (2)
  • _with_tracer_wrapper (14-23)
  • _wrap (34-51)
🪛 Flake8 (7.3.0)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

[error] 3-3: 'asyncio' imported but unused

(F401)


[error] 4-4: 'inspect' imported but unused

(F401)

packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py

[error] 2-2: 'asyncio' imported but unused

(F401)


[error] 50-50: local variable 'response' is assigned to but never used

(F841)

🪛 Ruff (0.14.11)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

212-212: Unused function argument: to_wrap

(ARG001)


214-214: Unused function argument: instance

(ARG001)


256-256: Do not catch blind exception: Exception

(BLE001)

packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py

14-14: Unused function argument: instrument_legacy

(ARG001)


14-14: Unused function argument: log_exporter

(ARG001)


50-50: Local variable response is assigned to but never used

Remove assignment to unused variable response

(F841)


73-73: Unused function argument: instrument_legacy

(ARG001)

🔇 Additional comments (5)
packages/opentelemetry-instrumentation-bedrock/pyproject.toml (1)

37-38: LGTM!

The dev dependencies for aioboto3 and pytest-asyncio are appropriately added to support the new async instrumentation tests.

packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py (1)

72-87: LGTM!

Good test coverage for verifying that the async client methods are properly wrapped by the instrumentation.

packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py (3)

108-114: LGTM!

The wrapped method entry for aiobotocore.session.ClientCreatorContext.__aenter__ correctly targets the async context manager entry point with the async flag for proper wrapper selection.


774-790: LGTM!

Clean implementation of wrapper selection based on the async flag. The default False ensures backward compatibility with existing sync wrappers.


367-381: LGTM!

The async converse wrapper correctly mirrors the sync implementation with proper await usage and span management.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +3 to +4
import asyncio
import inspect
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unused imports asyncio and inspect.

These imports are not used anywhere in the file. Per Flake8 rules in the coding guidelines, they should be removed.

Proposed fix
 """OpenTelemetry Bedrock instrumentation"""
 
-import asyncio
-import inspect
 import json
 import logging
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import asyncio
import inspect
"""OpenTelemetry Bedrock instrumentation"""
import json
import logging
🧰 Tools
🪛 Flake8 (7.3.0)

[error] 3-3: 'asyncio' imported but unused

(F401)


[error] 4-4: 'inspect' imported but unused

(F401)

🤖 Prompt for AI Agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`
around lines 3 - 4, The file imports unused modules asyncio and inspect at the
top; remove the two import statements (the "asyncio" and "inspect" imports) from
the module-level imports in __init__.py so that no unused imports remain, verify
there are no references to asyncio or inspect elsewhere in that module after
removal, and run linting (flake8) to confirm the unused-import warning is
resolved.

Comment on lines +333 to +346
def _instrumented_model_invoke_async(fn, tracer, metric_params, event_logger):
@wraps(fn)
async def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return await fn(*args, **kwargs)

with tracer.start_as_current_span(
_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT
) as span:
response = await fn(*args, **kwargs)
_handle_call(span, kwargs, response, metric_params, event_logger)
return response

return with_instrumentation
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find and examine the ReusableStreamingBody class
fd -t f "\.py$" | xargs grep -l "class ReusableStreamingBody"

Repository: traceloop/openllmetry

Length of output: 292


🏁 Script executed:

#!/bin/bash
# Also search for the _handle_call function definition
rg "def _handle_call" -A 40 packages/opentelemetry-instrumentation-bedrock/

Repository: traceloop/openllmetry

Length of output: 5490


🏁 Script executed:

#!/bin/bash
# Check the full context of the __init__.py file to see how _handle_call is used
wc -l packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 165


🏁 Script executed:

#!/bin/bash
# Read the ReusableStreamingBody class
cat -n packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/reusable_streaming_body.py

Repository: traceloop/openllmetry

Length of output: 2035


🏁 Script executed:

#!/bin/bash
# Get the full async function context
sed -n '333,346p' packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 646


🏁 Script executed:

#!/bin/bash
# Check if there's a separate async version of _handle_call
rg "def _handle_call" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py -B 2 -A 50 | head -60

Repository: traceloop/openllmetry

Length of output: 1970


🏁 Script executed:

#!/bin/bash
# Search for async versions of _handle_call
rg "_handle_call" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py | grep -i async

Repository: traceloop/openllmetry

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Check the sync version (_instrumented_model_invoke) to compare
rg "def _instrumented_model_invoke\(" -A 15 packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 624


🏁 Script executed:

#!/bin/bash
# Search for aioboto3 or asyncio related code to understand async body structure
rg "aioboto3|asyncio|async def" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 641


🏁 Script executed:

#!/bin/bash
# Verify that aioboto3 returns different response body types than botocore
rg "from aioboto3|import aioboto3" packages/opentelemetry-instrumentation-bedrock/

Repository: traceloop/openllmetry

Length of output: 303


🏁 Script executed:

#!/bin/bash
# Check if there's any try/except or special handling for async bodies
sed -n '447,483p' packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py | cat -n

Repository: traceloop/openllmetry

Length of output: 1683


Create an async version of _handle_call for async response handling.

The _instrumented_model_invoke_async function calls the synchronous _handle_call after awaiting an aioboto3 async response. However, _handle_call is entirely synchronous—it wraps the response body with ReusableStreamingBody (a sync-only wrapper) and immediately calls response.get("body").read(), which cannot handle async streaming bodies from aioboto3. This will fail when processing async invoke_model responses.

Create an async version of _handle_call that properly handles async streaming bodies, or ensure that aioboto3 responses are handled separately without synchronous stream operations.

🤖 Prompt for AI Agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`
around lines 333 - 346, The async instrument wrapper
_instrumented_model_invoke_async currently calls the sync _handle_call which
uses ReusableStreamingBody and response.get("body").read(), causing failures for
aioboto3 async streaming bodies; implement an async counterpart (e.g.,
_handle_call_async) that accepts the same params (span, kwargs, response,
metric_params, event_logger), awaits async body reads when response["body"] is
an async stream (await response["body"].read()), avoids wrapping async streams
with ReusableStreamingBody, and preserves the same telemetry/metric/event
behavior; then update _instrumented_model_invoke_async to await
_handle_call_async(response...) instead of calling the sync _handle_call so
async invoke_model responses are handled correctly.

Comment on lines +349 to +364
def _instrumented_model_invoke_with_response_stream_async(
fn, tracer, metric_params, event_logger
):
@wraps(fn)
async def with_instrumentation(*args, **kwargs):
if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY):
return await fn(*args, **kwargs)

span = tracer.start_span(_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT)

response = await fn(*args, **kwargs)
_handle_stream_call(span, kwargs, response, metric_params, event_logger)

return response

return with_instrumentation
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cd /tmp && find . -name "*.py" -path "*bedrock*" -type f | head -20

Repository: traceloop/openllmetry

Length of output: 47


🏁 Script executed:

git ls-files | grep -i bedrock | head -20

Repository: traceloop/openllmetry

Length of output: 1995


🏁 Script executed:

cat /tmp/repo_structure.txt 2>/dev/null || git ls-files | grep bedrock | head -30

Repository: traceloop/openllmetry

Length of output: 3011


🏁 Script executed:

find . -path "*opentelemetry/instrumentation/bedrock*" -type f

Repository: traceloop/openllmetry

Length of output: 1189


🏁 Script executed:

rg "_handle_stream_call|_handle_converse_stream" --type py -B 5 -A 15

Repository: traceloop/openllmetry

Length of output: 18511


🏁 Script executed:

sed -n '330,410p' packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 2804


🌐 Web query:

aioboto3 bedrock invoke_model_with_response_stream async iterator streaming

💡 Result:

Short answer + example: yes — call invoke_model_with_response_stream on an async Bedrock client and iterate the returned EventStream asynchronously; each event contains a single top‑level key (usually "chunk" with bytes you decode/JSON‑parse). Example pattern (aioboto3 / types-aioboto3 style):

  • Create async client with aioboto3.
  • await client.invoke_model_with_response_stream(...)
  • async for event in response["body"]: handle event (e.g. event["chunk"]["bytes"])

Minimal example:

import json
import aioboto3
import asyncio

async def stream_bedrock(model_id, payload_json):
    async with aioboto3.client("bedrock-runtime", region_name="us-east-1") as client:
        resp = await client.invoke_model_with_response_stream(
            modelId=model_id,
            body=json.dumps(payload_json).encode("utf-8"),
            accept="application/vnd.amazon.eventstream",
            contentType="application/json",
        )
        async for event in resp["body"]:
            # events have one top-level key per event (chunk / errors / etc.)
            if "chunk" in event:
                chunk_bytes = event["chunk"]["bytes"]
                chunk = json.loads(chunk_bytes)
                # e.g. stream deltas: chunk.get("delta")
                print(chunk, end="", flush=True)
            elif "modelStreamErrorException" in event:
                raise RuntimeError(event["modelStreamErrorException"])

asyncio.run(stream_bedrock("amazon.titan-text-express-v1", {"inputText":"Hello"}))

References: AWS InvokeModelWithResponseStream API (streaming response/event shape) and Python examples. [1][2][3]

References
[1] InvokeModelWithResponseStream — Amazon Bedrock API Reference.
[2] Boto3 / invoke_model_with_response_stream (EventStream response shape).
[3] AWS Python SDK example: invoke_model_with_response_stream (streaming code example).


🏁 Script executed:

rg -A 30 "^def _handle_stream_call" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 1054


🏁 Script executed:

rg -A 30 "^def _handle_converse_stream" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 1483


🏁 Script executed:

rg -A 50 "def stream_done" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py | head -80

Repository: traceloop/openllmetry

Length of output: 1813


🏁 Script executed:

rg "on_stream_event|add_stream_event|response\['body'\]" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py

Repository: traceloop/openllmetry

Length of output: 47


🏁 Script executed:

cat packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/streaming_wrapper.py

Repository: traceloop/openllmetry

Length of output: 2582


Async streaming handlers are incompatible with aioboto3 async iterators.

The async wrappers _instrumented_model_invoke_with_response_stream_async (lines 349-364) and _instrumented_converse_stream_async (lines 384-397) call synchronous handlers (_handle_stream_call and _handle_converse_stream) that wrap the response body with StreamingWrapper, a synchronous wrapper using iter() and next() calls. However, aioboto3 returns async iterators for streaming responses, which require async for iteration and cannot be consumed by synchronous iterators. This will raise TypeError at runtime when these code paths execute. The async streaming wrappers need dedicated async-aware stream handlers instead of reusing the sync ones.

🤖 Prompt for AI Agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`
around lines 349 - 364, The async wrappers
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async are calling synchronous handlers
(_handle_stream_call and _handle_converse_stream) that wrap response bodies with
the synchronous StreamingWrapper (using iter()/next()), which breaks for
aioboto3 async iterators; implement async-aware handlers (e.g.,
_handle_stream_call_async and _handle_converse_stream_async) that create an
AsyncStreamingWrapper implementing __aiter__ and __anext__ (consuming async for
/ anext) instead of iter()/next(), and update
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async to call these new async handlers (and pass
the same kwargs/metric_params/event_logger) so async streaming responses are
handled without TypeError.

Comment on lines +2 to +4
import asyncio
import os
from unittest.mock import AsyncMock, patch
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unused import asyncio.

The asyncio module is imported but not used in this file. Per Flake8 rules in the coding guidelines, this should be removed.

Proposed fix
 """Tests for aioboto3 async client instrumentation"""
-import asyncio
 import os
 from unittest.mock import AsyncMock, patch
🧰 Tools
🪛 Flake8 (7.3.0)

[error] 2-2: 'asyncio' imported but unused

(F401)

🤖 Prompt for AI Agents
In `@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`
around lines 2 - 4, Remove the unused top-level import "asyncio" from the
imports block (delete the line "import asyncio") so only used modules remain
(e.g., "import os" and "from unittest.mock import AsyncMock, patch"); update the
import block accordingly and run linters/tests to confirm no unused-import
errors remain.

Comment on lines +49 to +54
with patch.object(client, '_make_api_call', new=AsyncMock(return_value=mock_response)):
response = await client.converse(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
messages=messages,
inferenceConfig={"temperature": 0.5},
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused variable response.

The response variable is assigned but never used. Consider using _ to indicate it's intentionally discarded, or add assertions on the response if needed.

Proposed fix
         with patch.object(client, '_make_api_call', new=AsyncMock(return_value=mock_response)):
-            response = await client.converse(
+            await client.converse(
                 modelId="anthropic.claude-3-haiku-20240307-v1:0",
                 messages=messages,
                 inferenceConfig={"temperature": 0.5},
             )
🧰 Tools
🪛 Flake8 (7.3.0)

[error] 50-50: local variable 'response' is assigned to but never used

(F841)

🪛 Ruff (0.14.11)

50-50: Local variable response is assigned to but never used

Remove assignment to unused variable response

(F841)

🤖 Prompt for AI Agents
In `@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`
around lines 49 - 54, The test assigns an unused variable "response" when
calling client.converse with client._make_api_call patched; replace the unused
assignment with an underscore (await client.converse(...)) or add meaningful
assertions on the returned value to use it (referencing client, _make_api_call,
and converse) so the test no longer contains an unused-variable. Ensure the
patch still uses AsyncMock(return_value=mock_response) and that any added
assertions validate mock_response fields or behavior.

@dvirski
Copy link
Copy Markdown
Contributor

dvirski commented May 19, 2026

Thanks for the PR! Closing in favor of #4135, which lands aioboto3 async support for all 4 Bedrock runtime methods. Really appreciate you opening this — it gave us a head start on what the wrapping needed to look like.

@dvirski dvirski closed this May 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

🐛 Bug Report: Add support to aioboto3 + bedrock converse

3 participants