-
Notifications
You must be signed in to change notification settings - Fork 0
Add drain on shutdown option #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,47 +1,99 @@ | ||||||||||||||||||||||||||
| defmodule ProducerQueue.Producer do | ||||||||||||||||||||||||||
| @moduledoc """ | ||||||||||||||||||||||||||
| A simple implementation of a GenStage producer | ||||||||||||||||||||||||||
| A simple implementation of a GenStage producer backed by a `ProducerQueue.Queue`. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| ## Draining on shutdown | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Pass `drain_on_shutdown: true` to opt in to flushing the linked queue into the | ||||||||||||||||||||||||||
| pipeline when the producer is asked to drain. Under Broadway this happens on | ||||||||||||||||||||||||||
| graceful shutdown (SIGTERM/releases): Broadway invokes `prepare_for_draining/1` | ||||||||||||||||||||||||||
| before stopping the producer, giving us the chance to emit whatever is still | ||||||||||||||||||||||||||
| buffered in the queue so it is processed instead of lost. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Note this only protects against *graceful* shutdown - a SIGKILL/OOM still drops | ||||||||||||||||||||||||||
| the in-memory queue. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Broadway detects `prepare_for_draining/1` via `function_exported?/3`, so no | ||||||||||||||||||||||||||
| `Broadway.Producer` behaviour (or compile-time Broadway dependency) is required. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| When draining is enabled the GenStage `:buffer_size` defaults to `:infinity`. In steady state | ||||||||||||||||||||||||||
| the producer only ever emits exactly the demand it was handed, so the buffer | ||||||||||||||||||||||||||
| sits empty regardless of the cap - the cap is a limit, not an allocation. | ||||||||||||||||||||||||||
| Draining consumers should override `:buffer_size` to comfortably exceed their | ||||||||||||||||||||||||||
| worst-case queue depth at shutdown. | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| use GenStage | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @typedoc """ | ||||||||||||||||||||||||||
| {demand_count, queue_module, check_interval_in_ms, timer} | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| @type producer_state :: {pos_integer(), atom(), pos_integer(), nil | reference()} | ||||||||||||||||||||||||||
| defstruct demand: 0, queue: nil, check_interval: 500, timer: nil, drain_on_shutdown: false | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @type t :: %__MODULE__{ | ||||||||||||||||||||||||||
| demand: non_neg_integer(), | ||||||||||||||||||||||||||
| queue: atom() | pid() | nil, | ||||||||||||||||||||||||||
| check_interval: pos_integer(), | ||||||||||||||||||||||||||
| timer: nil | reference(), | ||||||||||||||||||||||||||
| drain_on_shutdown: boolean() | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @drain_chunk_size 500 | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @doc """ | ||||||||||||||||||||||||||
| Start a `ProducerQueue.Producer` linked to a `ProducerQueue.Queue` | ||||||||||||||||||||||||||
| Start a `ProducerQueue.Producer` linked to a `ProducerQueue.Queue`. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Options: | ||||||||||||||||||||||||||
| * `:queue` - the queue to pull from (required) | ||||||||||||||||||||||||||
| * `:check_interval` - ms between dispatch attempts when demand is unmet (default 500) | ||||||||||||||||||||||||||
| * `:drain_on_shutdown` - flush the queue into the pipeline on graceful shutdown (default false) | ||||||||||||||||||||||||||
| * `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default) | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In code, draining-default is infinity. Also they can change the GenStage default at any time. Below comments revert logic so it leaves this value unset so the real GenStage default will be used, rather than apply a hardcoded 10000:
Suggested change
|
||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| def start_link(opts \\ []), do: GenStage.start_link(__MODULE__, opts) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @impl true | ||||||||||||||||||||||||||
| @spec init(opts :: []) :: {:producer, producer_state()} | ||||||||||||||||||||||||||
| @spec init(opts :: keyword()) :: {:producer, t(), keyword()} | ||||||||||||||||||||||||||
| def init(opts) do | ||||||||||||||||||||||||||
| state = {0, Keyword.get(opts, :queue), Keyword.get(opts, :check_interval, 500), nil} | ||||||||||||||||||||||||||
| {:producer, state} | ||||||||||||||||||||||||||
| drain? = Keyword.get(opts, :drain_on_shutdown, false) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| state = %__MODULE__{ | ||||||||||||||||||||||||||
| queue: Keyword.get(opts, :queue), | ||||||||||||||||||||||||||
| check_interval: Keyword.get(opts, :check_interval, 500), | ||||||||||||||||||||||||||
| drain_on_shutdown: drain? | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| {:producer, state, buffer_size: buffer_size(opts, drain?)} | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @impl true | ||||||||||||||||||||||||||
| def handle_info(:dispatch_events, {_, _, _, nil} = state) do | ||||||||||||||||||||||||||
| def handle_info(:dispatch_events, %__MODULE__{timer: nil} = state) do | ||||||||||||||||||||||||||
| {:noreply, [], state} | ||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def handle_info(:dispatch_events, {demand, queue, check_interval, _}) do | ||||||||||||||||||||||||||
| dispatch_events({demand, queue, check_interval, nil}) | ||||||||||||||||||||||||||
| def handle_info(:dispatch_events, state) do | ||||||||||||||||||||||||||
| dispatch_events(%{state | timer: nil}) | ||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @impl true | ||||||||||||||||||||||||||
| def handle_demand(new_demand, {demand, queue, check_interval, timer}) do | ||||||||||||||||||||||||||
| dispatch_events({demand + new_demand, queue, check_interval, timer}) | ||||||||||||||||||||||||||
| def handle_demand(new_demand, %__MODULE__{demand: demand} = state) do | ||||||||||||||||||||||||||
| dispatch_events(%{state | demand: demand + new_demand}) | ||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @doc """ | ||||||||||||||||||||||||||
| Invoked by Broadway right before draining on graceful shutdown. When | ||||||||||||||||||||||||||
| `drain_on_shutdown` is enabled, flush everything still in the queue into the | ||||||||||||||||||||||||||
| pipeline so it is processed before the producer stops. Otherwise it is a no-op. | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| def prepare_for_draining(%__MODULE__{drain_on_shutdown: true} = state) do | ||||||||||||||||||||||||||
| if state.timer, do: Process.cancel_timer(state.timer) | ||||||||||||||||||||||||||
| {:noreply, drain_queue(state.queue), %{state | demand: 0, timer: nil}} | ||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| defp dispatch_events({demand, queue, check_interval, nil}) do | ||||||||||||||||||||||||||
| events = ProducerQueue.Queue.pop(queue, demand) | ||||||||||||||||||||||||||
| demand = demand - length(events) | ||||||||||||||||||||||||||
| timer = requeue_dispatch(events, demand, check_interval) | ||||||||||||||||||||||||||
| def prepare_for_draining(state), do: {:noreply, [], state} | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| {:noreply, events, {demand, queue, check_interval, timer}} | ||||||||||||||||||||||||||
| defp dispatch_events(%__MODULE__{timer: nil} = state) do | ||||||||||||||||||||||||||
| events = ProducerQueue.Queue.pop(state.queue, state.demand) | ||||||||||||||||||||||||||
| demand = state.demand - length(events) | ||||||||||||||||||||||||||
| timer = requeue_dispatch(events, demand, state.check_interval) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| {:noreply, events, %{state | demand: demand, timer: timer}} | ||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # this prevents dispatch requeue until the previous dispatch_events message is received | ||||||||||||||||||||||||||
|
|
@@ -57,4 +109,17 @@ defmodule ProducerQueue.Producer do | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # demand not satisfied and events available - try to satisfy demand immediately | ||||||||||||||||||||||||||
| defp requeue_dispatch(_, _, _), do: Process.send_after(self(), :dispatch_events, 0) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Pop the whole queue in FIFO order, in bounded chunks. | ||||||||||||||||||||||||||
| defp drain_queue(queue), do: queue |> drain_chunks([]) |> Enum.concat() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| defp drain_chunks(queue, acc) do | ||||||||||||||||||||||||||
| case ProducerQueue.Queue.pop(queue, @drain_chunk_size) do | ||||||||||||||||||||||||||
| [] -> Enum.reverse(acc) | ||||||||||||||||||||||||||
| events -> drain_chunks(queue, [events | acc]) | ||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity) | ||||||||||||||||||||||||||
| defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000) | ||||||||||||||||||||||||||
|
Comment on lines
+123
to
+124
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,35 +7,45 @@ defmodule ProducerQueue.ProducerTest do | |||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| setup do | ||||||||||||||||||||||||||||||||||||||||||
| {:ok, queue} = Queue.start_link() | ||||||||||||||||||||||||||||||||||||||||||
| [state: {0, queue, 10, nil}, queue: queue] | ||||||||||||||||||||||||||||||||||||||||||
| [state: %Producer{queue: queue, check_interval: 10}, queue: queue] | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| test "handle zero demand with zero backlog", %{state: state} do | ||||||||||||||||||||||||||||||||||||||||||
| assert {:noreply, [], ^state} = Producer.handle_demand(0, state) | ||||||||||||||||||||||||||||||||||||||||||
| refute_receive :dispatch_events | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| test "handle demand with zero backlog", %{state: {_, queue, check_interval, _} = state} do | ||||||||||||||||||||||||||||||||||||||||||
| test "handle demand with zero backlog", %{ | ||||||||||||||||||||||||||||||||||||||||||
| state: %Producer{check_interval: check_interval} = state, | ||||||||||||||||||||||||||||||||||||||||||
| queue: queue | ||||||||||||||||||||||||||||||||||||||||||
| } do | ||||||||||||||||||||||||||||||||||||||||||
| :ok = Queue.push(queue, ~c"123") | ||||||||||||||||||||||||||||||||||||||||||
| expected_state = {0, queue, check_interval, nil} | ||||||||||||||||||||||||||||||||||||||||||
| expected_state = %Producer{queue: queue, check_interval: check_interval} | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert {:noreply, ~c"123", ^expected_state} = Producer.handle_demand(3, state) | ||||||||||||||||||||||||||||||||||||||||||
| assert Queue.pop(queue) == [] | ||||||||||||||||||||||||||||||||||||||||||
| refute_receive :dispatch_events | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| test "handle demand with backlog - basic", %{state: {_, queue, check_interval, _} = state} do | ||||||||||||||||||||||||||||||||||||||||||
| test "handle demand with backlog - basic", %{ | ||||||||||||||||||||||||||||||||||||||||||
| state: %Producer{check_interval: check_interval} = state, | ||||||||||||||||||||||||||||||||||||||||||
| queue: queue | ||||||||||||||||||||||||||||||||||||||||||
| } do | ||||||||||||||||||||||||||||||||||||||||||
| :ok = Queue.push(queue, ~c"12") | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert {:noreply, ~c"12", {1, ^queue, ^check_interval, timer}} = | ||||||||||||||||||||||||||||||||||||||||||
| assert {:noreply, ~c"12", | ||||||||||||||||||||||||||||||||||||||||||
| %Producer{demand: 1, queue: ^queue, check_interval: ^check_interval, timer: timer}} = | ||||||||||||||||||||||||||||||||||||||||||
| Producer.handle_demand(3, state) | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert is_reference(timer) | ||||||||||||||||||||||||||||||||||||||||||
| assert Queue.pop(queue) == [] | ||||||||||||||||||||||||||||||||||||||||||
| assert_receive :dispatch_events | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| test "handle demand with backlog", %{state: {_, queue, check_interval, _}} do | ||||||||||||||||||||||||||||||||||||||||||
| test "handle demand with backlog", %{ | ||||||||||||||||||||||||||||||||||||||||||
| state: %Producer{check_interval: check_interval}, | ||||||||||||||||||||||||||||||||||||||||||
| queue: queue | ||||||||||||||||||||||||||||||||||||||||||
| } do | ||||||||||||||||||||||||||||||||||||||||||
| :ok = Queue.push(queue, ~c"12") | ||||||||||||||||||||||||||||||||||||||||||
| {:ok, producer} = Producer.start_link(check_interval: 10, queue: queue) | ||||||||||||||||||||||||||||||||||||||||||
| {:ok, consumer} = TestConsumer.start_link(producer) | ||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -48,6 +58,34 @@ defmodule ProducerQueue.ProducerTest do | |||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert TestConsumer.get_events_count(consumer) == 3 | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| describe "prepare_for_draining/1" do | ||||||||||||||||||||||||||||||||||||||||||
| test "flushes the entire queue in FIFO order when enabled", %{queue: queue} do | ||||||||||||||||||||||||||||||||||||||||||
| state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true} | ||||||||||||||||||||||||||||||||||||||||||
| backlog = Enum.to_list(1..2_500) | ||||||||||||||||||||||||||||||||||||||||||
| :ok = Queue.push(queue, backlog) | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert {:noreply, ^backlog, %Producer{demand: 0, timer: nil, drain_on_shutdown: true}} = | ||||||||||||||||||||||||||||||||||||||||||
| Producer.prepare_for_draining(state) | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert Queue.pop(queue, 1) == [] | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
| test "is a no-op when not enabled, leaving the queue intact", %{state: state, queue: queue} do | ||||||||||||||||||||||||||||||||||||||||||
| :ok = Queue.push(queue, [1, 2, 3]) | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert {:noreply, [], ^state} = Producer.prepare_for_draining(state) | ||||||||||||||||||||||||||||||||||||||||||
| assert Queue.pop(queue, 3) == [1, 2, 3] | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| test "cancels a pending dispatch timer while draining", %{queue: queue} do | ||||||||||||||||||||||||||||||||||||||||||
| timer = Process.send_after(self(), :dispatch_events, 60_000) | ||||||||||||||||||||||||||||||||||||||||||
| state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true} | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state) | ||||||||||||||||||||||||||||||||||||||||||
| assert Process.cancel_timer(timer) == false | ||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+82
to
+86
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nifty way of doing a much stronger assertion in a scenario like this (if the timer fails to cancel, we'd see the event within around 20ms - so refute_receive for 50ms is a strong test)
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| defmodule ProducerQueue.TestConsumer do | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth considering:
Undersized
:buffer_sizesilently drops drained eventsOn graceful shutdown
prepare_for_draining/1emits the whole queue as events → GenStage buffers them, capped by:buffer_size.:infinity→ safe.:buffer_sizeto a finite N < queue depth at shutdown → GenStage drops the overflow, logs an error only → silent data loss — the exact failure the feature exists to prevent.Doc says "size the buffer to exceed worst-case depth" but never warns that undersizing loses data. Fix = add that warning to the moduledoc.