diff --git a/internal/jsonrpc2/conn.go b/internal/jsonrpc2/conn.go index 571df63a..72cc7408 100644 --- a/internal/jsonrpc2/conn.go +++ b/internal/jsonrpc2/conn.go @@ -16,45 +16,6 @@ import ( "github.com/modelcontextprotocol/go-sdk/internal/json" ) -// Binder builds a connection configuration. -// This may be used in servers to generate a new configuration per connection. -// ConnectionOptions itself implements Binder returning itself unmodified, to -// allow for the simple cases where no per connection information is needed. -type Binder interface { - // Bind returns the ConnectionOptions to use when establishing the passed-in - // Connection. - // - // The connection is not ready to use when Bind is called, - // but Bind may close it without reading or writing to it. - Bind(context.Context, *Connection) ConnectionOptions -} - -// A BinderFunc implements the Binder interface for a standalone Bind function. -type BinderFunc func(context.Context, *Connection) ConnectionOptions - -func (f BinderFunc) Bind(ctx context.Context, c *Connection) ConnectionOptions { - return f(ctx, c) -} - -var _ Binder = BinderFunc(nil) - -// ConnectionOptions holds the options for new connections. -type ConnectionOptions struct { - // Framer allows control over the message framing and encoding. - // If nil, HeaderFramer will be used. - Framer Framer - // Preempter allows registration of a pre-queue message handler. - // If nil, no messages will be preempted. - Preempter Preempter - // Handler is used as the queued message handler for inbound messages. - // If nil, all responses will be ErrNotHandled. - Handler Handler - // OnInternalError, if non-nil, is called with any internal errors that occur - // while serving the connection, such as protocol errors or invariant - // violations. (If nil, internal errors result in panics.) - OnInternalError func(error) -} - // Connection manages the jsonrpc2 protocol, connecting responses back to their // calls. Connection is bidirectional; it does not have a designated server or // client end. @@ -197,9 +158,28 @@ type incomingRequest struct { cancel context.CancelFunc } -// Bind returns the options unmodified. -func (o ConnectionOptions) Bind(context.Context, *Connection) ConnectionOptions { - return o +// Reader abstracts the transport mechanics from the JSON RPC protocol. +// A Connection reads messages from the reader it was provided on construction, +// and assumes that each call to Read fully transfers a single message, +// or returns an error. +// +// A reader is not safe for concurrent use, it is expected it will be used by +// a single Connection in a safe manner. +type Reader interface { + // Read gets the next message from the stream. + Read(context.Context) (Message, error) +} + +// Writer abstracts the transport mechanics from the JSON RPC protocol. +// A Connection writes messages using the writer it was provided on construction, +// and assumes that each call to Write fully transfers a single message, +// or returns an error. +// +// A writer must be safe for concurrent use, as writes may occur concurrently +// in practice: libraries may make calls or respond to requests asynchronously. +type Writer interface { + // Write sends a message to the stream. + Write(context.Context, Message) error } // A ConnectionConfig configures a bidirectional jsonrpc2 connection. @@ -230,59 +210,16 @@ func NewConnection(ctx context.Context, cfg ConnectionConfig) *Connection { return c } -// bindConnection creates a new connection and runs it. -// -// This is used by the Dial and Serve functions to build the actual connection. -// -// The connection is closed automatically (and its resources cleaned up) when -// the last request has completed after the underlying ReadWriteCloser breaks, -// but it may be stopped earlier by calling Close (for a clean shutdown). -func bindConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder, onDone func()) *Connection { - // TODO: Should we create a new event span here? - // This will propagate cancellation from ctx; should it? - ctx := notDone{bindCtx} - - c := &Connection{ - state: inFlightState{closer: rwc}, - done: make(chan struct{}), - onDone: onDone, - } - // It's tempting to set a finalizer on c to verify that the state has gone - // idle when the connection becomes unreachable. Unfortunately, the Binder - // interface makes that unsafe: it allows the Handler to close over the - // Connection, which could create a reference cycle that would cause the - // Connection to become uncollectable. - - options := binder.Bind(bindCtx, c) - framer := options.Framer - if framer == nil { - framer = HeaderFramer() - } - c.handler = options.Handler - if c.handler == nil { - c.handler = defaultHandler{} - } - c.onInternalError = options.OnInternalError - - c.writer = framer.Writer(rwc) - reader := framer.Reader(rwc) - c.start(ctx, reader, options.Preempter) - return c -} - func (c *Connection) start(ctx context.Context, reader Reader, preempter Preempter) { c.updateInFlight(func(s *inFlightState) { select { case <-c.done: - // Bind already closed the connection; don't start a goroutine to read it. + // The connection was already closed; don't start a goroutine to read it. return default: } // The goroutine started here will continue until the underlying stream is closed. - // - // (If the Binder closed the Connection already, this should error out and - // return almost immediately.) s.reading = true go c.readIncoming(ctx, reader, preempter) }) @@ -439,18 +376,6 @@ type AsyncCall struct { // This can be used to cancel the call if needed. func (ac *AsyncCall) ID() ID { return ac.id } -// IsReady can be used to check if the result is already prepared. -// This is guaranteed to return true on a result for which Await has already -// returned, or a call that failed to send in the first place. -func (ac *AsyncCall) IsReady() bool { - select { - case <-ac.ready: - return true - default: - return false - } -} - // retire processes the response to the call. // // It is an error to call retire more than once: retire is guarded by the diff --git a/internal/jsonrpc2/frame.go b/internal/jsonrpc2/frame.go deleted file mode 100644 index 72527cb9..00000000 --- a/internal/jsonrpc2/frame.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2018 The Go MCP SDK Authors. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package jsonrpc2 - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "io" - "strconv" - "strings" - "sync" -) - -// Reader abstracts the transport mechanics from the JSON RPC protocol. -// A Conn reads messages from the reader it was provided on construction, -// and assumes that each call to Read fully transfers a single message, -// or returns an error. -// -// A reader is not safe for concurrent use, it is expected it will be used by -// a single Conn in a safe manner. -type Reader interface { - // Read gets the next message from the stream. - Read(context.Context) (Message, error) -} - -// Writer abstracts the transport mechanics from the JSON RPC protocol. -// A Conn writes messages using the writer it was provided on construction, -// and assumes that each call to Write fully transfers a single message, -// or returns an error. -// -// A writer must be safe for concurrent use, as writes may occur concurrently -// in practice: libraries may make calls or respond to requests asynchronously. -type Writer interface { - // Write sends a message to the stream. - Write(context.Context, Message) error -} - -// Framer wraps low level byte readers and writers into jsonrpc2 message -// readers and writers. -// It is responsible for the framing and encoding of messages into wire form. -// -// TODO(rfindley): rethink the framer interface, as with JSONRPC2 batching -// there is a need for Reader and Writer to be correlated, and while the -// implementation of framing here allows that, it is not made explicit by the -// interface. -// -// Perhaps a better interface would be -// -// Frame(io.ReadWriteCloser) (Reader, Writer). -type Framer interface { - // Reader wraps a byte reader into a message reader. - Reader(io.Reader) Reader - // Writer wraps a byte writer into a message writer. - Writer(io.Writer) Writer -} - -// RawFramer returns a new Framer. -// The messages are sent with no wrapping, and rely on json decode consistency -// to determine message boundaries. -func RawFramer() Framer { return rawFramer{} } - -type rawFramer struct{} -type rawReader struct{ in *json.Decoder } -type rawWriter struct { - mu sync.Mutex - out io.Writer -} - -func (rawFramer) Reader(rw io.Reader) Reader { - return &rawReader{in: json.NewDecoder(rw)} -} - -func (rawFramer) Writer(rw io.Writer) Writer { - return &rawWriter{out: rw} -} - -func (r *rawReader) Read(ctx context.Context) (Message, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - var raw json.RawMessage - if err := r.in.Decode(&raw); err != nil { - return nil, err - } - msg, err := DecodeMessage(raw) - return msg, err -} - -func (w *rawWriter) Write(ctx context.Context, msg Message) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - data, err := EncodeMessage(msg) - if err != nil { - return fmt.Errorf("marshaling message: %v", err) - } - - w.mu.Lock() - defer w.mu.Unlock() - _, err = w.out.Write(data) - return err -} - -// HeaderFramer returns a new Framer. -// The messages are sent with HTTP content length and MIME type headers. -// This is the format used by LSP and others. -func HeaderFramer() Framer { return headerFramer{} } - -type headerFramer struct{} -type headerReader struct{ in *bufio.Reader } -type headerWriter struct { - mu sync.Mutex - out io.Writer -} - -func (headerFramer) Reader(rw io.Reader) Reader { - return &headerReader{in: bufio.NewReader(rw)} -} - -func (headerFramer) Writer(rw io.Writer) Writer { - return &headerWriter{out: rw} -} - -func (r *headerReader) Read(ctx context.Context) (Message, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - firstRead := true // to detect a clean EOF below - var contentLength int64 - // read the header, stop on the first empty line - for { - line, err := r.in.ReadString('\n') - if err != nil { - if err == io.EOF { - if firstRead && line == "" { - return nil, io.EOF // clean EOF - } - err = io.ErrUnexpectedEOF - } - return nil, fmt.Errorf("failed reading header line: %w", err) - } - firstRead = false - - line = strings.TrimSpace(line) - // check we have a header line - if line == "" { - break - } - colon := strings.IndexRune(line, ':') - if colon < 0 { - return nil, fmt.Errorf("invalid header line %q", line) - } - name, value := line[:colon], strings.TrimSpace(line[colon+1:]) - switch { - case strings.EqualFold(name, "Content-Length"): - if contentLength, err = strconv.ParseInt(value, 10, 32); err != nil { - return nil, fmt.Errorf("failed parsing Content-Length: %v", value) - } - if contentLength <= 0 { - return nil, fmt.Errorf("invalid Content-Length: %v", contentLength) - } - default: - // ignoring unknown headers - } - } - if contentLength == 0 { - return nil, fmt.Errorf("missing Content-Length header") - } - data := make([]byte, contentLength) - _, err := io.ReadFull(r.in, data) - if err != nil { - return nil, err - } - msg, err := DecodeMessage(data) - return msg, err -} - -func (w *headerWriter) Write(ctx context.Context, msg Message) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - w.mu.Lock() - defer w.mu.Unlock() - - data, err := EncodeMessage(msg) - if err != nil { - return fmt.Errorf("marshaling message: %v", err) - } - _, err = fmt.Fprintf(w.out, "Content-Length: %v\r\n\r\n", len(data)) - if err == nil { - _, err = w.out.Write(data) - } - return err -} diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go index 234e6ee3..76a51161 100644 --- a/internal/jsonrpc2/jsonrpc2.go +++ b/internal/jsonrpc2/jsonrpc2.go @@ -12,17 +12,12 @@ import ( "errors" ) -var ( - // ErrIdleTimeout is returned when serving timed out waiting for new connections. - ErrIdleTimeout = errors.New("timed out waiting for new connections") - - // ErrNotHandled is returned from a Handler or Preempter to indicate it did - // not handle the request. - // - // If a Handler returns ErrNotHandled, the server replies with - // ErrMethodNotFound. - ErrNotHandled = errors.New("JSON RPC not handled") -) +// ErrNotHandled is returned from a Handler or Preempter to indicate it did +// not handle the request. +// +// If a Handler returns ErrNotHandled, the server replies with +// ErrMethodNotFound. +var ErrNotHandled = errors.New("JSON RPC not handled") // Preempter handles messages on a connection before they are queued to the main // handler. @@ -40,15 +35,6 @@ type Preempter interface { Preempt(ctx context.Context, req *Request) (result any, err error) } -// A PreempterFunc implements the Preempter interface for a standalone Preempt function. -type PreempterFunc func(ctx context.Context, req *Request) (any, error) - -func (f PreempterFunc) Preempt(ctx context.Context, req *Request) (any, error) { - return f(ctx, req) -} - -var _ Preempter = PreempterFunc(nil) - // Handler handles messages on a connection. type Handler interface { // Handle is invoked sequentially for each incoming request that has not @@ -67,16 +53,6 @@ type Handler interface { Handle(ctx context.Context, req *Request) (result any, err error) } -type defaultHandler struct{} - -func (defaultHandler) Preempt(context.Context, *Request) (any, error) { - return nil, ErrNotHandled -} - -func (defaultHandler) Handle(context.Context, *Request) (any, error) { - return nil, ErrNotHandled -} - // A HandlerFunc implements the Handler interface for a standalone Handle function. type HandlerFunc func(ctx context.Context, req *Request) (any, error) @@ -85,37 +61,3 @@ func (f HandlerFunc) Handle(ctx context.Context, req *Request) (any, error) { } var _ Handler = HandlerFunc(nil) - -// async is a small helper for operations with an asynchronous result that you -// can wait for. -type async struct { - ready chan struct{} // closed when done - firstErr chan error // 1-buffered; contains either nil or the first non-nil error -} - -func newAsync() *async { - var a async - a.ready = make(chan struct{}) - a.firstErr = make(chan error, 1) - a.firstErr <- nil - return &a -} - -func (a *async) done() { - close(a.ready) -} - -func (a *async) wait() error { - <-a.ready - err := <-a.firstErr - a.firstErr <- err - return err -} - -func (a *async) setError(err error) { - storedErr := <-a.firstErr - if storedErr == nil { - storedErr = err - } - a.firstErr <- storedErr -} diff --git a/internal/jsonrpc2/jsonrpc2_test.go b/internal/jsonrpc2/jsonrpc2_test.go deleted file mode 100644 index 8c79300c..00000000 --- a/internal/jsonrpc2/jsonrpc2_test.go +++ /dev/null @@ -1,385 +0,0 @@ -// Copyright 2018 The Go MCP SDK Authors. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package jsonrpc2_test - -import ( - "context" - "encoding/json" - "fmt" - "path" - "reflect" - "testing" - "time" - - "github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2" -) - -var callTests = []invoker{ - call{"no_args", nil, true}, - call{"one_string", "fish", "got:fish"}, - call{"one_number", 10, "got:10"}, - call{"join", []string{"a", "b", "c"}, "a/b/c"}, - sequence{"notify", []invoker{ - notify{"set", 3}, - notify{"add", 5}, - call{"get", nil, 8}, - }}, - sequence{"preempt", []invoker{ - async{"a", "wait", "a"}, - notify{"unblock", "a"}, - collect{"a", true, false}, - }}, - sequence{"basic cancel", []invoker{ - async{"b", "wait", "b"}, - cancel{"b"}, - collect{"b", nil, true}, - }}, - sequence{"queue", []invoker{ - async{"a", "wait", "a"}, - notify{"set", 1}, - notify{"add", 2}, - notify{"add", 3}, - notify{"add", 4}, - call{"peek", nil, 0}, // accumulator will not have any adds yet - notify{"unblock", "a"}, - collect{"a", true, false}, - call{"get", nil, 10}, // accumulator now has all the adds - }}, - sequence{"fork", []invoker{ - async{"a", "fork", "a"}, - notify{"set", 1}, - notify{"add", 2}, - notify{"add", 3}, - notify{"add", 4}, - call{"get", nil, 10}, // fork will not have blocked the adds - notify{"unblock", "a"}, - collect{"a", true, false}, - }}, - sequence{"concurrent", []invoker{ - async{"a", "fork", "a"}, - notify{"unblock", "a"}, - async{"b", "fork", "b"}, - notify{"unblock", "b"}, - collect{"a", true, false}, - collect{"b", true, false}, - }}, -} - -type binder struct { - framer jsonrpc2.Framer - runTest func(*handler) -} - -type handler struct { - conn *jsonrpc2.Connection - accumulator int - waiters chan map[string]chan struct{} - calls map[string]*jsonrpc2.AsyncCall -} - -type invoker interface { - Name() string - Invoke(t *testing.T, ctx context.Context, h *handler) -} - -type notify struct { - method string - params any -} - -type call struct { - method string - params any - expect any -} - -type async struct { - name string - method string - params any -} - -type collect struct { - name string - expect any - fails bool -} - -type cancel struct { - name string -} - -type sequence struct { - name string - tests []invoker -} - -type echo call - -type cancelParams struct{ ID int64 } - -func TestConnectionRaw(t *testing.T) { - testConnection(t, jsonrpc2.RawFramer()) -} - -func TestConnectionHeader(t *testing.T) { - testConnection(t, jsonrpc2.HeaderFramer()) -} - -func testConnection(t *testing.T, framer jsonrpc2.Framer) { - ctx := context.Background() - listener, err := jsonrpc2.NetPipeListener(ctx) - if err != nil { - t.Fatal(err) - } - server := jsonrpc2.NewServer(ctx, listener, binder{framer, nil}) - defer func() { - listener.Close() - server.Wait() - }() - - for _, test := range callTests { - t.Run(test.Name(), func(t *testing.T) { - client, err := jsonrpc2.Dial(ctx, - listener.Dialer(), binder{framer, func(h *handler) { - // Sleep a little to a void a race with setting conn.writer in jsonrpc2.bindConnection. - time.Sleep(50 * time.Millisecond) - defer h.conn.Close() - test.Invoke(t, ctx, h) - if call, ok := test.(*call); ok { - // also run all simple call tests in echo mode - (*echo)(call).Invoke(t, ctx, h) - } - }}, nil) - if err != nil { - t.Fatal(err) - } - client.Wait() - }) - } -} - -func (test notify) Name() string { return test.method } -func (test notify) Invoke(t *testing.T, ctx context.Context, h *handler) { - if err := h.conn.Notify(ctx, test.method, test.params); err != nil { - t.Fatalf("%v:Notify failed: %v", test.method, err) - } -} - -func (test call) Name() string { return test.method } -func (test call) Invoke(t *testing.T, ctx context.Context, h *handler) { - results := newResults(test.expect) - if err := h.conn.Call(ctx, test.method, test.params).Await(ctx, results); err != nil { - t.Fatalf("%v:Call failed: %v", test.method, err) - } - verifyResults(t, test.method, results, test.expect) -} - -func (test echo) Invoke(t *testing.T, ctx context.Context, h *handler) { - results := newResults(test.expect) - if err := h.conn.Call(ctx, "echo", []any{test.method, test.params}).Await(ctx, results); err != nil { - t.Fatalf("%v:Echo failed: %v", test.method, err) - } - verifyResults(t, test.method, results, test.expect) -} - -func (test async) Name() string { return test.name } -func (test async) Invoke(t *testing.T, ctx context.Context, h *handler) { - h.calls[test.name] = h.conn.Call(ctx, test.method, test.params) -} - -func (test collect) Name() string { return test.name } -func (test collect) Invoke(t *testing.T, ctx context.Context, h *handler) { - o := h.calls[test.name] - results := newResults(test.expect) - err := o.Await(ctx, results) - switch { - case test.fails && err == nil: - t.Fatalf("%v:Collect was supposed to fail", test.name) - case !test.fails && err != nil: - t.Fatalf("%v:Collect failed: %v", test.name, err) - } - verifyResults(t, test.name, results, test.expect) -} - -func (test cancel) Name() string { return test.name } -func (test cancel) Invoke(t *testing.T, ctx context.Context, h *handler) { - o := h.calls[test.name] - if err := h.conn.Notify(ctx, "cancel", &cancelParams{o.ID().Raw().(int64)}); err != nil { - t.Fatalf("%v:Collect failed: %v", test.name, err) - } -} - -func (test sequence) Name() string { return test.name } -func (test sequence) Invoke(t *testing.T, ctx context.Context, h *handler) { - for _, child := range test.tests { - child.Invoke(t, ctx, h) - } -} - -// newResults makes a new empty copy of the expected type to put the results into -func newResults(expect any) any { - switch e := expect.(type) { - case []any: - var r []any - for _, v := range e { - r = append(r, reflect.New(reflect.TypeOf(v)).Interface()) - } - return r - case nil: - return nil - default: - return reflect.New(reflect.TypeOf(expect)).Interface() - } -} - -// verifyResults compares the results to the expected values -func verifyResults(t *testing.T, method string, results any, expect any) { - if expect == nil { - if results != nil { - t.Errorf("%v:Got results %+v where none expected", method, expect) - } - return - } - val := reflect.Indirect(reflect.ValueOf(results)).Interface() - if !reflect.DeepEqual(val, expect) { - t.Errorf("%v:Results are incorrect, got %+v expect %+v", method, val, expect) - } -} - -func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) jsonrpc2.ConnectionOptions { - h := &handler{ - conn: conn, - waiters: make(chan map[string]chan struct{}, 1), - calls: make(map[string]*jsonrpc2.AsyncCall), - } - h.waiters <- make(map[string]chan struct{}) - if b.runTest != nil { - go b.runTest(h) - } - return jsonrpc2.ConnectionOptions{ - Framer: b.framer, - Preempter: h, - Handler: h, - } -} - -func (h *handler) waiter(name string) chan struct{} { - waiters := <-h.waiters - defer func() { h.waiters <- waiters }() - waiter, found := waiters[name] - if !found { - waiter = make(chan struct{}) - waiters[name] = waiter - } - return waiter -} - -func (h *handler) Preempt(ctx context.Context, req *jsonrpc2.Request) (any, error) { - switch req.Method { - case "unblock": - var name string - if err := json.Unmarshal(req.Params, &name); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - close(h.waiter(name)) - return nil, nil - case "peek": - if len(req.Params) > 0 { - return nil, fmt.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams) - } - return h.accumulator, nil - case "cancel": - var params cancelParams - if err := json.Unmarshal(req.Params, ¶ms); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - h.conn.Cancel(jsonrpc2.Int64ID(params.ID)) - return nil, nil - default: - return nil, jsonrpc2.ErrNotHandled - } -} - -func (h *handler) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error) { - switch req.Method { - case "no_args": - if len(req.Params) > 0 { - return nil, fmt.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams) - } - return true, nil - case "one_string": - var v string - if err := json.Unmarshal(req.Params, &v); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - return "got:" + v, nil - case "one_number": - var v int - if err := json.Unmarshal(req.Params, &v); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - return fmt.Sprintf("got:%d", v), nil - case "set": - var v int - if err := json.Unmarshal(req.Params, &v); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - h.accumulator = v - return nil, nil - case "add": - var v int - if err := json.Unmarshal(req.Params, &v); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - h.accumulator += v - return nil, nil - case "get": - if len(req.Params) > 0 { - return nil, fmt.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams) - } - return h.accumulator, nil - case "join": - var v []string - if err := json.Unmarshal(req.Params, &v); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - return path.Join(v...), nil - case "echo": - var v []any - if err := json.Unmarshal(req.Params, &v); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - var result any - err := h.conn.Call(ctx, v[0].(string), v[1]).Await(ctx, &result) - return result, err - case "wait": - var name string - if err := json.Unmarshal(req.Params, &name); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - select { - case <-h.waiter(name): - return true, nil - case <-ctx.Done(): - return nil, ctx.Err() - } - case "fork": - var name string - if err := json.Unmarshal(req.Params, &name); err != nil { - return nil, fmt.Errorf("%w: %s", jsonrpc2.ErrParse, err) - } - jsonrpc2.Async(ctx) - waitFor := h.waiter(name) - select { - case <-waitFor: - return true, nil - case <-ctx.Done(): - return nil, ctx.Err() - } - default: - return nil, jsonrpc2.ErrNotHandled - } -} diff --git a/internal/jsonrpc2/net.go b/internal/jsonrpc2/net.go deleted file mode 100644 index 05db0626..00000000 --- a/internal/jsonrpc2/net.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2018 The Go MCP SDK Authors. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package jsonrpc2 - -import ( - "context" - "io" - "net" - "os" -) - -// This file contains implementations of the transport primitives that use the standard network -// package. - -// NetListenOptions is the optional arguments to the NetListen function. -type NetListenOptions struct { - NetListenConfig net.ListenConfig - NetDialer net.Dialer -} - -// NetListener returns a new Listener that listens on a socket using the net package. -func NetListener(ctx context.Context, network, address string, options NetListenOptions) (Listener, error) { - ln, err := options.NetListenConfig.Listen(ctx, network, address) - if err != nil { - return nil, err - } - return &netListener{net: ln}, nil -} - -// netListener is the implementation of Listener for connections made using the net package. -type netListener struct { - net net.Listener -} - -// Accept blocks waiting for an incoming connection to the listener. -func (l *netListener) Accept(context.Context) (io.ReadWriteCloser, error) { - return l.net.Accept() -} - -// Close will cause the listener to stop listening. It will not close any connections that have -// already been accepted. -func (l *netListener) Close() error { - addr := l.net.Addr() - err := l.net.Close() - if addr.Network() == "unix" { - rerr := os.Remove(addr.String()) - if rerr != nil && err == nil { - err = rerr - } - } - return err -} - -// Dialer returns a dialer that can be used to connect to the listener. -func (l *netListener) Dialer() Dialer { - return NetDialer(l.net.Addr().Network(), l.net.Addr().String(), net.Dialer{}) -} - -// NetDialer returns a Dialer using the supplied standard network dialer. -func NetDialer(network, address string, nd net.Dialer) Dialer { - return &netDialer{ - network: network, - address: address, - dialer: nd, - } -} - -type netDialer struct { - network string - address string - dialer net.Dialer -} - -func (n *netDialer) Dial(ctx context.Context) (io.ReadWriteCloser, error) { - return n.dialer.DialContext(ctx, n.network, n.address) -} - -// NetPipeListener returns a new Listener that listens using net.Pipe. -// It is only possibly to connect to it using the Dialer returned by the -// Dialer method, each call to that method will generate a new pipe the other -// side of which will be returned from the Accept call. -func NetPipeListener(ctx context.Context) (Listener, error) { - return &netPiper{ - done: make(chan struct{}), - dialed: make(chan io.ReadWriteCloser), - }, nil -} - -// netPiper is the implementation of Listener build on top of net.Pipes. -type netPiper struct { - done chan struct{} - dialed chan io.ReadWriteCloser -} - -// Accept blocks waiting for an incoming connection to the listener. -func (l *netPiper) Accept(context.Context) (io.ReadWriteCloser, error) { - // Block until the pipe is dialed or the listener is closed, - // preferring the latter if already closed at the start of Accept. - select { - case <-l.done: - return nil, net.ErrClosed - default: - } - select { - case rwc := <-l.dialed: - return rwc, nil - case <-l.done: - return nil, net.ErrClosed - } -} - -// Close will cause the listener to stop listening. It will not close any connections that have -// already been accepted. -func (l *netPiper) Close() error { - // unblock any accept calls that are pending - close(l.done) - return nil -} - -func (l *netPiper) Dialer() Dialer { - return l -} - -func (l *netPiper) Dial(ctx context.Context) (io.ReadWriteCloser, error) { - client, server := net.Pipe() - - select { - case l.dialed <- server: - return client, nil - - case <-l.done: - client.Close() - server.Close() - return nil, net.ErrClosed - } -} diff --git a/internal/jsonrpc2/serve.go b/internal/jsonrpc2/serve.go deleted file mode 100644 index 424163aa..00000000 --- a/internal/jsonrpc2/serve.go +++ /dev/null @@ -1,330 +0,0 @@ -// Copyright 2020 The Go MCP SDK Authors. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package jsonrpc2 - -import ( - "context" - "fmt" - "io" - "runtime" - "sync" - "sync/atomic" - "time" -) - -// Listener is implemented by protocols to accept new inbound connections. -type Listener interface { - // Accept accepts an inbound connection to a server. - // It blocks until either an inbound connection is made, or the listener is closed. - Accept(context.Context) (io.ReadWriteCloser, error) - - // Close closes the listener. - // Any blocked Accept or Dial operations will unblock and return errors. - Close() error - - // Dialer returns a dialer that can be used to connect to this listener - // locally. - // If a listener does not implement this it will return nil. - Dialer() Dialer -} - -// Dialer is used by clients to dial a server. -type Dialer interface { - // Dial returns a new communication byte stream to a listening server. - Dial(ctx context.Context) (io.ReadWriteCloser, error) -} - -// Server is a running server that is accepting incoming connections. -type Server struct { - listener Listener - binder Binder - async *async - - shutdownOnce sync.Once - closing int32 // atomic: set to nonzero when Shutdown is called -} - -// Dial uses the dialer to make a new connection, wraps the returned -// reader and writer using the framer to make a stream, and then builds -// a connection on top of that stream using the binder. -// -// The returned Connection will operate independently using the Preempter and/or -// Handler provided by the Binder, and will release its own resources when the -// connection is broken, but the caller may Close it earlier to stop accepting -// (or sending) new requests. -// -// If non-nil, the onDone function is called when the connection is closed. -func Dial(ctx context.Context, dialer Dialer, binder Binder, onDone func()) (*Connection, error) { - // dial a server - rwc, err := dialer.Dial(ctx) - if err != nil { - return nil, err - } - return bindConnection(ctx, rwc, binder, onDone), nil -} - -// NewServer starts a new server listening for incoming connections and returns -// it. -// This returns a fully running and connected server, it does not block on -// the listener. -// You can call Wait to block on the server, or Shutdown to get the sever to -// terminate gracefully. -// To notice incoming connections, use an intercepting Binder. -func NewServer(ctx context.Context, listener Listener, binder Binder) *Server { - server := &Server{ - listener: listener, - binder: binder, - async: newAsync(), - } - go server.run(ctx) - return server -} - -// Wait returns only when the server has shut down. -func (s *Server) Wait() error { - return s.async.wait() -} - -// Shutdown informs the server to stop accepting new connections. -func (s *Server) Shutdown() { - s.shutdownOnce.Do(func() { - atomic.StoreInt32(&s.closing, 1) - s.listener.Close() - }) -} - -// run accepts incoming connections from the listener, -// If IdleTimeout is non-zero, run exits after there are no clients for this -// duration, otherwise it exits only on error. -func (s *Server) run(ctx context.Context) { - defer s.async.done() - - var activeConns sync.WaitGroup - for { - rwc, err := s.listener.Accept(ctx) - if err != nil { - // Only Shutdown closes the listener. If we get an error after Shutdown is - // called, assume that was the cause and don't report the error; - // otherwise, report the error in case it is unexpected. - if atomic.LoadInt32(&s.closing) == 0 { - s.async.setError(err) - } - // We are done generating new connections for good. - break - } - - // A new inbound connection. - activeConns.Add(1) - _ = bindConnection(ctx, rwc, s.binder, activeConns.Done) // unregisters itself when done - } - activeConns.Wait() -} - -// NewIdleListener wraps a listener with an idle timeout. -// -// When there are no active connections for at least the timeout duration, -// calls to Accept will fail with ErrIdleTimeout. -// -// A connection is considered inactive as soon as its Close method is called. -func NewIdleListener(timeout time.Duration, wrap Listener) Listener { - l := &idleListener{ - wrapped: wrap, - timeout: timeout, - active: make(chan int, 1), - timedOut: make(chan struct{}), - idleTimer: make(chan *time.Timer, 1), - } - l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired) - return l -} - -type idleListener struct { - wrapped Listener - timeout time.Duration - - // Only one of these channels is receivable at any given time. - active chan int // count of active connections; closed when Close is called if not timed out - timedOut chan struct{} // closed when the idle timer expires - idleTimer chan *time.Timer // holds the timer only when idle -} - -// Accept accepts an incoming connection. -// -// If an incoming connection is accepted concurrent to the listener being closed -// due to idleness, the new connection is immediately closed. -func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) { - rwc, err := l.wrapped.Accept(ctx) - - select { - case n, ok := <-l.active: - if err != nil { - if ok { - l.active <- n - } - return nil, err - } - if ok { - l.active <- n + 1 - } else { - // l.wrapped.Close Close has been called, but Accept returned a - // connection. This race can occur with concurrent Accept and Close calls - // with any net.Listener, and it is benign: since the listener was closed - // explicitly, it can't have also timed out. - } - return l.newConn(rwc), nil - - case <-l.timedOut: - if err == nil { - // Keeping the connection open would leave the listener simultaneously - // active and closed due to idleness, which would be contradictory and - // confusing. Close the connection and pretend that it never happened. - rwc.Close() - } else { - // In theory the timeout could have raced with an unrelated error return - // from Accept. However, ErrIdleTimeout is arguably still valid (since we - // would have closed due to the timeout independent of the error), and the - // harm from returning a spurious ErrIdleTimeout is negligible anyway. - } - return nil, ErrIdleTimeout - - case timer := <-l.idleTimer: - if err != nil { - // The idle timer doesn't run until it receives itself from the idleTimer - // channel, so it can't have called l.wrapped.Close yet and thus err can't - // be ErrIdleTimeout. Leave the idle timer as it was and return whatever - // error we got. - l.idleTimer <- timer - return nil, err - } - - if !timer.Stop() { - // Failed to stop the timer — the timer goroutine is in the process of - // firing. Send the timer back to the timer goroutine so that it can - // safely close the timedOut channel, and then wait for the listener to - // actually be closed before we return ErrIdleTimeout. - l.idleTimer <- timer - rwc.Close() - <-l.timedOut - return nil, ErrIdleTimeout - } - - l.active <- 1 - return l.newConn(rwc), nil - } -} - -func (l *idleListener) Close() error { - select { - case _, ok := <-l.active: - if ok { - close(l.active) - } - - case <-l.timedOut: - // Already closed by the timer; take care not to double-close if the caller - // only explicitly invokes this Close method once, since the io.Closer - // interface explicitly leaves doubled Close calls undefined. - return ErrIdleTimeout - - case timer := <-l.idleTimer: - if !timer.Stop() { - // Couldn't stop the timer. It shouldn't take long to run, so just wait - // (so that the Listener is guaranteed to be closed before we return) - // and pretend that this call happened afterward. - // That way we won't leak any timers or goroutines when Close returns. - l.idleTimer <- timer - <-l.timedOut - return ErrIdleTimeout - } - close(l.active) - } - - return l.wrapped.Close() -} - -func (l *idleListener) Dialer() Dialer { - return l.wrapped.Dialer() -} - -func (l *idleListener) timerExpired() { - select { - case n, ok := <-l.active: - if ok { - panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active", n)) - } else { - panic("jsonrpc2: Close finished with idle timer still running") - } - - case <-l.timedOut: - panic("jsonrpc2: idleListener idle timer fired more than once") - - case <-l.idleTimer: - // The timer for this very call! - } - - // Close the Listener with all channels still blocked to ensure that this call - // to l.wrapped.Close doesn't race with the one in l.Close. - defer close(l.timedOut) - l.wrapped.Close() -} - -func (l *idleListener) connClosed() { - select { - case n, ok := <-l.active: - if !ok { - // l is already closed, so it can't close due to idleness, - // and we don't need to track the number of active connections any more. - return - } - n-- - if n == 0 { - l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired) - } else { - l.active <- n - } - - case <-l.timedOut: - panic("jsonrpc2: idleListener idle timer fired before last active connection was closed") - - case <-l.idleTimer: - panic("jsonrpc2: idleListener idle timer active before last active connection was closed") - } -} - -type idleListenerConn struct { - wrapped io.ReadWriteCloser - l *idleListener - closeOnce sync.Once -} - -func (l *idleListener) newConn(rwc io.ReadWriteCloser) *idleListenerConn { - c := &idleListenerConn{ - wrapped: rwc, - l: l, - } - - // A caller that forgets to call Close may disrupt the idleListener's - // accounting, even though the file descriptor for the underlying connection - // may eventually be garbage-collected anyway. - // - // Set a (best-effort) finalizer to verify that a Close call always occurs. - // (We will clear the finalizer explicitly in Close.) - runtime.SetFinalizer(c, func(c *idleListenerConn) { - panic("jsonrpc2: IdleListener connection became unreachable without a call to Close") - }) - - return c -} - -func (c *idleListenerConn) Read(p []byte) (int, error) { return c.wrapped.Read(p) } -func (c *idleListenerConn) Write(p []byte) (int, error) { return c.wrapped.Write(p) } - -func (c *idleListenerConn) Close() error { - defer c.closeOnce.Do(func() { - c.l.connClosed() - runtime.SetFinalizer(c, nil) - }) - return c.wrapped.Close() -} diff --git a/internal/jsonrpc2/serve_test.go b/internal/jsonrpc2/serve_test.go deleted file mode 100644 index 282a6849..00000000 --- a/internal/jsonrpc2/serve_test.go +++ /dev/null @@ -1,354 +0,0 @@ -// Copyright 2020 The Go MCP SDK Authors. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package jsonrpc2_test - -import ( - "context" - "errors" - "fmt" - "runtime" - "runtime/debug" - "testing" - "time" - - "github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2" -) - -// needsLocalhostNet skips t if networking does not work for ports opened -// with "localhost". -// forked from golang.org/x/tools/internal/testenv. -func needsLocalhostNet(t testing.TB) { - switch runtime.GOOS { - case "js", "wasip1": - t.Skipf(`Listening on "localhost" fails on %s; see https://go.dev/issue/59718`, runtime.GOOS) - } -} - -func TestIdleTimeout(t *testing.T) { - needsLocalhostNet(t) - - // Use a panicking time.AfterFunc instead of context.WithTimeout so that we - // get a goroutine dump on failure. We expect the test to take on the order of - // a few tens of milliseconds at most, so 10s should be several orders of - // magnitude of headroom. - timer := time.AfterFunc(10*time.Second, func() { - debug.SetTraceback("all") - panic("TestIdleTimeout deadlocked") - }) - defer timer.Stop() - - ctx := context.Background() - - try := func(d time.Duration) (longEnough bool) { - listener, err := jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{}) - if err != nil { - t.Fatal(err) - } - - idleStart := time.Now() - listener = jsonrpc2.NewIdleListener(d, listener) - defer listener.Close() - - server := jsonrpc2.NewServer(ctx, listener, jsonrpc2.ConnectionOptions{}) - - // Exercise some connection/disconnection patterns, and then assert that when - // our timer fires, the server exits. - conn1, err := jsonrpc2.Dial(ctx, listener.Dialer(), jsonrpc2.ConnectionOptions{}, nil) - if err != nil { - if since := time.Since(idleStart); since < d { - t.Fatalf("conn1 failed to connect after %v: %v", since, err) - } - t.Log("jsonrpc2.Dial:", err) - return false // Took to long to dial, so the failure could have been due to the idle timeout. - } - // On the server side, Accept can race with the connection timing out. - // Send a call and wait for the response to ensure that the connection was - // actually fully accepted. - ac := conn1.Call(ctx, "ping", nil) - if err := ac.Await(ctx, nil); !errors.Is(err, jsonrpc2.ErrMethodNotFound) { - if since := time.Since(idleStart); since < d { - t.Fatalf("conn1 broken after %v: %v", since, err) - } - t.Log(`conn1.Call(ctx, "ping", nil):`, err) - conn1.Close() - return false - } - - // Since conn1 was successfully accepted and remains open, the server is - // definitely non-idle. Dialing another simultaneous connection should - // succeed. - conn2, err := jsonrpc2.Dial(ctx, listener.Dialer(), jsonrpc2.ConnectionOptions{}, nil) - if err != nil { - conn1.Close() - t.Fatalf("conn2 failed to connect while non-idle after %v: %v", time.Since(idleStart), err) - return false - } - // Ensure that conn2 is also accepted on the server side before we close - // conn1. Otherwise, the connection can appear idle if the server processes - // the closure of conn1 and the idle timeout before it finally notices conn2 - // in the accept queue. - // (That failure mode may explain the failure noted in - // https://go.dev/issue/49387#issuecomment-1303979877.) - ac = conn2.Call(ctx, "ping", nil) - if err := ac.Await(ctx, nil); !errors.Is(err, jsonrpc2.ErrMethodNotFound) { - t.Fatalf("conn2 broken while non-idle after %v: %v", time.Since(idleStart), err) - } - - if err := conn1.Close(); err != nil { - t.Fatalf("conn1.Close failed with error: %v", err) - } - idleStart = time.Now() - if err := conn2.Close(); err != nil { - t.Fatalf("conn2.Close failed with error: %v", err) - } - - conn3, err := jsonrpc2.Dial(ctx, listener.Dialer(), jsonrpc2.ConnectionOptions{}, nil) - if err != nil { - if since := time.Since(idleStart); since < d { - t.Fatalf("conn3 failed to connect after %v: %v", since, err) - } - t.Log("jsonrpc2.Dial:", err) - return false // Took to long to dial, so the failure could have been due to the idle timeout. - } - - ac = conn3.Call(ctx, "ping", nil) - if err := ac.Await(ctx, nil); !errors.Is(err, jsonrpc2.ErrMethodNotFound) { - if since := time.Since(idleStart); since < d { - t.Fatalf("conn3 broken after %v: %v", since, err) - } - t.Log(`conn3.Call(ctx, "ping", nil):`, err) - conn3.Close() - return false - } - - idleStart = time.Now() - if err := conn3.Close(); err != nil { - t.Fatalf("conn3.Close failed with error: %v", err) - } - - serverError := server.Wait() - - if !errors.Is(serverError, jsonrpc2.ErrIdleTimeout) { - t.Errorf("run() returned error %v, want %v", serverError, jsonrpc2.ErrIdleTimeout) - } - if since := time.Since(idleStart); since < d { - t.Errorf("server shut down after %v idle; want at least %v", since, d) - } - return true - } - - d := 1 * time.Millisecond - for { - t.Logf("testing with idle timeout %v", d) - if !try(d) { - d *= 2 - continue - } - break - } -} - -type msg struct { - Msg string -} - -type fakeHandler struct{} - -func (fakeHandler) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error) { - switch req.Method { - case "ping": - return &msg{"pong"}, nil - default: - return nil, jsonrpc2.ErrNotHandled - } -} - -func TestServe(t *testing.T) { - ctx := context.Background() - - tests := []struct { - name string - factory func(context.Context, testing.TB) (jsonrpc2.Listener, error) - }{ - {"tcp", func(ctx context.Context, t testing.TB) (jsonrpc2.Listener, error) { - needsLocalhostNet(t) - return jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{}) - }}, - {"pipe", func(ctx context.Context, t testing.TB) (jsonrpc2.Listener, error) { - return jsonrpc2.NetPipeListener(ctx) - }}, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fake, err := test.factory(ctx, t) - if err != nil { - t.Fatal(err) - } - conn, shutdown, err := newFake(t, ctx, fake) - if err != nil { - t.Fatal(err) - } - defer shutdown() - var got msg - if err := conn.Call(ctx, "ping", &msg{"ting"}).Await(ctx, &got); err != nil { - t.Fatal(err) - } - if want := "pong"; got.Msg != want { - t.Errorf("conn.Call(...): returned %q, want %q", got, want) - } - }) - } -} - -func newFake(t *testing.T, ctx context.Context, l jsonrpc2.Listener) (*jsonrpc2.Connection, func(), error) { - server := jsonrpc2.NewServer(ctx, l, jsonrpc2.ConnectionOptions{ - Handler: fakeHandler{}, - }) - - client, err := jsonrpc2.Dial(ctx, - l.Dialer(), - jsonrpc2.ConnectionOptions{ - Handler: fakeHandler{}, - }, nil) - if err != nil { - return nil, nil, err - } - return client, func() { - if err := l.Close(); err != nil { - t.Fatal(err) - } - if err := client.Close(); err != nil { - t.Fatal(err) - } - server.Wait() - }, nil -} - -// TestIdleListenerAcceptCloseRace checks for the Accept/Close race fixed in CL 388597. -// -// (A bug in the idleListener implementation caused a successful Accept to block -// on sending to a background goroutine that could have already exited.) -func TestIdleListenerAcceptCloseRace(t *testing.T) { - ctx := context.Background() - - n := 10 - - // Each iteration of the loop appears to take around a millisecond, so to - // avoid spurious failures we'll set the watchdog for three orders of - // magnitude longer. When the bug was present, this reproduced the deadlock - // reliably on a Linux workstation when run with -count=100, which should be - // frequent enough to show up on the Go build dashboard if it regresses. - watchdog := time.Duration(n) * 1000 * time.Millisecond - timer := time.AfterFunc(watchdog, func() { - debug.SetTraceback("all") - panic(fmt.Sprintf("%s deadlocked after %v", t.Name(), watchdog)) - }) - defer timer.Stop() - - for ; n > 0; n-- { - listener, err := jsonrpc2.NetPipeListener(ctx) - if err != nil { - t.Fatal(err) - } - listener = jsonrpc2.NewIdleListener(24*time.Hour, listener) - - done := make(chan struct{}) - go func() { - conn, err := jsonrpc2.Dial(ctx, listener.Dialer(), jsonrpc2.ConnectionOptions{}, nil) - listener.Close() - if err == nil { - conn.Close() - } - close(done) - }() - - // Accept may return a non-nil error if Close closes the underlying network - // connection before the wrapped Accept call unblocks. However, it must not - // deadlock! - c, err := listener.Accept(ctx) - if err == nil { - c.Close() - } - <-done - } -} - -// TestCloseCallRace checks for a race resulting in a deadlock when a Call on -// one side of the connection races with a Close (or otherwise broken -// connection) initiated from the other side. -// -// (The Call method was waiting for a result from the Read goroutine to -// determine which error value to return, but the Read goroutine was waiting for -// in-flight calls to complete before reporting that result.) -func TestCloseCallRace(t *testing.T) { - ctx := context.Background() - n := 10 - - watchdog := time.Duration(n) * 1000 * time.Millisecond - timer := time.AfterFunc(watchdog, func() { - debug.SetTraceback("all") - panic(fmt.Sprintf("%s deadlocked after %v", t.Name(), watchdog)) - }) - defer timer.Stop() - - for ; n > 0; n-- { - listener, err := jsonrpc2.NetPipeListener(ctx) - if err != nil { - t.Fatal(err) - } - - pokec := make(chan *jsonrpc2.AsyncCall, 1) - - s := jsonrpc2.NewServer(ctx, listener, jsonrpc2.BinderFunc(func(_ context.Context, srvConn *jsonrpc2.Connection) jsonrpc2.ConnectionOptions { - h := jsonrpc2.HandlerFunc(func(ctx context.Context, _ *jsonrpc2.Request) (any, error) { - // Start a concurrent call from the server to the client. - // The point of this test is to ensure this doesn't deadlock - // if the client shuts down the connection concurrently. - // - // The racing Call may or may not receive a response: it should get a - // response if it is sent before the client closes the connection, and - // it should fail with some kind of "connection closed" error otherwise. - go func() { - pokec <- srvConn.Call(ctx, "poke", nil) - }() - - return &msg{"pong"}, nil - }) - return jsonrpc2.ConnectionOptions{Handler: h} - })) - - dialConn, err := jsonrpc2.Dial(ctx, listener.Dialer(), jsonrpc2.ConnectionOptions{}, nil) - if err != nil { - listener.Close() - s.Wait() - t.Fatal(err) - } - - // Calling any method on the server should provoke it to asynchronously call - // us back. While it is starting that call, we will close the connection. - if err := dialConn.Call(ctx, "ping", nil).Await(ctx, nil); err != nil { - t.Error(err) - } - if err := dialConn.Close(); err != nil { - t.Error(err) - } - - // Ensure that the Call on the server side did not block forever when the - // connection closed. - pokeCall := <-pokec - if err := pokeCall.Await(ctx, nil); err == nil { - t.Errorf("unexpected nil error from server-initited call") - } else if errors.Is(err, jsonrpc2.ErrMethodNotFound) { - // The call completed before the Close reached the handler. - } else { - // The error was something else. - t.Logf("server-initiated call completed with expected error: %v", err) - } - - listener.Close() - s.Wait() - } -} diff --git a/internal/jsonrpc2/wire.go b/internal/jsonrpc2/wire.go index c0a41bff..4d123f2c 100644 --- a/internal/jsonrpc2/wire.go +++ b/internal/jsonrpc2/wire.go @@ -28,9 +28,6 @@ var ( // The following errors are not part of the json specification, but // compliant extensions specific to this implementation. - // ErrServerOverloaded is returned when a message was refused due to a - // server being temporarily unable to accept any new messages. - ErrServerOverloaded = NewError(-32000, "overloaded") // ErrUnknown should be used for all non coded errors. ErrUnknown = NewError(-32001, "unknown error") // ErrServerClosing is returned for calls that arrive while the server is closing.