Skip to content
5 changes: 5 additions & 0 deletions .changeset/gentle-wolves-flush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'posthog-ruby': minor
---

Add configurable flush interval for async event batching.
4 changes: 4 additions & 0 deletions lib/posthog/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def _decrement_instance_count(api_key)
# @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://us.i.posthog.com`.
# @option opts [Integer] :max_queue_size Maximum number of calls to remain queued. Defaults to 10_000.
# @option opts [Integer] :batch_size Maximum number of events to send in one async batch.
# @option opts [Numeric] :flush_interval_seconds Maximum seconds to wait for an async batch to fill before sending.
# Defaults to 5.
# @option opts [Boolean] :test_mode +true+ if messages should remain queued for testing. Defaults to +false+.
# @option opts [Boolean] :sync_mode +true+ to send events synchronously on the calling thread. Useful in
# forking environments like Sidekiq and Resque. Defaults to +false+.
Expand Down Expand Up @@ -172,6 +174,7 @@ def flush

while !@queue.empty? || @worker.is_requesting?
ensure_worker_running
@worker.request_flush
sleep(0.1)
end
end
Expand Down Expand Up @@ -972,6 +975,7 @@ def enqueue(action)

if queued
ensure_worker_running
@worker.notify
true
else
logger.warn(
Expand Down
1 change: 1 addition & 0 deletions lib/posthog/defaults.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module Message
module MessageBatch
MAX_BYTES = 512_000 # 500Kb
MAX_SIZE = 100
FLUSH_INTERVAL_SECONDS = 5.0 # seconds
end

module BackoffPolicy
Expand Down
10 changes: 10 additions & 0 deletions lib/posthog/noop_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ def is_requesting? # rubocop:disable Naming/PredicateName
false
end

# @return [void]
def request_flush
# Does nothing
end

# @return [void]
def notify
# Does nothing
end

# @return [void]
def shutdown
# Does nothing
Expand Down
152 changes: 133 additions & 19 deletions lib/posthog/send_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class SendWorker
# @param api_key [String] Project API key.
# @param options [Hash] Worker options.
# @option options [Integer] :batch_size How many items to send in a batch.
# @option options [Numeric] :flush_interval_seconds Maximum seconds to wait for a batch to fill before sending.
# @option options [Proc] :on_error Callback invoked as `on_error.call(status, error)`.
# @option options [String] :host PostHog API host URL.
# @option options [Boolean] :skip_ssl_verification Disable SSL certificate verification.
Expand All @@ -32,70 +33,183 @@ def initialize(queue, api_key, options = {})
@api_key = api_key
@on_error = options[:on_error] || proc { |status, error| }
batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
flush_interval_seconds = options[:flush_interval_seconds] || Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS
@flush_interval_seconds = flush_interval_seconds.to_f
@batch = MessageBatch.new(batch_size)
@lock = Mutex.new
@shutdown_mutex = Mutex.new
@state_lock = Mutex.new
@condition = ConditionVariable.new
@flush_requested = false
@shutdown = false
@transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification]
@pid = Process.pid
@transport_options = { api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] }
@transport = Transport.new(@transport_options)
end

# Continuously runs the loop to check for new events.
#
# @return [void]
def run
ensure_current_process!

until shutdown?
return if @queue.empty?
wait_for_work
break if shutdown?
next if @queue.empty?

@lock.synchronize do
consume_message_from_queue! until @batch.full? || @queue.empty?
end
build_batch

begin
unless @batch.empty?
res = @transport.send @api_key, @batch
handle_error(res.status, res.error) unless res.status == 200
end
send_batch unless @batch.empty?
ensure
@lock.synchronize { @batch.clear }
clear_flush_request_if_idle
end
end
ensure
# Worker threads exit when the queue is drained and are restarted for the
# next burst of events. Close the persistent connection on each exit and
# let Transport reconnect lazily when a future worker sends another batch.
@transport.shutdown
shutdown_transport
end

# Request the worker to send any pending events without waiting for the
# configured flush interval. Used by Client#flush and shutdown paths.
#
# @return [void]
def request_flush
ensure_current_process!

@state_lock.synchronize do
@flush_requested = true
@condition.broadcast
end
end

# Wake the worker when producers enqueue new messages.
#
# @return [void]
def notify
ensure_current_process!

@state_lock.synchronize { @condition.signal }
end

# @return [void]
def shutdown
@shutdown_mutex.synchronize { @shutdown = true }
@transport.shutdown
ensure_current_process!

@state_lock.synchronize do
@shutdown = true
@flush_requested = true
@condition.broadcast
end
end

# public: Check whether we have outstanding requests.
#
# @return [Boolean] Whether the worker has outstanding requests.
# TODO: Rename to `requesting?` in future version
def is_requesting? # rubocop:disable Naming/PredicateName
ensure_current_process!

@lock.synchronize { !@batch.empty? }
end

private

def shutdown?
@shutdown_mutex.synchronize { @shutdown }
def ensure_current_process!
return if @pid == Process.pid

@lock.synchronize { @batch.clear }
@state_lock.synchronize do
@pid = Process.pid
@shutdown = false
@flush_requested = false
@condition.broadcast
end
shutdown_transport
@transport = Transport.new(@transport_options)
end

def build_batch
deadline = monotonic_time + @flush_interval_seconds

loop do
@lock.synchronize do
consume_message_from_queue! until @batch.full? || @queue.empty?
end

break if @batch.full? || @batch.empty? || flush_requested?

remaining = deadline - monotonic_time
break unless remaining.positive?

wait_for_more_messages(remaining)
end
end

def send_batch
res = @transport.send @api_key, @batch
handle_error(res.status, res.error) unless res.status == 200
end

def consume_message_from_queue!
@batch << @queue.pop
@batch << @queue.pop(true)
rescue ThreadError
# Queue was emptied by another thread between #empty? and #pop.
rescue MessageBatch::JSONGenerationError => e
handle_error(-1, e.to_s)
end

def wait_for_work
@state_lock.synchronize do
while @queue.empty? && !@shutdown
clear_flush_request_without_lock
@condition.wait(@state_lock)
end
end
end
Comment thread
marandaneto marked this conversation as resolved.

def wait_for_more_messages(timeout)
@state_lock.synchronize do
return if @flush_requested || @shutdown || !@queue.empty?

@condition.wait(@state_lock, timeout)
end
end

def flush_requested?
@state_lock.synchronize { @flush_requested }
end

def shutdown?
@state_lock.synchronize { @shutdown }
end

def clear_flush_request
@state_lock.synchronize { clear_flush_request_without_lock }
end

def clear_flush_request_if_idle
@state_lock.synchronize { clear_flush_request_without_lock if @queue.empty? }
end

def clear_flush_request_without_lock
@flush_requested = false
end

def handle_error(status, error)
@on_error.call(status, error)
rescue StandardError => e
logger.error("Error in on_error callback: #{e.message}")
end

def shutdown_transport
@transport.shutdown
rescue StandardError => e
logger.error("Error shutting down transport: #{e.message}")
end

def monotonic_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
1 change: 1 addition & 0 deletions public_api_snapshot.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ constant PostHog::Defaults::MAX_HASH_SIZE: Integer
module PostHog::Defaults::Message
constant PostHog::Defaults::Message::MAX_BYTES: Integer
module PostHog::Defaults::MessageBatch
constant PostHog::Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS: Float
constant PostHog::Defaults::MessageBatch::MAX_BYTES: Integer
constant PostHog::Defaults::MessageBatch::MAX_SIZE: Integer
module PostHog::Defaults::Queue
Expand Down
34 changes: 34 additions & 0 deletions spec/posthog/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,40 @@ def run

Process.wait
end

it 'completes after fork when the async worker has a partial batch' do
PostHog::Transport.stub = true
async_client = Client.new(api_key: API_KEY, batch_size: 10, flush_interval_seconds: 60)
worker = async_client.instance_variable_get(:@worker)
queue = async_client.instance_variable_get(:@queue)

async_client.capture(Queued::CAPTURE)
eventually do
expect(queue).to be_empty
expect(worker.is_requesting?).to eq(true)
end

reader, writer = IO.pipe
pid = Process.fork do
reader.close
flush_thread = Thread.new { async_client.flush }
result = flush_thread.join(0.5) == flush_thread ? 'returned' : 'hung'
flush_thread.kill if flush_thread.alive?
writer.write(result)
writer.close
exit! 0
end

writer.close
result = reader.read
Process.wait(pid)
reader.close

expect(result).to eq('returned')
ensure
async_client&.shutdown
PostHog::Transport.stub = false
end
end
end

Expand Down
Loading
Loading