Skip to content

Add in-memory state expiration to StateManagerMemory#6201

Open
FarhanAliRaza wants to merge 3 commits intoreflex-dev:mainfrom
FarhanAliRaza:mem-expiration
Open

Add in-memory state expiration to StateManagerMemory#6201
FarhanAliRaza wants to merge 3 commits intoreflex-dev:mainfrom
FarhanAliRaza:mem-expiration

Conversation

@FarhanAliRaza
Copy link
Collaborator

@FarhanAliRaza FarhanAliRaza commented Mar 20, 2026

Extract reusable expiration logic into StateManagerExpiration base class that tracks token access times and purges expired states using a deadline-ordered heap. Integrate it into StateManagerMemory with a background asyncio task that automatically cleans up idle client states.

All Submissions:

  • Have you followed the guidelines stated in CONTRIBUTING.md file?
  • Have you checked to ensure there aren't any other open Pull Requests for the desired changed?

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

New Feature Submission:

  • Does your submission pass the tests?
  • Have you linted your code locally prior to submission?

Changes To Core Features:

  • Have you added an explanation of what your changes do and why you'd like us to include them?
  • Have you written new tests for your core changes, as applicable?
  • Have you successfully ran tests with your changes locally?

closes #3021

Extract reusable expiration logic into StateManagerExpiration base class
that tracks token access times and purges expired states using a
deadline-ordered heap. Integrate it into StateManagerMemory with a
background asyncio task that automatically cleans up idle client states.
@codspeed-hq
Copy link

codspeed-hq bot commented Mar 20, 2026

Merging this PR will not alter performance

✅ 8 untouched benchmarks


Comparing FarhanAliRaza:mem-expiration (2b0e097) with main (7ee3026)

Open in CodSpeed

Remove dead _token_last_touched dict, replace hand-rolled task
scheduling with ensure_task, move heap compaction off the hot path,
and fix touch ordering in get_state/set_state.
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 20, 2026

Greptile Summary

This PR adds automatic in-memory state expiration to StateManagerMemory by introducing a StateManagerExpiration base class that tracks token access times via a deadline-ordered min-heap and defers cleanup of locked tokens. A long-running asyncio background task wakes on token activity or deadline arrival to purge idle client states.

Key changes:

  • _expiration.py: new StateManagerExpiration dataclass with heap-based bookkeeping, stale-entry compaction, and lock-deferral semantics
  • memory.py: StateManagerMemory now inherits from StateManagerExpiration, touches tokens on get_state/set_state, notifies on unlock, and manages the expiration background task lifecycle
  • Unit and integration tests covering eviction, deadline refresh, locked-token deferral, and task cancellation

Issues found:

  • Race condition in modify_state (P1): After a per-token lock is inserted into _states_locks and _state_manager_lock is released, the expiration background task can concurrently call _purge_token, removing the lock entry before modify_state reaches async with self._states_locks[token], causing a KeyError. Saving a local reference to the Lock object before releasing _state_manager_lock would eliminate this window.
  • Missing human-readable time comments on _locked_expiration_poll_interval = 0.1 and the expires_at calculation in _touch_token (per project convention)
  • _expiration_task not declared with init=False in the dataclass, inadvertently exposing it as an __init__ parameter
  • Misleading env var in integration test: REFLEX_REDIS_TOKEN_EXPIRATION is used to configure the memory manager's expiration window; a comment explaining the shared config field would improve readability

Confidence Score: 3/5

  • Not safe to merge without fixing the race condition in modify_state that can cause a KeyError under concurrent expiration.
  • The core expiration logic is well-designed and tests are thorough, but there is a real (if low-probability) race condition in modify_state where the expiration task can delete a per-token lock entry between when it is created and when modify_state acquires it, producing an unhandled KeyError. Remaining issues are style/convention violations.
  • reflex/istate/manager/memory.py — specifically the modify_state lock-creation-to-acquisition window.

Important Files Changed

