From 7c3b7a10da0d34c2b72f2cc012dc641d0ead11fd Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Sun, 1 Feb 2026 13:35:14 -0800 Subject: [PATCH] Add gRPC client-side and bidirectional streaming support Stream multiple JSON objects as individual gRPC frames using io.Pipe for incremental delivery. Detection is automatic via the proto schema's MethodDescriptor.IsStreamingClient(), requiring no new CLI flags. --- docs/grpc.md | 47 +++++++++- integration/integration_test.go | 135 +++++++++++++++++++++++++++ internal/fetch/fetch.go | 50 +++++----- internal/fetch/proto.go | 45 ++++++++- internal/fetch/proto_test.go | 159 ++++++++++++++++++++++++++++++++ 5 files changed, 407 insertions(+), 29 deletions(-) create mode 100644 internal/fetch/proto_test.go diff --git a/docs/grpc.md b/docs/grpc.md index ac4386e..3f14c40 100644 --- a/docs/grpc.md +++ b/docs/grpc.md @@ -369,10 +369,53 @@ For streaming responses, messages are separated by blank lines in the output. Fo `fetch` reports gRPC status errors from response trailers. If the server returns a non-OK gRPC status (e.g., `INTERNAL`, `NOT_FOUND`), the error is printed to stderr and the exit code is set to 1. +## Client Streaming + +`fetch` supports client-side streaming gRPC calls. When the proto schema indicates a method is client-streaming, multiple JSON objects in the request body are each converted to a separate protobuf message and sent as individual gRPC frames. + +Detection is automatic via the method descriptor in the proto schema — no additional flags are needed. + +### Inline Data + +Provide multiple JSON objects separated by whitespace: + +```sh +fetch --grpc --proto-file service.proto \ + -d '{"value":"one"}{"value":"two"}{"value":"three"}' \ + https://localhost:50051/pkg.Service/ClientStream +``` + +### NDJSON from File + +```sh +fetch --grpc --proto-file service.proto \ + -d @messages.ndjson \ + https://localhost:50051/pkg.Service/ClientStream +``` + +### Streaming from Stdin + +Pipe data from stdin for real-time streaming — each JSON object is sent as soon as it is parsed: + +```sh +cat messages.ndjson | fetch --grpc --proto-file service.proto \ + -d @- https://localhost:50051/pkg.Service/ClientStream +``` + +## Bidirectional Streaming + +Bidirectional streaming is supported with the same mechanism as client streaming. When piping from stdin, request frames are sent incrementally while response frames are received and displayed concurrently: + +```sh +cat messages.ndjson | fetch --grpc --proto-file service.proto \ + -d @- https://localhost:50051/pkg.Service/BidiStream +``` + +Both directions flow on the same HTTP/2 stream. The response is formatted and displayed as messages arrive, just like server streaming. + ## Limitations -- **Client-side and bidirectional streaming are not supported**: Only unary and server-streaming RPCs are supported -- **Single request message**: Cannot send multiple messages in one request +- **Client/bidi streaming requires a proto schema**: The `--proto-file` or `--proto-desc` flag must be provided so `fetch` can detect that a method is client-streaming - **gRPC-Web**: Standard gRPC protocol only, not gRPC-Web ## See Also diff --git a/integration/integration_test.go b/integration/integration_test.go index f544c1b..81f1d7c 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -37,6 +37,8 @@ import ( "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zstd" "google.golang.org/protobuf/encoding/protowire" + protoMarshal "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" ) func TestMain(t *testing.T) { @@ -1103,6 +1105,136 @@ func TestMain(t *testing.T) { assertBufContains(t, res.stderr, "oh no!") }) + t.Run("grpc client streaming", func(t *testing.T) { + // Build a FileDescriptorSet with a client-streaming method. + boolTrue := true + strType := descriptorpb.FieldDescriptorProto_TYPE_STRING + int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64 + fds := &descriptorpb.FileDescriptorSet{ + File: []*descriptorpb.FileDescriptorProto{ + { + Name: strPtr("stream.proto"), + Package: strPtr("streampkg"), + Syntax: strPtr("proto3"), + MessageType: []*descriptorpb.DescriptorProto{ + { + Name: strPtr("StreamRequest"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: strPtr("value"), + Number: int32Ptr(1), + Type: &strType, + }, + }, + }, + { + Name: strPtr("StreamResponse"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: strPtr("count"), + Number: int32Ptr(1), + Type: &int64Type, + }, + }, + }, + }, + Service: []*descriptorpb.ServiceDescriptorProto{ + { + Name: strPtr("StreamService"), + Method: []*descriptorpb.MethodDescriptorProto{ + { + Name: strPtr("ClientStream"), + InputType: strPtr(".streampkg.StreamRequest"), + OutputType: strPtr(".streampkg.StreamResponse"), + ClientStreaming: &boolTrue, + }, + }, + }, + }, + }, + }, + } + + // Serialize the descriptor set to a temp file. + descData, err := protoMarshal.Marshal(fds) + if err != nil { + t.Fatalf("failed to marshal descriptor set: %v", err) + } + descFile := filepath.Join(t.TempDir(), "stream.pb") + if err := os.WriteFile(descFile, descData, 0644); err != nil { + t.Fatalf("failed to write descriptor file: %v", err) + } + + // Server reads gRPC frames from request body, counts them, + // and returns count as a gRPC-framed protobuf response. + server := startServer(func(w http.ResponseWriter, r *http.Request) { + // Count incoming gRPC frames. + var count int + for { + var header [5]byte + _, err := io.ReadFull(r.Body, header[:]) + if err != nil { + break + } + length := binary.BigEndian.Uint32(header[1:5]) + if length > 0 { + buf := make([]byte, length) + _, err = io.ReadFull(r.Body, buf) + if err != nil { + break + } + } + count++ + } + + // Build response: field 1 (count) as varint. + var respData []byte + respData = protowire.AppendTag(respData, 1, protowire.VarintType) + respData = protowire.AppendVarint(respData, uint64(count)) + + // Frame the response. + framedResp := make([]byte, 5+len(respData)) + framedResp[0] = 0 + binary.BigEndian.PutUint32(framedResp[1:5], uint32(len(respData))) + copy(framedResp[5:], respData) + + w.Header().Set("Content-Type", "application/grpc+proto") + w.WriteHeader(200) + w.Write(framedResp) + }) + defer server.Close() + + t.Run("multiple messages", func(t *testing.T) { + data := `{"value":"one"}{"value":"two"}{"value":"three"}` + res := runFetch(t, fetchPath, + server.URL+"/streampkg.StreamService/ClientStream", + "--grpc", "--proto-desc", descFile, + "-d", data, + "--http", "1", "--format", "on") + assertExitCode(t, 0, res) + assertBufContains(t, res.stdout, "3") + }) + + t.Run("single message", func(t *testing.T) { + data := `{"value":"only"}` + res := runFetch(t, fetchPath, + server.URL+"/streampkg.StreamService/ClientStream", + "--grpc", "--proto-desc", descFile, + "-d", data, + "--http", "1", "--format", "on") + assertExitCode(t, 0, res) + assertBufContains(t, res.stdout, "1") + }) + + t.Run("empty stream", func(t *testing.T) { + res := runFetch(t, fetchPath, + server.URL+"/streampkg.StreamService/ClientStream", + "--grpc", "--proto-desc", descFile, + "--http", "1", "--format", "on") + assertExitCode(t, 0, res) + }) + }) + t.Run("proto flags mutual exclusivity", func(t *testing.T) { // proto-file and proto-desc cannot be used together // Create temp files so we get past file existence validation @@ -1933,3 +2065,6 @@ func startMTLSServer(t *testing.T, certPath, keyPath, caCertPath string) *httpte server.StartTLS() return server } + +func strPtr(s string) *string { return &s } +func int32Ptr(i int32) *int32 { return &i } diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index 40466b7..88f24a5 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -129,9 +129,10 @@ func fetch(ctx context.Context, r *Request) (int, error) { // 2. Setup gRPC (adds headers, sets HTTP version, finds descriptors). var requestDesc protoreflect.MessageDescriptor + var isClientStreaming bool if r.GRPC { var err error - requestDesc, r.responseDescriptor, err = setupGRPC(r, schema) + requestDesc, r.responseDescriptor, isClientStreaming, err = setupGRPC(r, schema) if err != nil { return 0, err } @@ -200,30 +201,35 @@ func fetch(ctx context.Context, r *Request) (int, error) { } } - // 5. Convert JSON to protobuf AFTER edit. - if requestDesc != nil && req.Body != nil && req.Body != http.NoBody { - // Read the body and convert. - converted, err := convertJSONToProtobuf(req.Body, requestDesc) - if err != nil { - return 0, err - } - req.Body = io.NopCloser(converted) - if req.Header.Get("Content-Type") == "" { - req.Header.Set("Content-Type", "application/protobuf") - } - } - - // 6. Frame gRPC request AFTER conversion. - // gRPC requires framing even for empty messages. + // 5. Convert and frame gRPC request AFTER edit. if r.GRPC { - framed, err := frameGRPCRequest(req.Body) - if err != nil { - return 0, err + if isClientStreaming && requestDesc != nil { + // Client/bidi streaming: stream multiple JSON objects as gRPC frames. + if req.Body != nil && req.Body != http.NoBody { + req.Body = streamGRPCRequest(req.Body, requestDesc) + req.ContentLength = -1 // Unknown length; use chunked encoding. + } else { + // Empty client stream: no frames, just close immediately. + req.Body = http.NoBody + } + } else { + // Unary / server-streaming: existing single-message path. + if requestDesc != nil && req.Body != nil && req.Body != http.NoBody { + converted, err := convertJSONToProtobuf(req.Body, requestDesc) + if err != nil { + return 0, err + } + req.Body = io.NopCloser(converted) + } + framed, err := frameGRPCRequest(req.Body) + if err != nil { + return 0, err + } + req.Body = io.NopCloser(framed) } - req.Body = io.NopCloser(framed) } - // 7. Print request metadata / dry-run. + // 6. Print request metadata / dry-run. if r.Verbosity >= core.VExtraVerbose || r.DryRun { errPrinter := r.PrinterHandle.Stderr() printRequestMetadata(errPrinter, req, r.HTTP) @@ -263,7 +269,7 @@ func fetch(ctx context.Context, r *Request) (int, error) { req = req.WithContext(ctx) } - // 8. Make request. + // 7. Make request. code, err := makeRequest(ctx, r, c, req) // Save session cookies after request completes. diff --git a/internal/fetch/proto.go b/internal/fetch/proto.go index b067ead..b5666dd 100644 --- a/internal/fetch/proto.go +++ b/internal/fetch/proto.go @@ -2,6 +2,7 @@ package fetch import ( "bytes" + "encoding/json" "fmt" "io" "net/http" @@ -73,22 +74,24 @@ func parseGRPCPath(urlPath string) (serviceName, methodName string, err error) { } // setupGRPC configures request for gRPC protocol. -// Returns headers to add, HTTP version, and request/response descriptors. -func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor, protoreflect.MessageDescriptor, error) { +// Returns request/response descriptors, whether the method is client-streaming, and any error. +func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor, protoreflect.MessageDescriptor, bool, error) { var requestDesc, responseDesc protoreflect.MessageDescriptor + var isClientStreaming bool if schema != nil && r.URL != nil { serviceName, methodName, err := parseGRPCPath(r.URL.Path) if err != nil { - return nil, nil, err + return nil, nil, false, err } fullMethod := serviceName + "/" + methodName method, err := schema.FindMethod(fullMethod) if err != nil { - return nil, nil, err + return nil, nil, false, err } requestDesc = method.Input() responseDesc = method.Output() + isClientStreaming = method.IsStreamingClient() } if r.HTTP == core.HTTPDefault { @@ -100,7 +103,7 @@ func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor r.Headers = append(r.Headers, fetchgrpc.Headers()...) r.Headers = append(r.Headers, fetchgrpc.AcceptHeader()) - return requestDesc, responseDesc, nil + return requestDesc, responseDesc, isClientStreaming, nil } // convertJSONToProtobuf converts JSON body to protobuf. @@ -136,3 +139,35 @@ func frameGRPCRequest(data io.Reader) (io.Reader, error) { framedData := fetchgrpc.Frame(rawData, false) return bytes.NewReader(framedData), nil } + +// streamGRPCRequest reads JSON objects from data, converts each to protobuf, +// frames each as a gRPC message, and streams them through an io.Pipe. +// Returns an io.ReadCloser to use as the request body. +func streamGRPCRequest(data io.Reader, desc protoreflect.MessageDescriptor) io.ReadCloser { + pr, pw := io.Pipe() + go func() { + defer pw.Close() + decoder := json.NewDecoder(data) + for { + var raw json.RawMessage + err := decoder.Decode(&raw) + if err == io.EOF { + return + } + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to decode JSON message: %w", err)) + return + } + protoData, err := proto.JSONToProtobuf(raw, desc) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to convert JSON to protobuf: %w", err)) + return + } + frame := fetchgrpc.Frame(protoData, false) + if _, err := pw.Write(frame); err != nil { + return // pipe closed by reader + } + } + }() + return pr +} diff --git a/internal/fetch/proto_test.go b/internal/fetch/proto_test.go new file mode 100644 index 0000000..32fc489 --- /dev/null +++ b/internal/fetch/proto_test.go @@ -0,0 +1,159 @@ +package fetch + +import ( + "bytes" + "io" + "strings" + "testing" + + fetchgrpc "github.com/ryanfowler/fetch/internal/grpc" + "github.com/ryanfowler/fetch/internal/proto" + + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" +) + +func TestStreamGRPCRequest(t *testing.T) { + desc := testMessageDescriptor(t) + + t.Run("single message", func(t *testing.T) { + input := `{"name":"hello"}` + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 1 { + t.Fatalf("expected 1 frame, got %d", len(frames)) + } + }) + + t.Run("multiple messages", func(t *testing.T) { + input := `{"name":"one"}{"name":"two"}{"name":"three"}` + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 3 { + t.Fatalf("expected 3 frames, got %d", len(frames)) + } + }) + + t.Run("ndjson style", func(t *testing.T) { + input := "{\"name\":\"one\"}\n{\"name\":\"two\"}\n{\"name\":\"three\"}\n" + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 3 { + t.Fatalf("expected 3 frames, got %d", len(frames)) + } + }) + + t.Run("empty input", func(t *testing.T) { + rc := streamGRPCRequest(strings.NewReader(""), desc) + defer rc.Close() + + data, err := io.ReadAll(rc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(data) != 0 { + t.Fatalf("expected empty output, got %d bytes", len(data)) + } + }) + + t.Run("invalid json", func(t *testing.T) { + rc := streamGRPCRequest(strings.NewReader("{invalid"), desc) + defer rc.Close() + + _, err := io.ReadAll(rc) + if err == nil { + t.Fatal("expected error for invalid JSON") + } + if !strings.Contains(err.Error(), "failed to decode JSON message") { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("whitespace between objects", func(t *testing.T) { + input := " {\"name\":\"one\"} \n\n {\"name\":\"two\"} " + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 2 { + t.Fatalf("expected 2 frames, got %d", len(frames)) + } + }) +} + +// testMessageDescriptor builds a simple protobuf message descriptor for testing. +func testMessageDescriptor(t *testing.T) protoreflect.MessageDescriptor { + t.Helper() + + strType := descriptorpb.FieldDescriptorProto_TYPE_STRING + int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64 + fds := &descriptorpb.FileDescriptorSet{ + File: []*descriptorpb.FileDescriptorProto{ + { + Name: strp("test.proto"), + Package: strp("testpkg"), + Syntax: strp("proto3"), + MessageType: []*descriptorpb.DescriptorProto{ + { + Name: strp("TestMessage"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: strp("id"), + Number: int32p(1), + Type: &int64Type, + }, + { + Name: strp("name"), + Number: int32p(2), + Type: &strType, + }, + }, + }, + }, + }, + }, + } + + schema, err := proto.LoadFromDescriptorSet(fds) + if err != nil { + t.Fatalf("failed to load descriptor set: %v", err) + } + md, err := schema.FindMessage("testpkg.TestMessage") + if err != nil { + t.Fatalf("failed to find message: %v", err) + } + return md +} + +// readAllFrames reads all gRPC frames from a reader. +func readAllFrames(t *testing.T, r io.Reader) [][]byte { + t.Helper() + + data, err := io.ReadAll(r) + if err != nil { + t.Fatalf("failed to read all data: %v", err) + } + + var frames [][]byte + reader := bytes.NewReader(data) + for { + frame, _, err := fetchgrpc.ReadFrame(reader) + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("failed to read frame: %v", err) + } + frames = append(frames, frame) + } + return frames +} + +func strp(s string) *string { return &s } +func int32p(i int32) *int32 { return &i }