Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions docs/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,26 @@ apt install protobuf-compiler
- Verify proto schema matches server's actual message format
- Try without schema to see raw wire format

## Server Streaming

`fetch` supports server-side streaming gRPC responses. Each response message is formatted and displayed as it arrives, following the same real-time streaming pattern used for SSE and NDJSON responses.

```sh
fetch --grpc --proto-file service.proto \
-j '{"query": "search term"}' \
https://localhost:50051/search.SearchService/StreamResults
```

For streaming responses, messages are separated by blank lines in the output. Formatting and flushing happen incrementally, so results appear in real time.

### gRPC Status

`fetch` reports gRPC status errors from response trailers. If the server returns a non-OK gRPC status (e.g., `INTERNAL`, `NOT_FOUND`), the error is printed to stderr and the exit code is set to 1.

## Limitations

- **Unary calls only**: Streaming RPCs are not supported
- **Single message**: Cannot send multiple messages in one request
- **Client-side and bidirectional streaming are not supported**: Only unary and server-streaming RPCs are supported
- **Single request message**: Cannot send multiple messages in one request
- **gRPC-Web**: Standard gRPC protocol only, not gRPC-Web

## See Also
Expand Down
69 changes: 69 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,75 @@ func TestMain(t *testing.T) {
assertBufContains(t, res.stdout, "grpc test")
})

t.Run("grpc streaming response", func(t *testing.T) {
// Build 3 separate gRPC-framed protobuf messages.
makeFrame := func(fieldNum protowire.Number, value string) []byte {
var protoData []byte
protoData = protowire.AppendTag(protoData, fieldNum, protowire.BytesType)
protoData = protowire.AppendString(protoData, value)
framedData := make([]byte, 5+len(protoData))
framedData[0] = 0 // not compressed
binary.BigEndian.PutUint32(framedData[1:5], uint32(len(protoData)))
copy(framedData[5:], protoData)
return framedData
}

frame1 := makeFrame(1, "message one")
frame2 := makeFrame(1, "message two")
frame3 := makeFrame(1, "message three")

server := startServer(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/grpc+proto")
w.WriteHeader(200)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "no flusher", 500)
return
}
w.Write(frame1)
flusher.Flush()
w.Write(frame2)
flusher.Flush()
w.Write(frame3)
flusher.Flush()
})
defer server.Close()

res := runFetch(t, fetchPath, server.URL, "--format", "on")
assertExitCode(t, 0, res)
assertBufContains(t, res.stdout, "message one")
assertBufContains(t, res.stdout, "message two")
assertBufContains(t, res.stdout, "message three")
})

t.Run("grpc streaming error status", func(t *testing.T) {
// Build a single gRPC-framed protobuf message.
var protoData []byte
protoData = protowire.AppendTag(protoData, 1, protowire.BytesType)
protoData = protowire.AppendString(protoData, "partial data")
framedData := make([]byte, 5+len(protoData))
framedData[0] = 0
binary.BigEndian.PutUint32(framedData[1:5], uint32(len(protoData)))
copy(framedData[5:], protoData)

server := startServer(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/grpc+proto")
w.Header().Set("Trailer", "Grpc-Status, Grpc-Message")
w.WriteHeader(200)
w.Write(framedData)
w.(http.Flusher).Flush()
// Set trailers after body.
w.Header().Set("Grpc-Status", "13") // INTERNAL
w.Header().Set("Grpc-Message", "oh no!") // error message
})
defer server.Close()

res := runFetch(t, fetchPath, server.URL+"/pkg.Svc/Method", "--grpc", "--http", "1", "--format", "on")
assertExitCode(t, 1, res)
assertBufContains(t, res.stderr, "INTERNAL")
assertBufContains(t, res.stderr, "oh no!")
})

