Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c99ca34
feat: implement subscriptions/listen with SSE support for long-lived …
guglielmo-san Jun 11, 2026
0066ac9
refactor: remove subscription-based notification tests and cleanup un…
guglielmo-san Jun 11, 2026
8974819
wip
guglielmo-san Jun 12, 2026
d2803b7
refactor: encapsulate subscription management with activeSubscription…
guglielmo-san Jun 12, 2026
3ca0f0c
refactor: move subscription management from Server to ServerSession t…
guglielmo-san Jun 15, 2026
c729ae9
refactor: remove redundant SSE priming and simplify server session ca…
guglielmo-san Jun 15, 2026
c9557ad
refactor: remove stale comment regarding SSE stream response handling
guglielmo-san Jun 15, 2026
c3863e1
Merge branch 'main' into guglielmoc/SEP-2575_subscriptions_listen
guglielmo-san Jun 15, 2026
5d5b8c8
feat: implement stream identification for subscriptions/listen and si…
guglielmo-san Jun 18, 2026
6eb46c7
refactor: partition sessions by protocol version to restrict new noti…
guglielmo-san Jun 18, 2026
ccb0a6e
feat: implement SEP-2575 resource subscription management in Client a…
guglielmo-san Jun 18, 2026
894078e
feat: implement subscriptions/listen stream support for MCP protocol …
guglielmo-san Jun 19, 2026
280a725
Merge branch 'main' into guglielmoc/SEP-2575_subscriptions_listen
guglielmo-san Jun 19, 2026
c11e27e
refactor: remove unused toolCache field from MCP client struct
guglielmo-san Jun 19, 2026
d380bad
fix: allow outgoing cancellation notifications during shutdown and en…
guglielmo-san Jun 21, 2026
5581826
refactor: centralize call cancellation logic and add subscription not…
guglielmo-san Jun 21, 2026
f1845d3
refactor: propagate request context to notification timeout and remov…
guglielmo-san Jun 21, 2026
809e288
refactor: encapsulate ephemeral connection parameters into ephemeralC…
guglielmo-san Jun 21, 2026
3f5167f
Merge branch 'main' into guglielmoc/SEP-2575_subscriptions_listen
guglielmo-san Jun 22, 2026
34fc702
feat: introduce PropagateCancellation to jsonrpc2 and MCP to support …
guglielmo-san Jun 22, 2026
85a0277
docs: expand documentation for ConnectionConfig.PropagateCancellation…
guglielmo-san Jun 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions internal/jsonrpc2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,30 @@ type ConnectionConfig struct {
Bind func(*Connection) Handler // required
OnDone func() // optional
OnInternalError func(error) // optional

// PropagateCancellation controls whether cancellation of the context
// passed to [NewConnection] is observable by request handlers.
//
// By default (false), the connection wraps that context (see [notDone])
// so handlers' Done channels do not fire when the connection's root
// context is cancelled. Cancellation of an in-flight handler is then
// expected to flow only through the jsonrpc2 layer's explicit channels
// (the [Preempter] reacting to the peer's cancel notification, or a
// transport read/write failure cancelling every in-flight request).
//
// Set this true when cancellation of the connection's context is itself
// a meaningful signal that handlers should react to (for example, when
// the connection is tied to a carrier that owns it and whose end means
// the request is cancelled).
PropagateCancellation bool // optional
}

// NewConnection creates a new [Connection] object and starts processing
// incoming messages.
func NewConnection(ctx context.Context, cfg ConnectionConfig) *Connection {
ctx = notDone{ctx}
if !cfg.PropagateCancellation {
ctx = notDone{ctx}
}

c := &Connection{
state: inFlightState{closer: cfg.Closer},
Expand Down Expand Up @@ -530,6 +548,14 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter
ac.retire(&Response{ID: id, Error: err})
}
s.outgoingCalls = nil

// Cancel any incoming requests still in flight: with the reader gone we
// cannot receive cancellation notifications, and likely cannot write a
// response either, so parked handlers have nothing useful left to do.
// Mirrors the equivalent cleanup on write failure.
for _, r := range s.incomingByID {
r.cancel()
}
})
}