Filename Overview
reflex/istate/manager/_expiration.py New base class implementing token expiration bookkeeping via a min-heap; logic is sound but minor style issues: missing human-readable time comments on _locked_expiration_poll_interval and the expires_at calculation.
reflex/istate/manager/memory.py Integrates expiration into StateManagerMemory with a background asyncio task; contains a race condition in modify_state where a just-created per-token lock can be purged before it is acquired, leading to a KeyError. Also _expiration_task should use dataclasses.field(init=False).
tests/units/istate/manager/test_expiration.py Comprehensive unit tests covering eviction, expiration refresh, lock-deferral, and task cancellation; well-structured with a _poll_until helper to avoid flaky timing checks.
tests/integration/test_memory_state_manager_expiration.py End-to-end integration test verifying state expiration and reset; uses REFLEX_REDIS_TOKEN_EXPIRATION to configure the memory manager which is semantically misleading and should be commented.

Sequence Diagram

sequenceDiagram
    participant Client
    participant StateManagerMemory
    participant StateManagerExpiration
    participant ExpirationTask

    Client->>StateManagerMemory: modify_state(token)
    StateManagerMemory->>StateManagerMemory: create per-token Lock → _states_locks[token]
    StateManagerMemory->>StateManagerMemory: acquire _states_locks[token]
    StateManagerMemory->>StateManagerExpiration: get_state(token) → _touch_token(token)
    StateManagerExpiration->>StateManagerExpiration: update _token_expires_at, push heap entry
    StateManagerExpiration->>ExpirationTask: _token_activity.set() (if deadline changed)
    StateManagerMemory-->>Client: yield state
    Client->>StateManagerMemory: exit modify_state context
    StateManagerMemory->>StateManagerExpiration: _notify_token_unlocked(token)
    StateManagerExpiration->>ExpirationTask: re-push heap entry if pending

    loop Background expiration loop
        ExpirationTask->>StateManagerExpiration: _purge_expired_tokens(now)
        alt token expired and lock NOT held
            StateManagerExpiration->>StateManagerExpiration: _purge_token(token) — removes states, locks, metadata
        else token expired but lock IS held
            StateManagerExpiration->>StateManagerExpiration: add to _pending_locked_expirations
        end
        ExpirationTask->>StateManagerExpiration: _wait_for_token_activity(timeout)
    end
Loading

Comments Outside Diff (1)

  1. reflex/istate/manager/memory.py, line 117-127 (link)

    P1 Race condition: KeyError between lock creation and lock acquisition

    There is a race condition between the per-token lock being inserted into _states_locks and being acquired. After the async with self._state_manager_lock block exits (releasing _state_manager_lock), the event loop can switch to _expire_states_once, which calls _purge_expired_tokens synchronously. If the token's deadline has already passed, _purge_token removes it from _states_locks. When control returns to modify_state, the lookup self._states_locks[token] raises a KeyError.

    The reproducing sequence is:

    1. Token's expiration has elapsed (it's in the expiry heap)
    2. A new request calls modify_state — no lock exists yet, so one is created and stored
    3. _state_manager_lock is released (implicit await in __aexit__)
    4. _expire_states_once runs synchronously and calls _purge_token(token), popping the entry from _states_locks
    5. modify_state resumes and evaluates self._states_locks[token]KeyError

    A simple fix is to save a local reference to the lock object before releasing _state_manager_lock so that the subsequent async with uses the object directly instead of re-looking it up in the dict:

    lock = self._states_locks.get(token)
    if lock is None:
        async with self._state_manager_lock:
            lock = self._states_locks.get(token)
            if lock is None:
                lock = asyncio.Lock()
                self._states_locks[token] = lock
    
    try:
        async with lock:
            yield await self.get_state(token)
    finally:
        self._notify_token_unlocked(token)

    Holding a local reference to lock prevents the KeyError even if _purge_token removes the entry from _states_locks in the interim, since the task still holds its own reference to the Lock object.

Last reviewed commit: "Simplify StateManage..."

