Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions ts/docs/architecture/agent-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,24 @@ export function instantiate(): AppAgent {
}
```

Agents that allocate expensive resources (browser instances, network
connections, child processes) on `initializeAgentContext` should also implement
`closeAgentContext` to release them when the agent is disabled or the session
ends. The dispatcher calls `closeAgentContext` automatically; omitting it leaks
the resource for the lifetime of the process.

```typescript
// Example: agent that holds a long-lived resource
export function instantiate(): AppAgent {
return { initializeAgentContext, closeAgentContext, executeAction };
}
```

**Examples:** `weather`, `photo`, `list`, `image`, `video`, `utility`

**When to choose:** any integration with a well-defined, enumerable set of
actions — REST APIs, CLI tools, file operations, data queries.

**Examples:** `weather`, `photo`, `list`, `image`, `video`

---

### 2. `external-api` — REST / OAuth Bridge
Expand Down
12 changes: 9 additions & 3 deletions ts/docs/architecture/dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,15 @@ The dispatcher uses structured error handling at several levels:

- **Command lock** — A `Limiter` ensures only one command executes at a
time. Concurrent requests queue behind the lock.
- **Cancellation** — Each request gets an `AbortController`. Calling
`cancelCommand(requestId)` signals the abort, which propagates through
the translation and execution pipeline.
- **Cancellation** — Each request gets an `AbortController` whose signal
propagates through the translation and execution pipeline (LLM fetch,
streaming chunks, cache validation). Two cancellation paths exist:
- `cancelCommand(requestId)` — cancel by the server-assigned UUID, available
after `setUserRequest()` fires. Used by Escape/Ctrl+C once a request is running.
- `cancelCommandByClientId(clientRequestId)` — cancel by the client-assigned
id passed as the second argument to `processCommand()`. This AbortController
is created before the command lock is acquired, so it can abort a command
that is queued behind another in-flight command before `setUserRequest()` fires.
- **Unknown actions** — When no agent matches, the dispatcher displays
an error and uses semantic search to suggest the closest matching
agents/schemas.
Expand Down
67 changes: 61 additions & 6 deletions ts/packages/agentRpc/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,41 @@ import { ChannelProvider } from "./common.js";
import { getObjectProperty, uint8ArrayToBase64 } from "@typeagent/common-utils";
import { AgentInterfaceFunctionName } from "./server.js";

/**
* Race a promise against an AbortSignal. If the signal fires before the
* promise settles, throw an AbortError immediately (the underlying work
* continues in the background — the caller does not wait for it).
*/
function raceWithSignal<T>(
promise: Promise<T>,
signal: AbortSignal | undefined,
): Promise<T> {
if (!signal) {
return promise;
}
return new Promise<T>((resolve, reject) => {
const onAbort = () =>
reject(
signal.reason ??
new DOMException(
"The operation was aborted.",
"AbortError",
),
);
signal.addEventListener("abort", onAbort, { once: true });
promise.then(
(v) => {
signal.removeEventListener("abort", onAbort);
resolve(v);
},
(e) => {
signal.removeEventListener("abort", onAbort);
reject(e);
},
);
});
}

type ShimContext =
| {
contextId: number;
Expand Down Expand Up @@ -472,12 +507,32 @@ export async function createAgentRpcClient(
action: TypeAgentAction,
context: ActionContext<ShimContext>,
) {
return withActionContextAsync(context, (contextParams) =>
rpc.invoke("executeAction", {
...contextParams,
action,
}),
);
return withActionContextAsync(context, (contextParams) => {
const signal = context.abortSignal;
if (signal) {
const onAbort = () =>
rpc.send("cancelAction", {
actionContextId: contextParams.actionContextId,
});
signal.addEventListener("abort", onAbort, { once: true });
return raceWithSignal(
rpc.invoke("executeAction", {
...contextParams,
action,
}),
signal,
).finally(() => {
signal.removeEventListener("abort", onAbort);
});
}
return raceWithSignal(
rpc.invoke("executeAction", {
...contextParams,
action,
}),
signal,
);
});
Comment thread
GeorgeNgMsft marked this conversation as resolved.
},
validateWildcardMatch(
action: AppAction,
Expand Down
34 changes: 29 additions & 5 deletions ts/packages/agentRpc/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,14 @@ export function createAgentRpcServer(
if (agent.executeAction === undefined) {
throw new Error("Invalid invocation of executeAction");
}
return agent.executeAction(
param.action,
getActionContextShim(param),
);
const shim = getActionContextShim(param);
try {
return await agent.executeAction(param.action, shim);
} finally {
if (param.actionContextId !== undefined) {
actionAbortControllers.delete(param.actionContextId);
}
}
},
async validateWildcardMatch(param): Promise<any> {
if (agent.validateWildcardMatch === undefined) {
Expand Down Expand Up @@ -264,6 +268,8 @@ export function createAgentRpcServer(
},
};

const actionAbortControllers = new Map<number, AbortController>();

const agentCallHandlers: AgentCallFunctions = {
async streamPartialAction(
param: Partial<ContextParams> & {
Expand All @@ -284,6 +290,9 @@ export function createAgentRpcServer(
getActionContextShim(param),
);
},
cancelAction(param: { actionContextId: number }): void {
actionAbortControllers.get(param.actionContextId)?.abort();
},
};

const rpc = createRpc<
Expand Down Expand Up @@ -600,6 +609,18 @@ export function createAgentRpcServer(
"Invalid action context param: missing actionContextId",
);
}
// Reuse the controller already registered for this actionContextId if
// one exists. Multiple RPC entry points (executeAction,
// streamPartialAction, executeCommand, handleChoice) can build a shim
// for the same id, and overwriting the controller would orphan the
// signal handed to the in-flight executeAction — cancelAction would
// then abort the wrong one. The owning call (executeAction) is
// responsible for deleting the entry when it completes.
let abortController = actionAbortControllers.get(actionContextId);
if (abortController === undefined) {
abortController = new AbortController();
actionAbortControllers.set(actionContextId, abortController);
}
const sessionContext = getSessionContextShim(param);
const actionIO: ActionIO = {
setDisplay(content: DisplayContent): void {
Expand Down Expand Up @@ -634,6 +655,9 @@ export function createAgentRpcServer(
streamingContext: undefined,
activityContext: param.activityContext,
isFromReasoningLoop: param.isFromReasoningLoop ?? false,
get abortSignal() {
return abortController.signal;
},
get sessionContext() {
return sessionContext;
},
Expand Down Expand Up @@ -672,6 +696,6 @@ export function createAgentRpcServer(

export type AgentInterfaceFunctionName =
| keyof AgentInvokeFunctions
| keyof AgentCallFunctions;
| Exclude<keyof AgentCallFunctions, "cancelAction">;

export type AgentControlMessage = "exit";
1 change: 1 addition & 0 deletions ts/packages/agentRpc/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export type AgentCallFunctions = {
delta: string | undefined;
},
) => void;
cancelAction: (param: { actionContextId: number }) => void;
};

export type AgentInvokeFunctions = {
Expand Down
4 changes: 4 additions & 0 deletions ts/packages/agents/utility/jest.config.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

module.exports = require("../../../jest.config.js");
8 changes: 7 additions & 1 deletion ts/packages/agents/utility/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
"asc": "asc -i ./src/utilitySchema.mts -o ./dist/utilitySchema.pas.json -t UtilityAction",
"build": "concurrently npm:tsc npm:asc npm:agc",
"clean": "rimraf --glob dist *.tsbuildinfo *.done.build.log",
"jest-esm": "node --no-warnings --experimental-vm-modules ./node_modules/jest/bin/jest.js",
"prettier": "prettier --check . --ignore-path ../../../.prettierignore",
"prettier:fix": "prettier --write . --ignore-path ../../../.prettierignore",
"tsc": "tsc -b"
"test": "npm run test:local",
"test:local": "pnpm run jest-esm --testPathPattern=\".*[.]spec[.]js\"",
"tsc": "tsc -b src test"
},
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.2.101",
Expand All @@ -36,9 +39,12 @@
"puppeteer-extra-plugin-stealth": "^2.11.2"
},
"devDependencies": {
"@jest/globals": "^29.7.0",
"@typeagent/action-schema-compiler": "workspace:*",
"@types/jest": "^29.5.7",
"action-grammar-compiler": "workspace:*",
"concurrently": "^9.1.2",
"jest": "^29.7.0",
"prettier": "^3.5.3",
"rimraf": "^6.0.1",
"typescript": "~5.4.5"
Expand Down
Loading
Loading