Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ defmodule Kaffe.Config.Consumer do
* `:max_wait_time` Sets the maximum number of milliseconds that the broker is allowed to collect min_bytes of
messages in a batch of messages.

* `:sleep_timeout` Allows consumer process to sleep this amount of ms if kafka replied 'empty' message set.
The default is 1s.

* `:prefetch_count` Sets the window size (number of messages) allowed to fetch-ahead. The default is 10 messages.

* `:prefetch_bytes` Sets the total number of bytes allowed to fetch-ahead. `brod_consumer` is greed, it only stops
fetching more messages in when number of unacked messages has exceeded `prefetch_count` AND the unacked total
volume has exceeded `prefetch_bytes`. The default is 100KB.

* `:subscriber_retries` The number of times a subscriber will retry subscribing to a topic. Defaults to 5.

* `:subscriber_retry_delay_ms` The ms a subscriber will delay connecting to a topic after a failure. Defaults to 5000.
Expand Down Expand Up @@ -98,6 +107,9 @@ defmodule Kaffe.Config.Consumer do
max_bytes: max_bytes(config_key),
min_bytes: min_bytes(config_key),
max_wait_time: max_wait_time(config_key),
sleep_timeout: sleep_timeout(config_key),
prefetch_count: prefetch_count(config_key),
prefetch_bytes: prefetch_bytes(config_key),
subscriber_retries: subscriber_retries(config_key),
subscriber_retry_delay_ms: subscriber_retry_delay_ms(config_key),
offset_reset_policy: offset_reset_policy(config_key),
Expand Down Expand Up @@ -148,6 +160,18 @@ defmodule Kaffe.Config.Consumer do
config_get(config_key, :max_wait_time, 10_000)
end

def sleep_timeout(config_key) do
config_get(config_key, :sleep_timeout, 1_000)
end

def prefetch_count(config_key) do
config_get(config_key, :prefetch_count, 10)
end

def prefetch_bytes(config_key) do
config_get(config_key, :prefetch_bytes, 102_400)
end

def subscriber_retries(config_key) do
config_get(config_key, :subscriber_retries, 5)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/kaffe/consumer_group/subscriber/subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ defmodule Kaffe.Subscriber do

defp subscriber_ops(config) do
[
sleep_timeout: config.sleep_timeout,
prefetch_count: config.prefetch_count,
prefetch_bytes: config.prefetch_bytes,
max_bytes: config.max_bytes,
min_bytes: config.min_bytes,
max_wait_time: config.max_wait_time,
Expand Down
20 changes: 16 additions & 4 deletions test/kaffe/config/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ defmodule Kaffe.Config.ConsumerTest do
subscriber_retry_delay_ms: 5,
offset_reset_policy: :reset_by_subscriber,
worker_allocation_strategy: :worker_per_partition,
client_down_retry_expire: 15_000
client_down_retry_expire: 15_000,
prefetch_bytes: 102_400,
prefetch_count: 10,
sleep_timeout: 1000
}

on_exit(fn ->
Expand Down Expand Up @@ -91,7 +94,10 @@ defmodule Kaffe.Config.ConsumerTest do
subscriber_retry_delay_ms: 5,
offset_reset_policy: :reset_by_subscriber,
worker_allocation_strategy: :worker_per_partition,
client_down_retry_expire: 15_000
client_down_retry_expire: 15_000,
prefetch_bytes: 102_400,
prefetch_count: 10,
sleep_timeout: 1000
}

on_exit(fn ->
Expand Down Expand Up @@ -136,7 +142,10 @@ defmodule Kaffe.Config.ConsumerTest do
subscriber_retry_delay_ms: 5,
offset_reset_policy: :reset_by_subscriber,
worker_allocation_strategy: :worker_per_partition,
client_down_retry_expire: 15_000
client_down_retry_expire: 15_000,
prefetch_bytes: 102_400,
prefetch_count: 10,
sleep_timeout: 1000
}

on_exit(fn ->
Expand Down Expand Up @@ -180,7 +189,10 @@ defmodule Kaffe.Config.ConsumerTest do
subscriber_retry_delay_ms: 5,
offset_reset_policy: :reset_by_subscriber,
worker_allocation_strategy: :worker_per_partition,
client_down_retry_expire: 15_000
client_down_retry_expire: 15_000,
prefetch_bytes: 102_400,
prefetch_count: 10,
sleep_timeout: 1000
}

on_exit(fn ->
Expand Down