Skip to content

fix(memos-local-plugin): prevent orphan episode scan from closing active sessions; add episode delete API#1546

Closed
Starfie1d1272 wants to merge 18 commits intoMemTensor:mainfrom
Starfie1d1272:main
Closed

fix(memos-local-plugin): prevent orphan episode scan from closing active sessions; add episode delete API#1546
Starfie1d1272 wants to merge 18 commits intoMemTensor:mainfrom
Starfie1d1272:main

Conversation

@Starfie1d1272
Copy link
Copy Markdown

@Starfie1d1272 Starfie1d1272 commented Apr 27, 2026

Description

This PR introduces three related changes for the memos-local-plugin (Hermes adapter):

1. Orphan Episode Protection (Reliability Fix)

Problem: Previously, the bridge init() treated all status='open' episodes as orphans upon startup, auto-closing them with the reason "插件上次未正常退出". This caused episodes from active sessions to be prematurely abandoned during bridge restarts, gateway loops, or session switches.

Fix:

  • Updated init() to check session.meta.closedAt before closing open episodes. Episodes belonging to sessions that haven't been explicitly closed are now preserved.
  • Updated closeSession() in the session manager to stamp closedAt via touchLastSeen(), enabling the system to distinguish between a "properly closed session" and a "bridge crash/reconnect" scenario.
  • Batch-fetches session rows for distinct sessionIds to avoid N+1 queries during the orphan scan.

2. WebUI Tasks Page Deletion Fix & Bulk Delete

Problem: The "Delete selected" button on the Tasks page was calling closeEpisode(), which is a no-op for already-closed episodes.

Fix:

  • Repository Level: Added deleteById() to the episodes SQLite repo (supports cascading trace deletion via Foreign Keys).
  • Core Level: Added deleteEpisode and deleteEpisodes methods to the MemoryCore interface and implementation. Open episodes are rejected with a 409/conflict error.
  • API Level: Changed DELETE /api/v1/episodes to use deleteEpisode instead of closeEpisode, and added POST /api/v1/episodes/delete for bulk deletion.

3. TCP Transport Layer for Daemon Bridge

Problem: The bridge previously only supported stdio JSON-RPC, requiring the Hermes Python provider to spawn a child process. This was fragile across restarts and didn't support daemon mode.

Fix:

  • Added bridge/tcp.ts — a TCP JSON-RPC server with line-delimited framing, method whitelist validation, and graceful shutdown.
  • Updated bridge.cts to start the TCP server in daemon mode (via --daemon --tcp=<port>), with stdio fallback.
  • Updated the Hermes Python provider (bridge_client.py, daemon_manager.py, __init__.py) to connect via TCP when available, falling back to stdio. Includes PID-file singleton management to prevent duplicate daemon processes, reconnect with FD leak fixes, and socket cleanup on close.

Files Changed

File Change
agent-contract/memory-core.ts Added deleteEpisode / deleteEpisodes to interface
core/pipeline/memory-core.ts Orphan filter with closedAt check; delete implementation with open-episode guard
core/session/manager.ts Stamp closedAt on closeSession()
core/storage/repos/episodes.ts Added deleteById()
server/routes/session.ts Fixed delete route + bulk delete endpoint
bridge.cts Daemon mode, --tcp flag, version from package.json
bridge/tcp.ts New. TCP JSON-RPC server with framing, method validation, shutdown
adapters/hermes/memos_provider/__init__.py TCP mode with stdio fallback
adapters/hermes/memos_provider/bridge_client.py TCP transport, reconnect, PID-file management
adapters/hermes/memos_provider/daemon_manager.py New. PID-file singleton bridge lifecycle
tests/unit/pipeline/memory-core.test.ts Unit tests for orphan init behavior and delete APIs

How Has This Been Tested?

  • Orphan Fix: Created episodes in an active session → killed bridge → restarted. All 20+ open episodes survived the init() scan.
  • Delete Fix: Deletion on closed episodes physically removed rows from SQLite (previously no-op).
  • TCP Transport: Daemon mode with --tcp=18911 running continuously. Hermes Python provider successfully connects via TCP, survives reconnect cycles.
  • Unit Tests: Added 7 new tests covering orphan init filtering and deleteEpisode/deleteEpisodes (see updated memory-core.test.ts).

Checklist

  • I have performed a self-review of my own code.
  • I have commented my code, particularly in complex logic areas.
  • I have added tests that prove my fix is effective.
  • I have created related documentation in MemOS-Docs.

…ive sessions; add episode delete API