Expand Down Expand Up @@ -724,9 +750,12 @@ func (c *Connection) write(ctx context.Context, msg Message) error {
var err error
// Fail writes immediately if the connection is shutting down.
//
// TODO(rfindley): should we allow cancellation notifications through? It
// could be the case that writes can still succeed.
// Allow outgoing "notifications" forwarded by the Notify method.
// This will allow to send the cancelled notification when the client is shutting down.
c.updateInFlight(func(s *inFlightState) {
if req, ok := msg.(*Request); ok && !req.IsCall() && s.outgoingNotifications > 0 {
return
}
err = s.shuttingDown(ErrServerClosing)
})
if err == nil {
Expand Down Expand Up @@ -771,6 +800,15 @@ func (c *Connection) internalErrorf(format string, args ...any) error {
}

// notDone is a context.Context wrapper that returns a nil Done channel.
//
// Request handlers' contexts are derived from the connection's root context,
// which by default is wrapped in notDone so a transport-level cancellation
// does not implicitly cancel every in-flight handler. Cancellation of an
// in-flight handler is instead expected to flow only through the jsonrpc2
// layer's explicit channels: the [Preempter] calling [Connection.Cancel] in
// response to the peer's cancel notification, or the transport itself
// failing (the read loop exits on EOF or a write fails) — both of which
// cancel every in-flight incoming request in turn.
type notDone struct{ ctx context.Context }

func (ic notDone) Value(key any) any {
Expand Down
120 changes: 115 additions & 5 deletions mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,27 @@ func (c *Client) Connect(ctx context.Context, t Transport, opts *ClientSessionOp
if hc, ok := cs.mcpConn.(clientConnection); ok {
hc.sessionUpdated(cs.state)
}
subscribeParams := &SubscriptionsListenParams{}
if c.opts.ToolListChangedHandler != nil {
subscribeParams.Notifications.ToolsListChanged = true
}
if c.opts.PromptListChangedHandler != nil {
subscribeParams.Notifications.PromptsListChanged = true
}
if c.opts.ResourceListChangedHandler != nil {
subscribeParams.Notifications.ResourcesListChanged = true
}
if subscribeParams.Notifications.ToolsListChanged ||
subscribeParams.Notifications.PromptsListChanged ||
subscribeParams.Notifications.ResourcesListChanged {
// ClientSession.Close cancels the listenCtx context to send notifications/cancelled.
listenCtx, cancelListen := context.WithCancel(context.Background())
cs.listenCancel = cancelListen
if err := cs.subscriptionsListen(listenCtx, subscribeParams); err != nil {
cancelListen()
return nil, fmt.Errorf("opening subscriptions/listen: %w", err)
}
}
return cs, nil
}

Expand Down Expand Up @@ -414,6 +435,7 @@ type ClientSession struct {
conn *jsonrpc2.Connection
client *Client
keepaliveCancel context.CancelFunc
listenCancel context.CancelFunc
mcpConn Connection

// No mutex is (currently) required to guard the session state, because it is
Expand All @@ -430,6 +452,15 @@ type ClientSession struct {
// Pending URL elicitations waiting for completion notifications.
pendingElicitationsMu sync.Mutex
pendingElicitations map[string]chan struct{}

// resourceSubsMu guards resourceSubs.
resourceSubsMu sync.Mutex
// resourceSubs maps a subscribed resource URI to the cancel func of the
// goroutine running its dedicated subscriptions/listen stream. Populated
// only under SEP-2575; the legacy protocol routes Subscribe and
// Unsubscribe straight to the resources/subscribe and resources/unsubscribe
// RPCs and leaves this map untouched.
resourceSubs map[string]context.CancelFunc
}

type clientSessionState struct {
Expand Down Expand Up @@ -496,6 +527,10 @@ func (cs *ClientSession) Close() error {
if cs.keepaliveCancel != nil {
cs.keepaliveCancel()
}
if cs.listenCancel != nil {
cs.listenCancel()
}
cs.cancelAllResourceSubscriptions()
err := cs.conn.Close()

if cs.onClose != nil && cs.calledOnClose.CompareAndSwap(false, true) {
Expand Down Expand Up @@ -1083,6 +1118,7 @@ var clientMethodInfos = map[string]methodInfo{
notificationLoggingMessage: newClientMethodInfo(clientMethod((*Client).callLoggingHandler), notification),
notificationProgress: newClientMethodInfo(clientSessionMethod((*ClientSession).callProgressNotificationHandler), notification),
notificationElicitationComplete: newClientMethodInfo(clientMethod((*Client).callElicitationCompleteHandler), notification|missingParamsOK),
notificationSubscriptionsAck: newClientMethodInfo(clientMethod((*Client).callSubscriptionsAckHandler), notification|missingParamsOK),
}

func (cs *ClientSession) sendingMethodInfos() map[string]methodInfo {
Expand Down Expand Up @@ -1279,17 +1315,91 @@ func (cs *ClientSession) Complete(ctx context.Context, params *CompleteParams) (
// Subscribe sends a "resources/subscribe" request to the server, asking for
// notifications when the specified resource changes.
func (cs *ClientSession) Subscribe(ctx context.Context, params *SubscribeParams) error {
_, err := handleSend[*emptyResult](ctx, methodSubscribe, newClientRequest(cs, orZero[Params](params)))
return err
if !cs.usesNewProtocol() {
_, err := handleSend[*emptyResult](ctx, methodSubscribe, newClientRequest(cs, orZero[Params](params)))
return err
}
if params == nil || params.URI == "" {
return fmt.Errorf("Subscribe: missing URI")
}
uri := params.URI

var listenCtx context.Context
cs.resourceSubsMu.Lock()
if _, exists := cs.resourceSubs[uri]; !exists {
var cancel context.CancelFunc
listenCtx, cancel = context.WithCancel(context.Background())
if cs.resourceSubs == nil {
cs.resourceSubs = make(map[string]context.CancelFunc)
}
cs.resourceSubs[uri] = cancel
}
cs.resourceSubsMu.Unlock()
if listenCtx == nil {
// Already subscribed to this URI
return nil
}

return cs.subscriptionsListen(listenCtx, &SubscriptionsListenParams{
Notifications: NotificationSubscriptions{
ResourceSubscriptions: []string{uri},
},
})
}

// Unsubscribe sends a "resources/unsubscribe" request to the server, cancelling
// a previous subscription.
// Unsubscribe cancels a previous [ClientSession.Subscribe] for params.URI.
//
// Under the legacy protocol it sends a "resources/unsubscribe" request.
//
// Under SEP-2575 it cancels the background "subscriptions/listen" stream
// opened by Subscribe for the URI. Unsubscribe is idempotent: calling it for
// a URI that is not currently subscribed is a no-op.
func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribeParams) error {
_, err := handleSend[*emptyResult](ctx, methodUnsubscribe, newClientRequest(cs, orZero[Params](params)))
if !cs.usesNewProtocol() {
_, err := handleSend[*emptyResult](ctx, methodUnsubscribe, newClientRequest(cs, orZero[Params](params)))
return err
}
if params == nil || params.URI == "" {
return fmt.Errorf("Unsubscribe: missing URI")
}
cs.resourceSubsMu.Lock()
cancel, ok := cs.resourceSubs[params.URI]
delete(cs.resourceSubs, params.URI)
cs.resourceSubsMu.Unlock()
if ok {
cancel()
}
return nil
}

// cancelAllResourceSubscriptions cancels every active SEP-2575 resource
// subscription opened via Subscribe. The listen goroutines exit
// asynchronously as their contexts unwind. Called from Close.
func (cs *ClientSession) cancelAllResourceSubscriptions() {
cs.resourceSubsMu.Lock()
subs := cs.resourceSubs
cs.resourceSubs = nil
cs.resourceSubsMu.Unlock()
for _, cancel := range subs {
cancel()
}
}

// SubscriptionsListen opens a SEP-2575 "subscriptions/listen" stream.
//
// The server's first message on the stream is "notifications/subscriptions/acknowledged";
// subsequent opted-in notifications (e.g. tools/list_changed) are delivered through the
// usual handlers registered in [ClientOptions].
func (cs *ClientSession) subscriptionsListen(ctx context.Context, params *SubscriptionsListenParams) error {
params = injectRequestMeta(cs, params)
_, err := handleSend[*emptyResult](ctx, methodSubscriptionsListen, newClientRequest(cs, orZero[Params](params)))
return err
}

func (c *Client) callSubscriptionsAckHandler(context.Context, *ClientRequest[*SubscriptionsAcknowledgedParams]) (Result, error) {
return nil, nil
}

func (c *Client) callToolChangedHandler(ctx context.Context, req *ToolListChangedRequest) (Result, error) {
if cs, ok := req.GetSession().(*ClientSession); ok {
cs.toolsCache.invalidate()
Expand Down
Loading
Loading