Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions docs/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,53 @@ For streaming responses, messages are separated by blank lines in the output. Fo

`fetch` reports gRPC status errors from response trailers. If the server returns a non-OK gRPC status (e.g., `INTERNAL`, `NOT_FOUND`), the error is printed to stderr and the exit code is set to 1.

## Client Streaming

`fetch` supports client-side streaming gRPC calls. When the proto schema indicates a method is client-streaming, multiple JSON objects in the request body are each converted to a separate protobuf message and sent as individual gRPC frames.

Detection is automatic via the method descriptor in the proto schema — no additional flags are needed.

### Inline Data

Provide multiple JSON objects separated by whitespace:

```sh
fetch --grpc --proto-file service.proto \
-d '{"value":"one"}{"value":"two"}{"value":"three"}' \
https://localhost:50051/pkg.Service/ClientStream
```

### NDJSON from File

```sh
fetch --grpc --proto-file service.proto \
-d @messages.ndjson \
https://localhost:50051/pkg.Service/ClientStream
```

### Streaming from Stdin

Pipe data from stdin for real-time streaming — each JSON object is sent as soon as it is parsed:

```sh
cat messages.ndjson | fetch --grpc --proto-file service.proto \
-d @- https://localhost:50051/pkg.Service/ClientStream
```

## Bidirectional Streaming

Bidirectional streaming is supported with the same mechanism as client streaming. When piping from stdin, request frames are sent incrementally while response frames are received and displayed concurrently:

```sh
cat messages.ndjson | fetch --grpc --proto-file service.proto \
-d @- https://localhost:50051/pkg.Service/BidiStream
```

Both directions flow on the same HTTP/2 stream. The response is formatted and displayed as messages arrive, just like server streaming.

## Limitations

- **Client-side and bidirectional streaming are not supported**: Only unary and server-streaming RPCs are supported
- **Single request message**: Cannot send multiple messages in one request
- **Client/bidi streaming requires a proto schema**: The `--proto-file` or `--proto-desc` flag must be provided so `fetch` can detect that a method is client-streaming
- **gRPC-Web**: Standard gRPC protocol only, not gRPC-Web

## See Also
Expand Down
135 changes: 135 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"google.golang.org/protobuf/encoding/protowire"
protoMarshal "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
)

func TestMain(t *testing.T) {
Expand Down Expand Up @@ -1103,6 +1105,136 @@ func TestMain(t *testing.T) {
assertBufContains(t, res.stderr, "oh no!")
})

t.Run("grpc client streaming", func(t *testing.T) {
// Build a FileDescriptorSet with a client-streaming method.
boolTrue := true
strType := descriptorpb.FieldDescriptorProto_TYPE_STRING
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64
fds := &descriptorpb.FileDescriptorSet{
File: []*descriptorpb.FileDescriptorProto{
{
Name: strPtr("stream.proto"),
Package: strPtr("streampkg"),
Syntax: strPtr("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: strPtr("StreamRequest"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: strPtr("value"),
Number: int32Ptr(1),
Type: &strType,
},
},
},
{
Name: strPtr("StreamResponse"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: strPtr("count"),
Number: int32Ptr(1),
Type: &int64Type,
},
},
},
},
Service: []*descriptorpb.ServiceDescriptorProto{
{
Name: strPtr("StreamService"),
Method: []*descriptorpb.MethodDescriptorProto{
{
Name: strPtr("ClientStream"),
InputType: strPtr(".streampkg.StreamRequest"),
OutputType: strPtr(".streampkg.StreamResponse"),
ClientStreaming: &boolTrue,
},
},
},
},
},
},
}

// Serialize the descriptor set to a temp file.
descData, err := protoMarshal.Marshal(fds)
if err != nil {
t.Fatalf("failed to marshal descriptor set: %v", err)
}
descFile := filepath.Join(t.TempDir(), "stream.pb")
if err := os.WriteFile(descFile, descData, 0644); err != nil {
t.Fatalf("failed to write descriptor file: %v", err)
}

