From 695f86d75a9a92d80dcd7ee2d2c84104d0280e3b Mon Sep 17 00:00:00 2001 From: Ross Lampard <122778195+avenderossa@users.noreply.github.com> Date: Thu, 18 Jun 2026 13:49:35 +0200 Subject: [PATCH 1/2] add drain on shutdown opt --- lib/producer.ex | 103 +++++++++++++++++++++++++++++++++-------- test/producer_test.exs | 50 +++++++++++++++++--- 2 files changed, 128 insertions(+), 25 deletions(-) diff --git a/lib/producer.ex b/lib/producer.ex index faf015f..e558bba 100644 --- a/lib/producer.ex +++ b/lib/producer.ex @@ -1,47 +1,99 @@ defmodule ProducerQueue.Producer do @moduledoc """ - A simple implementation of a GenStage producer + A simple implementation of a GenStage producer backed by a `ProducerQueue.Queue`. + + ## Draining on shutdown + + Pass `drain_on_shutdown: true` to opt in to flushing the linked queue into the + pipeline when the producer is asked to drain. Under Broadway this happens on + graceful shutdown (SIGTERM/releases): Broadway invokes `prepare_for_draining/1` + before stopping the producer, giving us the chance to emit whatever is still + buffered in the queue so it is processed instead of lost. + + Note this only protects against *graceful* shutdown - a SIGKILL/OOM still drops + the in-memory queue. + + Broadway detects `prepare_for_draining/1` via `function_exported?/3`, so no + `Broadway.Producer` behaviour (or compile-time Broadway dependency) is required. + + When draining is enabled the GenStage `:buffer_size` defaults to `:infinity`. In steady state + the producer only ever emits exactly the demand it was handed, so the buffer + sits empty regardless of the cap - the cap is a limit, not an allocation. + Draining consumers should override `:buffer_size` to comfortably exceed their + worst-case queue depth at shutdown. """ use GenStage - @typedoc """ - {demand_count, queue_module, check_interval_in_ms, timer} - """ - @type producer_state :: {pos_integer(), atom(), pos_integer(), nil | reference()} + defstruct demand: 0, queue: nil, check_interval: 500, timer: nil, drain_on_shutdown: false + + @type t :: %__MODULE__{ + demand: non_neg_integer(), + queue: atom() | pid() | nil, + check_interval: pos_integer(), + timer: nil | reference(), + drain_on_shutdown: boolean() + } + + @drain_chunk_size 500 @doc """ - Start a `ProducerQueue.Producer` linked to a `ProducerQueue.Queue` + Start a `ProducerQueue.Producer` linked to a `ProducerQueue.Queue`. + + Options: + * `:queue` - the queue to pull from (required) + * `:check_interval` - ms between dispatch attempts when demand is unmet (default 500) + * `:drain_on_shutdown` - flush the queue into the pipeline on graceful shutdown (default false) + * `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default) """ def start_link(opts \\ []), do: GenStage.start_link(__MODULE__, opts) @impl true - @spec init(opts :: []) :: {:producer, producer_state()} + @spec init(opts :: keyword()) :: {:producer, t(), keyword()} def init(opts) do - state = {0, Keyword.get(opts, :queue), Keyword.get(opts, :check_interval, 500), nil} - {:producer, state} + drain? = Keyword.get(opts, :drain_on_shutdown, false) + + state = %__MODULE__{ + queue: Keyword.get(opts, :queue), + check_interval: Keyword.get(opts, :check_interval, 500), + drain_on_shutdown: drain? + } + + {:producer, state, buffer_size: buffer_size(opts, drain?)} end @impl true - def handle_info(:dispatch_events, {_, _, _, nil} = state) do + def handle_info(:dispatch_events, %__MODULE__{timer: nil} = state) do {:noreply, [], state} end - def handle_info(:dispatch_events, {demand, queue, check_interval, _}) do - dispatch_events({demand, queue, check_interval, nil}) + def handle_info(:dispatch_events, state) do + dispatch_events(%{state | timer: nil}) end @impl true - def handle_demand(new_demand, {demand, queue, check_interval, timer}) do - dispatch_events({demand + new_demand, queue, check_interval, timer}) + def handle_demand(new_demand, %__MODULE__{demand: demand} = state) do + dispatch_events(%{state | demand: demand + new_demand}) + end + + @doc """ + Invoked by Broadway right before draining on graceful shutdown. When + `drain_on_shutdown` is enabled, flush everything still in the queue into the + pipeline so it is processed before the producer stops. Otherwise it is a no-op. + """ + def prepare_for_draining(%__MODULE__{drain_on_shutdown: true} = state) do + if state.timer, do: Process.cancel_timer(state.timer) + {:noreply, drain_queue(state.queue), %{state | demand: 0, timer: nil}} end - defp dispatch_events({demand, queue, check_interval, nil}) do - events = ProducerQueue.Queue.pop(queue, demand) - demand = demand - length(events) - timer = requeue_dispatch(events, demand, check_interval) + def prepare_for_draining(state), do: {:noreply, [], state} - {:noreply, events, {demand, queue, check_interval, timer}} + defp dispatch_events(%__MODULE__{timer: nil} = state) do + events = ProducerQueue.Queue.pop(state.queue, state.demand) + demand = state.demand - length(events) + timer = requeue_dispatch(events, demand, state.check_interval) + + {:noreply, events, %{state | demand: demand, timer: timer}} end # this prevents dispatch requeue until the previous dispatch_events message is received @@ -57,4 +109,17 @@ defmodule ProducerQueue.Producer do # demand not satisfied and events available - try to satisfy demand immediately defp requeue_dispatch(_, _, _), do: Process.send_after(self(), :dispatch_events, 0) + + # Pop the whole queue in FIFO order, in bounded chunks. + defp drain_queue(queue), do: queue |> drain_chunks([]) |> Enum.concat() + + defp drain_chunks(queue, acc) do + case ProducerQueue.Queue.pop(queue, @drain_chunk_size) do + [] -> Enum.reverse(acc) + events -> drain_chunks(queue, [events | acc]) + end + end + + defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity) + defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000) end diff --git a/test/producer_test.exs b/test/producer_test.exs index 5c54eb8..e8edad1 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -7,7 +7,7 @@ defmodule ProducerQueue.ProducerTest do setup do {:ok, queue} = Queue.start_link() - [state: {0, queue, 10, nil}, queue: queue] + [state: %Producer{queue: queue, check_interval: 10}, queue: queue] end test "handle zero demand with zero backlog", %{state: state} do @@ -15,19 +15,26 @@ defmodule ProducerQueue.ProducerTest do refute_receive :dispatch_events end - test "handle demand with zero backlog", %{state: {_, queue, check_interval, _} = state} do + test "handle demand with zero backlog", %{ + state: %Producer{check_interval: check_interval} = state, + queue: queue + } do :ok = Queue.push(queue, '123') - expected_state = {0, queue, check_interval, nil} + expected_state = %Producer{queue: queue, check_interval: check_interval} assert {:noreply, '123', ^expected_state} = Producer.handle_demand(3, state) assert Queue.pop(queue) == [] refute_receive :dispatch_events end - test "handle demand with backlog - basic", %{state: {_, queue, check_interval, _} = state} do + test "handle demand with backlog - basic", %{ + state: %Producer{check_interval: check_interval} = state, + queue: queue + } do :ok = Queue.push(queue, '12') - assert {:noreply, '12', {1, ^queue, ^check_interval, timer}} = + assert {:noreply, '12', + %Producer{demand: 1, queue: ^queue, check_interval: ^check_interval, timer: timer}} = Producer.handle_demand(3, state) assert is_reference(timer) @@ -35,7 +42,10 @@ defmodule ProducerQueue.ProducerTest do assert_receive :dispatch_events end - test "handle demand with backlog", %{state: {_, queue, check_interval, _}} do + test "handle demand with backlog", %{ + state: %Producer{check_interval: check_interval}, + queue: queue + } do :ok = Queue.push(queue, '12') {:ok, producer} = Producer.start_link(check_interval: 10, queue: queue) {:ok, consumer} = TestConsumer.start_link(producer) @@ -48,6 +58,34 @@ defmodule ProducerQueue.ProducerTest do assert TestConsumer.get_events_count(consumer) == 3 end + + describe "prepare_for_draining/1" do + test "flushes the entire queue in FIFO order when enabled", %{queue: queue} do + state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true} + backlog = Enum.to_list(1..2_500) + :ok = Queue.push(queue, backlog) + + assert {:noreply, ^backlog, %Producer{demand: 0, timer: nil, drain_on_shutdown: true}} = + Producer.prepare_for_draining(state) + + assert Queue.pop(queue, 1) == [] + end + + test "is a no-op when not enabled, leaving the queue intact", %{state: state, queue: queue} do + :ok = Queue.push(queue, [1, 2, 3]) + + assert {:noreply, [], ^state} = Producer.prepare_for_draining(state) + assert Queue.pop(queue, 3) == [1, 2, 3] + end + + test "cancels a pending dispatch timer while draining", %{queue: queue} do + timer = Process.send_after(self(), :dispatch_events, 60_000) + state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true} + + assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state) + assert Process.cancel_timer(timer) == false + end + end end defmodule ProducerQueue.TestConsumer do From c8d50384a6006739715559a01fac9c2a8fb2cdd4 Mon Sep 17 00:00:00 2001 From: Ross Lampard <122778195+avenderossa@users.noreply.github.com> Date: Fri, 19 Jun 2026 14:58:42 +0200 Subject: [PATCH 2/2] charlists --- test/producer_test.exs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/producer_test.exs b/test/producer_test.exs index efb87a3..373ebd1 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -19,7 +19,7 @@ defmodule ProducerQueue.ProducerTest do state: %Producer{check_interval: check_interval} = state, queue: queue } do - :ok = Queue.push(queue, '123') + :ok = Queue.push(queue, ~c"123") expected_state = %Producer{queue: queue, check_interval: check_interval} assert {:noreply, ~c"123", ^expected_state} = Producer.handle_demand(3, state) @@ -31,9 +31,9 @@ defmodule ProducerQueue.ProducerTest do state: %Producer{check_interval: check_interval} = state, queue: queue } do - :ok = Queue.push(queue, '12') + :ok = Queue.push(queue, ~c"12") - assert {:noreply, '12', + assert {:noreply, ~c"12", %Producer{demand: 1, queue: ^queue, check_interval: ^check_interval, timer: timer}} = Producer.handle_demand(3, state) @@ -46,7 +46,7 @@ defmodule ProducerQueue.ProducerTest do state: %Producer{check_interval: check_interval}, queue: queue } do - :ok = Queue.push(queue, '12') + :ok = Queue.push(queue, ~c"12") {:ok, producer} = Producer.start_link(check_interval: 10, queue: queue) {:ok, consumer} = TestConsumer.start_link(producer)