From 28446b1e2c31deffb53ecf690a82f4e2940934b9 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 10:17:43 +0200 Subject: [PATCH 1/6] feat: add flush interval batching --- .changeset/gentle-wolves-flush.md | 5 ++ lib/posthog/client.rb | 6 ++ lib/posthog/defaults.rb | 1 + lib/posthog/noop_worker.rb | 10 ++++ lib/posthog/send_worker.rb | 96 +++++++++++++++++++++++++++---- spec/posthog/send_worker_spec.rb | 78 +++++++++++++++++++++++-- 6 files changed, 179 insertions(+), 17 deletions(-) create mode 100644 .changeset/gentle-wolves-flush.md diff --git a/.changeset/gentle-wolves-flush.md b/.changeset/gentle-wolves-flush.md new file mode 100644 index 0000000..e0a3ecf --- /dev/null +++ b/.changeset/gentle-wolves-flush.md @@ -0,0 +1,5 @@ +--- +'posthog-ruby': minor +--- + +Add configurable flush interval for async event batching. diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index 219a0ae..9a03c2d 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -57,6 +57,8 @@ def _decrement_instance_count(api_key) # @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://us.i.posthog.com`. # @option opts [Integer] :max_queue_size Maximum number of calls to remain queued. Defaults to 10_000. # @option opts [Integer] :batch_size Maximum number of events to send in one async batch. + # @option opts [Numeric] :flush_interval Maximum seconds to wait for an async batch to fill before sending. + # Defaults to 5. # @option opts [Boolean] :test_mode +true+ if messages should remain queued for testing. Defaults to +false+. # @option opts [Boolean] :sync_mode +true+ to send events synchronously on the calling thread. Useful in # forking environments like Sidekiq and Resque. Defaults to +false+. @@ -161,8 +163,11 @@ def flush return end + @worker.request_flush + while !@queue.empty? || @worker.is_requesting? ensure_worker_running + @worker.request_flush sleep(0.1) end end @@ -929,6 +934,7 @@ def enqueue(action) if @queue.length < @max_queue_size @queue << action ensure_worker_running + @worker.notify true else diff --git a/lib/posthog/defaults.rb b/lib/posthog/defaults.rb index 2fdcef2..6be5bc7 100644 --- a/lib/posthog/defaults.rb +++ b/lib/posthog/defaults.rb @@ -32,6 +32,7 @@ module Message module MessageBatch MAX_BYTES = 512_000 # 500Kb MAX_SIZE = 100 + FLUSH_INTERVAL = 5.0 # seconds end module BackoffPolicy diff --git a/lib/posthog/noop_worker.rb b/lib/posthog/noop_worker.rb index 10ce72e..5186f44 100644 --- a/lib/posthog/noop_worker.rb +++ b/lib/posthog/noop_worker.rb @@ -21,6 +21,16 @@ def is_requesting? # rubocop:disable Naming/PredicateName false end + # @return [void] + def request_flush + # Does nothing + end + + # @return [void] + def notify + # Does nothing + end + # @return [void] def shutdown # Does nothing diff --git a/lib/posthog/send_worker.rb b/lib/posthog/send_worker.rb index ac44a4d..3715538 100644 --- a/lib/posthog/send_worker.rb +++ b/lib/posthog/send_worker.rb @@ -23,6 +23,7 @@ class SendWorker # @param api_key [String] Project API key. # @param options [Hash] Worker options. # @option options [Integer] :batch_size How many items to send in a batch. + # @option options [Numeric] :flush_interval Maximum seconds to wait for a batch to fill before sending. # @option options [Proc] :on_error Callback invoked as `on_error.call(status, error)`. # @option options [String] :host PostHog API host URL. # @option options [Boolean] :skip_ssl_verification Disable SSL certificate verification. @@ -32,8 +33,14 @@ def initialize(queue, api_key, options = {}) @api_key = api_key @on_error = options[:on_error] || proc { |status, error| } batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE + flush_interval = options[:flush_interval] || Defaults::MessageBatch::FLUSH_INTERVAL + @flush_interval = flush_interval.to_f @batch = MessageBatch.new(batch_size) @lock = Mutex.new + @state_lock = Mutex.new + @condition = ConditionVariable.new + @flush_requested = false + @shutdown_requested = false @transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] end @@ -41,26 +48,45 @@ def initialize(queue, api_key, options = {}) # # @return [void] def run - until Thread.current[:should_exit] - return if @queue.empty? - - @lock.synchronize do - consume_message_from_queue! until @batch.full? || @queue.empty? - end - - unless @batch.empty? - res = @transport.send @api_key, @batch - @on_error.call(res.status, res.error) unless res.status == 200 + until Thread.current[:should_exit] || shutdown_requested? + if @queue.empty? + clear_flush_request + return end + build_batch + send_batch unless @batch.empty? @lock.synchronize { @batch.clear } end ensure @transport.shutdown end + # Request the worker to send any pending events without waiting for the + # configured flush interval. Used by Client#flush and shutdown paths. + # + # @return [void] + def request_flush + @state_lock.synchronize do + @flush_requested = true + @condition.broadcast + end + end + + # Wake the worker when producers enqueue new messages. + # + # @return [void] + def notify + @state_lock.synchronize { @condition.signal } + end + # @return [void] def shutdown + @state_lock.synchronize do + @shutdown_requested = true + @flush_requested = true + @condition.broadcast + end @transport.shutdown end @@ -74,10 +100,58 @@ def is_requesting? # rubocop:disable Naming/PredicateName private + def build_batch + deadline = monotonic_time + @flush_interval + + loop do + @lock.synchronize do + consume_message_from_queue! until @batch.full? || @queue.empty? + end + + break if @batch.full? || @batch.empty? || flush_requested? + + remaining = deadline - monotonic_time + break unless remaining.positive? + + wait_for_more_messages(remaining) + end + end + + def send_batch + res = @transport.send @api_key, @batch + @on_error.call(res.status, res.error) unless res.status == 200 + end + def consume_message_from_queue! - @batch << @queue.pop + @batch << @queue.pop(true) + rescue ThreadError + # Queue was emptied by another thread between #empty? and #pop. rescue MessageBatch::JSONGenerationError => e @on_error.call(-1, e.to_s) end + + def wait_for_more_messages(timeout) + @state_lock.synchronize do + return if @flush_requested || @shutdown_requested || !@queue.empty? + + @condition.wait(@state_lock, timeout) + end + end + + def flush_requested? + @state_lock.synchronize { @flush_requested } + end + + def shutdown_requested? + @state_lock.synchronize { @shutdown_requested } + end + + def clear_flush_request + @state_lock.synchronize { @flush_requested = false } + end + + def monotonic_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end end end diff --git a/spec/posthog/send_worker_spec.rb b/spec/posthog/send_worker_spec.rb index c3851e5..e613fe2 100644 --- a/spec/posthog/send_worker_spec.rb +++ b/spec/posthog/send_worker_spec.rb @@ -13,9 +13,17 @@ module PostHog describe '#init' do it 'accepts string keys' do queue = Queue.new - worker = described_class.new(queue, 'secret', 'batch_size' => 100) + worker = described_class.new(queue, 'secret', 'batch_size' => 100, 'flush_interval' => 2) batch = worker.instance_variable_get(:@batch) expect(batch.instance_variable_get(:@max_message_count)).to eq(100) + expect(worker.instance_variable_get(:@flush_interval)).to eq(2.0) + end + + it 'defaults flush_interval to 5 seconds' do + queue = Queue.new + worker = described_class.new(queue, 'secret') + + expect(worker.instance_variable_get(:@flush_interval)).to eq(5.0) end end @@ -37,7 +45,7 @@ module PostHog queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret') + worker = described_class.new(queue, 'secret', flush_interval: 0) worker.run expect(queue).to be_empty @@ -59,7 +67,7 @@ module PostHog queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret', on_error: on_error) + worker = described_class.new(queue, 'secret', on_error: on_error, flush_interval: 0) # This is to ensure that Client#flush doesn't finish before calling # the error handler. @@ -79,7 +87,7 @@ module PostHog queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret', on_error: on_error) + worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval: 0) worker.run expect(queue).to be_empty @@ -101,10 +109,68 @@ def bad_obj.to_json(*_args) queue << good_message queue << bad_message - worker = described_class.new(queue, 'testsecret', on_error: on_error) + worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval: 0) worker.run expect(queue).to be_empty end + + it 'waits for flush_interval before sending a partial batch' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval: 0.05) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker.run + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(sends).to eq([1]) + expect(elapsed).to be >= 0.05 + end + + it 'sends immediately when the batch size is reached' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + queue << Requested::IDENTIFY + worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval: 60) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker.run + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(sends).to eq([2]) + expect(elapsed).to be < 1 + end + + it 'flushes immediately when requested' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval: 60) + + worker_thread = Thread.new { worker.run } + eventually { expect(worker.is_requesting?).to eq(true) } + worker.request_flush + + expect(worker_thread.join(1)).to eq(worker_thread) + expect(sends).to eq([1]) + end end describe '#is_requesting?' do @@ -123,7 +189,7 @@ def bad_obj.to_json(*_args) queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret') + worker = described_class.new(queue, 'testsecret', flush_interval: 0) worker_thread = Thread.new { worker.run } eventually { expect(worker.is_requesting?).to eq(true) } From 8479aee1d4712071fff1174bdb852fe55d844998 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 21:30:30 +0200 Subject: [PATCH 2/6] test: cover worker notify batching --- spec/posthog/send_worker_spec.rb | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/spec/posthog/send_worker_spec.rb b/spec/posthog/send_worker_spec.rb index 3f726b7..a1e02ca 100644 --- a/spec/posthog/send_worker_spec.rb +++ b/spec/posthog/send_worker_spec.rb @@ -169,6 +169,31 @@ def bad_obj.to_json(*_args) expect(elapsed).to be < 1 end + it 'wakes and sends messages enqueued while waiting' do + sent_batches = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sent_batches << JSON.parse(batch.to_json).map { |message| message['event'] } + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval: 60) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker_thread = Thread.new { worker.run } + eventually { expect(worker.is_requesting?).to eq(true) } + + queue << Requested::CAPTURE.merge(event: 'Second event') + worker.notify + + expect(worker_thread.join(1)).to eq(worker_thread) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(sent_batches).to eq([[Requested::CAPTURE[:event], 'Second event']]) + expect(elapsed).to be < 1 + end + it 'flushes immediately when requested' do sends = [] allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| From eed482bffd2d9d261e60573c128e0c880218beaf Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 21:32:40 +0200 Subject: [PATCH 3/6] chore: update public api snapshot --- public_api_snapshot.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/public_api_snapshot.txt b/public_api_snapshot.txt index b238c61..43a1784 100644 --- a/public_api_snapshot.txt +++ b/public_api_snapshot.txt @@ -37,6 +37,7 @@ constant PostHog::Defaults::MAX_HASH_SIZE: Integer module PostHog::Defaults::Message constant PostHog::Defaults::Message::MAX_BYTES: Integer module PostHog::Defaults::MessageBatch +constant PostHog::Defaults::MessageBatch::FLUSH_INTERVAL: Float constant PostHog::Defaults::MessageBatch::MAX_BYTES: Integer constant PostHog::Defaults::MessageBatch::MAX_SIZE: Integer module PostHog::Defaults::Queue From a81477ffe020e3c9420e4a92f7d2eb9be5a9179c Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 21:37:25 +0200 Subject: [PATCH 4/6] refactor: rename flush interval option --- lib/posthog/client.rb | 2 +- lib/posthog/defaults.rb | 2 +- lib/posthog/send_worker.rb | 8 ++++---- public_api_snapshot.txt | 2 +- spec/posthog/send_worker_spec.rb | 28 ++++++++++++++-------------- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index cccd4f6..8e86a96 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -57,7 +57,7 @@ def _decrement_instance_count(api_key) # @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://us.i.posthog.com`. # @option opts [Integer] :max_queue_size Maximum number of calls to remain queued. Defaults to 10_000. # @option opts [Integer] :batch_size Maximum number of events to send in one async batch. - # @option opts [Numeric] :flush_interval Maximum seconds to wait for an async batch to fill before sending. + # @option opts [Numeric] :flush_interval_seconds Maximum seconds to wait for an async batch to fill before sending. # Defaults to 5. # @option opts [Boolean] :test_mode +true+ if messages should remain queued for testing. Defaults to +false+. # @option opts [Boolean] :sync_mode +true+ to send events synchronously on the calling thread. Useful in diff --git a/lib/posthog/defaults.rb b/lib/posthog/defaults.rb index 6be5bc7..db1e624 100644 --- a/lib/posthog/defaults.rb +++ b/lib/posthog/defaults.rb @@ -32,7 +32,7 @@ module Message module MessageBatch MAX_BYTES = 512_000 # 500Kb MAX_SIZE = 100 - FLUSH_INTERVAL = 5.0 # seconds + FLUSH_INTERVAL_SECONDS = 5.0 # seconds end module BackoffPolicy diff --git a/lib/posthog/send_worker.rb b/lib/posthog/send_worker.rb index 72dbfca..8eeba9f 100644 --- a/lib/posthog/send_worker.rb +++ b/lib/posthog/send_worker.rb @@ -23,7 +23,7 @@ class SendWorker # @param api_key [String] Project API key. # @param options [Hash] Worker options. # @option options [Integer] :batch_size How many items to send in a batch. - # @option options [Numeric] :flush_interval Maximum seconds to wait for a batch to fill before sending. + # @option options [Numeric] :flush_interval_seconds Maximum seconds to wait for a batch to fill before sending. # @option options [Proc] :on_error Callback invoked as `on_error.call(status, error)`. # @option options [String] :host PostHog API host URL. # @option options [Boolean] :skip_ssl_verification Disable SSL certificate verification. @@ -33,8 +33,8 @@ def initialize(queue, api_key, options = {}) @api_key = api_key @on_error = options[:on_error] || proc { |status, error| } batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE - flush_interval = options[:flush_interval] || Defaults::MessageBatch::FLUSH_INTERVAL - @flush_interval = flush_interval.to_f + flush_interval_seconds = options[:flush_interval_seconds] || Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS + @flush_interval_seconds = flush_interval_seconds.to_f @batch = MessageBatch.new(batch_size) @lock = Mutex.new @state_lock = Mutex.new @@ -108,7 +108,7 @@ def is_requesting? # rubocop:disable Naming/PredicateName private def build_batch - deadline = monotonic_time + @flush_interval + deadline = monotonic_time + @flush_interval_seconds loop do @lock.synchronize do diff --git a/public_api_snapshot.txt b/public_api_snapshot.txt index 43a1784..2a9bff7 100644 --- a/public_api_snapshot.txt +++ b/public_api_snapshot.txt @@ -37,7 +37,7 @@ constant PostHog::Defaults::MAX_HASH_SIZE: Integer module PostHog::Defaults::Message constant PostHog::Defaults::Message::MAX_BYTES: Integer module PostHog::Defaults::MessageBatch -constant PostHog::Defaults::MessageBatch::FLUSH_INTERVAL: Float +constant PostHog::Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS: Float constant PostHog::Defaults::MessageBatch::MAX_BYTES: Integer constant PostHog::Defaults::MessageBatch::MAX_SIZE: Integer module PostHog::Defaults::Queue diff --git a/spec/posthog/send_worker_spec.rb b/spec/posthog/send_worker_spec.rb index a1e02ca..792e9e6 100644 --- a/spec/posthog/send_worker_spec.rb +++ b/spec/posthog/send_worker_spec.rb @@ -13,17 +13,17 @@ module PostHog describe '#init' do it 'accepts string keys' do queue = Queue.new - worker = described_class.new(queue, 'secret', 'batch_size' => 100, 'flush_interval' => 2) + worker = described_class.new(queue, 'secret', 'batch_size' => 100, 'flush_interval_seconds' => 2) batch = worker.instance_variable_get(:@batch) expect(batch.instance_variable_get(:@max_message_count)).to eq(100) - expect(worker.instance_variable_get(:@flush_interval)).to eq(2.0) + expect(worker.instance_variable_get(:@flush_interval_seconds)).to eq(2.0) end - it 'defaults flush_interval to 5 seconds' do + it 'defaults flush_interval_seconds to 5 seconds' do queue = Queue.new worker = described_class.new(queue, 'secret') - expect(worker.instance_variable_get(:@flush_interval)).to eq(5.0) + expect(worker.instance_variable_get(:@flush_interval_seconds)).to eq(5.0) end end @@ -45,7 +45,7 @@ module PostHog queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret', flush_interval: 0) + worker = described_class.new(queue, 'secret', flush_interval_seconds: 0) worker.run expect(queue).to be_empty @@ -67,7 +67,7 @@ module PostHog queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret', on_error: on_error, flush_interval: 0) + worker = described_class.new(queue, 'secret', on_error: on_error, flush_interval_seconds: 0) # This is to ensure that Client#flush doesn't finish before calling # the error handler. @@ -103,7 +103,7 @@ module PostHog queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval: 0) + worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval_seconds: 0) worker.run expect(queue).to be_empty @@ -125,12 +125,12 @@ def bad_obj.to_json(*_args) queue << good_message queue << bad_message - worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval: 0) + worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval_seconds: 0) worker.run expect(queue).to be_empty end - it 'waits for flush_interval before sending a partial batch' do + it 'waits for flush_interval_seconds before sending a partial batch' do sends = [] allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| sends << batch.length @@ -139,7 +139,7 @@ def bad_obj.to_json(*_args) queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval: 0.05) + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval_seconds: 0.05) started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) worker.run @@ -159,7 +159,7 @@ def bad_obj.to_json(*_args) queue = Queue.new queue << Requested::CAPTURE queue << Requested::IDENTIFY - worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval: 60) + worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval_seconds: 60) started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) worker.run @@ -178,7 +178,7 @@ def bad_obj.to_json(*_args) queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval: 60) + worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval_seconds: 60) started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) worker_thread = Thread.new { worker.run } @@ -203,7 +203,7 @@ def bad_obj.to_json(*_args) queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval: 60) + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval_seconds: 60) worker_thread = Thread.new { worker.run } eventually { expect(worker.is_requesting?).to eq(true) } @@ -230,7 +230,7 @@ def bad_obj.to_json(*_args) queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret', flush_interval: 0) + worker = described_class.new(queue, 'testsecret', flush_interval_seconds: 0) worker_thread = Thread.new { worker.run } eventually { expect(worker.is_requesting?).to eq(true) } From 6ae174d868f7aaf268ec7d3c3397584f7c3856b2 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 21:47:53 +0200 Subject: [PATCH 5/6] fix: harden flush interval worker lifecycle --- lib/posthog/client.rb | 2 - lib/posthog/send_worker.rb | 31 ++++++--- spec/posthog/send_worker_spec.rb | 104 ++++++++++++++++++++++++++----- 3 files changed, 112 insertions(+), 25 deletions(-) diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index 8e86a96..c2f667f 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -167,8 +167,6 @@ def flush return end - @worker.request_flush - while !@queue.empty? || @worker.is_requesting? ensure_worker_running @worker.request_flush diff --git a/lib/posthog/send_worker.rb b/lib/posthog/send_worker.rb index 8eeba9f..a0fb6db 100644 --- a/lib/posthog/send_worker.rb +++ b/lib/posthog/send_worker.rb @@ -49,10 +49,9 @@ def initialize(queue, api_key, options = {}) # @return [void] def run until shutdown? - if @queue.empty? - clear_flush_request - return - end + wait_for_work + break if shutdown? + next if @queue.empty? build_batch @@ -60,12 +59,10 @@ def run send_batch unless @batch.empty? ensure @lock.synchronize { @batch.clear } + clear_flush_request_if_idle end end ensure - # Worker threads exit when the queue is drained and are restarted for the - # next burst of events. Close the persistent connection on each exit and - # let Transport reconnect lazily when a future worker sends another batch. @transport.shutdown end @@ -94,7 +91,6 @@ def shutdown @flush_requested = true @condition.broadcast end - @transport.shutdown end # public: Check whether we have outstanding requests. @@ -137,6 +133,15 @@ def consume_message_from_queue! handle_error(-1, e.to_s) end + def wait_for_work + @state_lock.synchronize do + while @queue.empty? && !@shutdown + clear_flush_request_without_lock + @condition.wait(@state_lock) + end + end + end + def wait_for_more_messages(timeout) @state_lock.synchronize do return if @flush_requested || @shutdown || !@queue.empty? @@ -154,7 +159,15 @@ def shutdown? end def clear_flush_request - @state_lock.synchronize { @flush_requested = false } + @state_lock.synchronize { clear_flush_request_without_lock } + end + + def clear_flush_request_if_idle + @state_lock.synchronize { clear_flush_request_without_lock if @queue.empty? } + end + + def clear_flush_request_without_lock + @flush_requested = false end def handle_error(status, error) diff --git a/spec/posthog/send_worker_spec.rb b/spec/posthog/send_worker_spec.rb index 792e9e6..bfd9f2a 100644 --- a/spec/posthog/send_worker_spec.rb +++ b/spec/posthog/send_worker_spec.rb @@ -10,6 +10,14 @@ module PostHog PostHog::Transport.stub = false end + def run_worker_until_idle(worker, queue) + worker_thread = Thread.new { worker.run } + eventually { expect(queue).to be_empty } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + expect(worker.is_requesting?).to eq(false) + end + describe '#init' do it 'accepts string keys' do queue = Queue.new @@ -46,7 +54,7 @@ module PostHog queue = Queue.new queue << {} worker = described_class.new(queue, 'secret', flush_interval_seconds: 0) - worker.run + run_worker_until_idle(worker, queue) expect(queue).to be_empty end.to_not raise_error @@ -71,9 +79,11 @@ module PostHog # This is to ensure that Client#flush doesn't finish before calling # the error handler. - Thread.new { worker.run } + worker_thread = Thread.new { worker.run } sleep 0.1 # First give thread time to spin-up. sleep 0.01 while worker.is_requesting? + worker.shutdown + worker_thread.join(1) expect(queue).to be_empty expect(status).to eq(400) @@ -83,7 +93,12 @@ module PostHog it 'clears the in-flight batch if the error handler raises' do queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret', on_error: proc { raise 'handler failed' }) + worker = described_class.new( + queue, + 'secret', + on_error: proc { raise 'handler failed' }, + flush_interval_seconds: 0 + ) transport = instance_double( PostHog::Transport, send: PostHog::Response.new(400, 'Some error'), @@ -91,7 +106,14 @@ module PostHog ) worker.instance_variable_set(:@transport, transport) - expect { worker.run }.to_not raise_error + worker_thread = Thread.new { worker.run } + eventually do + expect(queue).to be_empty + expect(worker.is_requesting?).to eq(false) + end + worker.shutdown + + expect(worker_thread.join(1)).to eq(worker_thread) expect(queue).to be_empty expect(worker.is_requesting?).to eq(false) end @@ -104,29 +126,25 @@ module PostHog queue = Queue.new queue << Requested::CAPTURE worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval_seconds: 0) - worker.run + run_worker_until_idle(worker, queue) expect(queue).to be_empty end it 'calls on_error for bad json' do - bad_obj = Object.new - def bad_obj.to_json(*_args) + bad_message = Requested::CAPTURE.dup + def bad_message.to_json(*_args) raise "can't serialize to json" end on_error = proc {} expect(on_error).to receive(:call).once.with(-1, /serialize to json/) - good_message = Requested::CAPTURE - bad_message = Requested::CAPTURE.merge({ 'bad_obj' => bad_obj }) - queue = Queue.new - queue << good_message queue << bad_message worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval_seconds: 0) - worker.run + run_worker_until_idle(worker, queue) expect(queue).to be_empty end @@ -142,7 +160,10 @@ def bad_obj.to_json(*_args) worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval_seconds: 0.05) started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) - worker.run + worker_thread = Thread.new { worker.run } + eventually { expect(sends).to eq([1]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at expect(sends).to eq([1]) @@ -162,7 +183,10 @@ def bad_obj.to_json(*_args) worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval_seconds: 60) started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) - worker.run + worker_thread = Thread.new { worker.run } + eventually { expect(sends).to eq([2]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at expect(sends).to eq([2]) @@ -187,6 +211,8 @@ def bad_obj.to_json(*_args) queue << Requested::CAPTURE.merge(event: 'Second event') worker.notify + eventually { expect(sent_batches).to eq([[Requested::CAPTURE[:event], 'Second event']]) } + worker.shutdown expect(worker_thread.join(1)).to eq(worker_thread) elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at @@ -194,6 +220,52 @@ def bad_obj.to_json(*_args) expect(elapsed).to be < 1 end + it 'stays alive while idle and handles a later enqueue' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + worker = described_class.new(queue, 'testsecret', batch_size: 1, flush_interval_seconds: 60) + worker_thread = Thread.new { worker.run } + + eventually { expect(worker_thread).to be_alive } + queue << Requested::CAPTURE + worker.notify + + eventually { expect(sends).to eq([1]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + end + + it 'does not keep a stale flush request while idle' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval_seconds: 0.05) + worker_thread = Thread.new { worker.run } + eventually { expect(worker_thread).to be_alive } + + worker.request_flush + sleep 0.01 + queue << Requested::CAPTURE + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker.notify + + eventually { expect(sends).to eq([1]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(elapsed).to be >= 0.05 + end + it 'flushes immediately when requested' do sends = [] allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| @@ -209,6 +281,8 @@ def bad_obj.to_json(*_args) eventually { expect(worker.is_requesting?).to eq(true) } worker.request_flush + eventually { expect(sends).to eq([1]) } + worker.shutdown expect(worker_thread.join(1)).to eq(worker_thread) expect(sends).to eq([1]) end @@ -235,6 +309,8 @@ def bad_obj.to_json(*_args) worker_thread = Thread.new { worker.run } eventually { expect(worker.is_requesting?).to eq(true) } + eventually { expect(worker.is_requesting?).to eq(false) } + worker.shutdown worker_thread.join expect(worker.is_requesting?).to eq(false) end From 90da6947bdc48f2de367bfab92234e6cfdd3793d Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Thu, 18 Jun 2026 19:13:25 +0200 Subject: [PATCH 6/6] fix: reset async worker state after fork --- lib/posthog/send_worker.rb | 36 ++++++++++++++++++++++++++++++++++-- spec/posthog/client_spec.rb | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/lib/posthog/send_worker.rb b/lib/posthog/send_worker.rb index a0fb6db..3218d37 100644 --- a/lib/posthog/send_worker.rb +++ b/lib/posthog/send_worker.rb @@ -41,13 +41,17 @@ def initialize(queue, api_key, options = {}) @condition = ConditionVariable.new @flush_requested = false @shutdown = false - @transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] + @pid = Process.pid + @transport_options = { api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] } + @transport = Transport.new(@transport_options) end # Continuously runs the loop to check for new events. # # @return [void] def run + ensure_current_process! + until shutdown? wait_for_work break if shutdown? @@ -63,7 +67,7 @@ def run end end ensure - @transport.shutdown + shutdown_transport end # Request the worker to send any pending events without waiting for the @@ -71,6 +75,8 @@ def run # # @return [void] def request_flush + ensure_current_process! + @state_lock.synchronize do @flush_requested = true @condition.broadcast @@ -81,11 +87,15 @@ def request_flush # # @return [void] def notify + ensure_current_process! + @state_lock.synchronize { @condition.signal } end # @return [void] def shutdown + ensure_current_process! + @state_lock.synchronize do @shutdown = true @flush_requested = true @@ -98,11 +108,27 @@ def shutdown # @return [Boolean] Whether the worker has outstanding requests. # TODO: Rename to `requesting?` in future version def is_requesting? # rubocop:disable Naming/PredicateName + ensure_current_process! + @lock.synchronize { !@batch.empty? } end private + def ensure_current_process! + return if @pid == Process.pid + + @lock.synchronize { @batch.clear } + @state_lock.synchronize do + @pid = Process.pid + @shutdown = false + @flush_requested = false + @condition.broadcast + end + shutdown_transport + @transport = Transport.new(@transport_options) + end + def build_batch deadline = monotonic_time + @flush_interval_seconds @@ -176,6 +202,12 @@ def handle_error(status, error) logger.error("Error in on_error callback: #{e.message}") end + def shutdown_transport + @transport.shutdown + rescue StandardError => e + logger.error("Error shutting down transport: #{e.message}") + end + def monotonic_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end diff --git a/spec/posthog/client_spec.rb b/spec/posthog/client_spec.rb index a28d6cf..3a4a609 100644 --- a/spec/posthog/client_spec.rb +++ b/spec/posthog/client_spec.rb @@ -1496,6 +1496,40 @@ def run Process.wait end + + it 'completes after fork when the async worker has a partial batch' do + PostHog::Transport.stub = true + async_client = Client.new(api_key: API_KEY, batch_size: 10, flush_interval_seconds: 60) + worker = async_client.instance_variable_get(:@worker) + queue = async_client.instance_variable_get(:@queue) + + async_client.capture(Queued::CAPTURE) + eventually do + expect(queue).to be_empty + expect(worker.is_requesting?).to eq(true) + end + + reader, writer = IO.pipe + pid = Process.fork do + reader.close + flush_thread = Thread.new { async_client.flush } + result = flush_thread.join(0.5) == flush_thread ? 'returned' : 'hung' + flush_thread.kill if flush_thread.alive? + writer.write(result) + writer.close + exit! 0 + end + + writer.close + result = reader.read + Process.wait(pid) + reader.close + + expect(result).to eq('returned') + ensure + async_client&.shutdown + PostHog::Transport.stub = false + end end end