feat(bedrock): add async support for aioboto3#4135
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (2)
📒 Files selected for processing (9)
🚧 Files skipped from review as they are similar to previous changes (9)
📝 WalkthroughWalkthroughAdds async aiobotocore support for Bedrock instrumentation: intercepts async client creation, wraps async Bedrock APIs (invoke_model*, converse*), buffers async responses, parses event streams with guaranteed span closure, adds tests and a sample async workflow, and hardens uninstrumentation. ChangesBedrock Async Instrumentation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py (1)
393-417: ⚡ Quick winAdd explicit early-break stream-consumption regression tests.
PR notes mention fixing span leaks on early break; add tests that break after first chunk/event and still assert span closure + attributes/log integrity.
Also applies to: 498-514
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py` around lines 393 - 417, Add explicit regression tests that consume only the first event from the response stream and then break early to ensure spans are closed and logs/attributes are exported; modify the existing test_aioboto3_invoke_stream_legacy (and the analogous test at lines 498-514) to iterate async for event in response["body"], read the first chunk, break immediately, then ensure you exit the context (await ctx.__aexit__(None, None, None)) and assert via span_exporter and log_exporter that the span is finished and contains the expected attributes/log entries; keep the same client creation helper (_patch_make_api_call and _fake_invoke_stream_response) and use span_exporter, log_exporter assertions to validate no span leaks on early-break consumption.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`:
- Around line 274-279: The test currently iterates the logs returned by
log_exporter.get_finished_logs() and asserts on any existing log bodies but can
false-pass if no logs are emitted; update each such block (the one using logs =
log_exporter.get_finished_logs(), the subsequent for log in logs loop and body =
dict(log.log_record.body)) to first assert a minimum expected log count (e.g.,
assert len(logs) > 0 or assert len([l for l in logs if l.log_record.body]) > 0)
before validating bodies, mirroring the with-content variants; apply the same
change to the other occurrences flagged (the similar blocks around the other
ranges) so the test fails when no logs are emitted.
- Around line 93-97: The mock _AsyncReadableBody.read currently returns
self._raw without consuming it; change read so that when amt is None it returns
the full buffer and sets self._raw to b'' (i.e., consume the bytes), and when
amt is provided slice and remove bytes from self._raw as the existing branch
does; update the mock implementation in the test helper where
_AsyncReadableBody.read is defined to match botocore/aiobotocore StreamingBody
semantics. Also strengthen the log assertions referenced (tests that inspect
no-content logs at the indicated assertions) by asserting that the logs list is
non-empty before checking their contents so the test fails if no logs were
emitted.
---
Nitpick comments:
In
`@packages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.py`:
- Around line 393-417: Add explicit regression tests that consume only the first
event from the response stream and then break early to ensure spans are closed
and logs/attributes are exported; modify the existing
test_aioboto3_invoke_stream_legacy (and the analogous test at lines 498-514) to
iterate async for event in response["body"], read the first chunk, break
immediately, then ensure you exit the context (await ctx.__aexit__(None, None,
None)) and assert via span_exporter and log_exporter that the span is finished
and contains the expected attributes/log entries; keep the same client creation
helper (_patch_make_api_call and _fake_invoke_stream_response) and use
span_exporter, log_exporter assertions to validate no span leaks on early-break
consumption.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 92e63e19-30c4-4168-9d8f-b67795b6c91a
⛔ Files ignored due to path filters (2)
packages/opentelemetry-instrumentation-bedrock/uv.lockis excluded by!**/*.lockpackages/sample-app/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pypackages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/reusable_streaming_body.pypackages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/streaming_wrapper.pypackages/opentelemetry-instrumentation-bedrock/pyproject.tomlpackages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.pypackages/sample-app/pyproject.tomlpackages/sample-app/sample_app/async_bedrock_example.py
1. Async exception logging is asymmetric with sync (
|
62af08f to
7039b7f
Compare
|
Topics addressed: #1 — Async exception logging asymmetric with sync
#2 — Only 1 of 12 tests asserts token attributes
#3 — No committed tests for the claimed edge-case fixes
#7 — Mocks bypass real aiobotocore body lifecycle
Bonus — preexisting sync bug found while addressing #2
Bonus — sync regression locks for the above fix
Verification: 380/380 bedrock package tests pass, ruff clean. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/span_utils.py`:
- Around line 647-655: The token-extraction logic in span_utils.py incorrectly
uses truthy `or` fallbacks so empty dicts or zero counts get treated as missing;
update the logic in the block that computes `usage`, `total_prompt_tokens`, and
`total_completion_tokens` to check presence/None explicitly: set `usage =
response_body.get("usage") if "usage" in response_body else
response_body.get("metadata", {}).get("usage", {})` (or equivalent presence
check), and when extracting `inputTokens`/`outputTokens` use `if value is not
None` or `get(..., None)` checks before falling back to header values so that 0
is accepted and empty dicts from `usage` don't fall through. Ensure
`total_prompt_tokens` and `total_completion_tokens` use these explicit None
checks on `usage.get("inputTokens")` and `usage.get("outputTokens")` and only
then fallback to `headers.get(...)`.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 14843c1a-b781-4425-8230-4a7be43aa42b
⛔ Files ignored due to path filters (2)
packages/opentelemetry-instrumentation-bedrock/uv.lockis excluded by!**/*.lockpackages/sample-app/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/__init__.pypackages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/reusable_streaming_body.pypackages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/span_utils.pypackages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/streaming_wrapper.pypackages/opentelemetry-instrumentation-bedrock/pyproject.tomlpackages/opentelemetry-instrumentation-bedrock/tests/traces/test_aioboto3.pypackages/opentelemetry-instrumentation-bedrock/tests/traces/test_nova.pypackages/sample-app/pyproject.tomlpackages/sample-app/sample_app/async_bedrock_example.py
✅ Files skipped from review due to trivial changes (1)
- packages/opentelemetry-instrumentation-bedrock/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (4)
- packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/reusable_streaming_body.py
- packages/sample-app/pyproject.toml
- packages/sample-app/sample_app/async_bedrock_example.py
- packages/opentelemetry-instrumentation-bedrock/opentelemetry/instrumentation/bedrock/init.py
7039b7f to
7b2b084
Compare
Fixes #3510
Bedrock’s OpenTelemetry instrumentation worked for boto3 (sync) but produced zero spans when users called Bedrock through aioboto3. This PR adds full async support — all four Bedrock Runtime methods (converse, converse_stream, invoke_model, invoke_model_with_response_stream)
now emit OTel-semconv-compliant spans, prompts, completions, token counts, and finish reasons.
The fix is symmetric to the existing sync path: same downstream helpers, same span attributes, same event-emitter pipeline, same semconv compliance — async just adds the missing async hook + four async-aware wrappers.
Tests: Added tests/traces/test_aioboto3.py — 12 mock-based tests covering all 4 async methods × 3 attribute modes (legacy / events+content / events+no-content), matching the sync test_nova.py coverage. Uses mocks instead of VCR cassettes because vcrpy's aiohttp stub can't replay aiobotocore responses (vcrpy#927 (kevin1024/vcrpy#927)).
Example: Added packages/sample-app/sample_app/async_bedrock_example.py — a research-assistant @workflow chaining all 4 async methods (converse → converse_stream → invoke_model → invoke_model_with_response_stream), so the resulting Traceloop trace shows a real-life async Bedrock flow.
Stress test (local-only, not committed): Wrote async_bedrock_stress_paths.py exercising 4 edge-case paths the doc example doesn't cover — invalid model (exception recording), concurrent asyncio.gather (no state leakage between simultaneous calls), early break from streaming
(span still ends with partial content, no ERROR status per OTel guidance), and SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY (instrumentation correctly skipped). This caught a real bug in converse_stream where the span would leak on early break, which we then fixed.
Code review (adversarial subagent): Spawned a code-reviewer subagent with explicit instructions to find bugs in the new async code. It flagged ~17 concerns; we fixed the ones introduced by this PR and deferred the ones that pre-exist in the sync path. Fixed:
AsyncStreamingWrapper span leak on early break (try/finally + _done guard), BufferedAsyncBody buffer-mutation bug (immutable buffer + cursor), aiobotocore body never closed (now released before replacement), InstrumentedClientContext.aenter not exception-safe (wrapped
patching block so the inner ctx is properly exited on failure), _handle_async_call passing a fake response dict to emit_choice_events (now passes the real response), and _handle_async_call swallowing exceptions at debug log level (now error).
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores