From 47861cedb42e53285788ab5d3f85973c9161f282 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Sun, 1 Feb 2026 11:08:03 -0800 Subject: [PATCH] Add gRPC server-side streaming response support Switch gRPC response handling from buffered single-frame processing to an incremental frame reader that reads and formats each message as it arrives. This uses the same streaming pattern as SSE and NDJSON formatters, and works for both unary (single frame) and server-streaming (multiple frames) responses. Also adds gRPC trailer status reporting: non-OK grpc-status trailers are now printed to stderr with the appropriate error message, and the exit code is set to 1. --- docs/grpc.md | 20 ++++- integration/integration_test.go | 69 ++++++++++++++++ internal/fetch/clipboard.go | 2 +- internal/fetch/fetch.go | 24 ++---- internal/fetch/proto.go | 27 +++++++ internal/format/grpc.go | 46 +++++++++++ internal/format/grpc_test.go | 88 ++++++++++++++++++++ internal/grpc/framing.go | 40 ++++++++- internal/grpc/framing_test.go | 139 ++++++++++++++++++++++++++++++++ 9 files changed, 434 insertions(+), 21 deletions(-) create mode 100644 internal/format/grpc.go create mode 100644 internal/format/grpc_test.go diff --git a/docs/grpc.md b/docs/grpc.md index 9c8e5dc..ac4386e 100644 --- a/docs/grpc.md +++ b/docs/grpc.md @@ -353,10 +353,26 @@ apt install protobuf-compiler - Verify proto schema matches server's actual message format - Try without schema to see raw wire format +## Server Streaming + +`fetch` supports server-side streaming gRPC responses. Each response message is formatted and displayed as it arrives, following the same real-time streaming pattern used for SSE and NDJSON responses. + +```sh +fetch --grpc --proto-file service.proto \ + -j '{"query": "search term"}' \ + https://localhost:50051/search.SearchService/StreamResults +``` + +For streaming responses, messages are separated by blank lines in the output. Formatting and flushing happen incrementally, so results appear in real time. + +### gRPC Status + +`fetch` reports gRPC status errors from response trailers. If the server returns a non-OK gRPC status (e.g., `INTERNAL`, `NOT_FOUND`), the error is printed to stderr and the exit code is set to 1. + ## Limitations -- **Unary calls only**: Streaming RPCs are not supported -- **Single message**: Cannot send multiple messages in one request +- **Client-side and bidirectional streaming are not supported**: Only unary and server-streaming RPCs are supported +- **Single request message**: Cannot send multiple messages in one request - **gRPC-Web**: Standard gRPC protocol only, not gRPC-Web ## See Also diff --git a/integration/integration_test.go b/integration/integration_test.go index bcfd70c..f544c1b 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -1034,6 +1034,75 @@ func TestMain(t *testing.T) { assertBufContains(t, res.stdout, "grpc test") }) + t.Run("grpc streaming response", func(t *testing.T) { + // Build 3 separate gRPC-framed protobuf messages. + makeFrame := func(fieldNum protowire.Number, value string) []byte { + var protoData []byte + protoData = protowire.AppendTag(protoData, fieldNum, protowire.BytesType) + protoData = protowire.AppendString(protoData, value) + framedData := make([]byte, 5+len(protoData)) + framedData[0] = 0 // not compressed + binary.BigEndian.PutUint32(framedData[1:5], uint32(len(protoData))) + copy(framedData[5:], protoData) + return framedData + } + + frame1 := makeFrame(1, "message one") + frame2 := makeFrame(1, "message two") + frame3 := makeFrame(1, "message three") + + server := startServer(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/grpc+proto") + w.WriteHeader(200) + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "no flusher", 500) + return + } + w.Write(frame1) + flusher.Flush() + w.Write(frame2) + flusher.Flush() + w.Write(frame3) + flusher.Flush() + }) + defer server.Close() + + res := runFetch(t, fetchPath, server.URL, "--format", "on") + assertExitCode(t, 0, res) + assertBufContains(t, res.stdout, "message one") + assertBufContains(t, res.stdout, "message two") + assertBufContains(t, res.stdout, "message three") + }) + + t.Run("grpc streaming error status", func(t *testing.T) { + // Build a single gRPC-framed protobuf message. + var protoData []byte + protoData = protowire.AppendTag(protoData, 1, protowire.BytesType) + protoData = protowire.AppendString(protoData, "partial data") + framedData := make([]byte, 5+len(protoData)) + framedData[0] = 0 + binary.BigEndian.PutUint32(framedData[1:5], uint32(len(protoData))) + copy(framedData[5:], protoData) + + server := startServer(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/grpc+proto") + w.Header().Set("Trailer", "Grpc-Status, Grpc-Message") + w.WriteHeader(200) + w.Write(framedData) + w.(http.Flusher).Flush() + // Set trailers after body. + w.Header().Set("Grpc-Status", "13") // INTERNAL + w.Header().Set("Grpc-Message", "oh no!") // error message + }) + defer server.Close() + + res := runFetch(t, fetchPath, server.URL+"/pkg.Svc/Method", "--grpc", "--http", "1", "--format", "on") + assertExitCode(t, 1, res) + assertBufContains(t, res.stderr, "INTERNAL") + assertBufContains(t, res.stderr, "oh no!") + }) + t.Run("proto flags mutual exclusivity", func(t *testing.T) { // proto-file and proto-desc cannot be used together // Create temp files so we get past file existence validation diff --git a/internal/fetch/clipboard.go b/internal/fetch/clipboard.go index 565cf8c..c95ccd5 100644 --- a/internal/fetch/clipboard.go +++ b/internal/fetch/clipboard.go @@ -45,7 +45,7 @@ func newClipboardCopier(r *Request, resp *http.Response) *clipboardCopier { } contentType := getContentType(resp.Header) - if contentType == TypeSSE || contentType == TypeNDJSON { + if contentType == TypeSSE || contentType == TypeNDJSON || contentType == TypeGRPC { p := r.PrinterHandle.Stderr() core.WriteWarningMsg(p, "--copy is not supported for streaming responses") return nil diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index a6b1b30..40466b7 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -22,7 +22,6 @@ import ( "github.com/ryanfowler/fetch/internal/client" "github.com/ryanfowler/fetch/internal/core" "github.com/ryanfowler/fetch/internal/format" - fetchgrpc "github.com/ryanfowler/fetch/internal/grpc" "github.com/ryanfowler/fetch/internal/image" "github.com/ryanfowler/fetch/internal/multipart" "github.com/ryanfowler/fetch/internal/proto" @@ -338,6 +337,11 @@ func makeRequest(ctx context.Context, r *Request, c *client.Client, req *http.Re // Copy captured bytes to clipboard. cc.finish(r.PrinterHandle.Stderr()) + // Check gRPC trailer status after the body has been fully consumed. + if r.GRPC { + exitCode = checkGRPCStatus(r, resp, exitCode) + } + return exitCode, nil } @@ -367,6 +371,9 @@ func formatResponse(ctx context.Context, r *Request, resp *http.Response) (io.Re switch contentType { case TypeUnknown: return resp.Body, nil + case TypeGRPC: + // NOTE: This bypasses the isPrintable check for binary data. + return nil, format.FormatGRPCStream(resp.Body, r.responseDescriptor, p) case TypeNDJSON: // NOTE: This bypasses the isPrintable check for binary data. return nil, format.FormatNDJSON(resp.Body, p) @@ -412,21 +419,6 @@ func formatResponse(ctx context.Context, r *Request, resp *http.Response) (io.Re if format.FormatMsgPack(buf, p) == nil { buf = p.Bytes() } - case TypeGRPC: - // Unframe gRPC response before processing. - unframedBuf, _, err := fetchgrpc.Unframe(buf) - if err != nil { - // If unframing fails, try to process as raw protobuf. - unframedBuf = buf - } - if r.responseDescriptor != nil { - err = format.FormatProtobufWithDescriptor(unframedBuf, r.responseDescriptor, p) - } else { - err = format.FormatProtobuf(unframedBuf, p) - } - if err == nil { - buf = p.Bytes() - } case TypeProtobuf: var err error if r.responseDescriptor != nil { diff --git a/internal/fetch/proto.go b/internal/fetch/proto.go index 63d43b5..b067ead 100644 --- a/internal/fetch/proto.go +++ b/internal/fetch/proto.go @@ -14,6 +14,33 @@ import ( "google.golang.org/protobuf/reflect/protoreflect" ) +// checkGRPCStatus checks gRPC status from trailers (or headers for +// trailers-only responses) and prints an error to stderr if the status +// is non-OK. Returns the updated exit code. +func checkGRPCStatus(r *Request, resp *http.Response, exitCode int) int { + grpcStatus := resp.Trailer.Get("Grpc-Status") + grpcMessage := resp.Trailer.Get("Grpc-Message") + + // Fall back to headers for trailers-only error responses. + if grpcStatus == "" { + grpcStatus = resp.Header.Get("Grpc-Status") + grpcMessage = resp.Header.Get("Grpc-Message") + } + + if grpcStatus == "" || grpcStatus == "0" { + return exitCode + } + + status := fetchgrpc.ParseStatus(grpcStatus, grpcMessage) + p := r.PrinterHandle.Stderr() + core.WriteErrorMsg(p, status) + + if exitCode == 0 { + return 1 + } + return exitCode +} + // loadProtoSchema loads schema from files or descriptor set. func loadProtoSchema(r *Request) (*proto.Schema, error) { if len(r.ProtoFiles) > 0 { diff --git a/internal/format/grpc.go b/internal/format/grpc.go new file mode 100644 index 0000000..6031632 --- /dev/null +++ b/internal/format/grpc.go @@ -0,0 +1,46 @@ +package format + +import ( + "errors" + "io" + + "github.com/ryanfowler/fetch/internal/core" + "github.com/ryanfowler/fetch/internal/grpc" + + "google.golang.org/protobuf/reflect/protoreflect" +) + +// FormatGRPCStream formats a gRPC response stream by reading and formatting +// each length-prefixed frame as it arrives. This handles both unary (single +// frame) and server-streaming (multiple frames) responses. +func FormatGRPCStream(r io.Reader, md protoreflect.MessageDescriptor, p *core.Printer) error { + var written bool + for { + data, _, err := grpc.ReadFrame(r) + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + + if written { + p.WriteString("\n") + } else { + written = true + } + + if md != nil { + err = FormatProtobufWithDescriptor(data, md, p) + } else { + err = FormatProtobuf(data, p) + } + if err != nil { + // If formatting fails, return the error. + p.Reset() + return err + } + + p.Flush() + } +} diff --git a/internal/format/grpc_test.go b/internal/format/grpc_test.go new file mode 100644 index 0000000..6e54bc5 --- /dev/null +++ b/internal/format/grpc_test.go @@ -0,0 +1,88 @@ +package format + +import ( + "bytes" + "testing" + + "github.com/ryanfowler/fetch/internal/core" + "github.com/ryanfowler/fetch/internal/grpc" +) + +func TestFormatGRPCStream(t *testing.T) { + t.Run("single frame", func(t *testing.T) { + protoData := appendVarint(nil, 1, 42) + protoData = appendBytes(protoData, 2, []byte("hello")) + framed := grpc.Frame(protoData, false) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(framed), nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("multiple frames", func(t *testing.T) { + frame1 := grpc.Frame(appendVarint(nil, 1, 100), false) + frame2 := grpc.Frame(appendVarint(nil, 1, 200), false) + frame3 := grpc.Frame(appendVarint(nil, 1, 300), false) + + var buf bytes.Buffer + buf.Write(frame1) + buf.Write(frame2) + buf.Write(frame3) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(&buf, nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("empty stream", func(t *testing.T) { + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(nil), nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("empty message frame", func(t *testing.T) { + framed := grpc.Frame(nil, false) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(framed), nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("error mid-stream", func(t *testing.T) { + // First frame is valid, then stream is truncated mid-header. + frame1 := grpc.Frame(appendVarint(nil, 1, 42), false) + truncated := append(frame1, 0x00, 0x00) // partial header + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(truncated), nil, p) + if err == nil { + t.Error("expected error for truncated stream") + } + }) + + t.Run("multiple frames with multi-field messages", func(t *testing.T) { + msg1 := appendVarint(nil, 1, 10) + msg1 = appendBytes(msg1, 2, []byte("first")) + + msg2 := appendVarint(nil, 1, 20) + msg2 = appendBytes(msg2, 2, []byte("second")) + + var buf bytes.Buffer + buf.Write(grpc.Frame(msg1, false)) + buf.Write(grpc.Frame(msg2, false)) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(&buf, nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) +} diff --git a/internal/grpc/framing.go b/internal/grpc/framing.go index 429eb60..ad9a66c 100644 --- a/internal/grpc/framing.go +++ b/internal/grpc/framing.go @@ -3,6 +3,7 @@ package grpc import ( "encoding/binary" "fmt" + "io" ) // Frame wraps message in gRPC length-prefixed format. @@ -19,6 +20,43 @@ func Frame(data []byte, compressed bool) []byte { return buf } +// maxMessageSize is the maximum allowed gRPC message size. +const maxMessageSize = 64 * 1024 * 1024 // 64MB + +// ReadFrame reads a single gRPC length-prefixed frame from the reader. +// Returns the message data, whether it was compressed, and any error. +// Returns io.EOF when the reader has no more data. +func ReadFrame(r io.Reader) ([]byte, bool, error) { + var header [5]byte + _, err := io.ReadFull(r, header[:]) + if err != nil { + if err == io.ErrUnexpectedEOF { + return nil, false, fmt.Errorf("failed to read gRPC frame header: incomplete header") + } + return nil, false, err + } + + compressed := header[0] != 0 + length := binary.BigEndian.Uint32(header[1:5]) + + if length > maxMessageSize { + return nil, false, fmt.Errorf("gRPC message too large: %d bytes", length) + } + + data := make([]byte, length) + if length > 0 { + _, err = io.ReadFull(r, data) + if err != nil { + if err == io.ErrUnexpectedEOF { + return nil, false, fmt.Errorf("failed to read gRPC message: incomplete data") + } + return nil, false, err + } + } + + return data, compressed, nil +} + // Unframe extracts a gRPC length-prefixed message from the data. // Returns the message data and whether it was compressed. func Unframe(data []byte) ([]byte, bool, error) { @@ -29,8 +67,6 @@ func Unframe(data []byte) ([]byte, bool, error) { compressed := data[0] != 0 length := binary.BigEndian.Uint32(data[1:5]) - // Sanity check on length. - const maxMessageSize = 64 * 1024 * 1024 // 64MB if length > maxMessageSize { return nil, false, fmt.Errorf("gRPC message too large: %d bytes", length) } diff --git a/internal/grpc/framing_test.go b/internal/grpc/framing_test.go index 2669b15..3ef3882 100644 --- a/internal/grpc/framing_test.go +++ b/internal/grpc/framing_test.go @@ -2,6 +2,7 @@ package grpc import ( "bytes" + "io" "testing" ) @@ -146,3 +147,141 @@ func TestUnframeLargeMessageRejected(t *testing.T) { t.Error("expected error for large message") } } + +func TestReadFrame(t *testing.T) { + tests := []struct { + name string + input []byte + wantData []byte + wantCompressed bool + wantErr bool + wantEOF bool + }{ + { + name: "empty message", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x00}, + wantData: []byte{}, + wantCompressed: false, + }, + { + name: "simple uncompressed", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x03, 0x01, 0x02, 0x03}, + wantData: []byte{0x01, 0x02, 0x03}, + wantCompressed: false, + }, + { + name: "simple compressed", + input: []byte{0x01, 0x00, 0x00, 0x00, 0x03, 0x01, 0x02, 0x03}, + wantData: []byte{0x01, 0x02, 0x03}, + wantCompressed: true, + }, + { + name: "empty reader", + input: []byte{}, + wantEOF: true, + }, + { + name: "truncated header", + input: []byte{0x00, 0x00, 0x00}, + wantErr: true, + }, + { + name: "truncated data", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x05, 0x01, 0x02}, + wantErr: true, + }, + { + name: "oversized message", + input: []byte{0x00, 0x10, 0x00, 0x00, 0x00}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := bytes.NewReader(tt.input) + data, compressed, err := ReadFrame(r) + if tt.wantEOF { + if err != io.EOF { + t.Errorf("ReadFrame() error = %v, want io.EOF", err) + } + return + } + if (err != nil) != tt.wantErr { + t.Errorf("ReadFrame() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr { + return + } + if !bytes.Equal(data, tt.wantData) { + t.Errorf("ReadFrame() data = %v, want %v", data, tt.wantData) + } + if compressed != tt.wantCompressed { + t.Errorf("ReadFrame() compressed = %v, want %v", compressed, tt.wantCompressed) + } + }) + } +} + +func TestReadFrameMultiple(t *testing.T) { + // Create a reader with two frames. + frame1 := Frame([]byte{0x01, 0x02}, false) + frame2 := Frame([]byte{0x03, 0x04, 0x05}, true) + r := bytes.NewReader(append(frame1, frame2...)) + + // Read first frame. + data, compressed, err := ReadFrame(r) + if err != nil { + t.Fatalf("ReadFrame() first frame error = %v", err) + } + if !bytes.Equal(data, []byte{0x01, 0x02}) { + t.Errorf("first frame data = %v, want [0x01, 0x02]", data) + } + if compressed { + t.Error("first frame: expected uncompressed") + } + + // Read second frame. + data, compressed, err = ReadFrame(r) + if err != nil { + t.Fatalf("ReadFrame() second frame error = %v", err) + } + if !bytes.Equal(data, []byte{0x03, 0x04, 0x05}) { + t.Errorf("second frame data = %v, want [0x03, 0x04, 0x05]", data) + } + if !compressed { + t.Error("second frame: expected compressed") + } + + // No more frames. + _, _, err = ReadFrame(r) + if err != io.EOF { + t.Errorf("ReadFrame() after last frame: error = %v, want io.EOF", err) + } +} + +func TestReadFrameRoundTrip(t *testing.T) { + testData := [][]byte{ + {}, + {0x00}, + {0x01, 0x02, 0x03, 0x04, 0x05}, + bytes.Repeat([]byte{0xAB}, 1000), + } + + for _, data := range testData { + framed := Frame(data, false) + r := bytes.NewReader(framed) + unframed, compressed, err := ReadFrame(r) + if err != nil { + t.Errorf("ReadFrame() error = %v", err) + continue + } + if compressed { + t.Error("expected uncompressed") + } + if !bytes.Equal(unframed, data) { + t.Errorf("round trip failed: got %v, want %v", unframed, data) + } + } +}