t.Run("proto flags mutual exclusivity", func(t *testing.T) {
// proto-file and proto-desc cannot be used together
// Create temp files so we get past file existence validation
Expand Down
2 changes: 1 addition & 1 deletion internal/fetch/clipboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newClipboardCopier(r *Request, resp *http.Response) *clipboardCopier {
}

contentType := getContentType(resp.Header)
if contentType == TypeSSE || contentType == TypeNDJSON {
if contentType == TypeSSE || contentType == TypeNDJSON || contentType == TypeGRPC {
p := r.PrinterHandle.Stderr()
core.WriteWarningMsg(p, "--copy is not supported for streaming responses")
return nil
Expand Down
24 changes: 8 additions & 16 deletions internal/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ryanfowler/fetch/internal/client"
"github.com/ryanfowler/fetch/internal/core"
"github.com/ryanfowler/fetch/internal/format"
fetchgrpc "github.com/ryanfowler/fetch/internal/grpc"
"github.com/ryanfowler/fetch/internal/image"
"github.com/ryanfowler/fetch/internal/multipart"
"github.com/ryanfowler/fetch/internal/proto"
Expand Down Expand Up @@ -338,6 +337,11 @@ func makeRequest(ctx context.Context, r *Request, c *client.Client, req *http.Re
// Copy captured bytes to clipboard.
cc.finish(r.PrinterHandle.Stderr())

// Check gRPC trailer status after the body has been fully consumed.
if r.GRPC {
exitCode = checkGRPCStatus(r, resp, exitCode)
}

return exitCode, nil
}

Expand Down Expand Up @@ -367,6 +371,9 @@ func formatResponse(ctx context.Context, r *Request, resp *http.Response) (io.Re
switch contentType {
case TypeUnknown:
return resp.Body, nil
case TypeGRPC:
// NOTE: This bypasses the isPrintable check for binary data.
return nil, format.FormatGRPCStream(resp.Body, r.responseDescriptor, p)
case TypeNDJSON:
// NOTE: This bypasses the isPrintable check for binary data.
return nil, format.FormatNDJSON(resp.Body, p)
Expand Down Expand Up @@ -412,21 +419,6 @@ func formatResponse(ctx context.Context, r *Request, resp *http.Response) (io.Re
if format.FormatMsgPack(buf, p) == nil {
buf = p.Bytes()
}
case TypeGRPC:
// Unframe gRPC response before processing.
unframedBuf, _, err := fetchgrpc.Unframe(buf)
if err != nil {
// If unframing fails, try to process as raw protobuf.
unframedBuf = buf
}
if r.responseDescriptor != nil {
err = format.FormatProtobufWithDescriptor(unframedBuf, r.responseDescriptor, p)
} else {
err = format.FormatProtobuf(unframedBuf, p)
}
if err == nil {
buf = p.Bytes()
}
case TypeProtobuf:
var err error
if r.responseDescriptor != nil {
Expand Down
27 changes: 27 additions & 0 deletions internal/fetch/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,33 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)

// checkGRPCStatus checks gRPC status from trailers (or headers for
// trailers-only responses) and prints an error to stderr if the status
// is non-OK. Returns the updated exit code.
func checkGRPCStatus(r *Request, resp *http.Response, exitCode int) int {
grpcStatus := resp.Trailer.Get("Grpc-Status")
grpcMessage := resp.Trailer.Get("Grpc-Message")

// Fall back to headers for trailers-only error responses.
if grpcStatus == "" {
grpcStatus = resp.Header.Get("Grpc-Status")
grpcMessage = resp.Header.Get("Grpc-Message")
}

if grpcStatus == "" || grpcStatus == "0" {
return exitCode
}

status := fetchgrpc.ParseStatus(grpcStatus, grpcMessage)
p := r.PrinterHandle.Stderr()
core.WriteErrorMsg(p, status)

if exitCode == 0 {
return 1
}
return exitCode
}

// loadProtoSchema loads schema from files or descriptor set.
func loadProtoSchema(r *Request) (*proto.Schema, error) {
if len(r.ProtoFiles) > 0 {
Expand Down
46 changes: 46 additions & 0 deletions internal/format/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package format

import (
"errors"
"io"

"github.com/ryanfowler/fetch/internal/core"
"github.com/ryanfowler/fetch/internal/grpc"

"google.golang.org/protobuf/reflect/protoreflect"
)

// FormatGRPCStream formats a gRPC response stream by reading and formatting
// each length-prefixed frame as it arrives. This handles both unary (single
// frame) and server-streaming (multiple frames) responses.
func FormatGRPCStream(r io.Reader, md protoreflect.MessageDescriptor, p *core.Printer) error {
var written bool
for {
data, _, err := grpc.ReadFrame(r)
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}

if written {
p.WriteString("\n")
} else {
written = true
}

if md != nil {
err = FormatProtobufWithDescriptor(data, md, p)
} else {
err = FormatProtobuf(data, p)
}
if err != nil {
// If formatting fails, return the error.
p.Reset()
return err
}

p.Flush()
}
}
88 changes: 88 additions & 0 deletions internal/format/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package format

import (
"bytes"
"testing"

"github.com/ryanfowler/fetch/internal/core"
"github.com/ryanfowler/fetch/internal/grpc"
)

func TestFormatGRPCStream(t *testing.T) {
t.Run("single frame", func(t *testing.T) {
protoData := appendVarint(nil, 1, 42)
protoData = appendBytes(protoData, 2, []byte("hello"))
framed := grpc.Frame(protoData, false)

p := core.NewHandle(core.ColorOff).Stderr()
err := FormatGRPCStream(bytes.NewReader(framed), nil, p)
if err != nil {
t.Fatalf("FormatGRPCStream() error = %v", err)
}
})

t.Run("multiple frames", func(t *testing.T) {
frame1 := grpc.Frame(appendVarint(nil, 1, 100), false)
frame2 := grpc.Frame(appendVarint(nil, 1, 200), false)
frame3 := grpc.Frame(appendVarint(nil, 1, 300), false)

var buf bytes.Buffer
buf.Write(frame1)
buf.Write(frame2)
buf.Write(frame3)

p := core.NewHandle(core.ColorOff).Stderr()
err := FormatGRPCStream(&buf, nil, p)
if err != nil {
t.Fatalf("FormatGRPCStream() error = %v", err)
}
})

t.Run("empty stream", func(t *testing.T) {
p := core.NewHandle(core.ColorOff).Stderr()
err := FormatGRPCStream(bytes.NewReader(nil), nil, p)
if err != nil {
t.Fatalf("FormatGRPCStream() error = %v", err)
}
})

t.Run("empty message frame", func(t *testing.T) {
framed := grpc.Frame(nil, false)

p := core.NewHandle(core.ColorOff).Stderr()
err := FormatGRPCStream(bytes.NewReader(framed), nil, p)
if err != nil {
t.Fatalf("FormatGRPCStream() error = %v", err)
}
})

t.Run("error mid-stream", func(t *testing.T) {
// First frame is valid, then stream is truncated mid-header.
frame1 := grpc.Frame(appendVarint(nil, 1, 42), false)
truncated := append(frame1, 0x00, 0x00) // partial header

p := core.NewHandle(core.ColorOff).Stderr()
err := FormatGRPCStream(bytes.NewReader(truncated), nil, p)
if err == nil {
t.Error("expected error for truncated stream")
}
})

t.Run("multiple frames with multi-field messages", func(t *testing.T) {
msg1 := appendVarint(nil, 1, 10)
msg1 = appendBytes(msg1, 2, []byte("first"))

msg2 := appendVarint(nil, 1, 20)
msg2 = appendBytes(msg2, 2, []byte("second"))

var buf bytes.Buffer
buf.Write(grpc.Frame(msg1, false))
buf.Write(grpc.Frame(msg2, false))

p := core.NewHandle(core.ColorOff).Stderr()
err := FormatGRPCStream(&buf, nil, p)
if err != nil {
t.Fatalf("FormatGRPCStream() error = %v", err)
}
})
}
40 changes: 38 additions & 2 deletions internal/grpc/framing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"encoding/binary"
"fmt"
"io"
)

// Frame wraps message in gRPC length-prefixed format.
Expand All @@ -19,6 +20,43 @@ func Frame(data []byte, compressed bool) []byte {
return buf
}

// maxMessageSize is the maximum allowed gRPC message size.
const maxMessageSize = 64 * 1024 * 1024 // 64MB

// ReadFrame reads a single gRPC length-prefixed frame from the reader.
// Returns the message data, whether it was compressed, and any error.
// Returns io.EOF when the reader has no more data.
func ReadFrame(r io.Reader) ([]byte, bool, error) {
var header [5]byte
_, err := io.ReadFull(r, header[:])
if err != nil {
if err == io.ErrUnexpectedEOF {
return nil, false, fmt.Errorf("failed to read gRPC frame header: incomplete header")
}
return nil, false, err
}

compressed := header[0] != 0
length := binary.BigEndian.Uint32(header[1:5])

if length > maxMessageSize {
return nil, false, fmt.Errorf("gRPC message too large: %d bytes", length)
}

data := make([]byte, length)
if length > 0 {
_, err = io.ReadFull(r, data)
if err != nil {
if err == io.ErrUnexpectedEOF {
return nil, false, fmt.Errorf("failed to read gRPC message: incomplete data")
}
return nil, false, err
}
}

return data, compressed, nil
}

// Unframe extracts a gRPC length-prefixed message from the data.
// Returns the message data and whether it was compressed.
func Unframe(data []byte) ([]byte, bool, error) {
Expand All @@ -29,8 +67,6 @@ func Unframe(data []byte) ([]byte, bool, error) {
compressed := data[0] != 0
length := binary.BigEndian.Uint32(data[1:5])

// Sanity check on length.
const maxMessageSize = 64 * 1024 * 1024 // 64MB
if length > maxMessageSize {
return nil, false, fmt.Errorf("gRPC message too large: %d bytes", length)
}
Expand Down
Loading