Add drain on shutdown option#5
Conversation
|
|
||
| assert Queue.pop(queue, 1) == [] | ||
| end | ||
|
|
There was a problem hiding this comment.
| test "flushes a backlog that is not a multiple of the drain chunk size", %{queue: queue} do | |
| state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true} | |
| backlog = Enum.to_list(1..2_501) | |
| :ok = Queue.push(queue, backlog) | |
| assert {:noreply, ^backlog, %Producer{demand: 0, timer: nil}} = | |
| Producer.prepare_for_draining(state) | |
| assert Queue.pop(queue, 1) == [] | |
| end | |
| test "returns an empty list when enabled but the queue is already empty", %{queue: queue} do | |
| state = %Producer{queue: queue, check_interval: 10, drain_on_shutdown: true} | |
| assert {:noreply, [], %Producer{demand: 0, timer: nil}} = | |
| Producer.prepare_for_draining(state) | |
| end | |
| * `:queue` - the queue to pull from (required) | ||
| * `:check_interval` - ms between dispatch attempts when demand is unmet (default 500) | ||
| * `:drain_on_shutdown` - flush the queue into the pipeline on graceful shutdown (default false) | ||
| * `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default) |
There was a problem hiding this comment.
In code, draining-default is infinity.
Also they can change the GenStage default at any time. Below comments revert logic so it leaves this value unset so the real GenStage default will be used, rather than apply a hardcoded 10000:
| * `:buffer_size` - GenStage producer buffer (default 100_000 when draining, else GenStage default) | |
| * `:buffer_size` - GenStage producer buffer (default `:infinity` when | |
| draining; otherwise unset, so GenStage's own default applies) |
| drain_on_shutdown: drain? | ||
| } | ||
|
|
||
| {:producer, state, buffer_size: buffer_size(opts, drain?)} |
There was a problem hiding this comment.
| {:producer, state, buffer_size: buffer_size(opts, drain?)} | |
| producer(state, opts, drain?) |
| defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity) | ||
| defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000) |
There was a problem hiding this comment.
| defp buffer_size(opts, true), do: Keyword.get(opts, :buffer_size, :infinity) | |
| defp buffer_size(opts, false), do: Keyword.get(opts, :buffer_size, 10_000) | |
| defp producer(state, opts, true) do | |
| {:producer, state, buffer_size: Keyword.get(opts, :buffer_size, :infinity)} | |
| end | |
| defp producer(state, opts, false) do | |
| case Keyword.fetch(opts, :buffer_size) do | |
| {:ok, size} -> {:producer, state, buffer_size: size} | |
| :error -> {:producer, state} | |
| end | |
| end |
| timer = Process.send_after(self(), :dispatch_events, 60_000) | ||
| state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true} | ||
|
|
||
| assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state) | ||
| assert Process.cancel_timer(timer) == false |
There was a problem hiding this comment.
Nifty way of doing a much stronger assertion in a scenario like this (if the timer fails to cancel, we'd see the event within around 20ms - so refute_receive for 50ms is a strong test)
| timer = Process.send_after(self(), :dispatch_events, 60_000) | |
| state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true} | |
| assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state) | |
| assert Process.cancel_timer(timer) == false | |
| timer = Process.send_after(self(), :dispatch_events, 20) | |
| state = %Producer{queue: queue, check_interval: 10, timer: timer, drain_on_shutdown: true} | |
| assert {:noreply, [], %Producer{timer: nil}} = Producer.prepare_for_draining(state) | |
| refute_receive :dispatch_events, 50 |
| the producer only ever emits exactly the demand it was handed, so the buffer | ||
| sits empty regardless of the cap - the cap is a limit, not an allocation. | ||
| Draining consumers should override `:buffer_size` to comfortably exceed their | ||
| worst-case queue depth at shutdown. |
There was a problem hiding this comment.
Worth considering:
Undersized :buffer_size silently drops drained events
On graceful shutdown prepare_for_draining/1 emits the whole queue as events → GenStage buffers them, capped by :buffer_size.
- Draining default
:infinity→ safe. - Caller overrides
:buffer_sizeto a finite N < queue depth at shutdown → GenStage drops the overflow, logs an error only → silent data loss — the exact failure the feature exists to prevent.
Doc says "size the buffer to exceed worst-case depth" but never warns that undersizing loses data. Fix = add that warning to the moduledoc.
No description provided.