diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index dec402f..6eb01be 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -52,6 +52,15 @@ defmodule Kaffe.Config.Consumer do * `:max_wait_time` Sets the maximum number of milliseconds that the broker is allowed to collect min_bytes of messages in a batch of messages. + * `:sleep_timeout` Allows consumer process to sleep this amount of ms if kafka replied 'empty' message set. + The default is 1s. + + * `:prefetch_count` Sets the window size (number of messages) allowed to fetch-ahead. The default is 10 messages. + + * `:prefetch_bytes` Sets the total number of bytes allowed to fetch-ahead. `brod_consumer` is greed, it only stops + fetching more messages in when number of unacked messages has exceeded `prefetch_count` AND the unacked total + volume has exceeded `prefetch_bytes`. The default is 100KB. + * `:subscriber_retries` The number of times a subscriber will retry subscribing to a topic. Defaults to 5. * `:subscriber_retry_delay_ms` The ms a subscriber will delay connecting to a topic after a failure. Defaults to 5000. @@ -98,6 +107,9 @@ defmodule Kaffe.Config.Consumer do max_bytes: max_bytes(config_key), min_bytes: min_bytes(config_key), max_wait_time: max_wait_time(config_key), + sleep_timeout: sleep_timeout(config_key), + prefetch_count: prefetch_count(config_key), + prefetch_bytes: prefetch_bytes(config_key), subscriber_retries: subscriber_retries(config_key), subscriber_retry_delay_ms: subscriber_retry_delay_ms(config_key), offset_reset_policy: offset_reset_policy(config_key), @@ -148,6 +160,18 @@ defmodule Kaffe.Config.Consumer do config_get(config_key, :max_wait_time, 10_000) end + def sleep_timeout(config_key) do + config_get(config_key, :sleep_timeout, 1_000) + end + + def prefetch_count(config_key) do + config_get(config_key, :prefetch_count, 10) + end + + def prefetch_bytes(config_key) do + config_get(config_key, :prefetch_bytes, 102_400) + end + def subscriber_retries(config_key) do config_get(config_key, :subscriber_retries, 5) end diff --git a/lib/kaffe/consumer_group/subscriber/subscriber.ex b/lib/kaffe/consumer_group/subscriber/subscriber.ex index 68593b3..93615eb 100644 --- a/lib/kaffe/consumer_group/subscriber/subscriber.ex +++ b/lib/kaffe/consumer_group/subscriber/subscriber.ex @@ -235,6 +235,9 @@ defmodule Kaffe.Subscriber do defp subscriber_ops(config) do [ + sleep_timeout: config.sleep_timeout, + prefetch_count: config.prefetch_count, + prefetch_bytes: config.prefetch_bytes, max_bytes: config.max_bytes, min_bytes: config.min_bytes, max_wait_time: config.max_wait_time, diff --git a/test/kaffe/config/consumer_test.exs b/test/kaffe/config/consumer_test.exs index 022da2b..f541c8b 100644 --- a/test/kaffe/config/consumer_test.exs +++ b/test/kaffe/config/consumer_test.exs @@ -48,7 +48,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn -> @@ -91,7 +94,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn -> @@ -136,7 +142,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn -> @@ -180,7 +189,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn ->