Skip to content

Add drain on shutdown option#5

Open
avenderossa wants to merge 3 commits into
masterfrom
ross/add-prepare-for-draining
Open

Add drain on shutdown option#5
avenderossa wants to merge 3 commits into
masterfrom
ross/add-prepare-for-draining

Conversation

@avenderossa

Copy link
Copy Markdown

No description provided.

Comment thread test/producer_test.exs

assert Queue.pop(queue, 1) == []
end

@louisvisser louisvisser Jun 19, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test "flushes a backlog that is not a multiple of the drain chunk size", %{queue: queue} do
state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true}
backlog = Enum.to_list(1..2_501)
:ok = Queue.push(queue, backlog)
assert {:noreply, ^backlog, %Producer{demand: 0, timer: nil}} =
Producer.prepare_for_draining(state)
assert Queue.pop(queue, 1) == []
end
test "returns an empty list when enabled but the queue is already empty", %{queue: queue} do
state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true}
assert {:noreply, [], %Producer{demand: 0, timer: nil}} =
Producer.prepare_for_draining(state)
end

Comment thread lib/producer.ex
* `:queue` - the queue to pull from (required)
* `:check_interval` - ms between dispatch attempts when demand is unmet (default 500)
* `:drain_on_shutdown` - flush the queue into the pipeline on graceful shutdown (default false)
* `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In code, draining-default is infinity.

Also they can change the GenStage default at any time. Below comments revert logic so it leaves this value unset so the real GenStage default will be used, rather than apply a hardcoded 10000:

Suggested change
* `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default)
* `:buffer_size` - GenStage producer buffer (default `:infinity` when
draining; otherwise unset, so GenStage's own default applies)

Comment thread lib/producer.ex
drain_on_shutdown: drain?
}

{:producer, state, buffer_size: buffer_size(opts, drain?)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{:producer, state, buffer_size: buffer_size(opts, drain?)}
producer(state, opts, drain?)

Comment thread lib/producer.ex
Comment on lines +123 to +124
defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity)
defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity)
defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000)
defp producer(state, opts, true) do
{:producer, state, buffer_size: Keyword.get(opts, :buffer_size, :infinity)}
end
defp producer(state, opts, false) do
case Keyword.fetch(opts, :buffer_size) do
{:ok, size} -> {:producer, state, buffer_size: size}
:error -> {:producer, state}
end
end

Comment thread test/producer_test.exs
Comment on lines +82 to +86
timer = Process.send_after(self(), :dispatch_events, 60_000)
state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true}

assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state)
assert Process.cancel_timer(timer) == false

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nifty way of doing a much stronger assertion in a scenario like this (if the timer fails to cancel, we'd see the event within around 20ms - so refute_receive for 50ms is a strong test)

Suggested change
timer = Process.send_after(self(), :dispatch_events, 60_000)
state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true}
assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state)
assert Process.cancel_timer(timer) == false
timer = Process.send_after(self(), :dispatch_events, 20)
state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true}
assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state)
refute_receive :dispatch_events, 50

Comment thread lib/producer.ex
the producer only ever emits exactly the demand it was handed, so the buffer
sits empty regardless of the cap - the cap is a limit, not an allocation.
Draining consumers should override `:buffer_size` to comfortably exceed their
worst-case queue depth at shutdown.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth considering:

Undersized :buffer_size silently drops drained events

On graceful shutdown prepare_for_draining/1 emits the whole queue as events → GenStage buffers them, capped by :buffer_size.

  • Draining default :infinity → safe.
  • Caller overrides :buffer_size to a finite N < queue depth at shutdown → GenStage drops the overflow, logs an error only → silent data loss — the exact failure the feature exists to prevent.

Doc says "size the buffer to exceed worst-case depth" but never warns that undersizing loses data. Fix = add that warning to the moduledoc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants