perf: sender pool for the event transport (removes head-of-line blocking)#147
perf: sender pool for the event transport (removes head-of-line blocking)#147cat-ph wants to merge 15 commits into
Conversation
Replace per-capture direct HTTP POST with a single background std::thread worker (blocking reqwest, no async runtime) that batches events, retries transient failures with backoff honoring Retry-After, and is driven by an injectable clock. capture/capture_batch are non-blocking enqueues. Adds flush, shutdown (flush + stop + join, idempotent), flush-on-Drop, and pending_events on both clients, plus global flush/shutdown (the global client lives in a static, whose Drop never runs). The bounded queue drops with a single warning when full; before_send now runs on the worker so it applies to every queued event. An injectable Clock (now + now_utc) makes interval flush, retry backoff, and v1 wire timestamps deterministic in tests (ManualClock + a test-only Tick; no real sleeps). v0 single capture now batches through /batch/. The compliance adapter delegates batching/flush/shutdown to the SDK. BREAKING CHANGE: capture/capture_batch return Ok as soon as the event is queued instead of awaiting delivery, and HTTP failures surface as logged warnings rather than Err. Call flush or shutdown before process exit to ensure queued events are delivered.
…t work Addresses review on PR #145. - transport: re-arm the bounded-queue "full" warning once the queue drains to empty, so a service that repeatedly fills then drains warns once per episode instead of only on the very first overflow. - transport: pending_events() now counts events accepted but not yet delivered (channel + worker buffer + retry queue) rather than only channel depth, by decrementing the in-flight counter when a batch reaches a terminal outcome (delivered/dropped) in both the v0 and v1 pipelines. This also makes the bounded queue cap total backlog. - adapter: document that total_events_sent can over-count only when the SDK queue fills (capture is fire-and-forget); the acceptance suite never fills it.
pending_events now counts channel + worker buffer + retries; the public doc on both clients still described the old channel-only meaning (flagged by autoreview).
# Conflicts: # src/client/async_client.rs # src/client/blocking.rs # tests/test_async.rs # tests/test_blocking.rs
pending_events exposed internal queue depth only for the compliance adapter (the harness /state contract requires the field); it is not meaningful public API. Gate it, plus the internal TransportHandle::pending it calls, behind test-harness so normal builds no longer expose it. The adapter enables test-harness so it still compiles; the --all-features public-API snapshot is unchanged (test-harness items appear there, like extra_capture_headers).
The helper used flush_interval_ms=50 and retry_initial_backoff_ms=1, so the background worker could fire an interval flush or a scheduled retry within milliseconds and race the exact assert_hits() checks (e.g. flush_keeps_events_retryable asserts hits==1 while the held batch auto-retried). The --workspace CI job lost that race. Use long interval/backoff (600s) so every send is driven by the flush_at threshold or an explicit flush()/shutdown(); no test_transport test relies on autonomous timers (those are covered deterministically by the ManualClock unit tests in transport.rs). Verified stable across 35 runs in all three configs that run this suite.
Events were timestamped when the worker built the batch (publish time), so batching or a retry pushed an event's `timestamp` to when it was finally sent rather than when it occurred. Stamp the capture (enqueue) time on the producer side when the caller set none, for both the v0 and v1 wire formats; v1 `created_at` and the request timestamp header already recorded publish time and are unchanged. Also add a v0 batch `sent_at` (publish time) so ingestion can do clock-skew correction, matching what v1 already sends. From @eli-r-ph's review on the event-transport PR.
shutdown() and Drop attempted one final send per pending batch with no overall deadline, so a slow or unreachable endpoint could block process teardown for up to (batches x request_timeout). Add a `shutdown_timeout_ms` option (default 30000): the worker stamps a drain deadline when it handles Shutdown (and on channel disconnect) and, once it passes, drops the remaining buffered/retrying events instead of attempting them, so exit can't hang. Drop takes the same path, so it is bounded too. flush() stays a blocking, bounded delivery barrier (one attempt per batch, returns even when the server is down) -- only teardown is time-bounded. The drain paths now take an `Option<Instant>` deadline instead of a `final_attempt` bool (Some => final attempt + deadline-gated). From @eli-r-ph's review on the event-transport PR.
Regenerated api/public-api.txt to record the new ClientOptionsBuilder::shutdown_timeout_ms builder method added alongside the shutdown drain timeout.
The single worker ran the blocking POST inline, so one slow or stalled endpoint blocked draining and could fill the bounded queue. Split it into a dispatcher thread that owns the buffer, retry queue, and schedule, plus a small pool of sender threads (~min(cores, 4)) that run the blocking POST concurrently and report each outcome back. The dispatcher stays the sole owner of the retry queue (no shared-state lock); `flush`/`shutdown` remain barriers that complete once the sends they dispatched report back. A slow endpoint now stalls at most one sender, not the whole pipeline. Cross-batch ordering becomes best-effort, which is fine now that event timestamps are stamped at capture time. Adds crossbeam-channel: std mpsc is single-consumer, so a worker pool would otherwise serialize on a mutex-wrapped receiver; crossbeam gives an MPMC job queue and a select over the control/outcome channels plus a timer. From @eli-r-ph's review on the event-transport PR.
The shutdown path dispatched every pending batch into the unbounded sender job channel up front, so on a slow endpoint the deadline could fire while many jobs were still queued. The dispatcher exited, but the detached senders kept draining and POSTing those queued jobs, so a closed client could keep doing network work well past shutdown_timeout. Add a shared `abandon` flag the senders check before each POST: when the deadline fires the dispatcher sets it and senders drop their remaining queued jobs (accounting for them) instead of sending. An already in-flight POST still finishes, bounded by the request timeout. Found by autoreview.
A batch dispatched before shutdown (non-final) that failed transiently while shutdown was being processed was dropped, instead of getting the final attempt it would have received under the old single-worker transport (which could not process Shutdown until the in-flight attempt completed). A final attempt never returns Retry, so any Retry seen mid-teardown is such a batch: re-dispatch it as a final attempt while the shutdown deadline allows, and drop it only once past the deadline. Found by autoreview.
…tries Two teardown/barrier correctness fixes from autoreview on the sender pool: - Defer captures that arrive while a flush/shutdown barrier is pending and replay them once it clears, so flush() only waits for work queued before the call. The historical_migration mismatch path could otherwise dispatch a post-flush capture and make flush wait on it, up to a request timeout. - When the dispatcher has already exited (shutdown deadline) and a non-final in-flight attempt then fails transiently, the sender can't report the Retry; drop the batch so pending_events stays accounted instead of leaking.
v0 baked sent_at into the serialized batch body at build time and reused those bytes for every retry, so a retried batch reported its original build time rather than when it was actually sent -- skewing server-side clock-skew correction by the retry/queue delay. Keep the v0 batch structured (like v1 keeps its events) and re-stamp sent_at + re-serialize per attempt. Found by autoreview.
flush() waits on the global in-flight count, so concurrent flush() calls from multiple threads can complete based on the shared drain rather than strictly each call's pre-call events. Single-threaded use is exact. Documented here as a known, deliberate scope choice; a fully precise fix (per-flush generation tracking) is tracked as a follow-up.
|
| /// Drop held captures (zero-grace shutdown), accounting for them in `len`. | ||
| fn drop_deferred(&mut self) { | ||
| let n = self.deferred.len(); | ||
| if n > 0 { | ||
| self.deferred.clear(); | ||
| dec_len(&self.pipeline.len, n); | ||
| } | ||
| } |
There was a problem hiding this comment.
drop_deferred silently discards events without any log message. drop_buffer logs a warning in the same codepath (zero-grace shutdown), so a user debugging missing events on the drop_buffer path would see a message, but an equivalent loss from the deferred queue would be invisible. Given the deferred queue can hold events that arrived during an active flush (a common concurrent-capture scenario), this asymmetry makes the zero-grace shutdown harder to reason about.
| /// Drop held captures (zero-grace shutdown), accounting for them in `len`. | |
| fn drop_deferred(&mut self) { | |
| let n = self.deferred.len(); | |
| if n > 0 { | |
| self.deferred.clear(); | |
| dec_len(&self.pipeline.len, n); | |
| } | |
| } | |
| /// Drop held captures (zero-grace shutdown), accounting for them in `len`. | |
| fn drop_deferred(&mut self) { | |
| let n = self.deferred.len(); | |
| if n > 0 { | |
| warn!( | |
| "posthog-rs: shutdown timeout reached; dropping {} deferred event(s)", | |
| n | |
| ); | |
| self.deferred.clear(); | |
| dec_len(&self.pipeline.len, n); | |
| } | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/client/transport.rs
Line: 510-517
Comment:
`drop_deferred` silently discards events without any log message. `drop_buffer` logs a warning in the same codepath (zero-grace shutdown), so a user debugging missing events on the `drop_buffer` path would see a message, but an equivalent loss from the deferred queue would be invisible. Given the deferred queue can hold events that arrived during an active flush (a common concurrent-capture scenario), this asymmetry makes the zero-grace shutdown harder to reason about.
```suggestion
/// Drop held captures (zero-grace shutdown), accounting for them in `len`.
fn drop_deferred(&mut self) {
let n = self.deferred.len();
if n > 0 {
warn!(
"posthog-rs: shutdown timeout reached; dropping {} deferred event(s)",
n
);
self.deferred.clear();
dec_len(&self.pipeline.len, n);
}
}
```
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| None => match outcome_rx.recv() { | ||
| Ok(r) => dispatcher.on_outcome(Ok(r)), | ||
| Err(_) => dispatcher.done = true, | ||
| }, |
There was a problem hiding this comment.
pending_events leak if all senders panic
If every sender thread panics (terminates without sending an outcome), outcome_rx closes while in_flight > 0. The disconnected-drain path's Err arm sets done = true and exits the loop without decrementing len for the in-flight batches — pending_events() would then report stale counts indefinitely. This is an exceptional path and the PR already acknowledges a future panic-hook; noting it here so the follow-up work has a concrete surface area to address (dec_len the still-in-flight count, or add a panic-hook on sender spawn).
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/client/transport.rs
Line: 728-731
Comment:
**`pending_events` leak if all senders panic**
If every sender thread panics (terminates without sending an outcome), `outcome_rx` closes while `in_flight > 0`. The disconnected-drain path's `Err` arm sets `done = true` and exits the loop without decrementing `len` for the in-flight batches — `pending_events()` would then report stale counts indefinitely. This is an exceptional path and the PR already acknowledges a future panic-hook; noting it here so the follow-up work has a concrete surface area to address (`dec_len` the still-in-flight count, or add a panic-hook on sender spawn).
How can I resolve this? If you propose a fix, please make it concise.
Stacked on #145 (base branch
ci/rust-event-transport) — review/merge that first.What
Addresses the head-of-line blocking raised in review: the single transport worker did the blocking POST inline, so one slow or stalled endpoint blocked draining and could fill the bounded queue.
Splits the worker into:
min(cores, 4)) that runs the blocking POSTs concurrently and reports each outcome back.A slow endpoint now stalls at most one sender instead of the whole pipeline.
flush()/shutdown()stay barriers that complete once the sends they dispatched report back. Cross-batch ordering becomes best-effort, which is fine now that event timestamps are stamped at capture time (#145).Adds
crossbeam-channelfor the MPMC job queue plus aselectover the control/outcome channels — stdmpscis single-consumer, so a pool would otherwise serialize on a mutex-wrapped receiver. Deliberately not a tokio task: the worker stays runtime-independent (blocking client + a future panic hook).Review (autoreview)
Ran a structured review over the diff across several rounds; it caught 5 real bugs in the shutdown/barrier semantics, all fixed with regression tests:
shutdown_timeout→ abandon flagflush()could wait on captures that arrived after the call → defer + replaypending_eventscount → accounted on send failuresent_atwas stale on retries → stamped per attemptKnown limitation (deferred, documented in code):
flush()under concurrent multi-threaded calls can under-wait — a pre-existing global-barrier limitation; a precise fix needs per-flush generation tracking. Tracked as a follow-up.Tests
Full gate green: fmt, clippy (
-D warnings, v0 + v1), public-api snapshot,publish --dry-run, and every unit-test feature config (default,--workspace, capture-v1 async + blocking, error-tracking blocking, error-tracking + capture-v1, e2e). Added deterministic regression tests for the pool.