feat: runtime-independent background event transport#145
Conversation
Prompt To Fix All With AIFix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
src/client/transport.rs:101-103
**Queue-full warning fires only once per client lifetime**
`full_warned` is set to `true` on the first overflow and is never reset, even after the queue drains back below capacity. Once the first overflow fires, all subsequent overflows produce no log output at all. In a production service where the worker falls behind (slow network, long backoff), the queue could fill, drain, fill, drain, … with events silently dropped on every refill after the very first warning. A caller seeing the single warning might assume it was a one-time spike and ignore it.
### Issue 2 of 3
compliance/adapter/src/main.rs:280-282
**`flushed` count is likely always 0 after a real flush**
`pending_events()` returns only the depth of the mpsc channel (events not yet consumed by the worker thread). By the time this HTTP handler is called, the worker has almost certainly already read the event off the channel and placed it in its internal `buffer: Vec<Event>` — so `before = 0` and `flushed = 0 - 0 = 0`, even though `flush()` just delivered that event. Events already in the worker buffer or held for retry are not visible to `pending_events()`. The reported `flushed` field is effectively always 0 for the typical "capture then flush" test pattern.
### Issue 3 of 3
compliance/adapter/src/main.rs:331
**`total_events_sent` can over-count when the SDK queue is full**
`total_events_captured` is incremented in the `/capture` handler before calling `capture_batch`, so if the SDK queue is full and the event is silently dropped, `total_events_captured` still reflects the dropped event. The formula `(total_events_captured - pending).max(0)` then counts that dropped event as "sent" once `pending` drains to 0. This is a semantic shift from the previous adapter, which only incremented `total_events_sent` on confirmed network delivery.
Reviews (1): Last reviewed commit: "feat: runtime-independent background eve..." | Re-trigger Greptile |
Replace per-capture direct HTTP POST with a single background std::thread worker (blocking reqwest, no async runtime) that batches events, retries transient failures with backoff honoring Retry-After, and is driven by an injectable clock. capture/capture_batch are non-blocking enqueues. Adds flush, shutdown (flush + stop + join, idempotent), flush-on-Drop, and pending_events on both clients, plus global flush/shutdown (the global client lives in a static, whose Drop never runs). The bounded queue drops with a single warning when full; before_send now runs on the worker so it applies to every queued event. An injectable Clock (now + now_utc) makes interval flush, retry backoff, and v1 wire timestamps deterministic in tests (ManualClock + a test-only Tick; no real sleeps). v0 single capture now batches through /batch/. The compliance adapter delegates batching/flush/shutdown to the SDK. BREAKING CHANGE: capture/capture_batch return Ok as soon as the event is queued instead of awaiting delivery, and HTTP failures surface as logged warnings rather than Err. Call flush or shutdown before process exit to ensure queued events are delivered.
f0072db to
2c64114
Compare
posthog-rs-v0 Compliance ReportDate: 2026-06-24 10:29:02 UTC
|
| Test | Status | Duration |
|---|---|---|
| Format Validation.Event Has Required Fields | ✅ | 146ms |
| Format Validation.Event Has Uuid | ✅ | 155ms |
| Format Validation.Event Has Lib Properties | ✅ | 150ms |
| Format Validation.Distinct Id Is String | ✅ | 153ms |
| Format Validation.Token Is Present | ✅ | 151ms |
| Format Validation.Custom Properties Preserved | ✅ | 152ms |
| Format Validation.Event Has Timestamp | ✅ | 146ms |
| Retry Behavior.Retries On 503 | ✅ | 5151ms |
| Retry Behavior.Does Not Retry On 400 | ✅ | 2156ms |
| Retry Behavior.Does Not Retry On 401 | ✅ | 2155ms |
| Retry Behavior.Respects Retry After Header | ✅ | 5105ms |
| Retry Behavior.Implements Backoff | ✅ | 15103ms |
| Retry Behavior.Retries On 500 | ✅ | 5106ms |
| Retry Behavior.Retries On 502 | ✅ | 5099ms |
| Retry Behavior.Retries On 504 | ✅ | 5102ms |
| Retry Behavior.Max Retries Respected | ✅ | 15102ms |
| Deduplication.Generates Unique Uuids | ✅ | 106ms |
| Deduplication.Preserves Uuid On Retry | ✅ | 5024ms |
| Deduplication.Preserves Uuid And Timestamp On Retry | ✅ | 10036ms |
| Deduplication.Preserves Uuid And Timestamp On Batch Retry | ✅ | 5031ms |
| Deduplication.No Duplicate Events In Batch | ✅ | 25ms |
| Deduplication.Different Events Have Different Uuids | ✅ | 23ms |
| Compression.Sends Gzip When Enabled | ✅ | 21ms |
| Batch Format.Uses Proper Batch Structure | ✅ | 22ms |
| Batch Format.Flush With No Events Sends Nothing | ✅ | 24ms |
| Batch Format.Multiple Events Batched Together | ✅ | 100ms |
| Error Handling.Does Not Retry On 403 | ✅ | 2069ms |
| Error Handling.Does Not Retry On 413 | ✅ | 2078ms |
| Error Handling.Retries On 408 | ✅ | 5079ms |
Feature_Flags Tests
View Details
| Test | Status | Duration |
|---|---|---|
| Request Payload.Request With Person Properties Device Id | ❌ | 48ms |
| Request Payload.Flags Request Uses V2 Query Param | ❌ | 48ms |
| Request Payload.Flags Request Hits Flags Path Not Decide | ❌ | 29ms |
| Request Payload.Flags Request Omits Authorization Header | ❌ | 36ms |
| Request Payload.Token In Flags Body Matches Init | ❌ | 24ms |
| Request Payload.Groups Round Trip | ❌ | 31ms |
| Request Payload.Groups Default To Empty Object | ❌ | 23ms |
| Request Payload.Person Properties Distinct Id Auto Populated When Caller Omits It | ❌ | 44ms |
| Request Payload.Disable Geoip False Propagates As Geoip Disable False | ❌ | 45ms |
| Request Payload.Disable Geoip Omitted Defaults To False | ❌ | 23ms |
| Request Payload.Flag Keys To Evaluate Contains Only Requested Key | ❌ | 23ms |
| Request Lifecycle.No Flags Request On Init Alone | ✅ | 30ms |
| Request Lifecycle.No Flags Request On Normal Capture | ✅ | 68ms |
| Request Lifecycle.Two Flag Calls Produce Two Remote Requests | ❌ | 34ms |
| Request Lifecycle.Mock Response Value Is Returned To Caller | ❌ | 37ms |
| Side Effect Events.Get Feature Flag Captures Feature Flag Called Event | ❌ | 21ms |
Failures
request_payload.request_with_person_properties_device_id
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7109cf2c-4091-43b8-825d-9fd2bedf3bc9'
request_payload.flags_request_uses_v2_query_param
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-99844c7b-4225-4fb6-a911-cfc75a606d5b'
request_payload.flags_request_hits_flags_path_not_decide
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-9d12bde6-2bb8-4c25-b286-309daf9f2d7a'
request_payload.flags_request_omits_authorization_header
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7095e157-897f-4564-b7fe-5324c1389774'
request_payload.token_in_flags_body_matches_init
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-6ad994af-5061-4f6f-adae-3ff9940f3533'
request_payload.groups_round_trip
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7c7c7b05-6fa8-4041-9fe7-97b2ba7dbcdc'
request_payload.groups_default_to_empty_object
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-95bf3c81-85ab-46a3-8b71-6ce22b294e07'
request_payload.person_properties_distinct_id_auto_populated_when_caller_omits_it
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-411753fc-6c3d-4959-a474-f6651c5e7449'
request_payload.disable_geoip_false_propagates_as_geoip_disable_false
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-6b25b506-f35f-4a1b-9b35-087f835a332b'
request_payload.disable_geoip_omitted_defaults_to_false
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-ae74c11d-39f7-47f3-b79a-a8d0324bab66'
request_payload.flag_keys_to_evaluate_contains_only_requested_key
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-73247f60-d0de-47b9-91d7-2da5ff479987'
request_lifecycle.two_flag_calls_produce_two_remote_requests
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-56b1c7c5-d7d6-49c6-b97f-81f33003f81b'
request_lifecycle.mock_response_value_is_returned_to_caller
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-0fa5b1ca-124d-418d-9717-27f7392e33b8'
side_effect_events.get_feature_flag_captures_feature_flag_called_event
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-382a429a-5e2c-422f-b155-98def1ddde22'
posthog-rs-v1 Compliance ReportDate: 2026-06-24 10:29:12 UTC
|
| Test | Status | Duration |
|---|---|---|
| Endpoint And Method.Targets V1 Endpoint | ✅ | 154ms |
| Endpoint And Method.Does Not Use Legacy Endpoints | ✅ | 153ms |
| Required Headers.Has Authorization Bearer Header | ✅ | 154ms |
| Required Headers.Has Content Type Json | ✅ | 152ms |
| Required Headers.Has Posthog Sdk Info Format | ✅ | 152ms |
| Required Headers.Has Posthog Attempt Header | ✅ | 153ms |
| Required Headers.Has Posthog Request Id | ✅ | 152ms |
| Required Headers.Has Posthog Request Timestamp | ✅ | 155ms |
| Required Headers.Has User Agent | ✅ | 152ms |
| Body Format.Body Has Created At And Batch | ✅ | 150ms |
| Body Format.No Api Key In Body | ✅ | 145ms |
| Body Format.No Sent At In Body | ✅ | 150ms |
| Event Format.Event Has Required Root Fields | ✅ | 145ms |
| Event Format.Event Uuid Is Valid | ✅ | 147ms |
| Event Format.Event Timestamp Is Rfc3339 | ✅ | 147ms |
| Event Format.Distinct Id Is String | ✅ | 146ms |
| Event Format.Distinct Id At Root Not Properties | ✅ | 146ms |
| Event Format.Custom Properties Preserved | ✅ | 145ms |
| Event Format.Set Properties Preserved | ✅ | 146ms |
| Event Format.Set Once Properties Preserved | ✅ | 143ms |
| Event Format.Groups Properties Preserved | ✅ | 130ms |
| Event Format.Sdk Generates Uuid If Not Provided | ✅ | 131ms |
| Event Format.Event Has Required Root Fields Batch | ✅ | 152ms |
| Event Format.Event Uuid Is Valid Batch | ✅ | 160ms |
| Event Format.Event Timestamp Is Rfc3339 Batch | ✅ | 175ms |
| Event Format.Distinct Id Is String Batch | ✅ | 151ms |
| Event Format.Distinct Id At Root Not Properties Batch | ✅ | 174ms |
| Event Format.Custom Properties Preserved Batch | ✅ | 161ms |
| Event Format.Set Properties Preserved Batch | ✅ | 175ms |
| Event Format.Set Once Properties Preserved Batch | ✅ | 171ms |
| Event Format.Groups Properties Preserved Batch | ✅ | 146ms |
| Event Format.Sdk Generates Uuid If Not Provided Batch | ✅ | 145ms |
| Batch Behavior.Multiple Events In Single Batch | ✅ | 189ms |
| Batch Behavior.Batch Envelope Smoke | ✅ | 173ms |
| Batch Behavior.Flush With No Events Sends Nothing | ✅ | 77ms |
| Batch Behavior.Flush At Triggers Batch | ✅ | 1135ms |
| Batch Behavior.Created At Reflects Batch Creation Time | ✅ | 121ms |
| Deduplication.Generates Unique Uuids | ✅ | 186ms |
| Deduplication.Different Events Same Content Different Uuids | ✅ | 131ms |
| Deduplication.Preserves Uuid On Retry | ✅ | 5136ms |
| Deduplication.Preserves Timestamp On Retry | ✅ | 5091ms |
| Deduplication.Preserves Uuid And Timestamp On Batch Retry | ✅ | 5104ms |
| Deduplication.No Duplicate Events In Batch | ✅ | 132ms |
| Header Behavior On Retry.Attempt Header Starts At One | ✅ | 64ms |
| Header Behavior On Retry.Attempt Header Increments On Retry | ✅ | 10074ms |
| Header Behavior On Retry.Request Id Preserved On Retry | ✅ | 5055ms |
| Header Behavior On Retry.Different Requests Have Different Request Ids | ✅ | 2066ms |
| Header Behavior On Retry.Request Timestamp Changes On Retry | ✅ | 5047ms |
| Response Format Validation.Success Response Has Uuid Keyed Results | ✅ | 54ms |
| Response Format Validation.Success Response Has Ok For Each Event | ✅ | 29ms |
| Response Format Validation.Success No Retry After When All Ok | ✅ | 31ms |
| Response Format Validation.Success Retry After Present When Retry Events | ✅ | 32ms |
| Response Format Validation.Success No Retry After When Drop Only | ✅ | 30ms |
| Response Format Validation.Response Echoes Request Id | ✅ | 26ms |
| Retry Behavior.Retries On 408 | ✅ | 5035ms |
| Retry Behavior.Retries On 500 | ✅ | 5025ms |
| Retry Behavior.Retries On 503 | ✅ | 5026ms |
| Retry Behavior.Retries On 504 | ✅ | 5024ms |
| Retry Behavior.Retryable Errors Have Retry After | ✅ | 2027ms |
| Retry Behavior.Respects Retry After On Retryable Error | ✅ | 8025ms |
| Retry Behavior.Does Not Retry On 400 | ✅ | 2039ms |
| Retry Behavior.Does Not Retry On 401 | ✅ | 2032ms |
| Retry Behavior.Does Not Retry On 402 | ✅ | 2027ms |
| Retry Behavior.Does Not Retry On 413 | ✅ | 2033ms |
| Retry Behavior.Does Not Retry On 415 | ✅ | 2032ms |
| Retry Behavior.Non Retryable Errors Have No Retry After | ✅ | 2026ms |
| Retry Behavior.Implements Backoff | ✅ | 15031ms |
| Retry Behavior.Max Retries Respected | ✅ | 15033ms |
| Partial Batch Handling.Handles 200 Full Success | ✅ | 2048ms |
| Partial Batch Handling.Handles 200 With All Ok | ✅ | 3044ms |
| Partial Batch Handling.Does Not Retry Dropped Events | ✅ | 3042ms |
| Partial Batch Handling.Does Not Retry Limited Events | ✅ | 3029ms |
| Partial Batch Handling.Prunes Ok Events On Partial Retry | ✅ | 5032ms |
| Partial Batch Handling.Prunes Dropped Events On Partial Retry | ✅ | 5026ms |
| Partial Batch Handling.Retries Only Retry Events From Partial | ✅ | 5027ms |
| Partial Batch Handling.Partial Retry Preserves Uuids | ✅ | 5027ms |
| Partial Batch Handling.Partial Retry Attempt Header Increments | ✅ | 5031ms |
| Partial Batch Handling.Partial Retry Request Id Preserved | ✅ | 5036ms |
| Partial Batch Handling.Respects Retry After On Partial | ✅ | 5025ms |
| Partial Batch Handling.Unknown Result Treated As Terminal | ✅ | 3028ms |
| Partial Batch Handling.Mixed Ok Drop Limited No Retry | ✅ | 3028ms |
| Compression.Sends Gzip Content Encoding | ✅ | 25ms |
| Compression.No Content Encoding When Disabled | ✅ | 23ms |
| Compression.Compressed Body Is Decompressible | ✅ | 23ms |
| Error Handling.Does Not Retry On Unknown 4Xx | ✅ | 2023ms |
| Event Options.Cookieless Mode Override | ✅ | 26ms |
| Event Options.Disable Skew Correction Override | ✅ | 23ms |
| Event Options.Process Person Profile Override | ✅ | 24ms |
| Event Options.Product Tour Id Override | ✅ | 23ms |
| Event Options.Unset Options Omitted | ✅ | 23ms |
| Event Options.Options Override In Batch | ✅ | 23ms |
| Geoip And Historical Migration.Geoip Disable Injected Into Properties | ✅ | 23ms |
| Geoip And Historical Migration.Historical Migration Set In Body | ✅ | 23ms |
| Geoip And Historical Migration.Historical Migration Absent By Default | ✅ | 22ms |
Feature_Flags Tests
View Details
| Test | Status | Duration |
|---|---|---|
| Request Payload.Request With Person Properties Device Id | ❌ | 13ms |
| Request Payload.Flags Request Uses V2 Query Param | ❌ | 12ms |
| Request Payload.Flags Request Hits Flags Path Not Decide | ❌ | 13ms |
| Request Payload.Flags Request Omits Authorization Header | ❌ | 13ms |
| Request Payload.Token In Flags Body Matches Init | ❌ | 15ms |
| Request Payload.Groups Round Trip | ❌ | 14ms |
| Request Payload.Groups Default To Empty Object | ❌ | 15ms |
| Request Payload.Person Properties Distinct Id Auto Populated When Caller Omits It | ❌ | 16ms |
| Request Payload.Disable Geoip False Propagates As Geoip Disable False | ❌ | 15ms |
| Request Payload.Disable Geoip Omitted Defaults To False | ❌ | 15ms |
| Request Payload.Flag Keys To Evaluate Contains Only Requested Key | ❌ | 15ms |
| Request Lifecycle.No Flags Request On Init Alone | ✅ | 12ms |
| Request Lifecycle.No Flags Request On Normal Capture | ✅ | 23ms |
| Request Lifecycle.Two Flag Calls Produce Two Remote Requests | ❌ | 13ms |
| Request Lifecycle.Mock Response Value Is Returned To Caller | ❌ | 14ms |
| Side Effect Events.Get Feature Flag Captures Feature Flag Called Event | ❌ | 15ms |
Failures
request_payload.request_with_person_properties_device_id
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-b43488d3-0a01-44d3-8873-0c2317965e44'
request_payload.flags_request_uses_v2_query_param
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-f68aa5ec-345f-494c-9b51-87e1ec2163a8'
request_payload.flags_request_hits_flags_path_not_decide
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7315e31f-1bf4-4b4c-91a8-3ec18adf70bf'
request_payload.flags_request_omits_authorization_header
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-92f06fa6-5bb6-43ff-94e2-0e919a9fa1b6'
request_payload.token_in_flags_body_matches_init
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-293b5386-5283-4edd-8f99-2794c61240e3'
request_payload.groups_round_trip
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-18a2c8cd-8e9a-4021-8422-8e5a353caf36'
request_payload.groups_default_to_empty_object
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-cd1932a7-ae91-4d00-b24f-9e7780447649'
request_payload.person_properties_distinct_id_auto_populated_when_caller_omits_it
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-653e10bb-2daa-401b-82b0-4a1b51a94268'
request_payload.disable_geoip_false_propagates_as_geoip_disable_false
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-f4e5d067-3ea0-478b-8227-72c1580a0c64'
request_payload.disable_geoip_omitted_defaults_to_false
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-4ccb1845-989a-4325-8c0d-6ac1e5c4fc40'
request_payload.flag_keys_to_evaluate_contains_only_requested_key
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-61e56d45-c9c9-45a0-99dc-70a335db0684'
request_lifecycle.two_flag_calls_produce_two_remote_requests
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-1a9127e3-6851-4e3b-ac36-8203aaaa5fdd'
request_lifecycle.mock_response_value_is_returned_to_caller
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-5750cab6-3c74-47d2-9e1b-c134240d3615'
side_effect_events.get_feature_flag_captures_feature_flag_called_event
404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-0db56348-0c5e-442a-804c-0ee1b92e6f76'
…t work Addresses review on PR #145. - transport: re-arm the bounded-queue "full" warning once the queue drains to empty, so a service that repeatedly fills then drains warns once per episode instead of only on the very first overflow. - transport: pending_events() now counts events accepted but not yet delivered (channel + worker buffer + retry queue) rather than only channel depth, by decrementing the in-flight counter when a batch reaches a terminal outcome (delivered/dropped) in both the v0 and v1 pipelines. This also makes the bounded queue cap total backlog. - adapter: document that total_events_sent can over-count only when the SDK queue fills (capture is fire-and-forget); the acceptance suite never fills it.
pending_events now counts channel + worker buffer + retries; the public doc on both clients still described the old channel-only meaning (flagged by autoreview).
# Conflicts: # src/client/async_client.rs # src/client/blocking.rs # tests/test_async.rs # tests/test_blocking.rs
| /// Maximum number of events buffered before new events are dropped | ||
| /// (default: 10000). A single warning is logged while the queue is full. | ||
| #[builder(default = "10000")] | ||
| pub(crate) max_queue_size: usize, |
There was a problem hiding this comment.
those defaults are a kind of mix between py and go now pretty much the same with py
|
Reviews (2): Last reviewed commit: "Merge remote-tracking branch 'origin/mai..." | Re-trigger Greptile |
|
maybe ask a Rust expert to take a look, as in someone who often contributes Rust code, e.g., Hughes, Ingestion folks, etc.? |
|
thanks! yep, pinging @eli-r-ph too 🙏 since I think he's also a bit familiar with the logic here (and I'm also changing the capture v1 stuff 😁) |
pending_events exposed internal queue depth only for the compliance adapter (the harness /state contract requires the field); it is not meaningful public API. Gate it, plus the internal TransportHandle::pending it calls, behind test-harness so normal builds no longer expose it. The adapter enables test-harness so it still compiles; the --all-features public-API snapshot is unchanged (test-harness items appear there, like extra_capture_headers).
The helper used flush_interval_ms=50 and retry_initial_backoff_ms=1, so the background worker could fire an interval flush or a scheduled retry within milliseconds and race the exact assert_hits() checks (e.g. flush_keeps_events_retryable asserts hits==1 while the held batch auto-retried). The --workspace CI job lost that race. Use long interval/backoff (600s) so every send is driven by the flush_at threshold or an explicit flush()/shutdown(); no test_transport test relies on autonomous timers (those are covered deterministically by the ManualClock unit tests in transport.rs). Verified stable across 35 runs in all three configs that run this suite.
|
👋 hey @cat-ph thanks for doing this! it's something I was hoping to circle back to soon myself ❤️ couple things to look into I think: Async capture path: I don't think having a single thread consumer (no pool) is safe here, any retried or higher-latency request will cause head-of-line blocking and could fill up the queue. I'd suggest using drop/flush/shutdown handling: I don't think this should block for very long as it could hold up app shutdown, especially if the queue is full or the active (head-of-line-blocked) event publish is busy. I think flush should be async and fire-and-forget for safety. The core idea to drop new events enqueued when the queue is full is sound 👍 Capture timestamp handling:
Other than that, this LGTM 👍 |
|
thanks for looking @eli-r-ph 💙 great catch with the timestamps, fixing it to be at capture time (& adding sent_at), the header/created_at were OK here but the timestamp wasn't 😬 I didn't go with tokio here to be able to use this in all 3 places that don't really share a runtime: the async client (w/ tokio), sync client (no async runtime) + std::panic in particular (arbitrary thread where runtime might not be present), I modeled it after go/py where both use a bg thread/goroutine good point about the HOL block, I think for retries it's safe cause we defer it in the for the drop/flush/shutdown handling I'm not very sure, I'm definitely adding a shutdown timeout (good point), but I think flush should still be blocking? 🤔 since capture() is fire-and-forget and flush here was the only place to signal it's flushed (and it also returns on a failure, since that's rescheduled/retried separately) so not unbounded but blocking by design |
Events were timestamped when the worker built the batch (publish time), so batching or a retry pushed an event's `timestamp` to when it was finally sent rather than when it occurred. Stamp the capture (enqueue) time on the producer side when the caller set none, for both the v0 and v1 wire formats; v1 `created_at` and the request timestamp header already recorded publish time and are unchanged. Also add a v0 batch `sent_at` (publish time) so ingestion can do clock-skew correction, matching what v1 already sends. From @eli-r-ph's review on the event-transport PR.
shutdown() and Drop attempted one final send per pending batch with no overall deadline, so a slow or unreachable endpoint could block process teardown for up to (batches x request_timeout). Add a `shutdown_timeout_ms` option (default 30000): the worker stamps a drain deadline when it handles Shutdown (and on channel disconnect) and, once it passes, drops the remaining buffered/retrying events instead of attempting them, so exit can't hang. Drop takes the same path, so it is bounded too. flush() stays a blocking, bounded delivery barrier (one attempt per batch, returns even when the server is down) -- only teardown is time-bounded. The drain paths now take an `Option<Instant>` deadline instead of a `final_attempt` bool (Some => final attempt + deadline-gated). From @eli-r-ph's review on the event-transport PR.
Regenerated api/public-api.txt to record the new ClientOptionsBuilder::shutdown_timeout_ms builder method added alongside the shutdown drain timeout.
The separate historical path sent batches inline in the HistoricalBatch handler, so a Shutdown queued behind them was observed too late: historical events were POSTed outside the shutdown deadline (e.g. shutdown_timeout_ms(0) dropped buffered events but still sent historical ones). Queue historical chunks and drain them on the flush interval like the live buffer, so the worker is blocked on recv when Shutdown arrives and drains/abandons them under the deadline. Found by autoreview.
The Timeout/Tick branches drained queued historical batches on any wakeup, but a wakeup can be caused by the live-buffer deadline or a due retry -- so historical was sent before its own interval elapsed, and a large drain could delay the retry that triggered the wake. Guard the drain with the historical interval (like the live buffer), keeping the batch queued until it's due. Found by autoreview.
Pre-refactor, historical events flowed through the live buffer and were auto-flushed at flush_at; the separate path only flushed on the interval, so under a high flush_interval they'd sit (holding queue slots) until the interval or an explicit flush. Mirror the live buffer: once flush_at events have queued, send them right away. Below the threshold they stay queued, so a pending shutdown still bounds them. Found by autoreview.
Drop `async` and `Result` from `Client::capture` / `Client::capture_batch` on both clients and from the global `capture`. Enqueueing is non-blocking and delivery failures are handled by the background worker, not surfaced to the caller, so there was nothing meaningful to `await` or return. `capture_exception` keeps its `Result` and async signature: building the exception event is still fallible. Collapse the two cfg-gated global `capture` definitions into a single fire-and-forget fn now that the async and blocking signatures match, and regenerate the public-API snapshot.
V0 `send_batch` captured the event count before `build_batch_payload` ran `before_send`, which can drop events. The in-flight `len` accounting stayed correct, but a batch held for retry counted the dropped events as in-flight — inflating the bounded-queue depth and over-reporting the "dropping N event(s)" logs — until that batch reached a terminal outcome. `build_batch_payload` now returns the surviving event count; `send_batch` decrements the before_send-dropped events immediately (they are terminal) and tracks only the survivors, matching the V1 pipeline. Adds a regression test asserting a before_send-dropped event in a retry-held batch is not counted as in-flight.
`shutdown_timeout_ms` was only checked between batches: a single in-flight blocking send still used the full `request_timeout_seconds` (30s default), so an endpoint that accepted the request and then stalled could push `shutdown()` / `Drop` well past the configured bound. Thread the shutdown deadline into the send path (replacing the `final_attempt` bool) and cap each shutdown/disconnect request's reqwest timeout at the time left before the deadline, never exceeding the normal request timeout. Adds a regression test that a stalled endpoint no longer blocks teardown past `shutdown_timeout_ms`.
A `flush()` whose `Control::Flush` raced into the channel behind a `Shutdown` (reachable with a shared `Arc<Client>` when one thread flushes while another shuts down or drops the last handle) was never signaled: the worker processed `Shutdown`, signaled only that completion, and returned, leaving the flush caller's wait (`rx.await` / `rx.recv`) hanging forever. Drain the channel when the worker exits on `Shutdown` and signal any queued flush/shutdown completions so their callers return instead of blocking. Queued captures are dropped — the client is closing. Adds a unit test for the helper.
`shutdown_timeout_ms` bounds the shutdown drain, but the single background
worker performs one blocking send at a time: a send already in flight when
shutdown is requested first runs to `request_timeout_seconds` before the drain
begins. The previous wording ("bounds how long teardown can block on a slow or
unreachable endpoint") overclaimed for that case. Clarify the option doc and the
worker comment; the full total-teardown bound comes with the sender pool.
Scheduled retries and the interval flush were only serviced on the idle `Timeout` wake. Under sustained capture traffic the worker's `recv` always returns a `Capture` before the timeout elapses, so due retries were never re-attempted (and the interval flush was deferred to the `flush_at` size trigger), leaving transient-failure batches pending indefinitely and holding queue slots. Move that servicing into a shared block that runs after every non-terminal wake (Capture/HistoricalBatch/Flush/Tick/Timeout); `Shutdown` and the disconnect path still `return` first so their drain stays deadline-bounded. The test-only `Tick` defers its completion signal until after the block so clock-driven tests still observe the serviced work.
`send_control` does not check `closed`; it returns `false` only once the worker has exited (the channel is disconnected). Document that accurately, and why a flush racing shutdown cannot hang: an orphaned control is either signaled by the worker on its way out, or dropped with the channel — which wakes the caller's wait with a recv error.
Expand the `shutdown_timeout_ms` doc (and the worker comment): the timeout bounds the shutdown drain, but the single worker runs one blocking send at a time, so an automatic flush/drain already in progress when shutdown is requested runs to completion first — up to `request_timeout_seconds` per in-flight batch. A large auto-drain can therefore delay teardown by several request timeouts. The sender pool removes this; this just makes the single-worker contract accurate.
`shutdown()` and `Drop` gated both sending the `Shutdown` control and the worker `join()` behind `begin_close()`. A non-winning caller returned immediately without waiting; worse, if an async `shutdown().await` was cancelled (a timeout or select) while awaiting the worker, `join()` was skipped, and a later `Drop` saw `closed == true` and also skipped `join()` — leaving the worker draining on a detached thread that the process could outlive, losing the flush guarantee. Move `join()` outside the `begin_close()` guard on all four paths (async/blocking x shutdown/drop) so every caller waits for the worker. The winner sends the `Shutdown` synchronously before any await, so the worker always exits and `join()` cannot hang.
`drain_pending_completions` (added to unblock a flush racing shutdown) dropped `Capture`/`HistoricalBatch` messages still queued when the worker exits, but did not decrement the in-flight `len` counter for them. A capture that reserved a queue slot just before shutdown left `pending_events()` permanently nonzero after the worker exited, skewing the compliance adapter's state/total-sent. Pass `len` into `drain_pending_completions` and decrement by the dropped event count. Extends the unit test to assert the counter settles to 0.
`backoff_duration` honored `Retry-After` as an unbounded `Duration::from_secs`. A hostile or buggy `Retry-After` (or an extreme `retry_max_backoff_ms`) could then overflow the worker's `now + delay` `Instant` addition, panicking the worker thread — after which all later captures are silently dropped because sends hit a disconnected channel. Cap the scheduled backoff at one day (far beyond any sane retry delay) so the addition can't overflow. Covers both V0 and V1 (shared `backoff_duration`). Adds a unit test for an absurd `Retry-After`.
…tant The worker computes `now + shutdown_timeout` on the shutdown/disconnect drain; an absurd `shutdown_timeout_ms` could overflow that `Instant + Duration` and panic the worker — the same failure mode `RETRY_BACKOFF_CAP` guards for retry backoff. Clamp the timeout to one day at worker startup. Operator config (not server-controlled), so low severity, but cheap and consistent.
The async `Client`'s `Drop` does a blocking drain, so dropping a `Client` inside an async task stalls that executor thread until the drain finishes (up to `shutdown_timeout_ms` plus any in-flight request). Make the caveat explicit and steer callers to `shutdown().await` first.
`capture_batch` enqueues per event, so a full bounded queue can drop the tail of a logical batch — unlike the prior model, which sent one atomic request. Document the non-atomic behavior on both clients.
|
^ bunch of updates above, but mostly kept in separate commits, tl;dr:
|
Brings in v0.13.2 / v0.13.3: #148 (remove the duplicate v0 batch-event api key, via `InnerEvent::new_for_batch`) and #153 (internalize `EventOptions` on the capture-v1 path). Only one conflict: compliance/adapter/src/main.rs. The branch had moved the adapter's event-building before the instances lock, while #153 replaced `Event::set_option` (no longer public to the adapter) with a wire-option -> magic-property mapping through a new `HARNESS_OPTION_TO_PROP` table. Kept the branch's structure and applied #153's mapping to the pre-lock build; dropped the now-redundant in-lock build. Everything else auto-merged. Regenerated api/public-api.txt (matched the auto-merge). Full gate green across all six feature configs; fmt + clippy clean.
The changeset still said `capture` / `capture_batch` "return `Ok(())`", but the fire-and-forget change dropped `async` + `Result` — they now enqueue and return `()` infallibly (no longer `async` on the async client). Also drop `pending_events()` from the listed public API (it is `test-harness`-gated, not part of the normal public surface) and add the new `shutdown_timeout_ms` option.
`now_utc` is used in the v0/default build too — `enqueue`/`enqueue_historical` stamp each event's capture timestamp with it, and the v0 `send_batch` uses it for the batch `sent_at`. So the "Unused by the v0 pipeline" doc was backwards and the `#[cfg_attr(not(feature = "capture-v1"), allow(dead_code))]` was dead (clippy is clean without it in every config). Reword the doc to its actual uses and remove the allow.
What
Replaces the per-
capturedirect HTTP POST with a runtime-independent background transport: a singlestd::threadworker draining a bounded channel with blocking reqwest (never a tokio task), so it works for the async client, the blocking client, and a future panic hook with no runtime present.capture()/capture_batch()become non-blocking enqueues.flush()(awaited on the async client, blocking on the blocking client),shutdown()(flush + stop + join; idempotent; drops further captures), flush-on-Drop,pending_events(), and globalflush()/shutdown().flush_at,max_batch_size,flush_interval_ms; a boundedmax_queue_sizequeue that drops with a single warning when full.Retry-After, drop-on-terminal — reusing the existing sans-IOretry.rs/v1_capturedecision logic.before_sendnow runs on the worker, so it applies to every queued event (including future panic captures)./batch/.Clock(now+now_utc) makes interval-flush, retry backoff, and v1 wire timestamps deterministic in tests — aManualClockplus a test-onlyTickdrive virtual time, with no real sleeps./shutdownroute).Breaking change (0.x)
capture()/capture_batch()returnOk(())as soon as the event is queued instead of awaiting delivery, and HTTP failures surface as logged warnings, notErr. Callflush()orshutdown()before process exit to ensure queued events are delivered. The global client lives in astatic, whoseDropnever runs, so global users must callposthog_rs::shutdown()/posthog_rs::flush()explicitly.Specs
Implements the
event-batcher,retry-queue,http-client,flush, andshutdowncontracts. The flush/shutdown acceptance scenarios run through the compliance adapter; the internal-behavior specs are covered by SDK tests (tests/test_transport.rsplus virtual-clock unit tests intransport.rs).Out of scope / deferred
mainhas no panic hook to migrate); the worker already exposes the blocking flush a future panic path needs.413batch-shrinking (spec-optional) is not implemented, matching prior behavior.Validation
TL;DR: green across all four feature configs (fmt, clippy, unit + integration tests), and verified end-to-end against a local PostHog dev stack — events actually ingest on both clients and both wire versions.
cargo fmt --checkclean;cargo clippy --all-targetsclean (async v0 + v1).api/public-api.txtregenerated with the pinned nightly (+11 lines:flush/shutdown/pending_events/Drop+ four builder setters + globalflush/shutdown).Local end-to-end run against a dev PostHog stack
A throwaway harness (
capture×5 with a smallflush_at, thencapture_exception, a finalcapture, andshutdown) ran on each transport config; every event was tagged with a unique marker and queried back from ClickHouse. All ingested — 7/7 per config (5 capture + 1 shutdown-drain + 1 exception):Confirms end-to-end:
std::thread+ blocking-reqwest worker delivers both from inside a tokio runtime (async client) and from a blockingmain;/batch/(v0) and/i/v1/analytics/events(v1) ingest, with v1 returning per-eventok;flush(), and ashutdown()drain;flush()andshutdown()both drain (the event captured just beforeshutdown()landed);pending_events()read0after both flush and shutdown.blocking + v1was not run e2e (every dimension is covered by the other three) but passes the full test suite.