// Server reads gRPC frames from request body, counts them,
// and returns count as a gRPC-framed protobuf response.
server := startServer(func(w http.ResponseWriter, r *http.Request) {
// Count incoming gRPC frames.
var count int
for {
var header [5]byte
_, err := io.ReadFull(r.Body, header[:])
if err != nil {
break
}
length := binary.BigEndian.Uint32(header[1:5])
if length > 0 {
buf := make([]byte, length)
_, err = io.ReadFull(r.Body, buf)
if err != nil {
break
}
}
count++
}

// Build response: field 1 (count) as varint.
var respData []byte
respData = protowire.AppendTag(respData, 1, protowire.VarintType)
respData = protowire.AppendVarint(respData, uint64(count))

// Frame the response.
framedResp := make([]byte, 5+len(respData))
framedResp[0] = 0
binary.BigEndian.PutUint32(framedResp[1:5], uint32(len(respData)))
copy(framedResp[5:], respData)

w.Header().Set("Content-Type", "application/grpc+proto")
w.WriteHeader(200)
w.Write(framedResp)
})
defer server.Close()

t.Run("multiple messages", func(t *testing.T) {
data := `{"value":"one"}{"value":"two"}{"value":"three"}`
res := runFetch(t, fetchPath,
server.URL+"/streampkg.StreamService/ClientStream",
"--grpc", "--proto-desc", descFile,
"-d", data,
"--http", "1", "--format", "on")
assertExitCode(t, 0, res)
assertBufContains(t, res.stdout, "3")
})

t.Run("single message", func(t *testing.T) {
data := `{"value":"only"}`
res := runFetch(t, fetchPath,
server.URL+"/streampkg.StreamService/ClientStream",
"--grpc", "--proto-desc", descFile,
"-d", data,
"--http", "1", "--format", "on")
assertExitCode(t, 0, res)
assertBufContains(t, res.stdout, "1")
})

t.Run("empty stream", func(t *testing.T) {
res := runFetch(t, fetchPath,
server.URL+"/streampkg.StreamService/ClientStream",
"--grpc", "--proto-desc", descFile,
"--http", "1", "--format", "on")
assertExitCode(t, 0, res)
})
})

