Skip to content

Commit d592b38

Browse files
authored
refactor: better interface and simpler implementation for GRPC.Stream.run (#453)
* fix: GRPC.Stream.run only consumes the first reply and returns :ok * fix: simplify noreply workflow
1 parent ab28076 commit d592b38

File tree

5 files changed

+122
-34
lines changed

5 files changed

+122
-34
lines changed

examples/helloworld/mix.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
%{
22
"cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"},
33
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
4+
"flow": {:hex, :flow, "1.2.4", "1dd58918287eb286656008777cb32714b5123d3855956f29aa141ebae456922d", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "874adde96368e71870f3510b91e35bc31652291858c86c0e75359cbdd35eb211"},
5+
"gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"},
46
"google_protos": {:hex, :google_protos, "0.3.0", "15faf44dce678ac028c289668ff56548806e313e4959a3aaf4f6e1ebe8db83f4", [:mix], [{:protobuf, "~> 0.10", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "1f6b7fb20371f72f418b98e5e48dae3e022a9a6de1858d4b254ac5a5d0b4035f"},
7+
"googleapis": {:hex, :googleapis, "0.1.0", "13770f3f75f5b863fb9acf41633c7bc71bad788f3f553b66481a096d083ee20e", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "1989a7244fd17d3eb5f3de311a022b656c3736b39740db46506157c4604bd212"},
58
"gun": {:hex, :gun, "2.1.0", "b4e4cbbf3026d21981c447e9e7ca856766046eff693720ba43114d7f5de36e87", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "52fc7fc246bfc3b00e01aea1c2854c70a366348574ab50c57dfe796d24a0101d"},
69
"hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"},
710
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},

lib/grpc/server/adapters/cowboy/handler.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,9 +561,11 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
561561
end
562562

563563
defp do_call_rpc(server, path, %{http_method: http_method} = stream) do
564-
result = server.__call_rpc__(path, http_method, stream)
564+
case server.__call_rpc__(path, http_method, stream) do
565+
{:ok, stream, :noreply} ->
566+
GRPC.Server.send_trailers(stream, @default_trailers)
567+
{:ok, stream}
565568

566-
case result do
567569
{:ok, stream, response} ->
568570
stream
569571
|> GRPC.Server.send_reply(response)

lib/grpc/stream.ex

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ defmodule GRPC.Stream do
7171
- `:dispatcher` — Specifies the `Flow` dispatcher (defaults to `GenStage.DemandDispatcher`).
7272
- `:propagate_context` - If `true`, the context from the `materializer` is propagated to the `Flow`.
7373
- `:materializer` - The `%GRPC.Server.Stream{}` struct representing the current gRPC stream context.
74-
74+
7575
And any other options supported by `Flow`.
7676
7777
## Returns
@@ -134,30 +134,33 @@ defmodule GRPC.Stream do
134134
def to_flow(%__MODULE__{flow: flow}), do: flow
135135

136136
@doc """
137-
Executes the underlying `Flow` for unary streams and emits responses into the provided gRPC server stream.
138-
139-
## Parameters
140-
141-
- `flow`: A `GRPC.Stream` struct containing the flow to be executed.
142-
- `stream`: A `GRPC.Server.Stream` to which responses are sent.
143-
- `:dry_run` — If `true`, responses are not sent (used for testing or inspection).
137+
Executes the underlying `Flow` for a unary stream.
144138
145-
## Example
139+
The response will be emitted automatically to the provided
140+
`:materializer` (set to a `GRPC.Server.Stream`) for the single resulting
141+
item in the materialized enumerable.
146142
147-
GRPC.Stream.run(request)
143+
The `stream` argument must be initialized as a `:unary` stream with
144+
a `:materializer` set.
148145
"""
149-
@spec run(t()) :: any()
146+
@spec run(stream :: t()) :: :noreply
150147
def run(%__MODULE__{flow: flow, options: opts}) do
151-
if !Keyword.get(opts, :unary, false) do
152-
raise ArgumentError, "run/2 is not supported for non-unary streams"
148+
opts = Keyword.take(opts, [:unary, :materializer])
149+
150+
if opts[:unary] != true do
151+
raise ArgumentError, "GRPC.Stream.run/1 only supports unary streams"
153152
end
154153

155-
# We have to call `Enum.to_list` because we want to actually run and materialize the full stream.
156-
# List.flatten and List.first are used so that we can obtain the first result of the materialized list.
157-
flow
158-
|> Enum.to_list()
159-
|> List.flatten()
160-
|> List.first()
154+
materializer = opts[:materializer]
155+
156+
if is_nil(materializer) do
157+
raise ArgumentError,
158+
"GRPC.Stream.run/1 requires a materializer to be set in the GRPC.Stream"
159+
end
160+
161+
send_response(materializer, Enum.at(flow, 0))
162+
163+
:noreply
161164
end
162165

