Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 23 additions & 98 deletions internal/jsonrpc2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,6 @@ import (
"github.com/modelcontextprotocol/go-sdk/internal/json"
)

// Binder builds a connection configuration.
// This may be used in servers to generate a new configuration per connection.
// ConnectionOptions itself implements Binder returning itself unmodified, to
// allow for the simple cases where no per connection information is needed.
type Binder interface {
// Bind returns the ConnectionOptions to use when establishing the passed-in
// Connection.
//
// The connection is not ready to use when Bind is called,
// but Bind may close it without reading or writing to it.
Bind(context.Context, *Connection) ConnectionOptions
}

// A BinderFunc implements the Binder interface for a standalone Bind function.
type BinderFunc func(context.Context, *Connection) ConnectionOptions

func (f BinderFunc) Bind(ctx context.Context, c *Connection) ConnectionOptions {
return f(ctx, c)
}

var _ Binder = BinderFunc(nil)

// ConnectionOptions holds the options for new connections.
type ConnectionOptions struct {
// Framer allows control over the message framing and encoding.
// If nil, HeaderFramer will be used.
Framer Framer
// Preempter allows registration of a pre-queue message handler.
// If nil, no messages will be preempted.
Preempter Preempter
// Handler is used as the queued message handler for inbound messages.
// If nil, all responses will be ErrNotHandled.
Handler Handler
// OnInternalError, if non-nil, is called with any internal errors that occur
// while serving the connection, such as protocol errors or invariant
// violations. (If nil, internal errors result in panics.)
OnInternalError func(error)
}

// Connection manages the jsonrpc2 protocol, connecting responses back to their
// calls. Connection is bidirectional; it does not have a designated server or
// client end.
Expand Down Expand Up @@ -197,9 +158,28 @@ type incomingRequest struct {
cancel context.CancelFunc
}

// Bind returns the options unmodified.
func (o ConnectionOptions) Bind(context.Context, *Connection) ConnectionOptions {
return o
// Reader abstracts the transport mechanics from the JSON RPC protocol.
// A Connection reads messages from the reader it was provided on construction,
// and assumes that each call to Read fully transfers a single message,
// or returns an error.
//
// A reader is not safe for concurrent use, it is expected it will be used by
// a single Connection in a safe manner.
type Reader interface {
// Read gets the next message from the stream.
Read(context.Context) (Message, error)
}

// Writer abstracts the transport mechanics from the JSON RPC protocol.
// A Connection writes messages using the writer it was provided on construction,
// and assumes that each call to Write fully transfers a single message,
// or returns an error.
//
// A writer must be safe for concurrent use, as writes may occur concurrently
// in practice: libraries may make calls or respond to requests asynchronously.
type Writer interface {
// Write sends a message to the stream.
Write(context.Context, Message) error
}

// A ConnectionConfig configures a bidirectional jsonrpc2 connection.
Expand Down Expand Up @@ -230,59 +210,16 @@ func NewConnection(ctx context.Context, cfg ConnectionConfig) *Connection {
return c
}

// bindConnection creates a new connection and runs it.
//
// This is used by the Dial and Serve functions to build the actual connection.
//
// The connection is closed automatically (and its resources cleaned up) when
// the last request has completed after the underlying ReadWriteCloser breaks,
// but it may be stopped earlier by calling Close (for a clean shutdown).
func bindConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder, onDone func()) *Connection {
// TODO: Should we create a new event span here?
// This will propagate cancellation from ctx; should it?
ctx := notDone{bindCtx}

c := &Connection{
state: inFlightState{closer: rwc},
done: make(chan struct{}),
onDone: onDone,
}
// It's tempting to set a finalizer on c to verify that the state has gone
// idle when the connection becomes unreachable. Unfortunately, the Binder
// interface makes that unsafe: it allows the Handler to close over the
// Connection, which could create a reference cycle that would cause the
// Connection to become uncollectable.

options := binder.Bind(bindCtx, c)
framer := options.Framer
if framer == nil {
framer = HeaderFramer()
}
c.handler = options.Handler
if c.handler == nil {
c.handler = defaultHandler{}
}
c.onInternalError = options.OnInternalError

c.writer = framer.Writer(rwc)
reader := framer.Reader(rwc)
c.start(ctx, reader, options.Preempter)
return c
}

func (c *Connection) start(ctx context.Context, reader Reader, preempter Preempter) {
c.updateInFlight(func(s *inFlightState) {
select {
case <-c.done:
// Bind already closed the connection; don't start a goroutine to read it.
// The connection was already closed; don't start a goroutine to read it.
return
default:
}

// The goroutine started here will continue until the underlying stream is closed.
//
// (If the Binder closed the Connection already, this should error out and
// return almost immediately.)
s.reading = true
go c.readIncoming(ctx, reader, preempter)
})
Expand Down Expand Up @@ -439,18 +376,6 @@ type AsyncCall struct {
// This can be used to cancel the call if needed.
func (ac *AsyncCall) ID() ID { return ac.id }

// IsReady can be used to check if the result is already prepared.
// This is guaranteed to return true on a result for which Await has already
// returned, or a call that failed to send in the first place.
func (ac *AsyncCall) IsReady() bool {
select {
case <-ac.ready:
return true
default:
return false
}
}

// retire processes the response to the call.
//
// It is an error to call retire more than once: retire is guarded by the
Expand Down
208 changes: 0 additions & 208 deletions internal/jsonrpc2/frame.go

This file was deleted.

Loading
Loading