Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 84 additions & 19 deletions lib/producer.ex
Original file line number Diff line number Diff line change
@@ -1,47 +1,99 @@
defmodule ProducerQueue.Producer do
@moduledoc """
A simple implementation of a GenStage producer
A simple implementation of a GenStage producer backed by a `ProducerQueue.Queue`.

## Draining on shutdown

Pass `drain_on_shutdown: true` to opt in to flushing the linked queue into the
pipeline when the producer is asked to drain. Under Broadway this happens on
graceful shutdown (SIGTERM/releases): Broadway invokes `prepare_for_draining/1`
before stopping the producer, giving us the chance to emit whatever is still
buffered in the queue so it is processed instead of lost.

Note this only protects against *graceful* shutdown - a SIGKILL/OOM still drops
the in-memory queue.

Broadway detects `prepare_for_draining/1` via `function_exported?/3`, so no
`Broadway.Producer` behaviour (or compile-time Broadway dependency) is required.

When draining is enabled the GenStage `:buffer_size` defaults to `:infinity`. In steady state
the producer only ever emits exactly the demand it was handed, so the buffer
sits empty regardless of the cap - the cap is a limit, not an allocation.
Draining consumers should override `:buffer_size` to comfortably exceed their
worst-case queue depth at shutdown.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth considering:

Undersized :buffer_size silently drops drained events

On graceful shutdown prepare_for_draining/1 emits the whole queue as events → GenStage buffers them, capped by :buffer_size.

  • Draining default :infinity → safe.
  • Caller overrides :buffer_size to a finite N < queue depth at shutdown → GenStage drops the overflow, logs an error only → silent data loss — the exact failure the feature exists to prevent.

Doc says "size the buffer to exceed worst-case depth" but never warns that undersizing loses data. Fix = add that warning to the moduledoc.

"""

use GenStage

@typedoc """
{demand_count, queue_module, check_interval_in_ms, timer}
"""
@type producer_state :: {pos_integer(), atom(), pos_integer(), nil | reference()}
defstruct demand: 0, queue: nil, check_interval: 500, timer: nil, drain_on_shutdown: false

@type t :: %__MODULE__{
demand: non_neg_integer(),
queue: atom() | pid() | nil,
check_interval: pos_integer(),
timer: nil | reference(),
drain_on_shutdown: boolean()
}

@drain_chunk_size 500

@doc """
Start a `ProducerQueue.Producer` linked to a `ProducerQueue.Queue`
Start a `ProducerQueue.Producer` linked to a `ProducerQueue.Queue`.

Options:
* `:queue` - the queue to pull from (required)
* `:check_interval` - ms between dispatch attempts when demand is unmet (default 500)
* `:drain_on_shutdown` - flush the queue into the pipeline on graceful shutdown (default false)
* `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In code, draining-default is infinity.

Also they can change the GenStage default at any time. Below comments revert logic so it leaves this value unset so the real GenStage default will be used, rather than apply a hardcoded 10000:

Suggested change
* `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default)
* `:buffer_size` - GenStage producer buffer (default `:infinity` when
draining; otherwise unset, so GenStage's own default applies)

"""
def start_link(opts \\ []), do: GenStage.start_link(__MODULE__, opts)

@impl true
@spec init(opts :: []) :: {:producer, producer_state()}
@spec init(opts :: keyword()) :: {:producer, t(), keyword()}
def init(opts) do
state = {0, Keyword.get(opts, :queue), Keyword.get(opts, :check_interval, 500), nil}
{:producer, state}
drain? = Keyword.get(opts, :drain_on_shutdown, false)

state = %__MODULE__{
queue: Keyword.get(opts, :queue),
check_interval: Keyword.get(opts, :check_interval, 500),
drain_on_shutdown: drain?
}

{:producer, state, buffer_size: buffer_size(opts, drain?)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{:producer, state, buffer_size: buffer_size(opts, drain?)}
producer(state, opts, drain?)

end

@impl true
def handle_info(:dispatch_events, {_, _, _, nil} = state) do
def handle_info(:dispatch_events, %__MODULE__{timer: nil} = state) do
{:noreply, [], state}
end

def handle_info(:dispatch_events, {demand, queue, check_interval, _}) do
dispatch_events({demand, queue, check_interval, nil})
def handle_info(:dispatch_events, state) do
dispatch_events(%{state | timer: nil})
end

@impl true
def handle_demand(new_demand, {demand, queue, check_interval, timer}) do
dispatch_events({demand + new_demand, queue, check_interval, timer})
def handle_demand(new_demand, %__MODULE__{demand: demand} = state) do
dispatch_events(%{state | demand: demand + new_demand})
end

@doc """
Invoked by Broadway right before draining on graceful shutdown. When
`drain_on_shutdown` is enabled, flush everything still in the queue into the
pipeline so it is processed before the producer stops. Otherwise it is a no-op.
"""
def prepare_for_draining(%__MODULE__{drain_on_shutdown: true} = state) do
if state.timer, do: Process.cancel_timer(state.timer)
{:noreply, drain_queue(state.queue), %{state | demand: 0, timer: nil}}
end

defp dispatch_events({demand, queue, check_interval, nil}) do
events = ProducerQueue.Queue.pop(queue, demand)
demand = demand - length(events)
timer = requeue_dispatch(events, demand, check_interval)
def prepare_for_draining(state), do: {:noreply, [], state}

{:noreply, events, {demand, queue, check_interval, timer}}
defp dispatch_events(%__MODULE__{timer: nil} = state) do
events = ProducerQueue.Queue.pop(state.queue, state.demand)
demand = state.demand - length(events)
timer = requeue_dispatch(events, demand, state.check_interval)

{:noreply, events, %{state | demand: demand, timer: timer}}
end

# this prevents dispatch requeue until the previous dispatch_events message is received
Expand All @@ -57,4 +109,17 @@ defmodule ProducerQueue.Producer do

# demand not satisfied and events available - try to satisfy demand immediately
defp requeue_dispatch(_, _, _), do: Process.send_after(self(), :dispatch_events, 0)

# Pop the whole queue in FIFO order, in bounded chunks.
defp drain_queue(queue), do: queue |> drain_chunks([]) |> Enum.concat()

defp drain_chunks(queue, acc) do
case ProducerQueue.Queue.pop(queue, @drain_chunk_size) do
[] -> Enum.reverse(acc)
events -> drain_chunks(queue, [events | acc])
end
end

defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity)
defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000)
Comment on lines +123 to +124

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity)
defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000)
defp producer(state, opts, true) do
{:producer, state, buffer_size: Keyword.get(opts, :buffer_size, :infinity)}
end
defp producer(state, opts, false) do
case Keyword.fetch(opts, :buffer_size) do
{:ok, size} -> {:producer, state, buffer_size: size}
:error -> {:producer, state}
end
end

