fix(memos-local-plugin): prevent orphan episode scan from closing active sessions; add episode delete API#1546
fix(memos-local-plugin): prevent orphan episode scan from closing active sessions; add episode delete API#1546Starfie1d1272 wants to merge 18 commits intoMemTensor:mainfrom
Conversation
…ive sessions; add episode delete API
Two related fixes:
1. Orphan episode protection (2 files):
- core/pipeline/memory-core.ts: init() now checks session.meta.closedAt
before treating open episodes as orphans. Episodes from sessions
that haven't been explicitly closed are no longer abandoned on
bridge restart.
- core/session/manager.ts: closeSession() now stamps session.meta.closedAt
so future init() calls can distinguish 'explicitly closed' from
'crashed and might reconnect'.
2. WebUI Tasks page bulk delete (3 files):
- core/storage/repos/episodes.ts: added deleteById() method
- core/pipeline/memory-core.ts: added deleteEpisode() / deleteEpisodes()
- agent-contract/memory-core.ts: added interface signatures
- server/routes/session.ts: DELETE /api/v1/episodes now calls
deleteEpisode (actually removes the row + cascading traces)
instead of closeEpisode (no-op on already-closed episodes).
Added POST /api/v1/episodes/delete for bulk operations.
…rmes session restart Three fixes for the Hermes bridge adapter: 1. Use tsx runtime instead of node --experimental-strip-types 2. PID file to prevent duplicate bridge processes 3. Bridge lifetime tracking via register_bridge()
…d of hardcoding Previously the bridge reported '2.0.0-alpha.1' regardless of the actual package version. Now it reads from package.json at startup.
There was a problem hiding this comment.
Pull request overview
Improves Hermes memos-local-plugin session stability by refining startup orphan-episode handling, and adds hard-delete support for episodes (including a bulk-delete HTTP endpoint) to fix Tasks page deletions.
Changes:
- Refine
init()orphan-episode scan to only close open episodes when the owning session is explicitly closed (or missing). - Add episode hard-delete capabilities end-to-end (repo → core → HTTP routes), including a bulk delete route.
- Stamp
session.meta.closedAton session close so restarts can distinguish “explicitly closed” vs “may reconnect”.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| apps/memos-local-plugin/server/routes/session.ts | Switch single-episode delete to hard-delete and add bulk delete endpoint. |
| apps/memos-local-plugin/core/storage/repos/episodes.ts | Add deleteById() to physically remove episodes (FK cascade expected). |
| apps/memos-local-plugin/core/session/manager.ts | Stamp meta.closedAt on closeSession() to support orphan detection. |
| apps/memos-local-plugin/core/pipeline/memory-core.ts | Update orphan filtering on init; implement deleteEpisode(s) in the core. |
| apps/memos-local-plugin/bridge.cts | Update bridge version sourcing and daemon/stdin lifetime handling. |
| apps/memos-local-plugin/agent-contract/memory-core.ts | Extend MemoryCore contract with deleteEpisode / deleteEpisodes. |
| apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py | Add PID-file-based singleton bridge management helpers. |
| apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py | Use PID-file singleton management when spawning/closing the bridge. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Prevent deleting episodes that are currently open/in-flight, which could cause FK violations and corrupt in-memory pipeline state. Co-Authored-By: Copilot <copilot-pull-request-reviewer[bot]@users.noreply.github.com>
… TCP Changes: - bridge_client.py: add TCP transport mode — connect to daemon bridge at 127.0.0.1:18911 instead of spawning a subprocess. Supports transparent reconnection on connection loss. Falls back to stdio subprocess mode when TCP params are not provided. - __init__.py (MemTensorProvider): connect via TCP to daemon bridge; on Hermes CLI exit, skip episode.close + session.close so the daemon bridge owns the session and the pipeline can finalize episodes naturally. Why: previously each Hermes CLI session spawned its own bridge subprocess, which died on CLI exit → all open episodes abandoned as 'session_closed:client'. With TCP mode the daemon bridge (persistent background process) keeps the session alive, and episodes get properly finalized by the pipeline.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ddress Copilot review Changes: - bridge/tcp.ts: new TCP JSON-RPC server (line-delimited, multi-client) - bridge.cts: start TCP server when --daemon --tcp=<port> is given; daemon mode now waits for TCP server stop instead of blocking forever - memory-core.ts: batch-fetch sessions to avoid N+1 orphan scan (Copilot suggestion); add SQLite fallback check in assertEpisodeDeletable (Copilot suggestion)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@copilot apply changes based on the comments in this thread |
…k, double newline, port validation Changes per Copilot review (8 comments): - bridge_client.py: fix double-newline in _SocketTransport.write_line() - bridge.cts: validate --tcp port with Number.isFinite() before use - __init__.py: TCP mode with stdio fallback (try TCP first, spawn subprocess on failure); restore episode.close + session.close in on_session_end (Copilot correctly noted closedAt must be stamped) - daemon_manager.py: add mkdir(parents=True) before writing PID file (prevents crash when parent dir doesn't exist)
bridge_client.py: close old transport before reconnect bridge/tcp.ts: await server.close() in TcpServerHandle.close() agent-contract/memory-core.ts: deleteEpisode/deleteEpisodes no longer optional
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
bridge/tcp.ts: wrap server.listen in a promise; expose 'ready' so
callers catch EADDRINUSE; remove stale server-level error handler
bridge.cts: await tcpServer.ready inside try/catch
bridge_client.py: enable TCP mode when either tcp_host or tcp_port
is provided (not just tcp_host)
server/routes/session.ts: return { ok: true, deleted } for backward
compatibility with callers expecting { ok: true }
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…leanup bridge.cts: add shuttingDown guard to prevent double-shutdown bridge/tcp.ts: validate typeof msg.method === 'string'; sock.destroy() on error to prevent resource leak
bridge/tcp.ts: check sock.destroyed before destroying core/pipeline/memory-core.ts: extract deleteClosedEpisode helper with no-op regression guard
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| routes.set("DELETE /api/v1/episodes", async (ctx) => { | ||
| const id = ctx.url.searchParams.get("episodeId"); | ||
| if (!id) { | ||
| writeError(ctx, 400, "invalid_argument", "episodeId is required"); | ||
| return; | ||
| } | ||
| await deps.core.closeEpisode(id as EpisodeId); | ||
| return { ok: true }; | ||
| const result = await deps.core.deleteEpisode(id as EpisodeId); | ||
| return { ok: true, deleted: result.deleted }; | ||
| }); | ||
|
|
||
| routes.set("POST /api/v1/episodes/delete", async (ctx) => { | ||
| const body = parseJson<{ ids?: unknown }>(ctx); | ||
| const ids = Array.isArray(body.ids) | ||
| ? body.ids.filter((v): v is string => typeof v === "string" && v.length > 0) | ||
| : []; | ||
| if (ids.length === 0) { | ||
| writeError(ctx, 400, "invalid_argument", "ids[] is required"); | ||
| return; | ||
| } | ||
| const result = await deps.core.deleteEpisodes(ids as EpisodeId[]); | ||
| return { ok: true, deleted: result.deleted }; | ||
| }); |
There was a problem hiding this comment.
New deletion endpoints (DELETE /api/v1/episodes now physically deletes; new POST /api/v1/episodes/delete bulk delete) aren’t covered by the existing HTTP server unit tests (tests/unit/server/http.test.ts currently covers the GET endpoints). Adding route-level tests for both single and bulk delete (including closed vs open episode behavior) would help ensure the WebUI doesn’t regress back to “successful delete but rows remain in DB”.
There was a problem hiding this comment.
Acknowledged. Adding route-level tests for the delete endpoints would be valuable, but since the existing http.test.ts covers GET endpoints with the same handler pattern and the delete logic is already covered by memory-core.test.ts, this can be deferred to a follow-up PR.
| export function startTcpServer(options: TcpServerOptions): TcpServerHandle { | ||
| const { core, host, port } = options; | ||
| const dispatch = makeDispatcher(core); | ||
|
|
||
| const clients = new Set<Socket>(); | ||
| let closed = false; | ||
| let doneResolve: () => void; | ||
| const donePromise = new Promise<void>((resolve) => { | ||
| doneResolve = resolve; | ||
| }); | ||
|
|
||
| // Subscribe to events + logs and broadcast to all connected clients. | ||
| const eventsUnsub = core.subscribeEvents((e) => { | ||
| broadcast({ jsonrpc: "2.0", method: RPC_METHODS.EVENTS_NOTIFY, params: e }); | ||
| }); | ||
| const logsUnsub = core.subscribeLogs((r) => { | ||
| broadcast({ jsonrpc: "2.0", method: RPC_METHODS.LOGS_FORWARD, params: r }); | ||
| }); | ||
|
|
||
| function broadcast(obj: unknown): void { | ||
| const payload = JSON.stringify(obj) + "\n"; | ||
| for (const sock of clients) { | ||
| try { | ||
| sock.write(payload); | ||
| } catch { | ||
| /* best-effort per client */ | ||
| } | ||
| } | ||
| } | ||
|
|
||
| function errorResponse( | ||
| id: JsonRpcRequest["id"] | null, | ||
| code: number, | ||
| message: string, | ||
| data?: unknown, | ||
| ): JsonRpcFailure { | ||
| return { | ||
| jsonrpc: "2.0", | ||
| id: id ?? null, | ||
| error: { code, message, data: data as any }, | ||
| }; | ||
| } | ||
|
|
||
| function writeLine(sock: Socket, obj: unknown): void { | ||
| try { | ||
| sock.write(JSON.stringify(obj) + "\n"); | ||
| } catch { | ||
| /* ignore */ | ||
| } | ||
| } | ||
|
|
||
| async function handleLine(sock: Socket, line: string): Promise<void> { | ||
| const trimmed = line.trim(); | ||
| if (trimmed.length === 0) return; | ||
|
|
||
| let msg: JsonRpcRequest | null = null; | ||
| try { | ||
| msg = JSON.parse(trimmed) as JsonRpcRequest; | ||
| } catch (err) { | ||
| writeLine( | ||
| sock, | ||
| errorResponse(null, JSONRPC_PARSE_ERROR, "invalid JSON", { | ||
| text: err instanceof Error ? err.message : String(err), | ||
| }), | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| if (!msg || typeof msg !== "object" || msg.jsonrpc !== "2.0" || typeof msg.method !== "string") { | ||
| writeLine(sock, errorResponse(msg?.id ?? null, JSONRPC_INVALID_REQUEST, "not JSON-RPC 2.0")); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| const result = await dispatch(msg.method, msg.params); | ||
| if (msg.id !== undefined && msg.id !== null) { | ||
| const ok: JsonRpcSuccess = { jsonrpc: "2.0", id: msg.id, result }; | ||
| writeLine(sock, ok); | ||
| } | ||
| } catch (err) { | ||
| const code = rpcCodeForError(errorCodeOf(err)); | ||
| const mErr = | ||
| err instanceof MemosError | ||
| ? err | ||
| : new MemosError("internal", err instanceof Error ? err.message : String(err)); | ||
| writeLine(sock, errorResponse(msg.id ?? null, code, mErr.message, mErr.toJSON())); | ||
| process.stderr.write(`bridge.tcp.dispatch.err ${msg.method}: ${mErr.message}\n`); | ||
| } | ||
| } |
There was a problem hiding this comment.
This PR adds a new JSON-RPC transport (bridge/tcp.ts), but there are currently only bridge transport tests for stdio (tests/unit/bridge/stdio.test.ts). Adding unit tests for TCP framing/dispatch (basic request/response, notification broadcast, and invalid JSON handling) would reduce the risk of transport regressions and subtle protocol incompatibilities.
There was a problem hiding this comment.
Acknowledged. TCP transport unit tests would be useful for regression prevention, but the transport is already exercised by the daemon smoke tests (Hermes provider connecting via TCP). Can be added in a follow-up PR alongside the existing stdio.test.ts pattern.
…r import bridge.cts: close tcpServer on startup failure; warn and skip TCP when --daemon not set; daemon mode now also falls back to stdio when TCP port is not given bridge/tcp.ts: isListening flag for error handling (no more removeAllListeners); runtime errors are logged not swallowed __init__.py: import BridgeError so TCP→stdio fallback actually works
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| function broadcast(obj: unknown): void { | ||
| const payload = JSON.stringify(obj) + "\n"; | ||
| for (const sock of clients) { | ||
| try { | ||
| sock.write(payload); | ||
| } catch { | ||
| /* best-effort per client */ | ||
| } | ||
| } |
There was a problem hiding this comment.
broadcast() writes to every socket without handling backpressure (sock.write(...) return value / drain), so a slow or stuck client can cause unbounded buffering and increased memory usage when logs/events are frequent. Consider tracking per-socket write state (pause/drop clients when write() returns false) or queueing with a bounded buffer and disconnecting slow consumers.
There was a problem hiding this comment.
Acknowledged. In practice broadcast payloads are short JSON lines (~100-500 bytes) and clients are localhost-only, so backpressure is extremely unlikely. Adding write() return-value tracking per-socket can be done as a follow-up optimization.
…stale PID validation, daemon stdio gate
|
Superseded by #1585 which includes all commits from this PR plus additional fixes (adapter-initiated empty episodes, idle-timeout finalize, Hermes auto-skill prompt filter). |
Description
This PR introduces three related changes for the
memos-local-plugin(Hermes adapter):1. Orphan Episode Protection (Reliability Fix)
Problem: Previously, the bridge
init()treated allstatus='open'episodes as orphans upon startup, auto-closing them with the reason "插件上次未正常退出". This caused episodes from active sessions to be prematurely abandoned during bridge restarts, gateway loops, or session switches.Fix:
init()to checksession.meta.closedAtbefore closing open episodes. Episodes belonging to sessions that haven't been explicitly closed are now preserved.closeSession()in the session manager to stampclosedAtviatouchLastSeen(), enabling the system to distinguish between a "properly closed session" and a "bridge crash/reconnect" scenario.sessionIds to avoid N+1 queries during the orphan scan.2. WebUI Tasks Page Deletion Fix & Bulk Delete
Problem: The "Delete selected" button on the Tasks page was calling
closeEpisode(), which is a no-op for already-closed episodes.Fix:
deleteById()to the episodes SQLite repo (supports cascading trace deletion via Foreign Keys).deleteEpisodeanddeleteEpisodesmethods to theMemoryCoreinterface and implementation. Open episodes are rejected with a 409/conflict error.DELETE /api/v1/episodesto usedeleteEpisodeinstead ofcloseEpisode, and addedPOST /api/v1/episodes/deletefor bulk deletion.3. TCP Transport Layer for Daemon Bridge
Problem: The bridge previously only supported stdio JSON-RPC, requiring the Hermes Python provider to spawn a child process. This was fragile across restarts and didn't support daemon mode.
Fix:
bridge/tcp.ts— a TCP JSON-RPC server with line-delimited framing, method whitelist validation, and graceful shutdown.bridge.ctsto start the TCP server in daemon mode (via--daemon --tcp=<port>), with stdio fallback.bridge_client.py,daemon_manager.py,__init__.py) to connect via TCP when available, falling back to stdio. Includes PID-file singleton management to prevent duplicate daemon processes, reconnect with FD leak fixes, and socket cleanup on close.Files Changed
agent-contract/memory-core.tsdeleteEpisode/deleteEpisodesto interfacecore/pipeline/memory-core.tsclosedAtcheck; delete implementation with open-episode guardcore/session/manager.tsclosedAtoncloseSession()core/storage/repos/episodes.tsdeleteById()server/routes/session.tsbridge.cts--tcpflag, version from package.jsonbridge/tcp.tsadapters/hermes/memos_provider/__init__.pyadapters/hermes/memos_provider/bridge_client.pyadapters/hermes/memos_provider/daemon_manager.pytests/unit/pipeline/memory-core.test.tsHow Has This Been Tested?
init()scan.--tcp=18911running continuously. Hermes Python provider successfully connects via TCP, survives reconnect cycles.memory-core.test.ts).Checklist