From fb3c976a4bba49ba0d032aba0bb7179c6cb208b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Wojtasik?= Date: Wed, 7 Jan 2026 19:21:21 +0100 Subject: [PATCH 1/4] Add support for sleep_timeout opt in consumer config --- lib/kaffe/config/consumer.ex | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index dec402f..7687dfc 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -52,6 +52,9 @@ defmodule Kaffe.Config.Consumer do * `:max_wait_time` Sets the maximum number of milliseconds that the broker is allowed to collect min_bytes of messages in a batch of messages. + * `:sleep_timeout` Allows consumer process to sleep this amount of ms if kafka replied 'empty' message set. + The default is 1s. + * `:subscriber_retries` The number of times a subscriber will retry subscribing to a topic. Defaults to 5. * `:subscriber_retry_delay_ms` The ms a subscriber will delay connecting to a topic after a failure. Defaults to 5000. @@ -98,6 +101,7 @@ defmodule Kaffe.Config.Consumer do max_bytes: max_bytes(config_key), min_bytes: min_bytes(config_key), max_wait_time: max_wait_time(config_key), + sleep_timeout: sleep_timeout(config_key), subscriber_retries: subscriber_retries(config_key), subscriber_retry_delay_ms: subscriber_retry_delay_ms(config_key), offset_reset_policy: offset_reset_policy(config_key), @@ -148,6 +152,10 @@ defmodule Kaffe.Config.Consumer do config_get(config_key, :max_wait_time, 10_000) end + def sleep_timeout(config_key) do + config_get(config_key, :sleep_timeout, 1_000) + end + def subscriber_retries(config_key) do config_get(config_key, :subscriber_retries, 5) end From dead3cd0fed7567f81dcbf2c43684d1bb200e90c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Wojtasik?= Date: Thu, 8 Jan 2026 14:02:43 +0100 Subject: [PATCH 2/4] Add support for prefetch_count and prefetch_bytes consumer option --- lib/kaffe/config/consumer.ex | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index 7687dfc..6eb01be 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -55,6 +55,12 @@ defmodule Kaffe.Config.Consumer do * `:sleep_timeout` Allows consumer process to sleep this amount of ms if kafka replied 'empty' message set. The default is 1s. + * `:prefetch_count` Sets the window size (number of messages) allowed to fetch-ahead. The default is 10 messages. + + * `:prefetch_bytes` Sets the total number of bytes allowed to fetch-ahead. `brod_consumer` is greed, it only stops + fetching more messages in when number of unacked messages has exceeded `prefetch_count` AND the unacked total + volume has exceeded `prefetch_bytes`. The default is 100KB. + * `:subscriber_retries` The number of times a subscriber will retry subscribing to a topic. Defaults to 5. * `:subscriber_retry_delay_ms` The ms a subscriber will delay connecting to a topic after a failure. Defaults to 5000. @@ -102,6 +108,8 @@ defmodule Kaffe.Config.Consumer do min_bytes: min_bytes(config_key), max_wait_time: max_wait_time(config_key), sleep_timeout: sleep_timeout(config_key), + prefetch_count: prefetch_count(config_key), + prefetch_bytes: prefetch_bytes(config_key), subscriber_retries: subscriber_retries(config_key), subscriber_retry_delay_ms: subscriber_retry_delay_ms(config_key), offset_reset_policy: offset_reset_policy(config_key), @@ -156,6 +164,14 @@ defmodule Kaffe.Config.Consumer do config_get(config_key, :sleep_timeout, 1_000) end + def prefetch_count(config_key) do + config_get(config_key, :prefetch_count, 10) + end + + def prefetch_bytes(config_key) do + config_get(config_key, :prefetch_bytes, 102_400) + end + def subscriber_retries(config_key) do config_get(config_key, :subscriber_retries, 5) end From 023f1e1d9add774fb9979436192f196b79c1c5f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Wojtasik?= Date: Thu, 8 Jan 2026 14:17:40 +0100 Subject: [PATCH 3/4] Use added consumer opts in Kaffe.Subscriber --- lib/kaffe/consumer_group/subscriber/subscriber.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/kaffe/consumer_group/subscriber/subscriber.ex b/lib/kaffe/consumer_group/subscriber/subscriber.ex index 68593b3..93615eb 100644 --- a/lib/kaffe/consumer_group/subscriber/subscriber.ex +++ b/lib/kaffe/consumer_group/subscriber/subscriber.ex @@ -235,6 +235,9 @@ defmodule Kaffe.Subscriber do defp subscriber_ops(config) do [ + sleep_timeout: config.sleep_timeout, + prefetch_count: config.prefetch_count, + prefetch_bytes: config.prefetch_bytes, max_bytes: config.max_bytes, min_bytes: config.min_bytes, max_wait_time: config.max_wait_time, From 967fe8788a105a3bf060d3354de5482dfd7842ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Wojtasik?= Date: Mon, 19 Jan 2026 11:05:32 +0100 Subject: [PATCH 4/4] Fix tests --- test/kaffe/config/consumer_test.exs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/test/kaffe/config/consumer_test.exs b/test/kaffe/config/consumer_test.exs index 022da2b..f541c8b 100644 --- a/test/kaffe/config/consumer_test.exs +++ b/test/kaffe/config/consumer_test.exs @@ -48,7 +48,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn -> @@ -91,7 +94,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn -> @@ -136,7 +142,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn -> @@ -180,7 +189,10 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, worker_allocation_strategy: :worker_per_partition, - client_down_retry_expire: 15_000 + client_down_retry_expire: 15_000, + prefetch_bytes: 102_400, + prefetch_count: 10, + sleep_timeout: 1000 } on_exit(fn ->