Skip to content

Commit 8befc65

Browse files
committed
internal/jsonrpc2_v2: refactor Reader/Writer and remove event dependency
In preparation for extracting the jsonrpc2_v2 package, update the Reader and Writer interfaces to not report byte counts, and remove the internal/event dependency. Change-Id: Ie6b373736fe297c8427cd4ece7ddad33e275864b Reviewed-on: https://go-review.googlesource.com/c/tools/+/678956 Reviewed-by: Alan Donovan <adonovan@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
1 parent 5e00265 commit 8befc65

File tree

6 files changed

+102
-167
lines changed

6 files changed

+102
-167
lines changed

internal/jsonrpc2_v2/conn.go

Lines changed: 11 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ import (
1313
"sync"
1414
"sync/atomic"
1515
"time"
16-
17-
"golang.org/x/tools/internal/event"
18-
"golang.org/x/tools/internal/event/keys"
19-
"golang.org/x/tools/internal/event/label"
20-
"golang.org/x/tools/internal/jsonrpc2"
2116
)
2217

2318
// Binder builds a connection configuration.
@@ -193,7 +188,6 @@ type incomingRequest struct {
193188
*Request // the request being processed
194189
ctx context.Context
195190
cancel context.CancelFunc
196-
endSpan func() // called (and set to nil) when the response is sent
197191
}
198192

199193
// Bind returns the options unmodified.
@@ -293,15 +287,9 @@ func (c *Connection) start(ctx context.Context, reader Reader, preempter Preempt
293287
// The params will be marshaled to JSON before sending over the wire, and will
294288
// be handed to the method invoked.
295289
func (c *Connection) Notify(ctx context.Context, method string, params any) (err error) {
296-
ctx, done := event.Start(ctx, method,
297-
jsonrpc2.Method.Of(method),
298-
jsonrpc2.RPCDirection.Of(jsonrpc2.Outbound),
299-
)
300290
attempted := false
301291

302292
defer func() {
303-
labelStatus(ctx, err)
304-
done()
305293
if attempted {
306294
c.updateInFlight(func(s *inFlightState) {
307295
s.outgoingNotifications--
@@ -332,7 +320,6 @@ func (c *Connection) Notify(ctx context.Context, method string, params any) (err
332320
return fmt.Errorf("marshaling notify parameters: %v", err)
333321
}
334322

335-
event.Metric(ctx, jsonrpc2.Started.Of(1))
336323
return c.write(ctx, notify)
337324
}
338325

@@ -344,17 +331,10 @@ func (c *Connection) Notify(ctx context.Context, method string, params any) (err
344331
func (c *Connection) Call(ctx context.Context, method string, params any) *AsyncCall {
345332
// Generate a new request identifier.
346333
id := Int64ID(atomic.AddInt64(&c.seq, 1))
347-
ctx, endSpan := event.Start(ctx, method,
348-
jsonrpc2.Method.Of(method),
349-
jsonrpc2.RPCDirection.Of(jsonrpc2.Outbound),
350-
jsonrpc2.RPCID.Of(fmt.Sprintf("%q", id)),
351-
)
352334

353335
ac := &AsyncCall{
354-
id: id,
355-
ready: make(chan struct{}),
356-
ctx: ctx,
357-
endSpan: endSpan,
336+
id: id,
337+
ready: make(chan struct{}),
358338
}
359339
// When this method returns, either ac is retired, or the request has been
360340
// written successfully and the call is awaiting a response (to be provided by
@@ -381,7 +361,6 @@ func (c *Connection) Call(ctx context.Context, method string, params any) *Async
381361
return ac
382362
}
383363

384-
event.Metric(ctx, jsonrpc2.Started.Of(1))
385364
if err := c.write(ctx, call); err != nil {
386365
// Sending failed. We will never get a response, so deliver a fake one if it
387366
// wasn't already retired by the connection breaking.
@@ -400,10 +379,8 @@ func (c *Connection) Call(ctx context.Context, method string, params any) *Async
400379

401380
type AsyncCall struct {
402381
id ID
403-
ready chan struct{} // closed after response has been set and span has been ended
382+
ready chan struct{} // closed after response has been set
404383
response *Response
405-
ctx context.Context // for event logging only
406-
endSpan func() // close the tracing span when all processing for the message is complete
407384
}
408385

409386
// ID used for this call.
@@ -431,12 +408,6 @@ func (ac *AsyncCall) retire(response *Response) {
431408
}
432409

433410
ac.response = response
434-
labelStatus(ac.ctx, response.Error)
435-
ac.endSpan()
436-
// Allow the trace context, which may retain a lot of reachable values,
437-
// to be garbage-collected.
438-
ac.ctx, ac.endSpan = nil, nil
439-
440411
close(ac.ready)
441412
}
442413

@@ -525,18 +496,15 @@ func (c *Connection) Close() error {
525496
func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter Preempter) {
526497
var err error
527498
for {
528-
var (
529-
msg Message
530-
n int64
531-
)
532-
msg, n, err = reader.Read(ctx)
499+
var msg Message
500+
msg, err = reader.Read(ctx)
533501
if err != nil {
534502
break
535503
}
536504

537505
switch msg := msg.(type) {
538506
case *Request:
539-
c.acceptRequest(ctx, msg, n, preempter)
507+
c.acceptRequest(ctx, msg, preempter)
540508

541509
case *Response:
542510
c.updateInFlight(func(s *inFlightState) {
@@ -568,28 +536,14 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter
568536

569537
// acceptRequest either handles msg synchronously or enqueues it to be handled
570538
// asynchronously.
571-
func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes int64, preempter Preempter) {
572-
// Add a span to the context for this request.
573-
labels := append(make([]label.Label, 0, 3), // Make space for the ID if present.
574-
jsonrpc2.Method.Of(msg.Method),
575-
jsonrpc2.RPCDirection.Of(jsonrpc2.Inbound),
576-
)
577-
if msg.IsCall() {
578-
labels = append(labels, jsonrpc2.RPCID.Of(fmt.Sprintf("%q", msg.ID)))
579-
}
580-
ctx, endSpan := event.Start(ctx, msg.Method, labels...)
581-
event.Metric(ctx,
582-
jsonrpc2.Started.Of(1),
583-
jsonrpc2.ReceivedBytes.Of(msgBytes))
584-
539+
func (c *Connection) acceptRequest(ctx context.Context, msg *Request, preempter Preempter) {
585540
// In theory notifications cannot be cancelled, but we build them a cancel
586541
// context anyway.
587-
ctx, cancel := context.WithCancel(ctx)
542+
reqCtx, cancel := context.WithCancel(ctx)
588543
req := &incomingRequest{
589544
Request: msg,
590-
ctx: ctx,
545+
ctx: reqCtx,
591546
cancel: cancel,
592-
endSpan: endSpan,
593547
}
594548

595549
// If the request is a call, add it to the incoming map so it can be
@@ -722,10 +676,6 @@ func (c *Connection) processResult(from any, req *incomingRequest, result any, e
722676
err = fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method)
723677
}
724678

725-
if req.endSpan == nil {
726-
return c.internalErrorf("%#v produced a duplicate %q Response", from, req.Method)
727-
}
728-
729679
if result != nil && err != nil {
730680
c.internalErrorf("%#v returned a non-nil result with a non-nil error for %s:\n%v\n%#v", from, req.Method, err, result)
731681
result = nil // Discard the spurious result and respond with err.
@@ -761,16 +711,11 @@ func (c *Connection) processResult(from any, req *incomingRequest, result any, e
761711
if err != nil {
762712
// TODO: can/should we do anything with this error beyond writing it to the event log?
763713
// (Is this the right label to attach to the log?)
764-
event.Label(req.ctx, keys.Err.Of(err))
765714
}
766715
}
767716

768-
labelStatus(req.ctx, err)
769-
770-
// Cancel the request and finalize the event span to free any associated resources.
717+
// Cancel the request to free any associated resources.
771718
req.cancel()
772-
req.endSpan()
773-
req.endSpan = nil
774719
c.updateInFlight(func(s *inFlightState) {
775720
if s.incoming == 0 {
776721
panic("jsonrpc2_v2: processResult called when incoming count is already zero")
@@ -785,8 +730,7 @@ func (c *Connection) processResult(from any, req *incomingRequest, result any, e
785730
func (c *Connection) write(ctx context.Context, msg Message) error {
786731
writer := <-c.writer
787732
defer func() { c.writer <- writer }()
788-
n, err := writer.Write(ctx, msg)
789-
event.Metric(ctx, jsonrpc2.SentBytes.Of(n))
733+
err := writer.Write(ctx, msg)
790734

791735
if err != nil && ctx.Err() == nil {
792736
// The call to Write failed, and since ctx.Err() is nil we can't attribute
@@ -823,15 +767,6 @@ func (c *Connection) internalErrorf(format string, args ...any) error {
823767
return fmt.Errorf("%w: %v", ErrInternal, err)
824768
}
825769

826-
// labelStatus labels the status of the event in ctx based on whether err is nil.
827-
func labelStatus(ctx context.Context, err error) {
828-
if err == nil {
829-
event.Label(ctx, jsonrpc2.StatusCode.Of("OK"))
830-
} else {
831-
event.Label(ctx, jsonrpc2.StatusCode.Of("ERROR"))
832-
}
833-
}
834-
835770
// notDone is a context.Context wrapper that returns a nil Done channel.
836771
type notDone struct{ ctx context.Context }
837772

internal/jsonrpc2_v2/frame.go

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
// a single Conn in a safe manner.
2323
type Reader interface {
2424
// Read gets the next message from the stream.
25-
Read(context.Context) (Message, int64, error)
25+
Read(context.Context) (Message, error)
2626
}
2727

2828
// Writer abstracts the transport mechanics from the JSON RPC protocol.
@@ -33,7 +33,7 @@ type Reader interface {
3333
// a single Conn in a safe manner.
3434
type Writer interface {
3535
// Write sends a message to the stream.
36-
Write(context.Context, Message) (int64, error)
36+
Write(context.Context, Message) error
3737
}
3838

3939
// Framer wraps low level byte readers and writers into jsonrpc2 message
@@ -72,32 +72,32 @@ func (rawFramer) Writer(rw io.Writer) Writer {
7272
return &rawWriter{out: rw}
7373
}
7474

75-
func (r *rawReader) Read(ctx context.Context) (Message, int64, error) {
75+
func (r *rawReader) Read(ctx context.Context) (Message, error) {
7676
select {
7777
case <-ctx.Done():
78-
return nil, 0, ctx.Err()
78+
return nil, ctx.Err()
7979
default:
8080
}
8181
var raw json.RawMessage
8282
if err := r.in.Decode(&raw); err != nil {
83-
return nil, 0, err
83+
return nil, err
8484
}
8585
msg, err := DecodeMessage(raw)
86-
return msg, int64(len(raw)), err
86+
return msg, err
8787
}
8888

89-
func (w *rawWriter) Write(ctx context.Context, msg Message) (int64, error) {
89+
func (w *rawWriter) Write(ctx context.Context, msg Message) error {
9090
select {
9191
case <-ctx.Done():
92-
return 0, ctx.Err()
92+
return ctx.Err()
9393
default:
9494
}
9595
data, err := EncodeMessage(msg)
9696
if err != nil {
97-
return 0, fmt.Errorf("marshaling message: %v", err)
97+
return fmt.Errorf("marshaling message: %v", err)
9898
}
99-
n, err := w.out.Write(data)
100-
return int64(n), err
99+
_, err = w.out.Write(data)
100+
return err
101101
}
102102

103103
// HeaderFramer returns a new Framer.
@@ -117,76 +117,76 @@ func (headerFramer) Writer(rw io.Writer) Writer {
117117
return &headerWriter{out: rw}
118118
}
119119

120-
func (r *headerReader) Read(ctx context.Context) (Message, int64, error) {
120+
func (r *headerReader) Read(ctx context.Context) (Message, error) {
121121
select {
122122
case <-ctx.Done():
123-
return nil, 0, ctx.Err()
123+
return nil, ctx.Err()
124124
default:
125125
}
126-
var total, length int64
126+
127+
firstRead := true // to detect a clean EOF below
128+
var contentLength int64
127129
// read the header, stop on the first empty line
128130
for {
129131
line, err := r.in.ReadString('\n')
130-
total += int64(len(line))
131132
if err != nil {
132133
if err == io.EOF {
133-
if total == 0 {
134-
return nil, 0, io.EOF
134+
if firstRead && line == "" {
135+
return nil, io.EOF // clean EOF
135136
}
136137
err = io.ErrUnexpectedEOF
137138
}
138-
return nil, total, fmt.Errorf("failed reading header line: %w", err)
139+
return nil, fmt.Errorf("failed reading header line: %w", err)
139140
}
141+
firstRead = false
142+
140143
line = strings.TrimSpace(line)
141144
// check we have a header line
142145
if line == "" {
143146
break
144147
}
145148
colon := strings.IndexRune(line, ':')
146149
if colon < 0 {
147-
return nil, total, fmt.Errorf("invalid header line %q", line)
150+
return nil, fmt.Errorf("invalid header line %q", line)
148151
}
149152
name, value := line[:colon], strings.TrimSpace(line[colon+1:])
150153
switch name {
151154
case "Content-Length":
152-
if length, err = strconv.ParseInt(value, 10, 32); err != nil {
153-
return nil, total, fmt.Errorf("failed parsing Content-Length: %v", value)
155+
if contentLength, err = strconv.ParseInt(value, 10, 32); err != nil {
156+
return nil, fmt.Errorf("failed parsing Content-Length: %v", value)
154157
}
155-
if length <= 0 {
156-
return nil, total, fmt.Errorf("invalid Content-Length: %v", length)
158+
if contentLength <= 0 {
159+
return nil, fmt.Errorf("invalid Content-Length: %v", contentLength)
157160
}
158161
default:
159162
// ignoring unknown headers
160163
}
161164
}
162-
if length == 0 {
163-
return nil, total, fmt.Errorf("missing Content-Length header")
165+
if contentLength == 0 {
166+
return nil, fmt.Errorf("missing Content-Length header")
164167
}
165-
data := make([]byte, length)
166-
n, err := io.ReadFull(r.in, data)
167-
total += int64(n)
168+
data := make([]byte, contentLength)
169+
_, err := io.ReadFull(r.in, data)
168170
if err != nil {
169-
return nil, total, err
171+
return nil, err
170172
}
171173
msg, err := DecodeMessage(data)
172-
return msg, total, err
174+
return msg, err
173175
}
174176

175-
func (w *headerWriter) Write(ctx context.Context, msg Message) (int64, error) {
177+
func (w *headerWriter) Write(ctx context.Context, msg Message) error {
176178
select {
177179
case <-ctx.Done():
178-
return 0, ctx.Err()
180+
return ctx.Err()
179181
default:
180182
}
181183
data, err := EncodeMessage(msg)
182184
if err != nil {
183-
return 0, fmt.Errorf("marshaling message: %v", err)
185+
return fmt.Errorf("marshaling message: %v", err)
184186
}
185-
n, err := fmt.Fprintf(w.out, "Content-Length: %v\r\n\r\n", len(data))
186-
total := int64(n)
187+
_, err = fmt.Fprintf(w.out, "Content-Length: %v\r\n\r\n", len(data))
187188
if err == nil {
188-
n, err = w.out.Write(data)
189-
total += int64(n)
189+
_, err = w.out.Write(data)
190190
}
191-
return total, err
191+
return err
192192
}

internal/mcp/conformance_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func runServerTest(t *testing.T, test *conformanceTest) {
118118
}
119119

120120
writeMsg := func(msg jsonrpc2.Message) {
121-
if _, err := cStream.Write(ctx, msg); err != nil {
121+
if err := cStream.Write(ctx, msg); err != nil {
122122
t.Fatalf("Write failed: %v", err)
123123
}
124124
}
@@ -145,7 +145,7 @@ func runServerTest(t *testing.T, test *conformanceTest) {
145145
// next incoming response.
146146
nextResponse := func() (*jsonrpc2.Response, error, bool) {
147147
for {
148-
msg, _, err := cStream.Read(ctx)
148+
msg, err := cStream.Read(ctx)
149149
if err != nil {
150150
// TODO(rfindley): we don't document (or want to document) that the in
151151
// memory transports use a net.Pipe. How can users detect this failure?

0 commit comments

Comments
 (0)