Comment on lines +78 to +79
heapq.heappush(self._token_expiration_heap, (expires_at, token))
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing human-readable time comment on expiration calculation

The time calculation should be annotated to clarify the unit.

Suggested change
heapq.heappush(self._token_expiration_heap, (expires_at, token))
if (
touched_at = time.time()
expires_at = touched_at + self.token_expiration # token_expiration seconds from now

Rule Used: When using time-based calculations in code, includ... (source)

Learnt From
reflex-dev/flexgen#2190

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

from reflex.state import BaseState, _split_substate_key
from reflex.utils.tasks import ensure_task


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing human-readable time comment

The retry constant's value should be annotated even though the variable name already includes SECONDS, for consistency with the project convention.

Suggested change
_EXPIRATION_ERROR_RETRY_SECONDS = 1.0 # 1 second retry after expiration worker error

Rule Used: When using time-based calculations in code, includ... (source)

Learnt From
reflex-dev/flexgen#2190

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

)
_expiration_task: asyncio.Task | None = dataclasses.field(default=None, init=False)

async def _expire_states_once(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Private field should use dataclasses.field(init=False)

_expiration_task is an internal implementation detail, but as written it will be exposed as an __init__ parameter for the dataclass. It should be wrapped in a dataclasses.field with init=False to prevent callers from accidentally passing it:

Suggested change
async def _expire_states_once(self):
_expiration_task: asyncio.Task | None = dataclasses.field(default=None, init=False)

A running app harness configured to use StateManagerMemory.
"""
monkeypatch.setenv("REFLEX_STATE_MANAGER_MODE", "memory")
monkeypatch.setenv("REFLEX_REDIS_TOKEN_EXPIRATION", "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Redis-specific env var used for memory state manager configuration

REFLEX_REDIS_TOKEN_EXPIRATION is a Redis-specific environment variable being used here to configure the memory state manager's expiration. This works because _default_token_expiration reads get_config().redis_token_expiration, but it is semantically misleading — a reader unfamiliar with the code might expect it to be ignored when running in memory mode.

Consider adding a comment clarifying that the memory manager reuses the same config field, or open a follow-up to introduce a dedicated REFLEX_TOKEN_EXPIRATION env var shared by all managers.

Suggested change
monkeypatch.setenv("REFLEX_REDIS_TOKEN_EXPIRATION", "1")
# The memory manager reads the same config field as Redis for token expiration.
monkeypatch.setenv("REFLEX_REDIS_TOKEN_EXPIRATION", "1")

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

…L is set

Previously, StateManager.create always overrode the mode to REDIS when a
Redis URL was detected, ignoring an explicitly configured memory mode.
Now it only auto-promotes to REDIS when state_manager_mode was not
explicitly set. Adds a test verifying the explicit mode is honored.
Copy link
Collaborator

@masenf masenf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i appreciate the enthusiasm here, but the implementation is too complex.

i think you can get away with just tracking the _token_expires_at dict, which should be a quick O(1) operation to update in the hot path.

there's not really a need to track _token_activity, because that's the opposite signal of expiration and just causes code to run more often in the hot path.

instead, an asyncio.sleep is probably the best proxy for expiration.

when the expiration task wakes up, it should just scan all of the (token, expires_at) items, purge any tokens that are expired, and then sleep until the next token is set to expire, which you definitively know from iterating through all the latest expiration times.

i suspect the most tracked tokens you'd see in a reflex server with default settings is <10k, so i iterating through all of the tokens is probably not much worse that popping a bunch of those tombstone expiration entries off the priority queue from each time the token was touched.


For the _pending_locked_expirations, i don't think you need to handle these as special. If the expiration task finds that the lock is held, just ignore that token and move on. When determining how long to sleep next, do not consider locked token expiration times (as it will probably be updated when the lock is released).


Separately, i think the expiration task management stuff would also be better inside the StateManagerExpiration class, so all that logic lives together

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[REF-2503] Memory StateManager should have some sort of expiration

2 participants