Add in-memory state expiration to StateManagerMemory#6201
Add in-memory state expiration to StateManagerMemory#6201FarhanAliRaza wants to merge 3 commits intoreflex-dev:mainfrom
Conversation
Extract reusable expiration logic into StateManagerExpiration base class that tracks token access times and purges expired states using a deadline-ordered heap. Integrate it into StateManagerMemory with a background asyncio task that automatically cleans up idle client states.
Remove dead _token_last_touched dict, replace hand-rolled task scheduling with ensure_task, move heap compaction off the hot path, and fix touch ordering in get_state/set_state.
Greptile SummaryThis PR adds automatic in-memory state expiration to Key changes:
Issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant StateManagerMemory
participant StateManagerExpiration
participant ExpirationTask
Client->>StateManagerMemory: modify_state(token)
StateManagerMemory->>StateManagerMemory: create per-token Lock → _states_locks[token]
StateManagerMemory->>StateManagerMemory: acquire _states_locks[token]
StateManagerMemory->>StateManagerExpiration: get_state(token) → _touch_token(token)
StateManagerExpiration->>StateManagerExpiration: update _token_expires_at, push heap entry
StateManagerExpiration->>ExpirationTask: _token_activity.set() (if deadline changed)
StateManagerMemory-->>Client: yield state
Client->>StateManagerMemory: exit modify_state context
StateManagerMemory->>StateManagerExpiration: _notify_token_unlocked(token)
StateManagerExpiration->>ExpirationTask: re-push heap entry if pending
loop Background expiration loop
ExpirationTask->>StateManagerExpiration: _purge_expired_tokens(now)
alt token expired and lock NOT held
StateManagerExpiration->>StateManagerExpiration: _purge_token(token) — removes states, locks, metadata
else token expired but lock IS held
StateManagerExpiration->>StateManagerExpiration: add to _pending_locked_expirations
end
ExpirationTask->>StateManagerExpiration: _wait_for_token_activity(timeout)
end
|
| heapq.heappush(self._token_expiration_heap, (expires_at, token)) | ||
| if ( |
There was a problem hiding this comment.
Missing human-readable time comment on expiration calculation
The time calculation should be annotated to clarify the unit.
| heapq.heappush(self._token_expiration_heap, (expires_at, token)) | |
| if ( | |
| touched_at = time.time() | |
| expires_at = touched_at + self.token_expiration # token_expiration seconds from now |
Rule Used: When using time-based calculations in code, includ... (source)
Learnt From
reflex-dev/flexgen#2190
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| from reflex.state import BaseState, _split_substate_key | ||
| from reflex.utils.tasks import ensure_task | ||
|
|
||
|
|
There was a problem hiding this comment.
Missing human-readable time comment
The retry constant's value should be annotated even though the variable name already includes SECONDS, for consistency with the project convention.
| _EXPIRATION_ERROR_RETRY_SECONDS = 1.0 # 1 second retry after expiration worker error |
Rule Used: When using time-based calculations in code, includ... (source)
Learnt From
reflex-dev/flexgen#2190
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| ) | ||
| _expiration_task: asyncio.Task | None = dataclasses.field(default=None, init=False) | ||
|
|
||
| async def _expire_states_once(self): |
There was a problem hiding this comment.
Private field should use
dataclasses.field(init=False)
_expiration_task is an internal implementation detail, but as written it will be exposed as an __init__ parameter for the dataclass. It should be wrapped in a dataclasses.field with init=False to prevent callers from accidentally passing it:
| async def _expire_states_once(self): | |
| _expiration_task: asyncio.Task | None = dataclasses.field(default=None, init=False) |
| A running app harness configured to use StateManagerMemory. | ||
| """ | ||
| monkeypatch.setenv("REFLEX_STATE_MANAGER_MODE", "memory") | ||
| monkeypatch.setenv("REFLEX_REDIS_TOKEN_EXPIRATION", "1") |
There was a problem hiding this comment.
Redis-specific env var used for memory state manager configuration
REFLEX_REDIS_TOKEN_EXPIRATION is a Redis-specific environment variable being used here to configure the memory state manager's expiration. This works because _default_token_expiration reads get_config().redis_token_expiration, but it is semantically misleading — a reader unfamiliar with the code might expect it to be ignored when running in memory mode.
Consider adding a comment clarifying that the memory manager reuses the same config field, or open a follow-up to introduce a dedicated REFLEX_TOKEN_EXPIRATION env var shared by all managers.
| monkeypatch.setenv("REFLEX_REDIS_TOKEN_EXPIRATION", "1") | |
| # The memory manager reads the same config field as Redis for token expiration. | |
| monkeypatch.setenv("REFLEX_REDIS_TOKEN_EXPIRATION", "1") |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
…L is set Previously, StateManager.create always overrode the mode to REDIS when a Redis URL was detected, ignoring an explicitly configured memory mode. Now it only auto-promotes to REDIS when state_manager_mode was not explicitly set. Adds a test verifying the explicit mode is honored.
masenf
left a comment
There was a problem hiding this comment.
i appreciate the enthusiasm here, but the implementation is too complex.
i think you can get away with just tracking the _token_expires_at dict, which should be a quick O(1) operation to update in the hot path.
there's not really a need to track _token_activity, because that's the opposite signal of expiration and just causes code to run more often in the hot path.
instead, an asyncio.sleep is probably the best proxy for expiration.
when the expiration task wakes up, it should just scan all of the (token, expires_at) items, purge any tokens that are expired, and then sleep until the next token is set to expire, which you definitively know from iterating through all the latest expiration times.
i suspect the most tracked tokens you'd see in a reflex server with default settings is <10k, so i iterating through all of the tokens is probably not much worse that popping a bunch of those tombstone expiration entries off the priority queue from each time the token was touched.
For the _pending_locked_expirations, i don't think you need to handle these as special. If the expiration task finds that the lock is held, just ignore that token and move on. When determining how long to sleep next, do not consider locked token expiration times (as it will probably be updated when the lock is released).
Separately, i think the expiration task management stuff would also be better inside the StateManagerExpiration class, so all that logic lives together
Extract reusable expiration logic into StateManagerExpiration base class that tracks token access times and purges expired states using a deadline-ordered heap. Integrate it into StateManagerMemory with a background asyncio task that automatically cleans up idle client states.
All Submissions:
Type of change
Please delete options that are not relevant.
New Feature Submission:
Changes To Core Features:
closes #3021