fix(bedrock): add aioboto3 async client instrumentation#3536
Conversation
Fixes traceloop#3510 ## Problem aioboto3 (async boto3) clients were not being instrumented, resulting in no OpenTelemetry spans or attributes for async Bedrock API calls. ## Root Cause aioboto3 uses `ClientCreatorContext.__aenter__` (async context manager) for client creation, which wasn't wrapped by the instrumentation. The existing wrappers only covered sync boto3's `create_client()` method. ## Solution - Added `ClientCreatorContext.__aenter__` to WRAPPED_METHODS - Created async wrapper `_wrap_aioboto3_context()` that instruments clients when they're created from the async context manager - Added 4 async instrumentation wrappers for all Bedrock methods: - `_instrumented_model_invoke_async()` - `_instrumented_model_invoke_with_response_stream_async()` - `_instrumented_converse_async()` - `_instrumented_converse_stream_async()` - Modified instrumentation loop to use appropriate wrapper based on async flag ## Testing - Added comprehensive aioboto3 tests in `test_aioboto3.py` - All 79 tests pass (77 existing + 2 new) - Verified backward compatibility with sync boto3 - aioboto3 now generates spans with all OpenTelemetry semantic convention attributes (gen_ai.system, model, tokens, content, etc.)
|
|
📝 WalkthroughWalkthroughAdded async support for aioboto3 Bedrock client instrumentation, including async wrapper functions for bedrock-runtime operations (converse, invoke_model, and streaming variants), aiobotocore async context manager integration, and corresponding test coverage. Changes
Sequence DiagramsequenceDiagram
participant App as Application Code
participant AioSession as aioboto3.Session
participant AsyncCtx as ClientCreatorContext<br/>(__aenter__)
participant Wrapper as _wrap_aioboto3_context
participant BedClient as Instrumented<br/>Bedrock Client
participant Tracer as Tracer/Metrics
participant BedAPI as Bedrock API
App->>AioSession: async with session.client()
AioSession->>AsyncCtx: __aenter__()
AsyncCtx->>Wrapper: Wrapping triggered
Wrapper->>BedClient: Inject async instrumentation<br/>(converse, invoke_model, etc.)
BedClient-->>App: Return instrumented client
App->>BedClient: await client.converse(...)
BedClient->>Tracer: Start span + record metrics
BedClient->>BedAPI: Invoke bedrock operation
BedAPI-->>BedClient: Response + usage tokens
BedClient->>Tracer: End span + populate attributes<br/>(gen_ai.usage.*)
BedClient-->>App: Return response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Changes requested ❌
Reviewed everything up to 6081bbe in 2 minutes and 43 seconds. Click for details.
- Reviewed
296lines of code in3files - Skipped
1files when reviewing. - Skipped posting
4draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py:108
- Draft comment:
Ensure that the package path 'aiobotocore.session' for ClientCreatorContext matches the aioboto3 version in use. This is critical for proper async client instrumentation. - Reason this comment was not posted:
Comment did not seem useful. Confidence is useful =0%<= threshold50%The comment is asking the author to ensure compatibility betweenaiobotocore.sessionandaioboto3versions, which is related to dependencies. The comment does not provide a specific suggestion or ask for a test to be written, and it is more of a cautionary note. This violates the rule against asking the author to ensure compatibility without a specific actionable suggestion.
2. packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py:333
- Draft comment:
Async wrappers (e.g. _instrumented_model_invoke_async) use a synchronous 'with' for starting spans. If the tracer supports async context management, consider using 'async with' for better compatibility in async code. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 15% vs. threshold = 50% This comment is speculative because it starts with "If the tracer supports..." - it's not stating definitively that there's a problem. The rules explicitly say "Do NOT make speculative comments, such as 'If X, then Y is an issue'. Only comment if it is definitely an issue." Additionally, the comment doesn't provide evidence that the OpenTelemetry tracer actually supports async context management or that using synchronous 'with' is causing problems. The code appears to be working as intended - the 'with' statement is synchronous but that doesn't mean it's wrong in an async context. Without seeing documentation or evidence that the tracer has an async context manager interface, this is just a suggestion without strong evidence. Perhaps OpenTelemetry tracers do support async context managers and this is a legitimate best practice for async code. The comment might be pointing out a real compatibility or performance issue that I'm not aware of. Even if OpenTelemetry tracers support async context managers, the comment is phrased speculatively ("If the tracer supports...") rather than definitively. Without strong evidence that this is causing an actual problem or that it's required, this falls under speculative comments that should be deleted per the rules. The PR author likely tested this code and it works with synchronous context managers. This comment should be deleted because it's speculative (uses "If the tracer supports...") and doesn't provide strong evidence of an actual issue. The rules explicitly state not to make speculative comments.
3. packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py:82
- Draft comment:
Test 'test_aioboto3_wrapping' correctly asserts that the client methods are wrapped. This confirms that both sync and async instrumentation are applied as expected. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
4. packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py:108
- Draft comment:
Typographical note: The comment on this block refers to 'aioboto3' client creation, but the package specified in the dictionary is 'aiobotocore.session'. Please verify if this is intentional or if the naming should be aligned for consistency. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 20% vs. threshold = 50% This comment is asking the PR author to "verify" something, which is explicitly against the rules ("Do NOT ask the PR author to confirm their intention, to explain, to double-check things, to ensure the behavior is intended"). The comment is not suggesting a clear code change - it's just pointing out that the comment mentions aioboto3 while the code uses aiobotocore. While there is a technical distinction (aioboto3 wraps aiobotocore), this is more of a documentation/comment clarity issue rather than a functional code issue. The comment doesn't suggest what should be changed or why it matters functionally. It's asking for verification rather than pointing out a definite problem. The comment might be highlighting a legitimate documentation inconsistency that could confuse future maintainers. If the inline comment is misleading about what's actually being wrapped, that could be a valid issue to fix. However, it's phrased as "please verify" which is explicitly against the rules. While there may be a documentation inconsistency, the comment violates the rule about not asking the PR author to verify or confirm things. The comment should either suggest a specific change to the comment text or be removed. Since it's asking for verification rather than pointing out a definite problem with a clear fix, it should be deleted. This comment should be deleted because it asks the PR author to "verify" something, which violates the rules. It's not suggesting a clear code change, just pointing out a potential naming inconsistency in a comment that may or may not be intentional.
Workflow ID: wflow_rV5Qx5jbfintddhm
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
| client.converse_stream, tracer, metric_params, event_logger | ||
| ) | ||
| except Exception as e: | ||
| logger.debug(f"Failed to instrument aioboto3 client: {e}") |
There was a problem hiding this comment.
Consider expanding the debug log in the exception block so that failures in instrumentation of _wrap_aioboto3_context are easier to diagnose.
| logger.debug(f"Failed to instrument aioboto3 client: {e}") | |
| logger.exception(f"Failed to instrument aioboto3 client: {e}") |
| """OpenTelemetry Bedrock instrumentation""" | ||
|
|
||
| import asyncio | ||
| import inspect |
There was a problem hiding this comment.
The inspect module is imported but not used. Remove it to clean up unused dependencies.
| import inspect |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`:
- Around line 3-4: The file imports unused modules asyncio and inspect at the
top; remove the two import statements (the "asyncio" and "inspect" imports) from
the module-level imports in __init__.py so that no unused imports remain, verify
there are no references to asyncio or inspect elsewhere in that module after
removal, and run linting (flake8) to confirm the unused-import warning is
resolved.
- Around line 333-346: The async instrument wrapper
_instrumented_model_invoke_async currently calls the sync _handle_call which
uses ReusableStreamingBody and response.get("body").read(), causing failures for
aioboto3 async streaming bodies; implement an async counterpart (e.g.,
_handle_call_async) that accepts the same params (span, kwargs, response,
metric_params, event_logger), awaits async body reads when response["body"] is
an async stream (await response["body"].read()), avoids wrapping async streams
with ReusableStreamingBody, and preserves the same telemetry/metric/event
behavior; then update _instrumented_model_invoke_async to await
_handle_call_async(response...) instead of calling the sync _handle_call so
async invoke_model responses are handled correctly.
- Around line 349-364: The async wrappers
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async are calling synchronous handlers
(_handle_stream_call and _handle_converse_stream) that wrap response bodies with
the synchronous StreamingWrapper (using iter()/next()), which breaks for
aioboto3 async iterators; implement async-aware handlers (e.g.,
_handle_stream_call_async and _handle_converse_stream_async) that create an
AsyncStreamingWrapper implementing __aiter__ and __anext__ (consuming async for
/ anext) instead of iter()/next(), and update
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async to call these new async handlers (and pass
the same kwargs/metric_params/event_logger) so async streaming responses are
handled without TypeError.
In
`@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`:
- Around line 2-4: Remove the unused top-level import "asyncio" from the imports
block (delete the line "import asyncio") so only used modules remain (e.g.,
"import os" and "from unittest.mock import AsyncMock, patch"); update the import
block accordingly and run linters/tests to confirm no unused-import errors
remain.
- Around line 49-54: The test assigns an unused variable "response" when calling
client.converse with client._make_api_call patched; replace the unused
assignment with an underscore (await client.converse(...)) or add meaningful
assertions on the returned value to use it (referencing client, _make_api_call,
and converse) so the test no longer contains an unused-variable. Ensure the
patch still uses AsyncMock(return_value=mock_response) and that any added
assertions validate mock_response fields or behavior.
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py (1)
236-258: Missing error metrics for async wrapper.The sync
_wrapfunction (lines 189-202) recordsduration_histogramandexception_countermetrics when client creation fails. The async_wrap_aioboto3_contextonly logs the exception at debug level without recording metrics, creating an inconsistency in observability between sync and async code paths.Proposed fix to add error metrics
if is_bedrock: try: + start_time = time.time() # Instrument the client's methods with async wrappers client.invoke_model = _instrumented_model_invoke_async( client.invoke_model, tracer, metric_params, event_logger ) client.invoke_model_with_response_stream = ( _instrumented_model_invoke_with_response_stream_async( client.invoke_model_with_response_stream, tracer, metric_params, event_logger, ) ) client.converse = _instrumented_converse_async( client.converse, tracer, metric_params, event_logger ) client.converse_stream = _instrumented_converse_stream_async( client.converse_stream, tracer, metric_params, event_logger ) except Exception as e: + end_time = time.time() + duration = end_time - start_time if "start_time" in locals() else 0 + attributes = {"error.type": e.__class__.__name__} + if duration > 0 and metric_params.duration_histogram: + metric_params.duration_histogram.record(duration, attributes=attributes) + if metric_params.exception_counter: + metric_params.exception_counter.add(1, attributes=attributes) logger.debug(f"Failed to instrument aioboto3 client: {e}") return client
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
packages/opentelemetry-instrumentation-bedrock/poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pypackages/opentelemetry-instrumentation-bedrock/pyproject.tomlpackages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pypackages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py
🧬 Code graph analysis (1)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py (3)
packages/opentelemetry-instrumentation-llamaindex/opentelemetry/instrumentation/llamaindex/utils.py (1)
_with_tracer_wrapper(16-23)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (1)
_with_tracer_wrapper(116-123)packages/opentelemetry-instrumentation-weaviate/opentelemetry/instrumentation/weaviate/wrapper.py (2)
_with_tracer_wrapper(14-23)_wrap(34-51)
🪛 Flake8 (7.3.0)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py
[error] 3-3: 'asyncio' imported but unused
(F401)
[error] 4-4: 'inspect' imported but unused
(F401)
packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py
[error] 2-2: 'asyncio' imported but unused
(F401)
[error] 50-50: local variable 'response' is assigned to but never used
(F841)
🪛 Ruff (0.14.11)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py
212-212: Unused function argument: to_wrap
(ARG001)
214-214: Unused function argument: instance
(ARG001)
256-256: Do not catch blind exception: Exception
(BLE001)
packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py
14-14: Unused function argument: instrument_legacy
(ARG001)
14-14: Unused function argument: log_exporter
(ARG001)
50-50: Local variable response is assigned to but never used
Remove assignment to unused variable response
(F841)
73-73: Unused function argument: instrument_legacy
(ARG001)
🔇 Additional comments (5)
packages/opentelemetry-instrumentation-bedrock/pyproject.toml (1)
37-38: LGTM!The dev dependencies for
aioboto3andpytest-asyncioare appropriately added to support the new async instrumentation tests.packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py (1)
72-87: LGTM!Good test coverage for verifying that the async client methods are properly wrapped by the instrumentation.
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py (3)
108-114: LGTM!The wrapped method entry for
aiobotocore.session.ClientCreatorContext.__aenter__correctly targets the async context manager entry point with theasyncflag for proper wrapper selection.
774-790: LGTM!Clean implementation of wrapper selection based on the async flag. The default
Falseensures backward compatibility with existing sync wrappers.
367-381: LGTM!The async converse wrapper correctly mirrors the sync implementation with proper
awaitusage and span management.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| import asyncio | ||
| import inspect |
There was a problem hiding this comment.
Remove unused imports asyncio and inspect.
These imports are not used anywhere in the file. Per Flake8 rules in the coding guidelines, they should be removed.
Proposed fix
"""OpenTelemetry Bedrock instrumentation"""
-import asyncio
-import inspect
import json
import logging📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import asyncio | |
| import inspect | |
| """OpenTelemetry Bedrock instrumentation""" | |
| import json | |
| import logging |
🧰 Tools
🪛 Flake8 (7.3.0)
[error] 3-3: 'asyncio' imported but unused
(F401)
[error] 4-4: 'inspect' imported but unused
(F401)
🤖 Prompt for AI Agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`
around lines 3 - 4, The file imports unused modules asyncio and inspect at the
top; remove the two import statements (the "asyncio" and "inspect" imports) from
the module-level imports in __init__.py so that no unused imports remain, verify
there are no references to asyncio or inspect elsewhere in that module after
removal, and run linting (flake8) to confirm the unused-import warning is
resolved.
| def _instrumented_model_invoke_async(fn, tracer, metric_params, event_logger): | ||
| @wraps(fn) | ||
| async def with_instrumentation(*args, **kwargs): | ||
| if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY): | ||
| return await fn(*args, **kwargs) | ||
|
|
||
| with tracer.start_as_current_span( | ||
| _BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT | ||
| ) as span: | ||
| response = await fn(*args, **kwargs) | ||
| _handle_call(span, kwargs, response, metric_params, event_logger) | ||
| return response | ||
|
|
||
| return with_instrumentation |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find and examine the ReusableStreamingBody class
fd -t f "\.py$" | xargs grep -l "class ReusableStreamingBody"Repository: traceloop/openllmetry
Length of output: 292
🏁 Script executed:
#!/bin/bash
# Also search for the _handle_call function definition
rg "def _handle_call" -A 40 packages/opentelemetry-instrumentation-bedrock/Repository: traceloop/openllmetry
Length of output: 5490
🏁 Script executed:
#!/bin/bash
# Check the full context of the __init__.py file to see how _handle_call is used
wc -l packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 165
🏁 Script executed:
#!/bin/bash
# Read the ReusableStreamingBody class
cat -n packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/reusable_streaming_body.pyRepository: traceloop/openllmetry
Length of output: 2035
🏁 Script executed:
#!/bin/bash
# Get the full async function context
sed -n '333,346p' packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 646
🏁 Script executed:
#!/bin/bash
# Check if there's a separate async version of _handle_call
rg "def _handle_call" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py -B 2 -A 50 | head -60Repository: traceloop/openllmetry
Length of output: 1970
🏁 Script executed:
#!/bin/bash
# Search for async versions of _handle_call
rg "_handle_call" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py | grep -i asyncRepository: traceloop/openllmetry
Length of output: 47
🏁 Script executed:
#!/bin/bash
# Check the sync version (_instrumented_model_invoke) to compare
rg "def _instrumented_model_invoke\(" -A 15 packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 624
🏁 Script executed:
#!/bin/bash
# Search for aioboto3 or asyncio related code to understand async body structure
rg "aioboto3|asyncio|async def" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 641
🏁 Script executed:
#!/bin/bash
# Verify that aioboto3 returns different response body types than botocore
rg "from aioboto3|import aioboto3" packages/opentelemetry-instrumentation-bedrock/Repository: traceloop/openllmetry
Length of output: 303
🏁 Script executed:
#!/bin/bash
# Check if there's any try/except or special handling for async bodies
sed -n '447,483p' packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py | cat -nRepository: traceloop/openllmetry
Length of output: 1683
Create an async version of _handle_call for async response handling.
The _instrumented_model_invoke_async function calls the synchronous _handle_call after awaiting an aioboto3 async response. However, _handle_call is entirely synchronous—it wraps the response body with ReusableStreamingBody (a sync-only wrapper) and immediately calls response.get("body").read(), which cannot handle async streaming bodies from aioboto3. This will fail when processing async invoke_model responses.
Create an async version of _handle_call that properly handles async streaming bodies, or ensure that aioboto3 responses are handled separately without synchronous stream operations.
🤖 Prompt for AI Agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`
around lines 333 - 346, The async instrument wrapper
_instrumented_model_invoke_async currently calls the sync _handle_call which
uses ReusableStreamingBody and response.get("body").read(), causing failures for
aioboto3 async streaming bodies; implement an async counterpart (e.g.,
_handle_call_async) that accepts the same params (span, kwargs, response,
metric_params, event_logger), awaits async body reads when response["body"] is
an async stream (await response["body"].read()), avoids wrapping async streams
with ReusableStreamingBody, and preserves the same telemetry/metric/event
behavior; then update _instrumented_model_invoke_async to await
_handle_call_async(response...) instead of calling the sync _handle_call so
async invoke_model responses are handled correctly.
| def _instrumented_model_invoke_with_response_stream_async( | ||
| fn, tracer, metric_params, event_logger | ||
| ): | ||
| @wraps(fn) | ||
| async def with_instrumentation(*args, **kwargs): | ||
| if context_api.get_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY): | ||
| return await fn(*args, **kwargs) | ||
|
|
||
| span = tracer.start_span(_BEDROCK_INVOKE_SPAN_NAME, kind=SpanKind.CLIENT) | ||
|
|
||
| response = await fn(*args, **kwargs) | ||
| _handle_stream_call(span, kwargs, response, metric_params, event_logger) | ||
|
|
||
| return response | ||
|
|
||
| return with_instrumentation |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /tmp && find . -name "*.py" -path "*bedrock*" -type f | head -20Repository: traceloop/openllmetry
Length of output: 47
🏁 Script executed:
git ls-files | grep -i bedrock | head -20Repository: traceloop/openllmetry
Length of output: 1995
🏁 Script executed:
cat /tmp/repo_structure.txt 2>/dev/null || git ls-files | grep bedrock | head -30Repository: traceloop/openllmetry
Length of output: 3011
🏁 Script executed:
find . -path "*opentelemetry/instrumentation/bedrock*" -type fRepository: traceloop/openllmetry
Length of output: 1189
🏁 Script executed:
rg "_handle_stream_call|_handle_converse_stream" --type py -B 5 -A 15Repository: traceloop/openllmetry
Length of output: 18511
🏁 Script executed:
sed -n '330,410p' packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 2804
🌐 Web query:
aioboto3 bedrock invoke_model_with_response_stream async iterator streaming
💡 Result:
Short answer + example: yes — call invoke_model_with_response_stream on an async Bedrock client and iterate the returned EventStream asynchronously; each event contains a single top‑level key (usually "chunk" with bytes you decode/JSON‑parse). Example pattern (aioboto3 / types-aioboto3 style):
- Create async client with aioboto3.
- await client.invoke_model_with_response_stream(...)
- async for event in response["body"]: handle event (e.g. event["chunk"]["bytes"])
Minimal example:
import json
import aioboto3
import asyncio
async def stream_bedrock(model_id, payload_json):
async with aioboto3.client("bedrock-runtime", region_name="us-east-1") as client:
resp = await client.invoke_model_with_response_stream(
modelId=model_id,
body=json.dumps(payload_json).encode("utf-8"),
accept="application/vnd.amazon.eventstream",
contentType="application/json",
)
async for event in resp["body"]:
# events have one top-level key per event (chunk / errors / etc.)
if "chunk" in event:
chunk_bytes = event["chunk"]["bytes"]
chunk = json.loads(chunk_bytes)
# e.g. stream deltas: chunk.get("delta")
print(chunk, end="", flush=True)
elif "modelStreamErrorException" in event:
raise RuntimeError(event["modelStreamErrorException"])
asyncio.run(stream_bedrock("amazon.titan-text-express-v1", {"inputText":"Hello"}))
References: AWS InvokeModelWithResponseStream API (streaming response/event shape) and Python examples. [1][2][3]
References
[1] InvokeModelWithResponseStream — Amazon Bedrock API Reference.
[2] Boto3 / invoke_model_with_response_stream (EventStream response shape).
[3] AWS Python SDK example: invoke_model_with_response_stream (streaming code example).
🏁 Script executed:
rg -A 30 "^def _handle_stream_call" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 1054
🏁 Script executed:
rg -A 30 "^def _handle_converse_stream" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 1483
🏁 Script executed:
rg -A 50 "def stream_done" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py | head -80Repository: traceloop/openllmetry
Length of output: 1813
🏁 Script executed:
rg "on_stream_event|add_stream_event|response\['body'\]" packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pyRepository: traceloop/openllmetry
Length of output: 47
🏁 Script executed:
cat packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/streaming_wrapper.pyRepository: traceloop/openllmetry
Length of output: 2582
Async streaming handlers are incompatible with aioboto3 async iterators.
The async wrappers _instrumented_model_invoke_with_response_stream_async (lines 349-364) and _instrumented_converse_stream_async (lines 384-397) call synchronous handlers (_handle_stream_call and _handle_converse_stream) that wrap the response body with StreamingWrapper, a synchronous wrapper using iter() and next() calls. However, aioboto3 returns async iterators for streaming responses, which require async for iteration and cannot be consumed by synchronous iterators. This will raise TypeError at runtime when these code paths execute. The async streaming wrappers need dedicated async-aware stream handlers instead of reusing the sync ones.
🤖 Prompt for AI Agents
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.py`
around lines 349 - 364, The async wrappers
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async are calling synchronous handlers
(_handle_stream_call and _handle_converse_stream) that wrap response bodies with
the synchronous StreamingWrapper (using iter()/next()), which breaks for
aioboto3 async iterators; implement async-aware handlers (e.g.,
_handle_stream_call_async and _handle_converse_stream_async) that create an
AsyncStreamingWrapper implementing __aiter__ and __anext__ (consuming async for
/ anext) instead of iter()/next(), and update
_instrumented_model_invoke_with_response_stream_async and
_instrumented_converse_stream_async to call these new async handlers (and pass
the same kwargs/metric_params/event_logger) so async streaming responses are
handled without TypeError.
| import asyncio | ||
| import os | ||
| from unittest.mock import AsyncMock, patch |
There was a problem hiding this comment.
Remove unused import asyncio.
The asyncio module is imported but not used in this file. Per Flake8 rules in the coding guidelines, this should be removed.
Proposed fix
"""Tests for aioboto3 async client instrumentation"""
-import asyncio
import os
from unittest.mock import AsyncMock, patch🧰 Tools
🪛 Flake8 (7.3.0)
[error] 2-2: 'asyncio' imported but unused
(F401)
🤖 Prompt for AI Agents
In `@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`
around lines 2 - 4, Remove the unused top-level import "asyncio" from the
imports block (delete the line "import asyncio") so only used modules remain
(e.g., "import os" and "from unittest.mock import AsyncMock, patch"); update the
import block accordingly and run linters/tests to confirm no unused-import
errors remain.
| with patch.object(client, '_make_api_call', new=AsyncMock(return_value=mock_response)): | ||
| response = await client.converse( | ||
| modelId="anthropic.claude-3-haiku-20240307-v1:0", | ||
| messages=messages, | ||
| inferenceConfig={"temperature": 0.5}, | ||
| ) |
There was a problem hiding this comment.
Unused variable response.
The response variable is assigned but never used. Consider using _ to indicate it's intentionally discarded, or add assertions on the response if needed.
Proposed fix
with patch.object(client, '_make_api_call', new=AsyncMock(return_value=mock_response)):
- response = await client.converse(
+ await client.converse(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
messages=messages,
inferenceConfig={"temperature": 0.5},
)🧰 Tools
🪛 Flake8 (7.3.0)
[error] 50-50: local variable 'response' is assigned to but never used
(F841)
🪛 Ruff (0.14.11)
50-50: Local variable response is assigned to but never used
Remove assignment to unused variable response
(F841)
🤖 Prompt for AI Agents
In `@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`
around lines 49 - 54, The test assigns an unused variable "response" when
calling client.converse with client._make_api_call patched; replace the unused
assignment with an underscore (await client.converse(...)) or add meaningful
assertions on the returned value to use it (referencing client, _make_api_call,
and converse) so the test no longer contains an unused-variable. Ensure the
patch still uses AsyncMock(return_value=mock_response) and that any added
assertions validate mock_response fields or behavior.
|
Thanks for the PR! Closing in favor of #4135, which lands aioboto3 async support for all 4 Bedrock runtime methods. Really appreciate you opening this — it gave us a head start on what the wrapping needed to look like. |
Fixes #3510
Problem
aioboto3 (async boto3) clients were not being instrumented, resulting in no OpenTelemetry spans or attributes for async Bedrock API calls.
Root Cause
aioboto3 uses
ClientCreatorContext.__aenter__(async context manager) for client creation, which wasn't wrapped by the instrumentation. The existing wrappers only covered sync boto3'screate_client()method.Solution
ClientCreatorContext.__aenter__to WRAPPED_METHODS_wrap_aioboto3_context()that instruments clients when they're created from the async context manager_instrumented_model_invoke_async()_instrumented_model_invoke_with_response_stream_async()_instrumented_converse_async()_instrumented_converse_stream_async()Testing
test_aioboto3.pyBefore Fix
After Fix
Important
Add async instrumentation for aioboto3 clients in OpenTelemetry Bedrock package, including new async wrappers and tests.
ClientCreatorContext.__aenter__toWRAPPED_METHODSfor aioboto3 async client creation._wrap_aioboto3_context()to instrument clients from async context manager._instrumented_model_invoke_async(),_instrumented_model_invoke_with_response_stream_async(),_instrumented_converse_async(),_instrumented_converse_stream_async().BedrockInstrumentor._instrument()to use async wrapper based on async flag.test_aioboto3.pywith tests for aioboto3 async client instrumentation.aioboto3andpytest-asynciotopyproject.tomldev dependencies.This description was created by
for 6081bbe. You can customize this summary. It will automatically update as commits are pushed.
Summary by CodeRabbit
New Features
Tests
Chores
✏️ Tip: You can customize this high-level summary in your review settings.