Skip to content

Commit addcb4b

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 4a9299f + 9c58f9c commit addcb4b

File tree

5 files changed

+197
-12
lines changed

5 files changed

+197
-12
lines changed

lib/grpc/client/supervisor.ex

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,16 @@ defmodule GRPC.Client.Supervisor do
4444
use DynamicSupervisor
4545

4646
def start_link(opts) do
47-
DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
47+
case DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__) do
48+
{:ok, _pid} = started ->
49+
started
50+
51+
{:error, {:already_started, pid}} ->
52+
{:ok, pid}
53+
54+
other ->
55+
other
56+
end
4857
end
4958

5059
@impl true

lib/grpc/server/adapters/cowboy/handler.ex

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
2929
pid: server_rpc_pid :: pid,
3030
handling_timer: timeout_timer_ref :: reference,
3131
pending_reader: nil | pending_reader,
32-
access_mode: GRPC.Server.Stream.access_mode()
32+
access_mode: GRPC.Server.Stream.access_mode(),
33+
exception_log_filter: exception_log_filter()
3334
}
3435
@type init_result ::
3536
{:cowboy_loop, :cowboy_req.req(), stream_state} | {:ok, :cowboy_req.req(), init_state}
@@ -40,6 +41,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
4041

4142
@type headers :: %{binary() => binary()}
4243

