diff --git a/docs/grpc.md b/docs/grpc.md index 9c8e5dc..ac4386e 100644 --- a/docs/grpc.md +++ b/docs/grpc.md @@ -353,10 +353,26 @@ apt install protobuf-compiler - Verify proto schema matches server's actual message format - Try without schema to see raw wire format +## Server Streaming + +`fetch` supports server-side streaming gRPC responses. Each response message is formatted and displayed as it arrives, following the same real-time streaming pattern used for SSE and NDJSON responses. + +```sh +fetch --grpc --proto-file service.proto \ + -j '{"query": "search term"}' \ + https://localhost:50051/search.SearchService/StreamResults +``` + +For streaming responses, messages are separated by blank lines in the output. Formatting and flushing happen incrementally, so results appear in real time. + +### gRPC Status + +`fetch` reports gRPC status errors from response trailers. If the server returns a non-OK gRPC status (e.g., `INTERNAL`, `NOT_FOUND`), the error is printed to stderr and the exit code is set to 1. + ## Limitations -- **Unary calls only**: Streaming RPCs are not supported -- **Single message**: Cannot send multiple messages in one request +- **Client-side and bidirectional streaming are not supported**: Only unary and server-streaming RPCs are supported +- **Single request message**: Cannot send multiple messages in one request - **gRPC-Web**: Standard gRPC protocol only, not gRPC-Web ## See Also diff --git a/integration/integration_test.go b/integration/integration_test.go index bcfd70c..f544c1b 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -1034,6 +1034,75 @@ func TestMain(t *testing.T) { assertBufContains(t, res.stdout, "grpc test") }) + t.Run("grpc streaming response", func(t *testing.T) { + // Build 3 separate gRPC-framed protobuf messages. + makeFrame := func(fieldNum protowire.Number, value string) []byte { + var protoData []byte + protoData = protowire.AppendTag(protoData, fieldNum, protowire.BytesType) + protoData = protowire.AppendString(protoData, value) + framedData := make([]byte, 5+len(protoData)) + framedData[0] = 0 // not compressed + binary.BigEndian.PutUint32(framedData[1:5], uint32(len(protoData))) + copy(framedData[5:], protoData) + return framedData + } + + frame1 := makeFrame(1, "message one") + frame2 := makeFrame(1, "message two") + frame3 := makeFrame(1, "message three") + + server := startServer(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/grpc+proto") + w.WriteHeader(200) + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "no flusher", 500) + return + } + w.Write(frame1) + flusher.Flush() + w.Write(frame2) + flusher.Flush() + w.Write(frame3) + flusher.Flush() + }) + defer server.Close() + + res := runFetch(t, fetchPath, server.URL, "--format", "on") + assertExitCode(t, 0, res) + assertBufContains(t, res.stdout, "message one") + assertBufContains(t, res.stdout, "message two") + assertBufContains(t, res.stdout, "message three") + }) + + t.Run("grpc streaming error status", func(t *testing.T) { + // Build a single gRPC-framed protobuf message. + var protoData []byte + protoData = protowire.AppendTag(protoData, 1, protowire.BytesType) + protoData = protowire.AppendString(protoData, "partial data") + framedData := make([]byte, 5+len(protoData)) + framedData[0] = 0 + binary.BigEndian.PutUint32(framedData[1:5], uint32(len(protoData))) + copy(framedData[5:], protoData) + + server := startServer(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/grpc+proto") + w.Header().Set("Trailer", "Grpc-Status, Grpc-Message") + w.WriteHeader(200) + w.Write(framedData) + w.(http.Flusher).Flush() + // Set trailers after body. + w.Header().Set("Grpc-Status", "13") // INTERNAL + w.Header().Set("Grpc-Message", "oh no!") // error message + }) + defer server.Close() + + res := runFetch(t, fetchPath, server.URL+"/pkg.Svc/Method", "--grpc", "--http", "1", "--format", "on") + assertExitCode(t, 1, res) + assertBufContains(t, res.stderr, "INTERNAL") + assertBufContains(t, res.stderr, "oh no!") + }) + t.Run("proto flags mutual exclusivity", func(t *testing.T) { // proto-file and proto-desc cannot be used together // Create temp files so we get past file existence validation diff --git a/internal/fetch/clipboard.go b/internal/fetch/clipboard.go index 565cf8c..c95ccd5 100644 --- a/internal/fetch/clipboard.go +++ b/internal/fetch/clipboard.go @@ -45,7 +45,7 @@ func newClipboardCopier(r *Request, resp *http.Response) *clipboardCopier { } contentType := getContentType(resp.Header) - if contentType == TypeSSE || contentType == TypeNDJSON { + if contentType == TypeSSE || contentType == TypeNDJSON || contentType == TypeGRPC { p := r.PrinterHandle.Stderr() core.WriteWarningMsg(p, "--copy is not supported for streaming responses") return nil diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index a6b1b30..40466b7 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -22,7 +22,6 @@ import ( "github.com/ryanfowler/fetch/internal/client" "github.com/ryanfowler/fetch/internal/core" "github.com/ryanfowler/fetch/internal/format" - fetchgrpc "github.com/ryanfowler/fetch/internal/grpc" "github.com/ryanfowler/fetch/internal/image" "github.com/ryanfowler/fetch/internal/multipart" "github.com/ryanfowler/fetch/internal/proto" @@ -338,6 +337,11 @@ func makeRequest(ctx context.Context, r *Request, c *client.Client, req *http.Re // Copy captured bytes to clipboard. cc.finish(r.PrinterHandle.Stderr()) + // Check gRPC trailer status after the body has been fully consumed. + if r.GRPC { + exitCode = checkGRPCStatus(r, resp, exitCode) + } + return exitCode, nil } @@ -367,6 +371,9 @@ func formatResponse(ctx context.Context, r *Request, resp *http.Response) (io.Re switch contentType { case TypeUnknown: return resp.Body, nil + case TypeGRPC: + // NOTE: This bypasses the isPrintable check for binary data. + return nil, format.FormatGRPCStream(resp.Body, r.responseDescriptor, p) case TypeNDJSON: // NOTE: This bypasses the isPrintable check for binary data. return nil, format.FormatNDJSON(resp.Body, p) @@ -412,21 +419,6 @@ func formatResponse(ctx context.Context, r *Request, resp *http.Response) (io.Re if format.FormatMsgPack(buf, p) == nil { buf = p.Bytes() } - case TypeGRPC: - // Unframe gRPC response before processing. - unframedBuf, _, err := fetchgrpc.Unframe(buf) - if err != nil { - // If unframing fails, try to process as raw protobuf. - unframedBuf = buf - } - if r.responseDescriptor != nil { - err = format.FormatProtobufWithDescriptor(unframedBuf, r.responseDescriptor, p) - } else { - err = format.FormatProtobuf(unframedBuf, p) - } - if err == nil { - buf = p.Bytes() - } case TypeProtobuf: var err error if r.responseDescriptor != nil { diff --git a/internal/fetch/proto.go b/internal/fetch/proto.go index 63d43b5..b067ead 100644 --- a/internal/fetch/proto.go +++ b/internal/fetch/proto.go @@ -14,6 +14,33 @@ import ( "google.golang.org/protobuf/reflect/protoreflect" ) +// checkGRPCStatus checks gRPC status from trailers (or headers for +// trailers-only responses) and prints an error to stderr if the status +// is non-OK. Returns the updated exit code. +func checkGRPCStatus(r *Request, resp *http.Response, exitCode int) int { + grpcStatus := resp.Trailer.Get("Grpc-Status") + grpcMessage := resp.Trailer.Get("Grpc-Message") + + // Fall back to headers for trailers-only error responses. + if grpcStatus == "" { + grpcStatus = resp.Header.Get("Grpc-Status") + grpcMessage = resp.Header.Get("Grpc-Message") + } + + if grpcStatus == "" || grpcStatus == "0" { + return exitCode + } + + status := fetchgrpc.ParseStatus(grpcStatus, grpcMessage) + p := r.PrinterHandle.Stderr() + core.WriteErrorMsg(p, status) + + if exitCode == 0 { + return 1 + } + return exitCode +} + // loadProtoSchema loads schema from files or descriptor set. func loadProtoSchema(r *Request) (*proto.Schema, error) { if len(r.ProtoFiles) > 0 { diff --git a/internal/format/grpc.go b/internal/format/grpc.go new file mode 100644 index 0000000..6031632 --- /dev/null +++ b/internal/format/grpc.go @@ -0,0 +1,46 @@ +package format + +import ( + "errors" + "io" + + "github.com/ryanfowler/fetch/internal/core" + "github.com/ryanfowler/fetch/internal/grpc" + + "google.golang.org/protobuf/reflect/protoreflect" +) + +// FormatGRPCStream formats a gRPC response stream by reading and formatting +// each length-prefixed frame as it arrives. This handles both unary (single +// frame) and server-streaming (multiple frames) responses. +func FormatGRPCStream(r io.Reader, md protoreflect.MessageDescriptor, p *core.Printer) error { + var written bool + for { + data, _, err := grpc.ReadFrame(r) + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + + if written { + p.WriteString("\n") + } else { + written = true + } + + if md != nil { + err = FormatProtobufWithDescriptor(data, md, p) + } else { + err = FormatProtobuf(data, p) + } + if err != nil { + // If formatting fails, return the error. + p.Reset() + return err + } + + p.Flush() + } +} diff --git a/internal/format/grpc_test.go b/internal/format/grpc_test.go new file mode 100644 index 0000000..6e54bc5 --- /dev/null +++ b/internal/format/grpc_test.go @@ -0,0 +1,88 @@ +package format + +import ( + "bytes" + "testing" + + "github.com/ryanfowler/fetch/internal/core" + "github.com/ryanfowler/fetch/internal/grpc" +) + +func TestFormatGRPCStream(t *testing.T) { + t.Run("single frame", func(t *testing.T) { + protoData := appendVarint(nil, 1, 42) + protoData = appendBytes(protoData, 2, []byte("hello")) + framed := grpc.Frame(protoData, false) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(framed), nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("multiple frames", func(t *testing.T) { + frame1 := grpc.Frame(appendVarint(nil, 1, 100), false) + frame2 := grpc.Frame(appendVarint(nil, 1, 200), false) + frame3 := grpc.Frame(appendVarint(nil, 1, 300), false) + + var buf bytes.Buffer + buf.Write(frame1) + buf.Write(frame2) + buf.Write(frame3) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(&buf, nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("empty stream", func(t *testing.T) { + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(nil), nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("empty message frame", func(t *testing.T) { + framed := grpc.Frame(nil, false) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(framed), nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) + + t.Run("error mid-stream", func(t *testing.T) { + // First frame is valid, then stream is truncated mid-header. + frame1 := grpc.Frame(appendVarint(nil, 1, 42), false) + truncated := append(frame1, 0x00, 0x00) // partial header + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(bytes.NewReader(truncated), nil, p) + if err == nil { + t.Error("expected error for truncated stream") + } + }) + + t.Run("multiple frames with multi-field messages", func(t *testing.T) { + msg1 := appendVarint(nil, 1, 10) + msg1 = appendBytes(msg1, 2, []byte("first")) + + msg2 := appendVarint(nil, 1, 20) + msg2 = appendBytes(msg2, 2, []byte("second")) + + var buf bytes.Buffer + buf.Write(grpc.Frame(msg1, false)) + buf.Write(grpc.Frame(msg2, false)) + + p := core.NewHandle(core.ColorOff).Stderr() + err := FormatGRPCStream(&buf, nil, p) + if err != nil { + t.Fatalf("FormatGRPCStream() error = %v", err) + } + }) +} diff --git a/internal/grpc/framing.go b/internal/grpc/framing.go index 429eb60..ad9a66c 100644 --- a/internal/grpc/framing.go +++ b/internal/grpc/framing.go @@ -3,6 +3,7 @@ package grpc import ( "encoding/binary" "fmt" + "io" ) // Frame wraps message in gRPC length-prefixed format. @@ -19,6 +20,43 @@ func Frame(data []byte, compressed bool) []byte { return buf } +// maxMessageSize is the maximum allowed gRPC message size. +const maxMessageSize = 64 * 1024 * 1024 // 64MB + +// ReadFrame reads a single gRPC length-prefixed frame from the reader. +// Returns the message data, whether it was compressed, and any error. +// Returns io.EOF when the reader has no more data. +func ReadFrame(r io.Reader) ([]byte, bool, error) { + var header [5]byte + _, err := io.ReadFull(r, header[:]) + if err != nil { + if err == io.ErrUnexpectedEOF { + return nil, false, fmt.Errorf("failed to read gRPC frame header: incomplete header") + } + return nil, false, err + } + + compressed := header[0] != 0 + length := binary.BigEndian.Uint32(header[1:5]) + + if length > maxMessageSize { + return nil, false, fmt.Errorf("gRPC message too large: %d bytes", length) + } + + data := make([]byte, length) + if length > 0 { + _, err = io.ReadFull(r, data) + if err != nil { + if err == io.ErrUnexpectedEOF { + return nil, false, fmt.Errorf("failed to read gRPC message: incomplete data") + } + return nil, false, err + } + } + + return data, compressed, nil +} + // Unframe extracts a gRPC length-prefixed message from the data. // Returns the message data and whether it was compressed. func Unframe(data []byte) ([]byte, bool, error) { @@ -29,8 +67,6 @@ func Unframe(data []byte) ([]byte, bool, error) { compressed := data[0] != 0 length := binary.BigEndian.Uint32(data[1:5]) - // Sanity check on length. - const maxMessageSize = 64 * 1024 * 1024 // 64MB if length > maxMessageSize { return nil, false, fmt.Errorf("gRPC message too large: %d bytes", length) } diff --git a/internal/grpc/framing_test.go b/internal/grpc/framing_test.go index 2669b15..3ef3882 100644 --- a/internal/grpc/framing_test.go +++ b/internal/grpc/framing_test.go @@ -2,6 +2,7 @@ package grpc import ( "bytes" + "io" "testing" ) @@ -146,3 +147,141 @@ func TestUnframeLargeMessageRejected(t *testing.T) { t.Error("expected error for large message") } } + +func TestReadFrame(t *testing.T) { + tests := []struct { + name string + input []byte + wantData []byte + wantCompressed bool + wantErr bool + wantEOF bool + }{ + { + name: "empty message", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x00}, + wantData: []byte{}, + wantCompressed: false, + }, + { + name: "simple uncompressed", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x03, 0x01, 0x02, 0x03}, + wantData: []byte{0x01, 0x02, 0x03}, + wantCompressed: false, + }, + { + name: "simple compressed", + input: []byte{0x01, 0x00, 0x00, 0x00, 0x03, 0x01, 0x02, 0x03}, + wantData: []byte{0x01, 0x02, 0x03}, + wantCompressed: true, + }, + { + name: "empty reader", + input: []byte{}, + wantEOF: true, + }, + { + name: "truncated header", + input: []byte{0x00, 0x00, 0x00}, + wantErr: true, + }, + { + name: "truncated data", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x05, 0x01, 0x02}, + wantErr: true, + }, + { + name: "oversized message", + input: []byte{0x00, 0x10, 0x00, 0x00, 0x00}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := bytes.NewReader(tt.input) + data, compressed, err := ReadFrame(r) + if tt.wantEOF { + if err != io.EOF { + t.Errorf("ReadFrame() error = %v, want io.EOF", err) + } + return + } + if (err != nil) != tt.wantErr { + t.Errorf("ReadFrame() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr { + return + } + if !bytes.Equal(data, tt.wantData) { + t.Errorf("ReadFrame() data = %v, want %v", data, tt.wantData) + } + if compressed != tt.wantCompressed { + t.Errorf("ReadFrame() compressed = %v, want %v", compressed, tt.wantCompressed) + } + }) + } +} + +func TestReadFrameMultiple(t *testing.T) { + // Create a reader with two frames. + frame1 := Frame([]byte{0x01, 0x02}, false) + frame2 := Frame([]byte{0x03, 0x04, 0x05}, true) + r := bytes.NewReader(append(frame1, frame2...)) + + // Read first frame. + data, compressed, err := ReadFrame(r) + if err != nil { + t.Fatalf("ReadFrame() first frame error = %v", err) + } + if !bytes.Equal(data, []byte{0x01, 0x02}) { + t.Errorf("first frame data = %v, want [0x01, 0x02]", data) + } + if compressed { + t.Error("first frame: expected uncompressed") + } + + // Read second frame. + data, compressed, err = ReadFrame(r) + if err != nil { + t.Fatalf("ReadFrame() second frame error = %v", err) + } + if !bytes.Equal(data, []byte{0x03, 0x04, 0x05}) { + t.Errorf("second frame data = %v, want [0x03, 0x04, 0x05]", data) + } + if !compressed { + t.Error("second frame: expected compressed") + } + + // No more frames. + _, _, err = ReadFrame(r) + if err != io.EOF { + t.Errorf("ReadFrame() after last frame: error = %v, want io.EOF", err) + } +} + +func TestReadFrameRoundTrip(t *testing.T) { + testData := [][]byte{ + {}, + {0x00}, + {0x01, 0x02, 0x03, 0x04, 0x05}, + bytes.Repeat([]byte{0xAB}, 1000), + } + + for _, data := range testData { + framed := Frame(data, false) + r := bytes.NewReader(framed) + unframed, compressed, err := ReadFrame(r) + if err != nil { + t.Errorf("ReadFrame() error = %v", err) + continue + } + if compressed { + t.Error("expected uncompressed") + } + if !bytes.Equal(unframed, data) { + t.Errorf("round trip failed: got %v, want %v", unframed, data) + } + } +}