diff --git a/.changeset/gentle-wolves-flush.md b/.changeset/gentle-wolves-flush.md new file mode 100644 index 0000000..e0a3ecf --- /dev/null +++ b/.changeset/gentle-wolves-flush.md @@ -0,0 +1,5 @@ +--- +'posthog-ruby': minor +--- + +Add configurable flush interval for async event batching. diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index 7dd7161..5f80feb 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -57,6 +57,8 @@ def _decrement_instance_count(api_key) # @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://us.i.posthog.com`. # @option opts [Integer] :max_queue_size Maximum number of calls to remain queued. Defaults to 10_000. # @option opts [Integer] :batch_size Maximum number of events to send in one async batch. + # @option opts [Numeric] :flush_interval_seconds Maximum seconds to wait for an async batch to fill before sending. + # Defaults to 5. # @option opts [Boolean] :test_mode +true+ if messages should remain queued for testing. Defaults to +false+. # @option opts [Boolean] :sync_mode +true+ to send events synchronously on the calling thread. Useful in # forking environments like Sidekiq and Resque. Defaults to +false+. @@ -172,6 +174,7 @@ def flush while !@queue.empty? || @worker.is_requesting? ensure_worker_running + @worker.request_flush sleep(0.1) end end @@ -972,6 +975,7 @@ def enqueue(action) if queued ensure_worker_running + @worker.notify true else logger.warn( diff --git a/lib/posthog/defaults.rb b/lib/posthog/defaults.rb index 2fdcef2..db1e624 100644 --- a/lib/posthog/defaults.rb +++ b/lib/posthog/defaults.rb @@ -32,6 +32,7 @@ module Message module MessageBatch MAX_BYTES = 512_000 # 500Kb MAX_SIZE = 100 + FLUSH_INTERVAL_SECONDS = 5.0 # seconds end module BackoffPolicy diff --git a/lib/posthog/noop_worker.rb b/lib/posthog/noop_worker.rb index 10ce72e..5186f44 100644 --- a/lib/posthog/noop_worker.rb +++ b/lib/posthog/noop_worker.rb @@ -21,6 +21,16 @@ def is_requesting? # rubocop:disable Naming/PredicateName false end + # @return [void] + def request_flush + # Does nothing + end + + # @return [void] + def notify + # Does nothing + end + # @return [void] def shutdown # Does nothing diff --git a/lib/posthog/send_worker.rb b/lib/posthog/send_worker.rb index b7b40de..3218d37 100644 --- a/lib/posthog/send_worker.rb +++ b/lib/posthog/send_worker.rb @@ -23,6 +23,7 @@ class SendWorker # @param api_key [String] Project API key. # @param options [Hash] Worker options. # @option options [Integer] :batch_size How many items to send in a batch. + # @option options [Numeric] :flush_interval_seconds Maximum seconds to wait for a batch to fill before sending. # @option options [Proc] :on_error Callback invoked as `on_error.call(status, error)`. # @option options [String] :host PostHog API host URL. # @option options [Boolean] :skip_ssl_verification Disable SSL certificate verification. @@ -32,44 +33,74 @@ def initialize(queue, api_key, options = {}) @api_key = api_key @on_error = options[:on_error] || proc { |status, error| } batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE + flush_interval_seconds = options[:flush_interval_seconds] || Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS + @flush_interval_seconds = flush_interval_seconds.to_f @batch = MessageBatch.new(batch_size) @lock = Mutex.new - @shutdown_mutex = Mutex.new + @state_lock = Mutex.new + @condition = ConditionVariable.new + @flush_requested = false @shutdown = false - @transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] + @pid = Process.pid + @transport_options = { api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] } + @transport = Transport.new(@transport_options) end # Continuously runs the loop to check for new events. # # @return [void] def run + ensure_current_process! + until shutdown? - return if @queue.empty? + wait_for_work + break if shutdown? + next if @queue.empty? - @lock.synchronize do - consume_message_from_queue! until @batch.full? || @queue.empty? - end + build_batch begin - unless @batch.empty? - res = @transport.send @api_key, @batch - handle_error(res.status, res.error) unless res.status == 200 - end + send_batch unless @batch.empty? ensure @lock.synchronize { @batch.clear } + clear_flush_request_if_idle end end ensure - # Worker threads exit when the queue is drained and are restarted for the - # next burst of events. Close the persistent connection on each exit and - # let Transport reconnect lazily when a future worker sends another batch. - @transport.shutdown + shutdown_transport + end + + # Request the worker to send any pending events without waiting for the + # configured flush interval. Used by Client#flush and shutdown paths. + # + # @return [void] + def request_flush + ensure_current_process! + + @state_lock.synchronize do + @flush_requested = true + @condition.broadcast + end + end + + # Wake the worker when producers enqueue new messages. + # + # @return [void] + def notify + ensure_current_process! + + @state_lock.synchronize { @condition.signal } end # @return [void] def shutdown - @shutdown_mutex.synchronize { @shutdown = true } - @transport.shutdown + ensure_current_process! + + @state_lock.synchronize do + @shutdown = true + @flush_requested = true + @condition.broadcast + end end # public: Check whether we have outstanding requests. @@ -77,25 +108,108 @@ def shutdown # @return [Boolean] Whether the worker has outstanding requests. # TODO: Rename to `requesting?` in future version def is_requesting? # rubocop:disable Naming/PredicateName + ensure_current_process! + @lock.synchronize { !@batch.empty? } end private - def shutdown? - @shutdown_mutex.synchronize { @shutdown } + def ensure_current_process! + return if @pid == Process.pid + + @lock.synchronize { @batch.clear } + @state_lock.synchronize do + @pid = Process.pid + @shutdown = false + @flush_requested = false + @condition.broadcast + end + shutdown_transport + @transport = Transport.new(@transport_options) + end + + def build_batch + deadline = monotonic_time + @flush_interval_seconds + + loop do + @lock.synchronize do + consume_message_from_queue! until @batch.full? || @queue.empty? + end + + break if @batch.full? || @batch.empty? || flush_requested? + + remaining = deadline - monotonic_time + break unless remaining.positive? + + wait_for_more_messages(remaining) + end + end + + def send_batch + res = @transport.send @api_key, @batch + handle_error(res.status, res.error) unless res.status == 200 end def consume_message_from_queue! - @batch << @queue.pop + @batch << @queue.pop(true) + rescue ThreadError + # Queue was emptied by another thread between #empty? and #pop. rescue MessageBatch::JSONGenerationError => e handle_error(-1, e.to_s) end + def wait_for_work + @state_lock.synchronize do + while @queue.empty? && !@shutdown + clear_flush_request_without_lock + @condition.wait(@state_lock) + end + end + end + + def wait_for_more_messages(timeout) + @state_lock.synchronize do + return if @flush_requested || @shutdown || !@queue.empty? + + @condition.wait(@state_lock, timeout) + end + end + + def flush_requested? + @state_lock.synchronize { @flush_requested } + end + + def shutdown? + @state_lock.synchronize { @shutdown } + end + + def clear_flush_request + @state_lock.synchronize { clear_flush_request_without_lock } + end + + def clear_flush_request_if_idle + @state_lock.synchronize { clear_flush_request_without_lock if @queue.empty? } + end + + def clear_flush_request_without_lock + @flush_requested = false + end + def handle_error(status, error) @on_error.call(status, error) rescue StandardError => e logger.error("Error in on_error callback: #{e.message}") end + + def shutdown_transport + @transport.shutdown + rescue StandardError => e + logger.error("Error shutting down transport: #{e.message}") + end + + def monotonic_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end end end diff --git a/public_api_snapshot.txt b/public_api_snapshot.txt index ba9eebd..b91aa93 100644 --- a/public_api_snapshot.txt +++ b/public_api_snapshot.txt @@ -39,6 +39,7 @@ constant PostHog::Defaults::MAX_HASH_SIZE: Integer module PostHog::Defaults::Message constant PostHog::Defaults::Message::MAX_BYTES: Integer module PostHog::Defaults::MessageBatch +constant PostHog::Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS: Float constant PostHog::Defaults::MessageBatch::MAX_BYTES: Integer constant PostHog::Defaults::MessageBatch::MAX_SIZE: Integer module PostHog::Defaults::Queue diff --git a/spec/posthog/client_spec.rb b/spec/posthog/client_spec.rb index a28d6cf..3a4a609 100644 --- a/spec/posthog/client_spec.rb +++ b/spec/posthog/client_spec.rb @@ -1496,6 +1496,40 @@ def run Process.wait end + + it 'completes after fork when the async worker has a partial batch' do + PostHog::Transport.stub = true + async_client = Client.new(api_key: API_KEY, batch_size: 10, flush_interval_seconds: 60) + worker = async_client.instance_variable_get(:@worker) + queue = async_client.instance_variable_get(:@queue) + + async_client.capture(Queued::CAPTURE) + eventually do + expect(queue).to be_empty + expect(worker.is_requesting?).to eq(true) + end + + reader, writer = IO.pipe + pid = Process.fork do + reader.close + flush_thread = Thread.new { async_client.flush } + result = flush_thread.join(0.5) == flush_thread ? 'returned' : 'hung' + flush_thread.kill if flush_thread.alive? + writer.write(result) + writer.close + exit! 0 + end + + writer.close + result = reader.read + Process.wait(pid) + reader.close + + expect(result).to eq('returned') + ensure + async_client&.shutdown + PostHog::Transport.stub = false + end end end diff --git a/spec/posthog/send_worker_spec.rb b/spec/posthog/send_worker_spec.rb index 8604d3f..bfd9f2a 100644 --- a/spec/posthog/send_worker_spec.rb +++ b/spec/posthog/send_worker_spec.rb @@ -10,12 +10,28 @@ module PostHog PostHog::Transport.stub = false end + def run_worker_until_idle(worker, queue) + worker_thread = Thread.new { worker.run } + eventually { expect(queue).to be_empty } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + expect(worker.is_requesting?).to eq(false) + end + describe '#init' do it 'accepts string keys' do queue = Queue.new - worker = described_class.new(queue, 'secret', 'batch_size' => 100) + worker = described_class.new(queue, 'secret', 'batch_size' => 100, 'flush_interval_seconds' => 2) batch = worker.instance_variable_get(:@batch) expect(batch.instance_variable_get(:@max_message_count)).to eq(100) + expect(worker.instance_variable_get(:@flush_interval_seconds)).to eq(2.0) + end + + it 'defaults flush_interval_seconds to 5 seconds' do + queue = Queue.new + worker = described_class.new(queue, 'secret') + + expect(worker.instance_variable_get(:@flush_interval_seconds)).to eq(5.0) end end @@ -37,8 +53,8 @@ module PostHog queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret') - worker.run + worker = described_class.new(queue, 'secret', flush_interval_seconds: 0) + run_worker_until_idle(worker, queue) expect(queue).to be_empty end.to_not raise_error @@ -59,13 +75,15 @@ module PostHog queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret', on_error: on_error) + worker = described_class.new(queue, 'secret', on_error: on_error, flush_interval_seconds: 0) # This is to ensure that Client#flush doesn't finish before calling # the error handler. - Thread.new { worker.run } + worker_thread = Thread.new { worker.run } sleep 0.1 # First give thread time to spin-up. sleep 0.01 while worker.is_requesting? + worker.shutdown + worker_thread.join(1) expect(queue).to be_empty expect(status).to eq(400) @@ -75,7 +93,12 @@ module PostHog it 'clears the in-flight batch if the error handler raises' do queue = Queue.new queue << {} - worker = described_class.new(queue, 'secret', on_error: proc { raise 'handler failed' }) + worker = described_class.new( + queue, + 'secret', + on_error: proc { raise 'handler failed' }, + flush_interval_seconds: 0 + ) transport = instance_double( PostHog::Transport, send: PostHog::Response.new(400, 'Some error'), @@ -83,7 +106,14 @@ module PostHog ) worker.instance_variable_set(:@transport, transport) - expect { worker.run }.to_not raise_error + worker_thread = Thread.new { worker.run } + eventually do + expect(queue).to be_empty + expect(worker.is_requesting?).to eq(false) + end + worker.shutdown + + expect(worker_thread.join(1)).to eq(worker_thread) expect(queue).to be_empty expect(worker.is_requesting?).to eq(false) end @@ -95,32 +125,167 @@ module PostHog queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret', on_error: on_error) - worker.run + worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval_seconds: 0) + run_worker_until_idle(worker, queue) expect(queue).to be_empty end it 'calls on_error for bad json' do - bad_obj = Object.new - def bad_obj.to_json(*_args) + bad_message = Requested::CAPTURE.dup + def bad_message.to_json(*_args) raise "can't serialize to json" end on_error = proc {} expect(on_error).to receive(:call).once.with(-1, /serialize to json/) - good_message = Requested::CAPTURE - bad_message = Requested::CAPTURE.merge({ 'bad_obj' => bad_obj }) - queue = Queue.new - queue << good_message queue << bad_message - worker = described_class.new(queue, 'testsecret', on_error: on_error) - worker.run + worker = described_class.new(queue, 'testsecret', on_error: on_error, flush_interval_seconds: 0) + run_worker_until_idle(worker, queue) expect(queue).to be_empty end + + it 'waits for flush_interval_seconds before sending a partial batch' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval_seconds: 0.05) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker_thread = Thread.new { worker.run } + eventually { expect(sends).to eq([1]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(sends).to eq([1]) + expect(elapsed).to be >= 0.05 + end + + it 'sends immediately when the batch size is reached' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + queue << Requested::IDENTIFY + worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval_seconds: 60) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker_thread = Thread.new { worker.run } + eventually { expect(sends).to eq([2]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(sends).to eq([2]) + expect(elapsed).to be < 1 + end + + it 'wakes and sends messages enqueued while waiting' do + sent_batches = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sent_batches << JSON.parse(batch.to_json).map { |message| message['event'] } + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + worker = described_class.new(queue, 'testsecret', batch_size: 2, flush_interval_seconds: 60) + + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker_thread = Thread.new { worker.run } + eventually { expect(worker.is_requesting?).to eq(true) } + + queue << Requested::CAPTURE.merge(event: 'Second event') + worker.notify + + eventually { expect(sent_batches).to eq([[Requested::CAPTURE[:event], 'Second event']]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(sent_batches).to eq([[Requested::CAPTURE[:event], 'Second event']]) + expect(elapsed).to be < 1 + end + + it 'stays alive while idle and handles a later enqueue' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + worker = described_class.new(queue, 'testsecret', batch_size: 1, flush_interval_seconds: 60) + worker_thread = Thread.new { worker.run } + + eventually { expect(worker_thread).to be_alive } + queue << Requested::CAPTURE + worker.notify + + eventually { expect(sends).to eq([1]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + end + + it 'does not keep a stale flush request while idle' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval_seconds: 0.05) + worker_thread = Thread.new { worker.run } + eventually { expect(worker_thread).to be_alive } + + worker.request_flush + sleep 0.01 + queue << Requested::CAPTURE + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + worker.notify + + eventually { expect(sends).to eq([1]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + expect(elapsed).to be >= 0.05 + end + + it 'flushes immediately when requested' do + sends = [] + allow_any_instance_of(PostHog::Transport).to receive(:send) do |_transport, _api_key, batch| + sends << batch.length + PostHog::Response.new(200, 'Success') + end + + queue = Queue.new + queue << Requested::CAPTURE + worker = described_class.new(queue, 'testsecret', batch_size: 10, flush_interval_seconds: 60) + + worker_thread = Thread.new { worker.run } + eventually { expect(worker.is_requesting?).to eq(true) } + worker.request_flush + + eventually { expect(sends).to eq([1]) } + worker.shutdown + expect(worker_thread.join(1)).to eq(worker_thread) + expect(sends).to eq([1]) + end end describe '#is_requesting?' do @@ -139,11 +304,13 @@ def bad_obj.to_json(*_args) queue = Queue.new queue << Requested::CAPTURE - worker = described_class.new(queue, 'testsecret') + worker = described_class.new(queue, 'testsecret', flush_interval_seconds: 0) worker_thread = Thread.new { worker.run } eventually { expect(worker.is_requesting?).to eq(true) } + eventually { expect(worker.is_requesting?).to eq(false) } + worker.shutdown worker_thread.join expect(worker.is_requesting?).to eq(false) end