t.Run("proto flags mutual exclusivity", func(t *testing.T) {
// proto-file and proto-desc cannot be used together
// Create temp files so we get past file existence validation
Expand Down Expand Up @@ -1933,3 +2065,6 @@ func startMTLSServer(t *testing.T, certPath, keyPath, caCertPath string) *httpte
server.StartTLS()
return server
}

func strPtr(s string) *string { return &s }
func int32Ptr(i int32) *int32 { return &i }
50 changes: 28 additions & 22 deletions internal/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ func fetch(ctx context.Context, r *Request) (int, error) {

// 2. Setup gRPC (adds headers, sets HTTP version, finds descriptors).
var requestDesc protoreflect.MessageDescriptor
var isClientStreaming bool
if r.GRPC {
var err error
requestDesc, r.responseDescriptor, err = setupGRPC(r, schema)
requestDesc, r.responseDescriptor, isClientStreaming, err = setupGRPC(r, schema)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -200,30 +201,35 @@ func fetch(ctx context.Context, r *Request) (int, error) {
}
}

// 5. Convert JSON to protobuf AFTER edit.
if requestDesc != nil && req.Body != nil && req.Body != http.NoBody {
// Read the body and convert.
converted, err := convertJSONToProtobuf(req.Body, requestDesc)
if err != nil {
return 0, err
}
req.Body = io.NopCloser(converted)
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", "application/protobuf")
}
}

// 6. Frame gRPC request AFTER conversion.
// gRPC requires framing even for empty messages.
// 5. Convert and frame gRPC request AFTER edit.
if r.GRPC {
framed, err := frameGRPCRequest(req.Body)
if err != nil {
return 0, err
if isClientStreaming && requestDesc != nil {
// Client/bidi streaming: stream multiple JSON objects as gRPC frames.
if req.Body != nil && req.Body != http.NoBody {
req.Body = streamGRPCRequest(req.Body, requestDesc)
req.ContentLength = -1 // Unknown length; use chunked encoding.
} else {
// Empty client stream: no frames, just close immediately.
req.Body = http.NoBody
}
} else {
// Unary / server-streaming: existing single-message path.
if requestDesc != nil && req.Body != nil && req.Body != http.NoBody {
converted, err := convertJSONToProtobuf(req.Body, requestDesc)
if err != nil {
return 0, err
}
req.Body = io.NopCloser(converted)
}
framed, err := frameGRPCRequest(req.Body)
if err != nil {
return 0, err
}
req.Body = io.NopCloser(framed)
}
req.Body = io.NopCloser(framed)
}

// 7. Print request metadata / dry-run.
// 6. Print request metadata / dry-run.
if r.Verbosity >= core.VExtraVerbose || r.DryRun {
errPrinter := r.PrinterHandle.Stderr()
printRequestMetadata(errPrinter, req, r.HTTP)
Expand Down Expand Up @@ -263,7 +269,7 @@ func fetch(ctx context.Context, r *Request) (int, error) {
req = req.WithContext(ctx)
}

// 8. Make request.
// 7. Make request.
code, err := makeRequest(ctx, r, c, req)

// Save session cookies after request completes.
Expand Down
45 changes: 40 additions & 5 deletions internal/fetch/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fetch

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -73,22 +74,24 @@ func parseGRPCPath(urlPath string) (serviceName, methodName string, err error) {
}

// setupGRPC configures request for gRPC protocol.
// Returns headers to add, HTTP version, and request/response descriptors.
func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor, protoreflect.MessageDescriptor, error) {
// Returns request/response descriptors, whether the method is client-streaming, and any error.
func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor, protoreflect.MessageDescriptor, bool, error) {
var requestDesc, responseDesc protoreflect.MessageDescriptor
var isClientStreaming bool
if schema != nil && r.URL != nil {
serviceName, methodName, err := parseGRPCPath(r.URL.Path)
if err != nil {
return nil, nil, err
return nil, nil, false, err
}

fullMethod := serviceName + "/" + methodName
method, err := schema.FindMethod(fullMethod)
if err != nil {
return nil, nil, err
return nil, nil, false, err
}
requestDesc = method.Input()
responseDesc = method.Output()
isClientStreaming = method.IsStreamingClient()
}

if r.HTTP == core.HTTPDefault {
Expand All @@ -100,7 +103,7 @@ func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor
r.Headers = append(r.Headers, fetchgrpc.Headers()...)
r.Headers = append(r.Headers, fetchgrpc.AcceptHeader())

return requestDesc, responseDesc, nil
return requestDesc, responseDesc, isClientStreaming, nil
}

// convertJSONToProtobuf converts JSON body to protobuf.
Expand Down Expand Up @@ -136,3 +139,35 @@ func frameGRPCRequest(data io.Reader) (io.Reader, error) {
framedData := fetchgrpc.Frame(rawData, false)
return bytes.NewReader(framedData), nil
}

// streamGRPCRequest reads JSON objects from data, converts each to protobuf,
// frames each as a gRPC message, and streams them through an io.Pipe.
// Returns an io.ReadCloser to use as the request body.
func streamGRPCRequest(data io.Reader, desc protoreflect.MessageDescriptor) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
defer pw.Close()
decoder := json.NewDecoder(data)
for {
var raw json.RawMessage
err := decoder.Decode(&raw)
if err == io.EOF {
return
}
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to decode JSON message: %w", err))
return
}
protoData, err := proto.JSONToProtobuf(raw, desc)
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to convert JSON to protobuf: %w", err))
return
}
frame := fetchgrpc.Frame(protoData, false)
if _, err := pw.Write(frame); err != nil {
return // pipe closed by reader
}
}
}()
return pr
}
Loading