163166
@doc """
@@ -335,6 +338,20 @@ defmodule GRPC.Stream do
335338
opts = Keyword.merge(opts, metadata: metadata)
336339
dispatcher = Keyword.get(opts, :default_dispatcher, GenStage.DemandDispatcher)
337340

341+
if opts[:unary] do
342+
case opts[:materializer] do
343+
%GRPC.Server.Stream{grpc_type: :unary} ->
344+
:ok
345+
346+
%GRPC.Server.Stream{} ->
347+
raise ArgumentError,
348+
"materializer must be set to a unary GRPC.Server.Stream when unary: true is passed"
349+
350+
_ ->
351+
raise ArgumentError, "materializer is required when unary: true is passed"
352+
end
353+
end
354+
338355
flow =
339356
case Keyword.get(opts, :join_with) do
340357
pid when is_pid(pid) ->

test/grpc/stream_test.exs

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
defmodule GRPC.StreamTest do
2-
use ExUnit.Case
2+
use GRPC.Integration.TestCase
33
doctest GRPC.Stream
44

55
describe "simple test" do
@@ -9,22 +9,42 @@ defmodule GRPC.StreamTest do
99

1010
defmodule FakeAdapter do
1111
def get_headers(_), do: %{"content-type" => "application/grpc"}
12-
end
1312

14-
test "unary/2 creates a flow from a unary input" do
15-
input = %TestInput{message: 1}
13+
def send_reply(%{test_pid: test_pid, ref: ref}, item, _opts) do
14+
send(test_pid, {:send_reply, ref, item})
15+
end
1616

17-
result =
18-
GRPC.Stream.unary(input)
19-
|> GRPC.Stream.map(& &1)
20-
|> GRPC.Stream.run()
17+
def send_trailers(%{test_pid: test_pid, ref: ref}, trailers) do
18+
send(test_pid, {:send_trailers, ref, trailers})
19+
end
20+
end
2121

22-
assert result == input
22+
test "unary/2 creates a flow from a unary input" do
23+
test_pid = self()
24+
ref = make_ref()
25+
26+
input = %Routeguide.Point{latitude: 1, longitude: 2}
27+
28+
materializer = %GRPC.Server.Stream{
29+
adapter: FakeAdapter,
30+
payload: %{test_pid: test_pid, ref: ref},
31+
grpc_type: :unary
32+
}
33+
34+
assert :noreply =
35+
GRPC.Stream.unary(input, materializer: materializer)
36+
|> GRPC.Stream.map(fn item ->
37+
item
38+
end)
39+
|> GRPC.Stream.run()
40+
41+
assert_receive {:send_reply, ^ref, response}
42+
assert IO.iodata_to_binary(response) == Protobuf.encode(input)
2343
end
2444

2545
test "unary/2 creates a flow with metadata" do
2646
input = %TestInput{message: 1}
27-
materializer = %GRPC.Server.Stream{adapter: FakeAdapter}
47+
materializer = %GRPC.Server.Stream{adapter: FakeAdapter, grpc_type: :unary}
2848

2949
flow =
3050
GRPC.Stream.unary(input, materializer: materializer, propagate_context: true)
@@ -51,7 +71,7 @@ defmodule GRPC.StreamTest do
5171

5272
test "from_as_ctx/3 creates a flow from enumerable input" do
5373
input = [%{message: "a"}, %{message: "b"}]
54-
materializer = %GRPC.Server.Stream{adapter: FakeAdapter}
74+
materializer = %GRPC.Server.Stream{adapter: FakeAdapter, grpc_type: :unary}
5575

5676
flow =
5777
GRPC.Stream.from(input, propagate_context: true, materializer: materializer)
@@ -257,6 +277,38 @@ defmodule GRPC.StreamTest do
257277
end
258278
end
259279

280+
defmodule MyGRPCService do
281+
use GRPC.Server, service: Routeguide.RouteGuide.Service
282+
283+
def get_feature(input, materializer) do
284+
GRPC.Stream.unary(input, materializer: materializer)
285+
|> GRPC.Stream.map(fn point ->
286+
%Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
287+
end)
288+
|> GRPC.Stream.run()
289+
end
290+
end
291+
292+
describe "run/1" do
293+
test "runs a unary stream" do
294+
run_server([MyGRPCService], fn port ->
295+
point = %Routeguide.Point{latitude: 409_146_138, longitude: -746_188_906}
296+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}", adapter_opts: [retry_timeout: 10])
297+
298+
expected_response = %Routeguide.Feature{
299+
location: point,
300+
name: "#{point.latitude},#{point.longitude}"
301+
}
302+
303+
assert {:ok, response, %{trailers: trailers}} =
304+
Routeguide.RouteGuide.Stub.get_feature(channel, point, return_headers: true)
305+
306+
assert response == expected_response
307+
assert trailers == GRPC.Transport.HTTP2.server_trailers()
308+
end)
309+
end
310+
end
311+
260312
defp receive_loop do
261313
receive do
262314
{:request, item, from} ->

test/support/integration_test_case.ex

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,14 @@ defmodule GRPC.Integration.TestCase do
1010
end
1111

1212
def run_server(servers, func, port \\ 0, opts \\ []) do
13-
{:ok, _pid, port} = GRPC.Server.start(servers, port, opts)
13+
{:ok, _pid, port} =
14+
start_supervised(%{
15+
id: {GRPC.Server, System.unique_integer([:positive])},
16+
start: {GRPC.Server, :start, [servers, port, opts]},
17+
type: :worker,
18+
restart: :permanent,
19+
shutdown: 500
20+
})
1421

1522
try do
1623
func.(port)
@@ -20,7 +27,14 @@ defmodule GRPC.Integration.TestCase do
2027
end
2128

2229
def run_endpoint(endpoint, func, port \\ 0) do
23-
{:ok, _pid, port} = GRPC.Server.start_endpoint(endpoint, port)
30+
{:ok, _pid, port} =
31+
start_supervised(%{
32+
id: {GRPC.Server, System.unique_integer([:positive])},
33+
start: {GRPC.Server, :start_endpoint, [endpoint, port]},
34+
type: :worker,
35+
restart: :permanent,
36+
shutdown: 500
37+
})
2438

2539
try do
2640
func.(port)

0 commit comments

Comments
 (0)