UN-3211 [FEAT] HTTP session lifecycle management for workers API clients#1782
Conversation
Summary by CodeRabbit
WalkthroughCentralizes API client and StateStore cleanup in finally blocks; adds singleton/shared HTTP session support with task-count reset and observability; introduces Celery signal handlers for lifecycle events; increases API client pool defaults; adds test fixtures and extensive session lifecycle tests. Changes
Sequence Diagram(s)mermaid Task->>API: setup_execution_context() => api_client Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Add _owns_session flag to prevent singleton shared session from being closed by individual clients - Wire API_CLIENT_POOL_SIZE into HTTPAdapter connection pools - Add idempotent close() and __del__ destructor to BaseAPIClient - Add try/finally cleanup in api-deployment and callback tasks - Add on_worker_process_shutdown hook and early-return guard in postrun - Add 25 unit tests for session lifecycle behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7f18370 to
0752a37
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@workers/shared/api/internal_client.py`:
- Around line 256-268: The reset_singleton method currently swallows exceptions
when closing cls._shared_session; change this to log the exception details
instead of silently passing so FD leaks/errors are visible—catch the Exception
around cls._shared_session.close() and call logger.exception or logger.error
with the exception/context (mentioning InternalAPIClient.reset_singleton and
cls._shared_session) before continuing to set cls._shared_session,
cls._shared_base_client, cls._initialization_count, and cls._task_counter to
None/0 and logging the reset completion.
- Around line 271-294: The non-atomic update in increment_task_counter can lose
counts under threaded/eventlet/gevent worker pools; make the method thread-safe
by adding a class-level lock (e.g., _task_counter_lock = threading.Lock()) and
wrapping the read/increment/check/reset sequence in a with _task_counter_lock:
block (import threading where needed) so the operations on _task_counter and
_last_reset_time and the call to reset_singleton() are atomic; alternatively, if
you require prefork-only deployments, add a precondition/assertion at the start
of increment_task_counter that the worker is running in prefork mode and skip
changes.
🧹 Nitpick comments (2)
workers/api-deployment/tasks.py (1)
263-269: Don’t swallow api_client.close() errors silently.
A debug/warn log helps diagnose lingering sessions if close fails.🔧 Suggested tweak
- try: - api_client.close() - except Exception: - pass + try: + api_client.close() + except Exception as e: + logger.debug("api_client.close() failed during cleanup: %s", e)workers/callback/tasks.py (1)
1508-1512: Avoid silent failures when closing API clients.
A debug/warn log helps diagnose leaks if close fails in either finally block.🔧 Suggested tweak
- try: - context.api_client.close() - except Exception: - pass + try: + context.api_client.close() + except Exception as e: + logger.debug("api_client.close() failed during callback cleanup: %s", e) @@ - try: - api_client.close() - except Exception: - pass + try: + api_client.close() + except Exception as e: + logger.debug("api_client.close() failed during API callback cleanup: %s", e)Also applies to: 1781-1784
…afe counter - Log warning instead of silently swallowing exceptions in reset_singleton() - Add threading.Lock around task counter increment for thread safety with threads/gevent/eventlet pools Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@workers/shared/api/internal_client.py`:
- Around line 258-296: reset_singleton() can close cls._shared_session while
other threads are mid-request; change to a "swap-then-close" approach: inside
reset_singleton() grab a session lock (use existing _task_counter_lock or add a
dedicated _session_lock), atomically replace cls._shared_session with a new
requests.Session() (or None if you want lazy re-init) and store the old session
in a local variable, then spawn a short-lived background worker (thread or
timer) that waits a configurable grace period and then closes the old session
(catching exceptions); update increment_task_counter to use the same session
lock when reading/swapping to avoid races and ensure any code that reads
_shared_session uses that lock or reads a local reference so in-flight requests
continue using the old session until it is closed after the grace period.
🧹 Nitpick comments (1)
workers/shared/api/internal_client.py (1)
280-296:WorkerConfig()is instantiated inside the lock on every task completion.Line 285 creates a new
WorkerConfig(parsing env vars) while holding_task_counter_lock. This extends the critical section unnecessarily and allocates an object per task. Consider reading the threshold once (e.g., as a class-level cached value or outside the lock).Also,
cls._task_counter = 0on line 295 is redundant sincereset_singleton()(line 269) already resets it.♻️ Proposed refactor: read config outside the lock, remove redundant reset
`@classmethod` def increment_task_counter(cls) -> None: - with cls._task_counter_lock: - cls._task_counter += 1 - - from shared.infrastructure.config.worker_config import WorkerConfig + from shared.infrastructure.config.worker_config import WorkerConfig - threshold = WorkerConfig().singleton_reset_task_threshold - if threshold > 0 and cls._task_counter >= threshold: - import time + threshold = WorkerConfig().singleton_reset_task_threshold + with cls._task_counter_lock: + cls._task_counter += 1 + if threshold > 0 and cls._task_counter >= threshold: + import time - logger.info( - "Task counter reached threshold (%d/%d), resetting singleton session", - cls._task_counter, - threshold, - ) - cls.reset_singleton() - cls._task_counter = 0 - cls._last_reset_time = time.time() + logger.info( + "Task counter reached threshold (%d/%d), resetting singleton session", + cls._task_counter, + threshold, + ) + cls.reset_singleton() + cls._last_reset_time = time.time()
… document thread-safety - Move WorkerConfig() instantiation outside lock in increment_task_counter() - Remove redundant _task_counter=0 (already done inside reset_singleton) - Document thread-safety caveat in reset_singleton() docstring - Log close failures in task cleanup instead of silently swallowing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
workers/callback/tasks.py (1)
1380-1389:⚠️ Potential issue | 🟡 Minor
api_clientcreated inside_extract_callback_parametersis not covered by thetry/finallyif extraction fails mid-way.If
_extract_callback_parametersraises aftercreate_api_client()(line 702 or 772 in the extraction function) but before assigning tocontext.api_client(line 846), the newly created client becomes an orphan local variable. Thetry/finallyat lines 1389/1507 won't run because the exception occurs before entering that block.The
__del__destructor (FR-1) serves as the safety net here, which is the design intent. Just flagging for awareness — if deterministic cleanup is desired, thetry/finallycould be widened to wrap_extract_callback_parametersas well.
🤖 Fix all issues with AI agents
In `@workers/shared/tests/test_session_lifecycle.py`:
- Around line 371-396: Replace the inline simulated guard in the tests with
calls to the real signal handler on_task_postrun (imported from workers.worker)
and mock InternalAPIClient.increment_task_counter so the handler's guard,
try/except, and logging paths are exercised; for the singleton-disabled test
call on_task_postrun(sender=None, task_id=None, **{}) and assert
increment_task_counter was not called, and for the singleton-enabled test patch
the same method and call on_task_postrun then assert increment_task_counter was
called once, ensuring the patch target matches the import path used inside
on_task_postrun.
🧹 Nitpick comments (5)
workers/shared/api/internal_client.py (2)
125-179: Singleton initialization creates and immediately discards 7 sessions.Each specialized client's
__init__(viaBaseAPIClient.__init__) creates a freshrequests.Sessionwith mountedHTTPAdapter, which_share_sessionimmediately closes and replaces. For 7 specialized clients, that's 7 throwaway sessions perInternalAPIClientinstantiation.This isn't a bug — the sessions are properly closed — but it's wasteful, especially if
InternalAPIClientis instantiated frequently (e.g., per-task in non-singleton mode). Consider passing an existing session into the specialized client constructors to avoid the create-then-close pattern.
286-300:WorkerConfig()instantiated on every task completion.
increment_task_counteris called viatask_postrunsignal after every task. Each call constructs a newWorkerConfig(), which reads environment variables. While this keeps the threshold dynamically reconfigurable, it adds overhead on every task completion.If env-var reading becomes a concern at scale, consider caching the threshold at the class level and only refreshing it on reset.
workers/shared/tests/test_session_lifecycle.py (1)
322-348:mock_config_singletonfixtures are required for env setup — Ruff ARG002 is a false positive.The
mock_config_singletonparameter intest_increment_counterandtest_threshold_triggers_resetisn't directly referenced in the test body, but it's needed because the fixture patchesos.environwithWORKER_SINGLETON_RESET_THRESHOLD=3andENABLE_API_CLIENT_SINGLETON=true. Without it,WorkerConfig()insideincrement_task_counterwould read unpatched env vars.To silence the Ruff warning while keeping the intent clear, you could prefix with underscore:
Suggested fix
- def test_increment_counter(self, mock_config_singleton): + def test_increment_counter(self, mock_config_singleton): # noqa: ARG002Or rename the parameter:
- def test_increment_counter(self, mock_config_singleton): + def test_increment_counter(self, _mock_config_singleton):workers/callback/tasks.py (2)
1484-1503: Uselogger.exceptionfor better tracebacks in error handlers.At lines 1485-1487 and 1503,
logger.error(...)is used to log exceptions, butlogger.exception(...)would automatically include the traceback, which is more useful for debugging callback failures.Suggested fix
except Exception as e: - logger.error( - f"Unified batch callback processing failed for execution {context.execution_id}: {e}" + logger.exception( + f"Unified batch callback processing failed for execution {context.execution_id}: {e}" )except Exception as cleanup_error: - logger.error(f"Failed to mark execution as failed: {cleanup_error}") + logger.exception(f"Failed to mark execution as failed: {cleanup_error}")
1757-1777: Samelogger.error→logger.exceptionopportunity in API callback error path.Lines 1758-1760 and 1777 use
logger.errorwherelogger.exceptionwould capture the full traceback for easier debugging.Suggested fix
except Exception as e: - logger.error( + logger.exception( f"API callback processing failed for execution {execution_id}: {e}" )except Exception as update_error: - logger.error(f"Failed to update execution status: {update_error}") + logger.exception(f"Failed to update execution status: {update_error}")
…strun handler Tests now call the real worker.on_task_postrun() signal handler instead of simulating the guard logic inline, catching divergence if the handler's guard, try/except, or import path changes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@workers/shared/tests/test_session_lifecycle.py`:
- Around line 243-263: test_default_pool_size uses patch.dict(..., clear=False)
so an externally-set API_CLIENT_POOL_SIZE can leak into the test; ensure the
test removes any inherited value before instantiating WorkerConfig by
deleting/unsetting API_CLIENT_POOL_SIZE in the patched context (so
WorkerConfig().api_client_pool_size is forced to use the hardcoded default).
Locate test_default_pool_size and adjust the patched block to explicitly remove
API_CLIENT_POOL_SIZE from os.environ (e.g., pop if present) before creating
WorkerConfig and asserting api_client_pool_size == 10.
🧹 Nitpick comments (3)
workers/shared/tests/test_session_lifecycle.py (3)
322-348: Prefix unused fixture parameters with_to suppress Ruff ARG002.
mock_config_singletonis correctly used for its env-patching side effect, but Ruff flags it as unused. Prefixing with_is the idiomatic pytest convention for fixtures consumed only for side effects.- def test_increment_counter(self, mock_config_singleton): + def test_increment_counter(self, _mock_config_singleton):- def test_threshold_triggers_reset(self, mock_config_singleton): + def test_threshold_triggers_reset(self, _mock_config_singleton):Alternatively, apply
@pytest.mark.usefixtures("mock_config_singleton")at the class or method level to avoid the parameter entirely.
415-424: Extract the repeated sub-client attribute list into a constant.The same 8-element list appears three times in this class. If a sub-client is added or renamed in
InternalAPIClient, only some lists may get updated, causing silent test gaps.Suggested refactor
Define once at module or class level:
_SUB_CLIENT_ATTRS = [ "base_client", "execution_client", "file_client", "webhook_client", "organization_client", "tool_client", "workflow_client", "usage_client", ]Then reference
_SUB_CLIENT_ATTRSin all three test methods.Also applies to: 466-475, 488-497
523-526: Prefix unused unpacked variables with_to suppress Ruff RUF059.The unpacked values aren't needed in these cleanup-focused tests.
- ) as (cfg, client): + ) as (_cfg, _client):Also applies to: 541-545
|
@muhammad-ali-e I also suggest wiring up the tests to run with tox on every PR to catch regressions |
…ove close() logging Cache the singleton_reset_task_threshold to avoid re-importing WorkerConfig on every task increment. Promote api_client.close() failure logs from debug to warning for better production visibility. Update tests to reset cached threshold. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…r imports Refactor api-deployment tasks to handle setup failures early with proper cleanup, move shared imports to module level in worker.py, and fix type annotations in client_factory. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Greptile SummaryThis PR adds HTTP session lifecycle management to all worker API clients: explicit Key observations:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Task as Celery Task
participant IAC as InternalAPIClient
participant BAC as BaseAPIClient
participant Session as requests.Session
Note over Task,Session: Per-task execution (default: singleton disabled)
Task->>IAC: __init__(config)
IAC->>BAC: BaseAPIClient(config) [_owns_session=True]
BAC->>Session: Session() + HTTPAdapter(pool_size)
Task->>Task: execute work
Task->>IAC: close() [try/finally]
IAC->>BAC: base_client.close()
BAC->>Session: session.close() [_closed=True]
Note over Task,Session: Singleton mode (ENABLE_API_CLIENT_SINGLETON=true)
Task->>IAC: __init__(config)
IAC->>BAC: BaseAPIClient(config)
IAC->>BAC: _owns_session = False (shared)
BAC-->>IAC: _shared_session stored
Task->>IAC: close() [try/finally]
IAC-->>Task: no-op (shared session preserved)
Note over Task,Session: Periodic reset (WORKER_SINGLETON_RESET_THRESHOLD tasks)
Task->>+IAC: on_task_postrun signal
IAC->>IAC: increment_task_counter()
IAC->>IAC: counter >= threshold?
IAC->>Session: reset_singleton() → session.close()
IAC->>IAC: _shared_session = None
Note over Task,Session: Worker shutdown
Task->>IAC: on_worker_process_shutdown
IAC->>Session: reset_singleton() → session.close()
IAC->>BAC: ClientFactory.reset_shared_state() → close()
Prompt To Fix All With AIThis is a comment left during a code review.
Path: workers/shared/api/internal_client.py
Line: 136-138
Comment:
**Misleading comment contradicts the code**
The comment on line 137 says "The first client owns the session" but the very next line sets `_owns_session = False`, meaning ownership is explicitly *denied*. This is the opposite of what the comment implies and will confuse future maintainers who read it together with the `_owns_session` docstring in `BaseAPIClient` ("Track whether this client owns its session").
Consider replacing with something that matches the intent:
```suggestion
self.base_client = BaseAPIClient(self.config)
# Defer session ownership to reset_singleton(); no individual client
# should close the shared session on close() or __del__.
self.base_client._owns_session = False
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: workers/callback/tasks.py
Line: 1718-1722
Comment:
**`api_client` may be undefined when `finally` runs**
`api_client` is assigned at the very start of `process_batch_callback_api` before the `try` block:
```python
api_client = create_api_client(organization_id)
logger.info(f"Created organization-scoped API client: {organization_id}")
try:
...
finally:
try:
api_client.close() # ← safe because api_client is pre-assigned
```
As currently structured this is safe: if `create_api_client()` raises, we never enter the `try` and the `finally` never runs. However, the `logger.info(...)` call between creation and the `try` block is an implicit code gap — any future refactor that moves the `api_client` assignment *inside* the `try` block would silently introduce a `NameError` in the `finally`.
A defensive guard (matching the pattern already used in `_process_batch_callback_core`) would make intent explicit:
```suggestion
finally:
if "api_client" in dir() and api_client is not None:
try:
api_client.close()
except Exception as e:
logger.warning("api_client.close() failed during cleanup: %s", e)
```
Or, more simply, move `api_client = create_api_client(...)` to the first line inside the `try` block and initialize `api_client = None` before it.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: workers/shared/api/internal_client.py
Line: 289-304
Comment:
**`cls._last_reset_time` set inside the lock but `reset_singleton()` does not clear it**
`cls._last_reset_time = time.time()` is correctly set inside `_task_counter_lock` after calling `reset_singleton()`. However, `reset_singleton()` does not clear `_last_reset_time`, which is inconsistent with its resetting of every other class-level counter/cache field (`_task_counter`, `_cached_reset_threshold`, `_initialization_count`).
This means `_last_reset_time` always reflects the *last* periodic reset, even after a manual `reset_singleton()` call from `on_worker_process_shutdown`. If observability tooling later relies on this value to determine "was the session ever reset?", it will read stale data after a clean shutdown + restart within the same process lifetime.
```python
# In reset_singleton(), alongside the other resets:
cls._last_reset_time = None
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (2): Last reviewed commit: "Merge branch 'main' into feat/UN-3211-FE..." | Re-trigger Greptile |
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|
| if cls._cached_reset_threshold is None: | ||
| cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold | ||
|
|
||
| with cls._task_counter_lock: | ||
| cls._task_counter += 1 | ||
| if ( | ||
| cls._cached_reset_threshold > 0 | ||
| and cls._task_counter >= cls._cached_reset_threshold | ||
| ): | ||
| logger.info( | ||
| "Task counter reached threshold (%d/%d), resetting singleton session", | ||
| cls._task_counter, | ||
| cls._cached_reset_threshold, | ||
| ) | ||
| cls.reset_singleton() | ||
| cls._last_reset_time = time.time() |
There was a problem hiding this comment.
cls._last_reset_time set inside the lock but reset_singleton() does not clear it
cls._last_reset_time = time.time() is correctly set inside _task_counter_lock after calling reset_singleton(). However, reset_singleton() does not clear _last_reset_time, which is inconsistent with its resetting of every other class-level counter/cache field (_task_counter, _cached_reset_threshold, _initialization_count).
This means _last_reset_time always reflects the last periodic reset, even after a manual reset_singleton() call from on_worker_process_shutdown. If observability tooling later relies on this value to determine "was the session ever reset?", it will read stale data after a clean shutdown + restart within the same process lifetime.
# In reset_singleton(), alongside the other resets:
cls._last_reset_time = NonePrompt To Fix With AI
This is a comment left during a code review.
Path: workers/shared/api/internal_client.py
Line: 289-304
Comment:
**`cls._last_reset_time` set inside the lock but `reset_singleton()` does not clear it**
`cls._last_reset_time = time.time()` is correctly set inside `_task_counter_lock` after calling `reset_singleton()`. However, `reset_singleton()` does not clear `_last_reset_time`, which is inconsistent with its resetting of every other class-level counter/cache field (`_task_counter`, `_cached_reset_threshold`, `_initialization_count`).
This means `_last_reset_time` always reflects the *last* periodic reset, even after a manual `reset_singleton()` call from `on_worker_process_shutdown`. If observability tooling later relies on this value to determine "was the session ever reset?", it will read stale data after a clean shutdown + restart within the same process lifetime.
```python
# In reset_singleton(), alongside the other resets:
cls._last_reset_time = None
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
P2 level, can consider later.



What
API_CLIENT_POOL_SIZEinto HTTPAdapter connection pools_owns_sessionflag to prevent singleton shared sessions from being closed by individual clientsWhy
API_CLIENT_POOL_SIZEconfig existed but was never wired into HTTPAdapter (dead config)on_task_postrunsignal handler ran uselessly on every task when singleton mode was disabledHow
base_client.py: Added_owns_sessionflag, idempotentclose(),__del__destructor, wired pool size into HTTPAdapterinternal_client.py: Set_owns_session=Falseon all clients sharing singleton sessionapi-deployment/tasks.py: Added try/finally withapi_client.close()for missing cleanupcallback/tasks.py: try/finally cleanup in callback task functionsworker.py: Early-return guard inon_task_postrunwhen singleton disabled;on_worker_process_shutdownhookworker_config.py: Default pool size 10, singleton reset threshold configCan this PR break any existing features?
ENABLE_API_CLIENT_SINGLETON=falseremains the default. Pool size default stays at 10.Database Migrations
Env Config
API_CLIENT_POOL_SIZE— now actually wired in (default: 10, unchanged)WORKER_SINGLETON_RESET_THRESHOLD— already documented in sample.env (default: 1000)ENABLE_API_CLIENT_SINGLETON— existing, unchanged (default: false)Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
cd workers && PYTHONPATH=.:../unstract .venv/bin/python -m pytest shared/tests/ -v🤖 Generated with Claude Code