end
50 changes: 44 additions & 6 deletions test/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,45 @@ defmodule ProducerQueue.ProducerTest do

setup do
{:ok, queue} = Queue.start_link()
[state: {0, queue, 10, nil}, queue: queue]
[state: %Producer{queue: queue, check_interval: 10}, queue: queue]
end

test "handle zero demand with zero backlog", %{state: state} do
assert {:noreply, [], ^state} = Producer.handle_demand(0, state)
refute_receive :dispatch_events
end

test "handle demand with zero backlog", %{state: {_, queue, check_interval, _} = state} do
test "handle demand with zero backlog", %{
state: %Producer{check_interval: check_interval} = state,
queue: queue
} do
:ok = Queue.push(queue, ~c"123")
expected_state = {0, queue, check_interval, nil}
expected_state = %Producer{queue: queue, check_interval: check_interval}

assert {:noreply, ~c"123", ^expected_state} = Producer.handle_demand(3, state)
assert Queue.pop(queue) == []
refute_receive :dispatch_events
end

test "handle demand with backlog - basic", %{state: {_, queue, check_interval, _} = state} do
test "handle demand with backlog - basic", %{
state: %Producer{check_interval: check_interval} = state,
queue: queue
} do
:ok = Queue.push(queue, ~c"12")

assert {:noreply, ~c"12", {1, ^queue, ^check_interval, timer}} =
assert {:noreply, ~c"12",
%Producer{demand: 1, queue: ^queue, check_interval: ^check_interval, timer: timer}} =
Producer.handle_demand(3, state)

assert is_reference(timer)
assert Queue.pop(queue) == []
assert_receive :dispatch_events
end

test "handle demand with backlog", %{state: {_, queue, check_interval, _}} do
test "handle demand with backlog", %{
state: %Producer{check_interval: check_interval},
queue: queue
} do
:ok = Queue.push(queue, ~c"12")
{:ok, producer} = Producer.start_link(check_interval: 10, queue: queue)
{:ok, consumer} = TestConsumer.start_link(producer)
Expand All @@ -48,6 +58,34 @@ defmodule ProducerQueue.ProducerTest do

assert TestConsumer.get_events_count(consumer) == 3
end

describe "prepare_for_draining/1" do
test "flushes the entire queue in FIFO order when enabled", %{queue: queue} do
state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true}
backlog = Enum.to_list(1..2_500)
:ok = Queue.push(queue, backlog)

assert {:noreply, ^backlog, %Producer{demand: 0, timer: nil, drain_on_shutdown: true}} =
Producer.prepare_for_draining(state)

assert Queue.pop(queue, 1) == []
end

@louisvisser louisvisser Jun 19, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test "flushes a backlog that is not a multiple of the drain chunk size", %{queue: queue} do
state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true}
backlog = Enum.to_list(1..2_501)
:ok = Queue.push(queue, backlog)
assert {:noreply, ^backlog, %Producer{demand: 0, timer: nil}} =
Producer.prepare_for_draining(state)
assert Queue.pop(queue, 1) == []
end
test "returns an empty list when enabled but the queue is already empty", %{queue: queue} do
state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true}
assert {:noreply, [], %Producer{demand: 0, timer: nil}} =
Producer.prepare_for_draining(state)
end

test "is a no-op when not enabled, leaving the queue intact", %{state: state, queue: queue} do
:ok = Queue.push(queue, [1, 2, 3])

assert {:noreply, [], ^state} = Producer.prepare_for_draining(state)
assert Queue.pop(queue, 3) == [1, 2, 3]
end

test "cancels a pending dispatch timer while draining", %{queue: queue} do
timer = Process.send_after(self(), :dispatch_events, 60_000)
state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true}

assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state)
assert Process.cancel_timer(timer) == false
Comment on lines +82 to +86

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nifty way of doing a much stronger assertion in a scenario like this (if the timer fails to cancel, we'd see the event within around 20ms - so refute_receive for 50ms is a strong test)

Suggested change
timer = Process.send_after(self(), :dispatch_events, 60_000)
state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true}
assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state)
assert Process.cancel_timer(timer) == false
timer = Process.send_after(self(), :dispatch_events, 20)
state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true}
assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state)
refute_receive :dispatch_events, 50

end
end
end

defmodule ProducerQueue.TestConsumer do
Expand Down