44+
@type exception_log_filter :: {module(), atom()} | nil
45+
4346
@doc """
4447
This function is meant to be called whenever a new request arrives to an existing connection.
4548
This handler works mainly with two linked processes.
@@ -52,6 +55,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
5255
@spec init(:cowboy_req.req(), state :: init_state) :: init_result
5356
def init(req, {endpoint, {_name, server}, route, opts} = state) do
5457
http_method = extract_http_method(req) |> String.to_existing_atom()
58+
exception_log_filter = extract_exception_log_filter_opt(opts)
5559

5660
with {:ok, access_mode, sub_type, content_type} <- find_content_type_subtype(req),
5761
{:ok, codec} <- find_codec(sub_type, content_type, server),
@@ -98,7 +102,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
98102
pid: server_rpc_pid,
99103
handling_timer: timer_ref,
100104
pending_reader: nil,
101-
access_mode: access_mode
105+
access_mode: access_mode,
106+
exception_log_filter: exception_log_filter
102107
}
103108
}
104109
else
@@ -110,6 +115,19 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
110115
end
111116
end
112117

118+
defp extract_exception_log_filter_opt(opts) do
119+
case opts[:exception_log_filter] do
120+
{module, func_name} when is_atom(module) and is_atom(func_name) ->
121+
{module, func_name}
122+
123+
nil ->
124+
nil
125+
126+
invalid ->
127+
raise ArgumentError, "invalid exception log filter: #{inspect(invalid)}"
128+
end
129+
end
130+
113131
defp find_codec(subtype, content_type, server) do
114132
if codec = Enum.find(server.__meta__(:codecs), nil, fn c -> c.name() == subtype end) do
115133
{:ok, codec}
@@ -466,7 +484,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
466484

467485
[req: req]
468486
|> ReportException.new(error)
469-
|> log_error()
487+
|> maybe_log_error(state.exception_log_filter)
470488

471489
{:stop, req, state}
472490
end
@@ -488,12 +506,12 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
488506
end
489507

490508
# expected error raised from user to return error immediately
491-
def info({:EXIT, pid, {%RPCError{} = error, stacktrace}}, req, state = %{pid: pid}) do
509+
def info({:EXIT, pid, {:shutdown, {%RPCError{} = error, stacktrace}}}, req, state = %{pid: pid}) do
492510
req = send_error(req, error, state, :rpc_error)
493511

494512
[req: req]
495513
|> ReportException.new(error, stacktrace)
496-
|> log_error(stacktrace)
514+
|> maybe_log_error(state.exception_log_filter, stacktrace)
497515

498516
{:stop, req, state}
499517
end
@@ -506,7 +524,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
506524

507525
[req: req]
508526
|> ReportException.new(reason, stack, kind)
509-
|> log_error(stack)
527+
|> maybe_log_error(state.exception_log_filter, stack)
510528

511529
{:stop, req, state}
512530
end
@@ -517,7 +535,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
517535

518536
[req: req]
519537
|> ReportException.new(reason, stacktrace)
520-
|> log_error(stacktrace)
538+
|> maybe_log_error(state.exception_log_filter, stacktrace)
521539

522540
{:stop, req, state}
523541
end
@@ -550,7 +568,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
550568

551569
case result do
552570
{:error, %GRPC.RPCError{} = e} ->
553-
exit({e, _stacktrace = []})
571+
exit({:shutdown, {e, _stacktrace = []}})
554572

555573
{:error, %{kind: _kind, reason: _reason, stack: _stack} = e} ->
556574
exit({:handle_error, e})
@@ -705,11 +723,31 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
705723
{:wait, ref}
706724
end
707725

708-
defp log_error(%ReportException{kind: kind} = exception, stacktrace \\ []) do
726+
defp maybe_log_error(exception, filter, stacktrace \\ [])
727+
728+
defp maybe_log_error(
729+
%ReportException{} = exception,
730+
{module, func_name},
731+
stacktrace
732+
) do
733+
if apply(module, func_name, [exception]) do
734+
log_error(exception, stacktrace)
735+
else
736+
:ok
737+
end
738+
end
739+
740+
defp maybe_log_error(exception, nil, stacktrace) do
741+
log_error(exception, stacktrace)
742+
end
743+
744+
defp log_error(%ReportException{kind: kind} = exception, stacktrace) do
709745
crash_reason = GRPC.Logger.crash_reason(kind, exception, stacktrace)
710746

711747
kind
712748
|> Exception.format(exception, stacktrace)
713749
|> Logger.error(crash_reason: crash_reason)
750+
751+
:ok
714752
end
715753
end

lib/grpc/server/supervisor.ex

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ defmodule GRPC.Server.Supervisor do
4141
* `:endpoint` - defines the endpoint module that will be started.
4242
* `:port` - the HTTP port for the endpoint.
4343
* `:servers` - the list of servers that will be be started.
44+
* `:exception_log_filter` - a `{module, function :: atom}` tuple that refers to a filter function of arity 1.
45+
This function will be called with a `GRPC.Server.Adapters.ReportException` struct and must return a boolean
46+
indicating whether or not a given exception should be logged or dropped. Defaults to `nil`, which means all exceptions will be logged.
4447
* `:adapter_opts` - options for the adapter.
4548
4649
Either `:endpoint` or `:servers` must be present, but not both.
@@ -62,13 +65,20 @@ defmodule GRPC.Server.Supervisor do
6265
end
6366

6467
opts =
65-
case Keyword.validate(opts, [:endpoint, :servers, :start_server, :port, :adapter_opts]) do
68+
case Keyword.validate(opts, [
69+
:endpoint,
70+
:servers,
71+
:start_server,
72+
:port,
73+
:adapter_opts,
74+
:exception_log_filter
75+
]) do
6676
{:ok, _opts} ->
6777
opts
6878

6979
{:error, _} ->
7080
raise ArgumentError,
71-
"just [:endpoint, :servers, :start_server, :port, :adapter_opts] are accepted as arguments, and any other keys for adapters should be passed as adapter_opts!"
81+
"just [:endpoint, :servers, :start_server, :port, :adapter_opts, :exception_log_filter] are accepted as arguments, and any other keys for adapters should be passed as adapter_opts!"
7282
end
7383

7484
case validate_cred(opts) do
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
defmodule GRPC.Client.SupervisorTest do
2+
use ExUnit.Case, async: false
3+
4+
alias GRPC.Client
5+
6+
describe "start_link/1" do
7+
test "allows multiple start_links" do
8+
{:ok, second_pid} = Client.Supervisor.start_link([])
9+
{:ok, third_pid} = Client.Supervisor.start_link([])
10+
11+
assert second_pid == third_pid
12+
end
13+
end
14+
end

test/grpc/integration/server_test.exs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,16 @@ defmodule GRPC.Integration.ServerTest do
208208
end
209209
end
210210

211+
defmodule ExceptionLogFilter do
212+
def always_allow(_exception) do
213+
true
214+
end
215+
216+
def never_allow(_exception) do
217+
false
218+
end
219+
end
220+
211221
test "multiple servers works" do
212222
run_server([FeatureServer, HelloServer], fn port ->
213223
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
@@ -277,6 +287,110 @@ defmodule GRPC.Integration.ServerTest do
277287
assert logs =~ "Exception raised while handling /helloworld.Greeter/SayHello"
278288
end
279289

290+
test "logs error if exception_log_filter returns true" do
291+
logs =
292+
ExUnit.CaptureLog.capture_log(fn ->
293+
run_server(
294+
[HelloErrorServer],
295+
fn port ->
296+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
297+
req = %Helloworld.HelloRequest{name: "unknown error"}
298+
Helloworld.Greeter.Stub.say_hello(channel, req)
299+
end,
300+
0,
301+
exception_log_filter: {ExceptionLogFilter, :always_allow}
302+
)
303+
end)
304+
305+
assert logs =~ "Exception raised while handling /helloworld.Greeter/SayHello"
306+
end
307+
308+
test "does not log error if exception_log_filter returns false" do
309+
logs =
310+
ExUnit.CaptureLog.capture_log(fn ->
311+
run_server(
312+
[HelloErrorServer],
313+
fn port ->
314+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
315+
req = %Helloworld.HelloRequest{name: "unknown error"}
316+
Helloworld.Greeter.Stub.say_hello(channel, req)
317+
end,
318+
0,
319+
exception_log_filter: {TestFalseFilter, :never_allow}
320+
)
321+
end)
322+
323+
refute logs =~ "Exception raised while handling /helloworld.Greeter/SayHello"
324+
end
325+
326+
defmodule ExceptionFilterMustBeRPCError do
327+
def filter(exception) do
328+
data = exception.adapter_extra[:req][:headers]["test-data"]
329+
330+
{pid, ref} = :erlang.binary_to_term(data)
331+
send(pid, {:exception_log_filter, ref, exception})
332+
333+
true
334+
end
335+
end
336+
337+
test "passes RPCErrors to `exception_log_filter" do
338+
test_pid = self()
339+
ref = make_ref()
340+
341+
run_server(
342+
[HelloErrorServer],
343+
fn port ->
344+
{:ok, channel} =
345+
GRPC.Stub.connect("localhost:#{port}",
346+
headers: [{"test-data", :erlang.term_to_binary({test_pid, ref})}]
347+
)
348+
349+
req = %Helloworld.HelloRequest{name: "world"}
350+
Helloworld.Greeter.Stub.say_hello(channel, req)
351+
end,
352+
0,
353+
exception_log_filter: {ExceptionFilterMustBeRPCError, :filter}
354+
)
355+
356+
assert_receive {:exception_log_filter, ^ref,
357+
%GRPC.Server.Adapters.ReportException{reason: %GRPC.RPCError{}}}
358+
end
359+
360+
defmodule ExceptionFilterMustBeRaisedError do
361+
def filter(exception) do
362+
data = exception.adapter_extra[:req][:headers]["test-data"]
363+
364+
{pid, ref} = :erlang.binary_to_term(data)
365+
send(pid, {:exception_log_filter, ref, exception})
366+
367+
true
368+
end
369+
end
370+
371+
test "passes thrown exceptions to `exception_log_filter" do
372+
test_pid = self()
373+
ref = make_ref()
374+
375+
run_server(
376+
[HelloErrorServer],
377+
fn port ->
378+
{:ok, channel} =
379+
GRPC.Stub.connect("localhost:#{port}",
380+
headers: [{"test-data", :erlang.term_to_binary({test_pid, ref})}]
381+
)
382+
383+
req = %Helloworld.HelloRequest{name: "unknown error", duration: 0}
384+
Helloworld.Greeter.Stub.say_hello(channel, req)
385+
end,
386+
0,
387+
exception_log_filter: {ExceptionFilterMustBeRaisedError, :filter}
388+
)
389+
390+
assert_receive {:exception_log_filter, ^ref,
391+
%GRPC.Server.Adapters.ReportException{reason: %RuntimeError{}}}
392+
end
393+
280394
test "returns appropriate error for stream requests" do
281395
run_server([FeatureErrorServer], fn port ->
282396
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")

0 commit comments

Comments
 (0)