Skip to content

Commit ab7207b

Browse files
w32-blasterrobbavey
authored andcommitted
Make kafka client unique across multiple pipelines
1 parent 4adfec6 commit ab7207b

File tree

4 files changed

+16
-4
lines changed

4 files changed

+16
-4
lines changed

lib/logstash/inputs/kafka.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
131131
# The class name of the partition assignment strategy that the client will use to distribute
132132
# partition ownership amongst consumer instances
133133
config :partition_assignment_strategy, :validate => :string
134+
# ID of the pipeline whose events you want to read from.
135+
config :pipeline_id, :validate => :string, :default => "main"
134136
# The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
135137
config :receive_buffer_bytes, :validate => :string
136138
# The amount of time to wait before attempting to reconnect to a given host.
@@ -221,7 +223,7 @@ def register
221223

222224
public
223225
def run(logstash_queue)
224-
@runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") }
226+
@runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}-#{pipeline_id}") }
225227
@runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) }
226228
@runner_threads.each { |t| t.join }
227229
end # def run

logstash-input-kafka.iml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<module external.linked.project.id="logstash-input-kafka" external.linked.project.path="$MODULE_DIR$" external.root.project.path="$MODULE_DIR$" external.system.id="GRADLE" type="JAVA_MODULE" version="4">
3+
<component name="NewModuleRootManager" inherit-compiler-output="true">
4+
<exclude-output />
5+
<content url="file://$MODULE_DIR$" />
6+
<orderEntry type="inheritedJdk" />
7+
<orderEntry type="sourceFolder" forTests="false" />
8+
</component>
9+
</module>

spec/integration/inputs/kafka_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
let(:group_id_4) {rand(36**8).to_s(36)}
1414
let(:group_id_5) {rand(36**8).to_s(36)}
1515
let(:plain_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
16-
let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) }
16+
let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3, "pipeline_id" => "spec_pipeline"}) }
1717
let(:snappy_config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
1818
let(:lz4_config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
1919
let(:pattern_config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} }
@@ -82,7 +82,7 @@ def thread_it(kafka_input, queue)
8282
wait(timeout_seconds).for {queue.length}.to eq(num_events)
8383
expect(queue.length).to eq(num_events)
8484
kafka_input.kafka_consumers.each_with_index do |consumer, i|
85-
expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}")
85+
expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-spec_pipeline")
8686
end
8787
ensure
8888
t.kill

spec/unit/inputs/kafka_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ def wakeup
3030
end
3131

3232
describe LogStash::Inputs::Kafka do
33-
let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4 } }
33+
let(:pipeline_id) { SecureRandom.hex(8)}
34+
let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4, 'pipeline_id' => pipeline_id } }
3435
subject { LogStash::Inputs::Kafka.new(config) }
3536

3637
it "should register" do

0 commit comments

Comments
 (0)