Two related fixes:

1. Orphan episode protection (2 files):
   - core/pipeline/memory-core.ts: init() now checks session.meta.closedAt
     before treating open episodes as orphans. Episodes from sessions
     that haven't been explicitly closed are no longer abandoned on
     bridge restart.
   - core/session/manager.ts: closeSession() now stamps session.meta.closedAt
     so future init() calls can distinguish 'explicitly closed' from
     'crashed and might reconnect'.

2. WebUI Tasks page bulk delete (3 files):
   - core/storage/repos/episodes.ts: added deleteById() method
   - core/pipeline/memory-core.ts: added deleteEpisode() / deleteEpisodes()
   - agent-contract/memory-core.ts: added interface signatures
   - server/routes/session.ts: DELETE /api/v1/episodes now calls
     deleteEpisode (actually removes the row + cascading traces)
     instead of closeEpisode (no-op on already-closed episodes).
     Added POST /api/v1/episodes/delete for bulk operations.
…rmes session restart

Three fixes for the Hermes bridge adapter:

1. Use tsx runtime instead of node --experimental-strip-types
2. PID file to prevent duplicate bridge processes
3. Bridge lifetime tracking via register_bridge()
…d of hardcoding

Previously the bridge reported '2.0.0-alpha.1' regardless of the
actual package version. Now it reads from package.json at startup.
Copilot AI review requested due to automatic review settings April 27, 2026 05:50
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Improves Hermes memos-local-plugin session stability by refining startup orphan-episode handling, and adds hard-delete support for episodes (including a bulk-delete HTTP endpoint) to fix Tasks page deletions.

Changes:

  • Refine init() orphan-episode scan to only close open episodes when the owning session is explicitly closed (or missing).
  • Add episode hard-delete capabilities end-to-end (repo → core → HTTP routes), including a bulk delete route.
  • Stamp session.meta.closedAt on session close so restarts can distinguish “explicitly closed” vs “may reconnect”.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
apps/memos-local-plugin/server/routes/session.ts Switch single-episode delete to hard-delete and add bulk delete endpoint.
apps/memos-local-plugin/core/storage/repos/episodes.ts Add deleteById() to physically remove episodes (FK cascade expected).
apps/memos-local-plugin/core/session/manager.ts Stamp meta.closedAt on closeSession() to support orphan detection.
apps/memos-local-plugin/core/pipeline/memory-core.ts Update orphan filtering on init; implement deleteEpisode(s) in the core.
apps/memos-local-plugin/bridge.cts Update bridge version sourcing and daemon/stdin lifetime handling.
apps/memos-local-plugin/agent-contract/memory-core.ts Extend MemoryCore contract with deleteEpisode / deleteEpisodes.
apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py Add PID-file-based singleton bridge management helpers.
apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py Use PID-file singleton management when spawning/closing the bridge.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/agent-contract/memory-core.ts
Comment thread apps/memos-local-plugin/core/session/manager.ts Outdated
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Starfie1d1272 and others added 4 commits April 27, 2026 14:19
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Prevent deleting episodes that are currently open/in-flight,
which could cause FK violations and corrupt in-memory pipeline state.

Co-Authored-By: Copilot <copilot-pull-request-reviewer[bot]@users.noreply.github.com>
… TCP

Changes:
- bridge_client.py: add TCP transport mode — connect to daemon bridge
  at 127.0.0.1:18911 instead of spawning a subprocess. Supports
  transparent reconnection on connection loss. Falls back to stdio
  subprocess mode when TCP params are not provided.
- __init__.py (MemTensorProvider): connect via TCP to daemon bridge;
  on Hermes CLI exit, skip episode.close + session.close so the daemon
  bridge owns the session and the pipeline can finalize episodes naturally.

Why: previously each Hermes CLI session spawned its own bridge subprocess,
which died on CLI exit → all open episodes abandoned as 'session_closed:client'.
With TCP mode the daemon bridge (persistent background process) keeps the
session alive, and episodes get properly finalized by the pipeline.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 10 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py Outdated
Comment thread apps/memos-local-plugin/bridge.cts Outdated
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts Outdated
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/server/routes/session.ts Outdated
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py Outdated
Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py Outdated
Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py Outdated
…ddress Copilot review

Changes:
- bridge/tcp.ts: new TCP JSON-RPC server (line-delimited, multi-client)
- bridge.cts: start TCP server when --daemon --tcp=<port> is given;
  daemon mode now waits for TCP server stop instead of blocking forever
- memory-core.ts: batch-fetch sessions to avoid N+1 orphan scan (Copilot suggestion);
  add SQLite fallback check in assertEpisodeDeletable (Copilot suggestion)
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py Outdated
Comment thread apps/memos-local-plugin/bridge.cts Outdated
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/bridge/tcp.ts Outdated
@Starfie1d1272
Copy link
Copy Markdown
Author

@copilot apply changes based on the comments in this thread

…k, double newline, port validation

Changes per Copilot review (8 comments):
- bridge_client.py: fix double-newline in _SocketTransport.write_line()
- bridge.cts: validate --tcp port with Number.isFinite() before use
- __init__.py: TCP mode with stdio fallback (try TCP first, spawn
  subprocess on failure); restore episode.close + session.close in
  on_session_end (Copilot correctly noted closedAt must be stamped)
- daemon_manager.py: add mkdir(parents=True) before writing PID file
  (prevents crash when parent dir doesn't exist)
bridge_client.py: close old transport before reconnect
bridge/tcp.ts: await server.close() in TcpServerHandle.close()
agent-contract/memory-core.ts: deleteEpisode/deleteEpisodes no longer optional
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment thread apps/memos-local-plugin/bridge.cts Outdated
Comment thread apps/memos-local-plugin/bridge/tcp.ts Outdated
Comment thread apps/memos-local-plugin/bridge.cts
Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py Outdated
Comment thread apps/memos-local-plugin/server/routes/session.ts Outdated
bridge/tcp.ts: wrap server.listen in a promise; expose 'ready' so
callers catch EADDRINUSE; remove stale server-level error handler
bridge.cts: await tcpServer.ready inside try/catch
bridge_client.py: enable TCP mode when either tcp_host or tcp_port
  is provided (not just tcp_host)
server/routes/session.ts: return { ok: true, deleted } for backward
  compatibility with callers expecting { ok: true }
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/bridge.cts Outdated
Comment thread apps/memos-local-plugin/bridge/tcp.ts Outdated
Comment thread apps/memos-local-plugin/bridge/tcp.ts
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts Outdated
…leanup

bridge.cts: add shuttingDown guard to prevent double-shutdown
bridge/tcp.ts: validate typeof msg.method === 'string';
  sock.destroy() on error to prevent resource leak
bridge/tcp.ts: check sock.destroyed before destroying
core/pipeline/memory-core.ts: extract deleteClosedEpisode helper
  with no-op regression guard
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/bridge.cts
Comment thread apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Comment thread apps/memos-local-plugin/bridge/tcp.ts Outdated
Comment thread apps/memos-local-plugin/bridge.cts Outdated
Comment thread apps/memos-local-plugin/core/pipeline/memory-core.ts
Comment on lines 48 to 69
routes.set("DELETE /api/v1/episodes", async (ctx) => {
const id = ctx.url.searchParams.get("episodeId");
if (!id) {
writeError(ctx, 400, "invalid_argument", "episodeId is required");
return;
}
await deps.core.closeEpisode(id as EpisodeId);
return { ok: true };
const result = await deps.core.deleteEpisode(id as EpisodeId);
return { ok: true, deleted: result.deleted };
});

routes.set("POST /api/v1/episodes/delete", async (ctx) => {
const body = parseJson<{ ids?: unknown }>(ctx);
const ids = Array.isArray(body.ids)
? body.ids.filter((v): v is string => typeof v === "string" && v.length > 0)
: [];
if (ids.length === 0) {
writeError(ctx, 400, "invalid_argument", "ids[] is required");
return;
}
const result = await deps.core.deleteEpisodes(ids as EpisodeId[]);
return { ok: true, deleted: result.deleted };
});
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New deletion endpoints (DELETE /api/v1/episodes now physically deletes; new POST /api/v1/episodes/delete bulk delete) aren’t covered by the existing HTTP server unit tests (tests/unit/server/http.test.ts currently covers the GET endpoints). Adding route-level tests for both single and bulk delete (including closed vs open episode behavior) would help ensure the WebUI doesn’t regress back to “successful delete but rows remain in DB”.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. Adding route-level tests for the delete endpoints would be valuable, but since the existing http.test.ts covers GET endpoints with the same handler pattern and the delete logic is already covered by memory-core.test.ts, this can be deferred to a follow-up PR.

Comment on lines +52 to +140
export function startTcpServer(options: TcpServerOptions): TcpServerHandle {
const { core, host, port } = options;
const dispatch = makeDispatcher(core);

const clients = new Set<Socket>();
let closed = false;
let doneResolve: () => void;
const donePromise = new Promise<void>((resolve) => {
doneResolve = resolve;
});

// Subscribe to events + logs and broadcast to all connected clients.
const eventsUnsub = core.subscribeEvents((e) => {
broadcast({ jsonrpc: "2.0", method: RPC_METHODS.EVENTS_NOTIFY, params: e });
});
const logsUnsub = core.subscribeLogs((r) => {
broadcast({ jsonrpc: "2.0", method: RPC_METHODS.LOGS_FORWARD, params: r });
});

function broadcast(obj: unknown): void {
const payload = JSON.stringify(obj) + "\n";
for (const sock of clients) {
try {
sock.write(payload);
} catch {
/* best-effort per client */
}
}
}

function errorResponse(
id: JsonRpcRequest["id"] | null,
code: number,
message: string,
data?: unknown,
): JsonRpcFailure {
return {
jsonrpc: "2.0",
id: id ?? null,
error: { code, message, data: data as any },
};
}

function writeLine(sock: Socket, obj: unknown): void {
try {
sock.write(JSON.stringify(obj) + "\n");
} catch {
/* ignore */
}
}

async function handleLine(sock: Socket, line: string): Promise<void> {
const trimmed = line.trim();
if (trimmed.length === 0) return;

let msg: JsonRpcRequest | null = null;
try {
msg = JSON.parse(trimmed) as JsonRpcRequest;
} catch (err) {
writeLine(
sock,
errorResponse(null, JSONRPC_PARSE_ERROR, "invalid JSON", {
text: err instanceof Error ? err.message : String(err),
}),
);
return;
}

if (!msg || typeof msg !== "object" || msg.jsonrpc !== "2.0" || typeof msg.method !== "string") {
writeLine(sock, errorResponse(msg?.id ?? null, JSONRPC_INVALID_REQUEST, "not JSON-RPC 2.0"));
return;
}

try {
const result = await dispatch(msg.method, msg.params);
if (msg.id !== undefined && msg.id !== null) {
const ok: JsonRpcSuccess = { jsonrpc: "2.0", id: msg.id, result };
writeLine(sock, ok);
}
} catch (err) {
const code = rpcCodeForError(errorCodeOf(err));
const mErr =
err instanceof MemosError
? err
: new MemosError("internal", err instanceof Error ? err.message : String(err));
writeLine(sock, errorResponse(msg.id ?? null, code, mErr.message, mErr.toJSON()));
process.stderr.write(`bridge.tcp.dispatch.err ${msg.method}: ${mErr.message}\n`);
}
}
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds a new JSON-RPC transport (bridge/tcp.ts), but there are currently only bridge transport tests for stdio (tests/unit/bridge/stdio.test.ts). Adding unit tests for TCP framing/dispatch (basic request/response, notification broadcast, and invalid JSON handling) would reduce the risk of transport regressions and subtle protocol incompatibilities.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. TCP transport unit tests would be useful for regression prevention, but the transport is already exercised by the daemon smoke tests (Hermes provider connecting via TCP). Can be added in a follow-up PR alongside the existing stdio.test.ts pattern.

…r import

bridge.cts: close tcpServer on startup failure; warn and skip TCP
  when --daemon not set; daemon mode now also falls back to stdio when
  TCP port is not given
bridge/tcp.ts: isListening flag for error handling (no more
  removeAllListeners); runtime errors are logged not swallowed
__init__.py: import BridgeError so TCP→stdio fallback actually works
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/bridge.cts
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/memos-local-plugin/bridge.cts
Comment on lines +71 to +79
function broadcast(obj: unknown): void {
const payload = JSON.stringify(obj) + "\n";
for (const sock of clients) {
try {
sock.write(payload);
} catch {
/* best-effort per client */
}
}
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broadcast() writes to every socket without handling backpressure (sock.write(...) return value / drain), so a slow or stuck client can cause unbounded buffering and increased memory usage when logs/events are frequent. Consider tracking per-socket write state (pause/drop clients when write() returns false) or queueing with a bounded buffer and disconnecting slow consumers.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. In practice broadcast payloads are short JSON lines (~100-500 bytes) and clients are localhost-only, so backpressure is extremely unlikely. Adding write() return-value tracking per-socket can be done as a follow-up optimization.

@Starfie1d1272
Copy link
Copy Markdown
Author

Superseded by #1585 which includes all commits from this PR plus additional fixes (adapter-initiated empty episodes, idle-timeout finalize, Hermes auto-skill prompt filter).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants