Skip to content

perf: sender pool for the event transport (removes head-of-line blocking)#147

Draft
cat-ph wants to merge 15 commits into
mainfrom
ci/rust-event-transport-sender-pool
Draft

perf: sender pool for the event transport (removes head-of-line blocking)#147
cat-ph wants to merge 15 commits into
mainfrom
ci/rust-event-transport-sender-pool

Conversation

@cat-ph

@cat-ph cat-ph commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Stacked on #145 (base branch ci/rust-event-transport) — review/merge that first.

What

Addresses the head-of-line blocking raised in review: the single transport worker did the blocking POST inline, so one slow or stalled endpoint blocked draining and could fill the bounded queue.

Splits the worker into:

  • a dispatcher thread that owns the buffer, retry queue, and schedule (single-owner, no lock), and
  • a small sender pool (min(cores, 4)) that runs the blocking POSTs concurrently and reports each outcome back.

A slow endpoint now stalls at most one sender instead of the whole pipeline. flush()/shutdown() stay barriers that complete once the sends they dispatched report back. Cross-batch ordering becomes best-effort, which is fine now that event timestamps are stamped at capture time (#145).

Adds crossbeam-channel for the MPMC job queue plus a select over the control/outcome channels — std mpsc is single-consumer, so a pool would otherwise serialize on a mutex-wrapped receiver. Deliberately not a tokio task: the worker stays runtime-independent (blocking client + a future panic hook).

Review (autoreview)

Ran a structured review over the diff across several rounds; it caught 5 real bugs in the shutdown/barrier semantics, all fixed with regression tests:

  • queued sender jobs kept POSTing past shutdown_timeout → abandon flag
  • a pre-shutdown in-flight batch that failed transiently was dropped instead of getting its final attempt → re-dispatch as final
  • flush() could wait on captures that arrived after the call → defer + replay
  • abandoned in-flight retries leaked the pending_events count → accounted on send failure
  • v0 sent_at was stale on retries → stamped per attempt

Known limitation (deferred, documented in code): flush() under concurrent multi-threaded calls can under-wait — a pre-existing global-barrier limitation; a precise fix needs per-flush generation tracking. Tracked as a follow-up.

Tests

Full gate green: fmt, clippy (-D warnings, v0 + v1), public-api snapshot, publish --dry-run, and every unit-test feature config (default, --workspace, capture-v1 async + blocking, error-tracking blocking, error-tracking + capture-v1, e2e). Added deterministic regression tests for the pool.

cat-ph added 15 commits June 15, 2026 21:53
Replace per-capture direct HTTP POST with a single background std::thread worker (blocking reqwest, no async runtime) that batches events, retries transient failures with backoff honoring Retry-After, and is driven by an injectable clock.

capture/capture_batch are non-blocking enqueues. Adds flush, shutdown (flush + stop + join, idempotent), flush-on-Drop, and pending_events on both clients, plus global flush/shutdown (the global client lives in a static, whose Drop never runs). The bounded queue drops with a single warning when full; before_send now runs on the worker so it applies to every queued event. An injectable Clock (now + now_utc) makes interval flush, retry backoff, and v1 wire timestamps deterministic in tests (ManualClock + a test-only Tick; no real sleeps). v0 single capture now batches through /batch/. The compliance adapter delegates batching/flush/shutdown to the SDK.

BREAKING CHANGE: capture/capture_batch return Ok as soon as the event is queued instead of awaiting delivery, and HTTP failures surface as logged warnings rather than Err. Call flush or shutdown before process exit to ensure queued events are delivered.
…t work

Addresses review on PR #145.

- transport: re-arm the bounded-queue "full" warning once the queue drains to empty, so a service that repeatedly fills then drains warns once per episode instead of only on the very first overflow.

- transport: pending_events() now counts events accepted but not yet delivered (channel + worker buffer + retry queue) rather than only channel depth, by decrementing the in-flight counter when a batch reaches a terminal outcome (delivered/dropped) in both the v0 and v1 pipelines. This also makes the bounded queue cap total backlog.

- adapter: document that total_events_sent can over-count only when the SDK queue fills (capture is fire-and-forget); the acceptance suite never fills it.
pending_events now counts channel + worker buffer + retries; the public doc on both clients still described the old channel-only meaning (flagged by autoreview).
# Conflicts:
#	src/client/async_client.rs
#	src/client/blocking.rs
#	tests/test_async.rs
#	tests/test_blocking.rs
pending_events exposed internal queue depth only for the compliance adapter (the harness /state contract requires the field); it is not meaningful public API. Gate it, plus the internal TransportHandle::pending it calls, behind test-harness so normal builds no longer expose it.

The adapter enables test-harness so it still compiles; the --all-features public-API snapshot is unchanged (test-harness items appear there, like extra_capture_headers).
The helper used flush_interval_ms=50 and retry_initial_backoff_ms=1, so the background worker could fire an interval flush or a scheduled retry within milliseconds and race the exact assert_hits() checks (e.g. flush_keeps_events_retryable asserts hits==1 while the held batch auto-retried). The --workspace CI job lost that race.

Use long interval/backoff (600s) so every send is driven by the flush_at threshold or an explicit flush()/shutdown(); no test_transport test relies on autonomous timers (those are covered deterministically by the ManualClock unit tests in transport.rs). Verified stable across 35 runs in all three configs that run this suite.
Events were timestamped when the worker built the batch (publish time),
so batching or a retry pushed an event's `timestamp` to when it was
finally sent rather than when it occurred. Stamp the capture (enqueue)
time on the producer side when the caller set none, for both the v0 and
v1 wire formats; v1 `created_at` and the request timestamp header already
recorded publish time and are unchanged.

Also add a v0 batch `sent_at` (publish time) so ingestion can do
clock-skew correction, matching what v1 already sends.

From @eli-r-ph's review on the event-transport PR.
shutdown() and Drop attempted one final send per pending batch with no
overall deadline, so a slow or unreachable endpoint could block process
teardown for up to (batches x request_timeout). Add a `shutdown_timeout_ms`
option (default 30000): the worker stamps a drain deadline when it handles
Shutdown (and on channel disconnect) and, once it passes, drops the
remaining buffered/retrying events instead of attempting them, so exit
can't hang. Drop takes the same path, so it is bounded too.

flush() stays a blocking, bounded delivery barrier (one attempt per batch,
returns even when the server is down) -- only teardown is time-bounded.
The drain paths now take an `Option<Instant>` deadline instead of a
`final_attempt` bool (Some => final attempt + deadline-gated).

From @eli-r-ph's review on the event-transport PR.
Regenerated api/public-api.txt to record the new
ClientOptionsBuilder::shutdown_timeout_ms builder method added alongside the
shutdown drain timeout.
The single worker ran the blocking POST inline, so one slow or stalled
endpoint blocked draining and could fill the bounded queue. Split it into a
dispatcher thread that owns the buffer, retry queue, and schedule, plus a
small pool of sender threads (~min(cores, 4)) that run the blocking POST
concurrently and report each outcome back. The dispatcher stays the sole
owner of the retry queue (no shared-state lock); `flush`/`shutdown` remain
barriers that complete once the sends they dispatched report back.

A slow endpoint now stalls at most one sender, not the whole pipeline.
Cross-batch ordering becomes best-effort, which is fine now that event
timestamps are stamped at capture time.

Adds crossbeam-channel: std mpsc is single-consumer, so a worker pool would
otherwise serialize on a mutex-wrapped receiver; crossbeam gives an MPMC job
queue and a select over the control/outcome channels plus a timer.

From @eli-r-ph's review on the event-transport PR.
The shutdown path dispatched every pending batch into the unbounded sender
job channel up front, so on a slow endpoint the deadline could fire while
many jobs were still queued. The dispatcher exited, but the detached senders
kept draining and POSTing those queued jobs, so a closed client could keep
doing network work well past shutdown_timeout.

Add a shared `abandon` flag the senders check before each POST: when the
deadline fires the dispatcher sets it and senders drop their remaining queued
jobs (accounting for them) instead of sending. An already in-flight POST
still finishes, bounded by the request timeout. Found by autoreview.
A batch dispatched before shutdown (non-final) that failed transiently while
shutdown was being processed was dropped, instead of getting the final attempt
it would have received under the old single-worker transport (which could not
process Shutdown until the in-flight attempt completed). A final attempt never
returns Retry, so any Retry seen mid-teardown is such a batch: re-dispatch it
as a final attempt while the shutdown deadline allows, and drop it only once
past the deadline. Found by autoreview.
…tries

Two teardown/barrier correctness fixes from autoreview on the sender pool:

- Defer captures that arrive while a flush/shutdown barrier is pending and
  replay them once it clears, so flush() only waits for work queued before the
  call. The historical_migration mismatch path could otherwise dispatch a
  post-flush capture and make flush wait on it, up to a request timeout.

- When the dispatcher has already exited (shutdown deadline) and a non-final
  in-flight attempt then fails transiently, the sender can't report the Retry;
  drop the batch so pending_events stays accounted instead of leaking.
v0 baked sent_at into the serialized batch body at build time and reused those
bytes for every retry, so a retried batch reported its original build time
rather than when it was actually sent -- skewing server-side clock-skew
correction by the retry/queue delay. Keep the v0 batch structured (like v1
keeps its events) and re-stamp sent_at + re-serialize per attempt. Found by
autoreview.
flush() waits on the global in-flight count, so concurrent flush() calls from
multiple threads can complete based on the shared drain rather than strictly
each call's pre-call events. Single-threaded use is exact. Documented here as a
known, deliberate scope choice; a fully precise fix (per-flush generation
tracking) is tracked as a follow-up.
@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown

Comments Outside Diff (1)

  1. src/client/transport.rs, line 1057-1163 (link)

    P2 Tests could be parameterised per the repo convention

    The three new regression tests (flush_waits_for_every_batch_across_the_pool, sender_abandons_queued_jobs_without_posting, sender_accounts_retry_when_dispatcher_is_gone) each exercise one specific scenario. The repo prefers parameterised tests where applicable. While these scenarios have meaningfully different setups, a parameterised approach for, for example, the abandon-flag test (varying event counts, or abandon = true/false) and the dispatcher-gone test (varying max_capture_attempts) would make it easier to extend coverage later without adding boilerplate.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/client/transport.rs
    Line: 1057-1163
    
    Comment:
    **Tests could be parameterised per the repo convention**
    
    The three new regression tests (`flush_waits_for_every_batch_across_the_pool`, `sender_abandons_queued_jobs_without_posting`, `sender_accounts_retry_when_dispatcher_is_gone`) each exercise one specific scenario. The repo prefers parameterised tests where applicable. While these scenarios have meaningfully different setups, a parameterised approach for, for example, the abandon-flag test (varying event counts, or `abandon = true/false`) and the dispatcher-gone test (varying `max_capture_attempts`) would make it easier to extend coverage later without adding boilerplate.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
src/client/transport.rs:510-517
`drop_deferred` silently discards events without any log message. `drop_buffer` logs a warning in the same codepath (zero-grace shutdown), so a user debugging missing events on the `drop_buffer` path would see a message, but an equivalent loss from the deferred queue would be invisible. Given the deferred queue can hold events that arrived during an active flush (a common concurrent-capture scenario), this asymmetry makes the zero-grace shutdown harder to reason about.

```suggestion
    /// Drop held captures (zero-grace shutdown), accounting for them in `len`.
    fn drop_deferred(&mut self) {
        let n = self.deferred.len();
        if n > 0 {
            warn!(
                "posthog-rs: shutdown timeout reached; dropping {} deferred event(s)",
                n
            );
            self.deferred.clear();
            dec_len(&self.pipeline.len, n);
        }
    }
```

### Issue 2 of 3
src/client/transport.rs:728-731
**`pending_events` leak if all senders panic**

If every sender thread panics (terminates without sending an outcome), `outcome_rx` closes while `in_flight > 0`. The disconnected-drain path's `Err` arm sets `done = true` and exits the loop without decrementing `len` for the in-flight batches — `pending_events()` would then report stale counts indefinitely. This is an exceptional path and the PR already acknowledges a future panic-hook; noting it here so the follow-up work has a concrete surface area to address (`dec_len` the still-in-flight count, or add a panic-hook on sender spawn).

### Issue 3 of 3
src/client/transport.rs:1057-1163
**Tests could be parameterised per the repo convention**

The three new regression tests (`flush_waits_for_every_batch_across_the_pool`, `sender_abandons_queued_jobs_without_posting`, `sender_accounts_retry_when_dispatcher_is_gone`) each exercise one specific scenario. The repo prefers parameterised tests where applicable. While these scenarios have meaningfully different setups, a parameterised approach for, for example, the abandon-flag test (varying event counts, or `abandon = true/false`) and the dispatcher-gone test (varying `max_capture_attempts`) would make it easier to extend coverage later without adding boilerplate.

Reviews (1): Last reviewed commit: "docs: note the concurrent-flush barrier ..." | Re-trigger Greptile

Comment thread src/client/transport.rs
Comment on lines +510 to +517
/// Drop held captures (zero-grace shutdown), accounting for them in `len`.
fn drop_deferred(&mut self) {
let n = self.deferred.len();
if n > 0 {
self.deferred.clear();
dec_len(&self.pipeline.len, n);
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 drop_deferred silently discards events without any log message. drop_buffer logs a warning in the same codepath (zero-grace shutdown), so a user debugging missing events on the drop_buffer path would see a message, but an equivalent loss from the deferred queue would be invisible. Given the deferred queue can hold events that arrived during an active flush (a common concurrent-capture scenario), this asymmetry makes the zero-grace shutdown harder to reason about.

Suggested change
/// Drop held captures (zero-grace shutdown), accounting for them in `len`.
fn drop_deferred(&mut self) {
let n = self.deferred.len();
if n > 0 {
self.deferred.clear();
dec_len(&self.pipeline.len, n);
}
}
/// Drop held captures (zero-grace shutdown), accounting for them in `len`.
fn drop_deferred(&mut self) {
let n = self.deferred.len();
if n > 0 {
warn!(
"posthog-rs: shutdown timeout reached; dropping {} deferred event(s)",
n
);
self.deferred.clear();
dec_len(&self.pipeline.len, n);
}
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/client/transport.rs
Line: 510-517

Comment:
`drop_deferred` silently discards events without any log message. `drop_buffer` logs a warning in the same codepath (zero-grace shutdown), so a user debugging missing events on the `drop_buffer` path would see a message, but an equivalent loss from the deferred queue would be invisible. Given the deferred queue can hold events that arrived during an active flush (a common concurrent-capture scenario), this asymmetry makes the zero-grace shutdown harder to reason about.

```suggestion
    /// Drop held captures (zero-grace shutdown), accounting for them in `len`.
    fn drop_deferred(&mut self) {
        let n = self.deferred.len();
        if n > 0 {
            warn!(
                "posthog-rs: shutdown timeout reached; dropping {} deferred event(s)",
                n
            );
            self.deferred.clear();
            dec_len(&self.pipeline.len, n);
        }
    }
```

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment thread src/client/transport.rs
Comment on lines +728 to +731
None => match outcome_rx.recv() {
Ok(r) => dispatcher.on_outcome(Ok(r)),
Err(_) => dispatcher.done = true,
},

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 pending_events leak if all senders panic

If every sender thread panics (terminates without sending an outcome), outcome_rx closes while in_flight > 0. The disconnected-drain path's Err arm sets done = true and exits the loop without decrementing len for the in-flight batches — pending_events() would then report stale counts indefinitely. This is an exceptional path and the PR already acknowledges a future panic-hook; noting it here so the follow-up work has a concrete surface area to address (dec_len the still-in-flight count, or add a panic-hook on sender spawn).

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/client/transport.rs
Line: 728-731

Comment:
**`pending_events` leak if all senders panic**

If every sender thread panics (terminates without sending an outcome), `outcome_rx` closes while `in_flight > 0`. The disconnected-drain path's `Err` arm sets `done = true` and exits the loop without decrementing `len` for the in-flight batches — `pending_events()` would then report stale counts indefinitely. This is an exceptional path and the PR already acknowledges a future panic-hook; noting it here so the follow-up work has a concrete surface area to address (`dec_len` the still-in-flight count, or add a panic-hook on sender spawn).

How can I resolve this? If you propose a fix, please make it concise.

Base automatically changed from ci/rust-event-transport to main June 24, 2026 14:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant