Skip to content

feat: runtime-independent background event transport#145

Merged
cat-ph merged 30 commits into
mainfrom
ci/rust-event-transport
Jun 24, 2026
Merged

feat: runtime-independent background event transport#145
cat-ph merged 30 commits into
mainfrom
ci/rust-event-transport

Conversation

@cat-ph

@cat-ph cat-ph commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

What

Replaces the per-capture direct HTTP POST with a runtime-independent background transport: a single std::thread worker draining a bounded channel with blocking reqwest (never a tokio task), so it works for the async client, the blocking client, and a future panic hook with no runtime present.

  • capture() / capture_batch() become non-blocking enqueues.
  • New: flush() (awaited on the async client, blocking on the blocking client), shutdown() (flush + stop + join; idempotent; drops further captures), flush-on-Drop, pending_events(), and global flush() / shutdown().
  • Batching knobs: flush_at, max_batch_size, flush_interval_ms; a bounded max_queue_size queue that drops with a single warning when full.
  • Worker retry: keep-on-transient (5xx/408/429/network) with exponential backoff honoring Retry-After, drop-on-terminal — reusing the existing sans-IO retry.rs / v1_capture decision logic.
  • before_send now runs on the worker, so it applies to every queued event (including future panic captures).
  • v0 single capture now batches through /batch/.
  • An injectable Clock (now + now_utc) makes interval-flush, retry backoff, and v1 wire timestamps deterministic in tests — a ManualClock plus a test-only Tick drive virtual time, with no real sleeps.
  • The compliance adapter delegates batching/flush/shutdown to the SDK (adds a /shutdown route).

Breaking change (0.x)

capture() / capture_batch() return Ok(()) as soon as the event is queued instead of awaiting delivery, and HTTP failures surface as logged warnings, not Err. Call flush() or shutdown() before process exit to ensure queued events are delivered. The global client lives in a static, whose Drop never runs, so global users must call posthog_rs::shutdown() / posthog_rs::flush() explicitly.

Specs

Implements the event-batcher, retry-queue, http-client, flush, and shutdown contracts. The flush/shutdown acceptance scenarios run through the compliance adapter; the internal-behavior specs are covered by SDK tests (tests/test_transport.rs plus virtual-clock unit tests in transport.rs).

Out of scope / deferred

  • Panic autocapture is handled separately (main has no panic hook to migrate); the worker already exposes the blocking flush a future panic path needs.
  • 413 batch-shrinking (spec-optional) is not implemented, matching prior behavior.

Validation

TL;DR: green across all four feature configs (fmt, clippy, unit + integration tests), and verified end-to-end against a local PostHog dev stack — events actually ingest on both clients and both wire versions.

  • cargo fmt --check clean; cargo clippy --all-targets clean (async v0 + v1).
  • Tests green across all four feature configs: async-v0 229, blocking-v0 220, async-v1 274, blocking-v1 260.
  • api/public-api.txt regenerated with the pinned nightly (+11 lines: flush / shutdown / pending_events / Drop + four builder setters + global flush / shutdown).
Local end-to-end run against a dev PostHog stack

A throwaway harness (capture ×5 with a small flush_at, then capture_exception, a final capture, and shutdown) ran on each transport config; every event was tagged with a unique marker and queried back from ClickHouse. All ingested — 7/7 per config (5 capture + 1 shutdown-drain + 1 exception):

Config Ingested
async + v0 7 / 7
async + v1 7 / 7
blocking + v0 7 / 7

Confirms end-to-end:

  • the std::thread + blocking-reqwest worker delivers both from inside a tokio runtime (async client) and from a blocking main;
  • both /batch/ (v0) and /i/v1/analytics/events (v1) ingest, with v1 returning per-event ok;
  • batching split as designed — a threshold flush, an explicit flush(), and a shutdown() drain;
  • flush() and shutdown() both drain (the event captured just before shutdown() landed);
  • the manual exception path ingests;
  • pending_events() read 0 after both flush and shutdown.

blocking + v1 was not run e2e (every dimension is covered by the other three) but passes the full test suite.

@greptile-apps

greptile-apps Bot commented Jun 15, 2026

Copy link
Copy Markdown
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
src/client/transport.rs:101-103
**Queue-full warning fires only once per client lifetime**

`full_warned` is set to `true` on the first overflow and is never reset, even after the queue drains back below capacity. Once the first overflow fires, all subsequent overflows produce no log output at all. In a production service where the worker falls behind (slow network, long backoff), the queue could fill, drain, fill, drain, … with events silently dropped on every refill after the very first warning. A caller seeing the single warning might assume it was a one-time spike and ignore it.

### Issue 2 of 3
compliance/adapter/src/main.rs:280-282
**`flushed` count is likely always 0 after a real flush**

`pending_events()` returns only the depth of the mpsc channel (events not yet consumed by the worker thread). By the time this HTTP handler is called, the worker has almost certainly already read the event off the channel and placed it in its internal `buffer: Vec<Event>` — so `before = 0` and `flushed = 0 - 0 = 0`, even though `flush()` just delivered that event. Events already in the worker buffer or held for retry are not visible to `pending_events()`. The reported `flushed` field is effectively always 0 for the typical "capture then flush" test pattern.

### Issue 3 of 3
compliance/adapter/src/main.rs:331
**`total_events_sent` can over-count when the SDK queue is full**

`total_events_captured` is incremented in the `/capture` handler before calling `capture_batch`, so if the SDK queue is full and the event is silently dropped, `total_events_captured` still reflects the dropped event. The formula `(total_events_captured - pending).max(0)` then counts that dropped event as "sent" once `pending` drains to 0. This is a semantic shift from the previous adapter, which only incremented `total_events_sent` on confirmed network delivery.

Reviews (1): Last reviewed commit: "feat: runtime-independent background eve..." | Re-trigger Greptile

Comment thread src/client/transport.rs
Comment thread compliance/adapter/src/main.rs
Comment thread compliance/adapter/src/main.rs
Replace per-capture direct HTTP POST with a single background std::thread worker (blocking reqwest, no async runtime) that batches events, retries transient failures with backoff honoring Retry-After, and is driven by an injectable clock.

capture/capture_batch are non-blocking enqueues. Adds flush, shutdown (flush + stop + join, idempotent), flush-on-Drop, and pending_events on both clients, plus global flush/shutdown (the global client lives in a static, whose Drop never runs). The bounded queue drops with a single warning when full; before_send now runs on the worker so it applies to every queued event. An injectable Clock (now + now_utc) makes interval flush, retry backoff, and v1 wire timestamps deterministic in tests (ManualClock + a test-only Tick; no real sleeps). v0 single capture now batches through /batch/. The compliance adapter delegates batching/flush/shutdown to the SDK.

BREAKING CHANGE: capture/capture_batch return Ok as soon as the event is queued instead of awaiting delivery, and HTTP failures surface as logged warnings rather than Err. Call flush or shutdown before process exit to ensure queued events are delivered.
@cat-ph cat-ph force-pushed the ci/rust-event-transport branch from f0072db to 2c64114 Compare June 15, 2026 18:56
@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

posthog-rs-v0 Compliance Report

Date: 2026-06-24 10:29:02 UTC
Duration: 15561ms

⚠️ Some Tests Failed

31/45 tests passed, 14 failed


Capture Tests

29/29 tests passed

View Details
Test Status Duration
Format Validation.Event Has Required Fields 146ms
Format Validation.Event Has Uuid 155ms
Format Validation.Event Has Lib Properties 150ms
Format Validation.Distinct Id Is String 153ms
Format Validation.Token Is Present 151ms
Format Validation.Custom Properties Preserved 152ms
Format Validation.Event Has Timestamp 146ms
Retry Behavior.Retries On 503 5151ms
Retry Behavior.Does Not Retry On 400 2156ms
Retry Behavior.Does Not Retry On 401 2155ms
Retry Behavior.Respects Retry After Header 5105ms
Retry Behavior.Implements Backoff 15103ms
Retry Behavior.Retries On 500 5106ms
Retry Behavior.Retries On 502 5099ms
Retry Behavior.Retries On 504 5102ms
Retry Behavior.Max Retries Respected 15102ms
Deduplication.Generates Unique Uuids 106ms
Deduplication.Preserves Uuid On Retry 5024ms
Deduplication.Preserves Uuid And Timestamp On Retry 10036ms
Deduplication.Preserves Uuid And Timestamp On Batch Retry 5031ms
Deduplication.No Duplicate Events In Batch 25ms
Deduplication.Different Events Have Different Uuids 23ms
Compression.Sends Gzip When Enabled 21ms
Batch Format.Uses Proper Batch Structure 22ms
Batch Format.Flush With No Events Sends Nothing 24ms
Batch Format.Multiple Events Batched Together 100ms
Error Handling.Does Not Retry On 403 2069ms
Error Handling.Does Not Retry On 413 2078ms
Error Handling.Retries On 408 5079ms

Feature_Flags Tests

⚠️ 2/16 tests passed, 14 failed

View Details
Test Status Duration
Request Payload.Request With Person Properties Device Id 48ms
Request Payload.Flags Request Uses V2 Query Param 48ms
Request Payload.Flags Request Hits Flags Path Not Decide 29ms
Request Payload.Flags Request Omits Authorization Header 36ms
Request Payload.Token In Flags Body Matches Init 24ms
Request Payload.Groups Round Trip 31ms
Request Payload.Groups Default To Empty Object 23ms
Request Payload.Person Properties Distinct Id Auto Populated When Caller Omits It 44ms
Request Payload.Disable Geoip False Propagates As Geoip Disable False 45ms
Request Payload.Disable Geoip Omitted Defaults To False 23ms
Request Payload.Flag Keys To Evaluate Contains Only Requested Key 23ms
Request Lifecycle.No Flags Request On Init Alone 30ms
Request Lifecycle.No Flags Request On Normal Capture 68ms
Request Lifecycle.Two Flag Calls Produce Two Remote Requests 34ms
Request Lifecycle.Mock Response Value Is Returned To Caller 37ms
Side Effect Events.Get Feature Flag Captures Feature Flag Called Event 21ms

Failures

request_payload.request_with_person_properties_device_id

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7109cf2c-4091-43b8-825d-9fd2bedf3bc9'

request_payload.flags_request_uses_v2_query_param

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-99844c7b-4225-4fb6-a911-cfc75a606d5b'

request_payload.flags_request_hits_flags_path_not_decide

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-9d12bde6-2bb8-4c25-b286-309daf9f2d7a'

request_payload.flags_request_omits_authorization_header

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7095e157-897f-4564-b7fe-5324c1389774'

request_payload.token_in_flags_body_matches_init

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-6ad994af-5061-4f6f-adae-3ff9940f3533'

request_payload.groups_round_trip

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7c7c7b05-6fa8-4041-9fe7-97b2ba7dbcdc'

request_payload.groups_default_to_empty_object

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-95bf3c81-85ab-46a3-8b71-6ce22b294e07'

request_payload.person_properties_distinct_id_auto_populated_when_caller_omits_it

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-411753fc-6c3d-4959-a474-f6651c5e7449'

request_payload.disable_geoip_false_propagates_as_geoip_disable_false

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-6b25b506-f35f-4a1b-9b35-087f835a332b'

request_payload.disable_geoip_omitted_defaults_to_false

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-ae74c11d-39f7-47f3-b79a-a8d0324bab66'

request_payload.flag_keys_to_evaluate_contains_only_requested_key

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-73247f60-d0de-47b9-91d7-2da5ff479987'

request_lifecycle.two_flag_calls_produce_two_remote_requests

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-56b1c7c5-d7d6-49c6-b97f-81f33003f81b'

request_lifecycle.mock_response_value_is_returned_to_caller

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-0fa5b1ca-124d-418d-9717-27f7392e33b8'

side_effect_events.get_feature_flag_captures_feature_flag_called_event

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-382a429a-5e2c-422f-b155-98def1ddde22'

@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

posthog-rs-v1 Compliance Report

Date: 2026-06-24 10:29:12 UTC
Duration: 23003ms

⚠️ Some Tests Failed

96/110 tests passed, 14 failed


Capture_V1 Tests

94/94 tests passed

View Details
Test Status Duration
Endpoint And Method.Targets V1 Endpoint 154ms
Endpoint And Method.Does Not Use Legacy Endpoints 153ms
Required Headers.Has Authorization Bearer Header 154ms
Required Headers.Has Content Type Json 152ms
Required Headers.Has Posthog Sdk Info Format 152ms
Required Headers.Has Posthog Attempt Header 153ms
Required Headers.Has Posthog Request Id 152ms
Required Headers.Has Posthog Request Timestamp 155ms
Required Headers.Has User Agent 152ms
Body Format.Body Has Created At And Batch 150ms
Body Format.No Api Key In Body 145ms
Body Format.No Sent At In Body 150ms
Event Format.Event Has Required Root Fields 145ms
Event Format.Event Uuid Is Valid 147ms
Event Format.Event Timestamp Is Rfc3339 147ms
Event Format.Distinct Id Is String 146ms
Event Format.Distinct Id At Root Not Properties 146ms
Event Format.Custom Properties Preserved 145ms
Event Format.Set Properties Preserved 146ms
Event Format.Set Once Properties Preserved 143ms
Event Format.Groups Properties Preserved 130ms
Event Format.Sdk Generates Uuid If Not Provided 131ms
Event Format.Event Has Required Root Fields Batch 152ms
Event Format.Event Uuid Is Valid Batch 160ms
Event Format.Event Timestamp Is Rfc3339 Batch 175ms
Event Format.Distinct Id Is String Batch 151ms
Event Format.Distinct Id At Root Not Properties Batch 174ms
Event Format.Custom Properties Preserved Batch 161ms
Event Format.Set Properties Preserved Batch 175ms
Event Format.Set Once Properties Preserved Batch 171ms
Event Format.Groups Properties Preserved Batch 146ms
Event Format.Sdk Generates Uuid If Not Provided Batch 145ms
Batch Behavior.Multiple Events In Single Batch 189ms
Batch Behavior.Batch Envelope Smoke 173ms
Batch Behavior.Flush With No Events Sends Nothing 77ms
Batch Behavior.Flush At Triggers Batch 1135ms
Batch Behavior.Created At Reflects Batch Creation Time 121ms
Deduplication.Generates Unique Uuids 186ms
Deduplication.Different Events Same Content Different Uuids 131ms
Deduplication.Preserves Uuid On Retry 5136ms
Deduplication.Preserves Timestamp On Retry 5091ms
Deduplication.Preserves Uuid And Timestamp On Batch Retry 5104ms
Deduplication.No Duplicate Events In Batch 132ms
Header Behavior On Retry.Attempt Header Starts At One 64ms
Header Behavior On Retry.Attempt Header Increments On Retry 10074ms
Header Behavior On Retry.Request Id Preserved On Retry 5055ms
Header Behavior On Retry.Different Requests Have Different Request Ids 2066ms
Header Behavior On Retry.Request Timestamp Changes On Retry 5047ms
Response Format Validation.Success Response Has Uuid Keyed Results 54ms
Response Format Validation.Success Response Has Ok For Each Event 29ms
Response Format Validation.Success No Retry After When All Ok 31ms
Response Format Validation.Success Retry After Present When Retry Events 32ms
Response Format Validation.Success No Retry After When Drop Only 30ms
Response Format Validation.Response Echoes Request Id 26ms
Retry Behavior.Retries On 408 5035ms
Retry Behavior.Retries On 500 5025ms
Retry Behavior.Retries On 503 5026ms
Retry Behavior.Retries On 504 5024ms
Retry Behavior.Retryable Errors Have Retry After 2027ms
Retry Behavior.Respects Retry After On Retryable Error 8025ms
Retry Behavior.Does Not Retry On 400 2039ms
Retry Behavior.Does Not Retry On 401 2032ms
Retry Behavior.Does Not Retry On 402 2027ms
Retry Behavior.Does Not Retry On 413 2033ms
Retry Behavior.Does Not Retry On 415 2032ms
Retry Behavior.Non Retryable Errors Have No Retry After 2026ms
Retry Behavior.Implements Backoff 15031ms
Retry Behavior.Max Retries Respected 15033ms
Partial Batch Handling.Handles 200 Full Success 2048ms
Partial Batch Handling.Handles 200 With All Ok 3044ms
Partial Batch Handling.Does Not Retry Dropped Events 3042ms
Partial Batch Handling.Does Not Retry Limited Events 3029ms
Partial Batch Handling.Prunes Ok Events On Partial Retry 5032ms
Partial Batch Handling.Prunes Dropped Events On Partial Retry 5026ms
Partial Batch Handling.Retries Only Retry Events From Partial 5027ms
Partial Batch Handling.Partial Retry Preserves Uuids 5027ms
Partial Batch Handling.Partial Retry Attempt Header Increments 5031ms
Partial Batch Handling.Partial Retry Request Id Preserved 5036ms
Partial Batch Handling.Respects Retry After On Partial 5025ms
Partial Batch Handling.Unknown Result Treated As Terminal 3028ms
Partial Batch Handling.Mixed Ok Drop Limited No Retry 3028ms
Compression.Sends Gzip Content Encoding 25ms
Compression.No Content Encoding When Disabled 23ms
Compression.Compressed Body Is Decompressible 23ms
Error Handling.Does Not Retry On Unknown 4Xx 2023ms
Event Options.Cookieless Mode Override 26ms
Event Options.Disable Skew Correction Override 23ms
Event Options.Process Person Profile Override 24ms
Event Options.Product Tour Id Override 23ms
Event Options.Unset Options Omitted 23ms
Event Options.Options Override In Batch 23ms
Geoip And Historical Migration.Geoip Disable Injected Into Properties 23ms
Geoip And Historical Migration.Historical Migration Set In Body 23ms
Geoip And Historical Migration.Historical Migration Absent By Default 22ms

Feature_Flags Tests

⚠️ 2/16 tests passed, 14 failed

View Details
Test Status Duration
Request Payload.Request With Person Properties Device Id 13ms
Request Payload.Flags Request Uses V2 Query Param 12ms
Request Payload.Flags Request Hits Flags Path Not Decide 13ms
Request Payload.Flags Request Omits Authorization Header 13ms
Request Payload.Token In Flags Body Matches Init 15ms
Request Payload.Groups Round Trip 14ms
Request Payload.Groups Default To Empty Object 15ms
Request Payload.Person Properties Distinct Id Auto Populated When Caller Omits It 16ms
Request Payload.Disable Geoip False Propagates As Geoip Disable False 15ms
Request Payload.Disable Geoip Omitted Defaults To False 15ms
Request Payload.Flag Keys To Evaluate Contains Only Requested Key 15ms
Request Lifecycle.No Flags Request On Init Alone 12ms
Request Lifecycle.No Flags Request On Normal Capture 23ms
Request Lifecycle.Two Flag Calls Produce Two Remote Requests 13ms
Request Lifecycle.Mock Response Value Is Returned To Caller 14ms
Side Effect Events.Get Feature Flag Captures Feature Flag Called Event 15ms

Failures

request_payload.request_with_person_properties_device_id

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-b43488d3-0a01-44d3-8873-0c2317965e44'

request_payload.flags_request_uses_v2_query_param

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-f68aa5ec-345f-494c-9b51-87e1ec2163a8'

request_payload.flags_request_hits_flags_path_not_decide

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-7315e31f-1bf4-4b4c-91a8-3ec18adf70bf'

request_payload.flags_request_omits_authorization_header

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-92f06fa6-5bb6-43ff-94e2-0e919a9fa1b6'

request_payload.token_in_flags_body_matches_init

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-293b5386-5283-4edd-8f99-2794c61240e3'

request_payload.groups_round_trip

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-18a2c8cd-8e9a-4021-8422-8e5a353caf36'

request_payload.groups_default_to_empty_object

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-cd1932a7-ae91-4d00-b24f-9e7780447649'

request_payload.person_properties_distinct_id_auto_populated_when_caller_omits_it

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-653e10bb-2daa-401b-82b0-4a1b51a94268'

request_payload.disable_geoip_false_propagates_as_geoip_disable_false

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-f4e5d067-3ea0-478b-8227-72c1580a0c64'

request_payload.disable_geoip_omitted_defaults_to_false

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-4ccb1845-989a-4325-8c0d-6ac1e5c4fc40'

request_payload.flag_keys_to_evaluate_contains_only_requested_key

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-61e56d45-c9c9-45a0-99dc-70a335db0684'

request_lifecycle.two_flag_calls_produce_two_remote_requests

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-1a9127e3-6851-4e3b-ac36-8203aaaa5fdd'

request_lifecycle.mock_response_value_is_returned_to_caller

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-5750cab6-3c74-47d2-9e1b-c134240d3615'

side_effect_events.get_feature_flag_captures_feature_flag_called_event

404, message='Not Found', url='http://sdk-adapter:8080/get_feature_flag?test_id=t-0db56348-0c5e-442a-804c-0ee1b92e6f76'

cat-ph added 3 commits June 16, 2026 00:12
…t work

Addresses review on PR #145.

- transport: re-arm the bounded-queue "full" warning once the queue drains to empty, so a service that repeatedly fills then drains warns once per episode instead of only on the very first overflow.

- transport: pending_events() now counts events accepted but not yet delivered (channel + worker buffer + retry queue) rather than only channel depth, by decrementing the in-flight counter when a batch reaches a terminal outcome (delivered/dropped) in both the v0 and v1 pipelines. This also makes the bounded queue cap total backlog.

- adapter: document that total_events_sent can over-count only when the SDK queue fills (capture is fire-and-forget); the acceptance suite never fills it.
pending_events now counts channel + worker buffer + retries; the public doc on both clients still described the old channel-only meaning (flagged by autoreview).
# Conflicts:
#	src/client/async_client.rs
#	src/client/blocking.rs
#	tests/test_async.rs
#	tests/test_blocking.rs
Comment thread src/client/mod.rs
/// Maximum number of events buffered before new events are dropped
/// (default: 10000). A single warning is logged while the queue is full.
#[builder(default = "10000")]
pub(crate) max_queue_size: usize,

@cat-ph cat-ph Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those defaults are a kind of mix between py and go now pretty much the same with py

@cat-ph cat-ph marked this pull request as ready for review June 16, 2026 00:13
@cat-ph cat-ph requested a review from a team as a code owner June 16, 2026 00:13
@greptile-apps

greptile-apps Bot commented Jun 16, 2026

Copy link
Copy Markdown

Reviews (2): Last reviewed commit: "Merge remote-tracking branch 'origin/mai..." | Re-trigger Greptile

Comment thread api/public-api.txt
Comment thread src/client/mod.rs
@marandaneto marandaneto requested a review from a team June 16, 2026 06:59
@marandaneto

Copy link
Copy Markdown
Member

maybe ask a Rust expert to take a look, as in someone who often contributes Rust code, e.g., Hughes, Ingestion folks, etc.?

@marandaneto marandaneto requested a review from hpouillot June 16, 2026 07:01
@cat-ph cat-ph requested a review from eli-r-ph June 16, 2026 12:46
@cat-ph

cat-ph commented Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

thanks! yep, pinging @eli-r-ph too 🙏 since I think he's also a bit familiar with the logic here (and I'm also changing the capture v1 stuff 😁)

cat-ph added 2 commits June 16, 2026 16:10
pending_events exposed internal queue depth only for the compliance adapter (the harness /state contract requires the field); it is not meaningful public API. Gate it, plus the internal TransportHandle::pending it calls, behind test-harness so normal builds no longer expose it.

The adapter enables test-harness so it still compiles; the --all-features public-API snapshot is unchanged (test-harness items appear there, like extra_capture_headers).
The helper used flush_interval_ms=50 and retry_initial_backoff_ms=1, so the background worker could fire an interval flush or a scheduled retry within milliseconds and race the exact assert_hits() checks (e.g. flush_keeps_events_retryable asserts hits==1 while the held batch auto-retried). The --workspace CI job lost that race.

Use long interval/backoff (600s) so every send is driven by the flush_at threshold or an explicit flush()/shutdown(); no test_transport test relies on autonomous timers (those are covered deterministically by the ManualClock unit tests in transport.rs). Verified stable across 35 runs in all three configs that run this suite.
@eli-r-ph

Copy link
Copy Markdown
Contributor

👋 hey @cat-ph thanks for doing this! it's something I was hoping to circle back to soon myself ❤️ couple things to look into I think:

Async capture path: I don't think having a single thread consumer (no pool) is safe here, any retried or higher-latency request will cause head-of-line blocking and could fill up the queue. I'd suggest using tokio::spawn (or runtime with a thread pool no bigger than num_cpus or smaller than (num_cpus / 2) + 1, if you're worried about starving the client app) to avoid this.

drop/flush/shutdown handling: I don't think this should block for very long as it could hold up app shutdown, especially if the queue is full or the active (head-of-line-blocked) event publish is busy. I think flush should be async and fire-and-forget for safety.

The core idea to drop new events enqueued when the queue is full is sound 👍

Capture timestamp handling:

  • capture v0:
    • event.timestamp if unset by the caller should be now at capture (enqueue) time
    • event.sent_at should be set at publish time
  • capture v1:
    • event.timestamp if unset by caller should be now at capture (enqueue) time also
    • Timestamp header and batch created_at should be set at publish time

Other than that, this LGTM 👍

@cat-ph

cat-ph commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

thanks for looking @eli-r-ph 💙

great catch with the timestamps, fixing it to be at capture time (& adding sent_at), the header/created_at were OK here but the timestamp wasn't 😬

I didn't go with tokio here to be able to use this in all 3 places that don't really share a runtime: the async client (w/ tokio), sync client (no async runtime) + std::panic in particular (arbitrary thread where runtime might not be present), I modeled it after go/py where both use a bg thread/goroutine

good point about the HOL block, I think for retries it's safe cause we defer it in the retries: VecDeque<RetryBatch> but the HTTP attempt itself still blocks, I think still worth doing a std-thread pool for this? might complicate ordering but I don't think we care since the timestamps are proper, just need a mutex on the retries VecDeque probably, taking a look! 👀

for the drop/flush/shutdown handling I'm not very sure, I'm definitely adding a shutdown timeout (good point), but I think flush should still be blocking? 🤔 since capture() is fire-and-forget and flush here was the only place to signal it's flushed (and it also returns on a failure, since that's rescheduled/retried separately) so not unbounded but blocking by design

cat-ph added 2 commits June 17, 2026 17:53
Events were timestamped when the worker built the batch (publish time),
so batching or a retry pushed an event's `timestamp` to when it was
finally sent rather than when it occurred. Stamp the capture (enqueue)
time on the producer side when the caller set none, for both the v0 and
v1 wire formats; v1 `created_at` and the request timestamp header already
recorded publish time and are unchanged.

Also add a v0 batch `sent_at` (publish time) so ingestion can do
clock-skew correction, matching what v1 already sends.

From @eli-r-ph's review on the event-transport PR.
shutdown() and Drop attempted one final send per pending batch with no
overall deadline, so a slow or unreachable endpoint could block process
teardown for up to (batches x request_timeout). Add a `shutdown_timeout_ms`
option (default 30000): the worker stamps a drain deadline when it handles
Shutdown (and on channel disconnect) and, once it passes, drops the
remaining buffered/retrying events instead of attempting them, so exit
can't hang. Drop takes the same path, so it is bounded too.

flush() stays a blocking, bounded delivery barrier (one attempt per batch,
returns even when the server is down) -- only teardown is time-bounded.
The drain paths now take an `Option<Instant>` deadline instead of a
`final_attempt` bool (Some => final attempt + deadline-gated).

From @eli-r-ph's review on the event-transport PR.
@hpouillot hpouillot requested a review from ablaszkiewicz June 17, 2026 15:49
Regenerated api/public-api.txt to record the new
ClientOptionsBuilder::shutdown_timeout_ms builder method added alongside the
shutdown drain timeout.
Comment thread src/client/async_client.rs Outdated
cat-ph added 17 commits June 22, 2026 16:11
The separate historical path sent batches inline in the HistoricalBatch
handler, so a Shutdown queued behind them was observed too late: historical
events were POSTed outside the shutdown deadline (e.g. shutdown_timeout_ms(0)
dropped buffered events but still sent historical ones). Queue historical chunks
and drain them on the flush interval like the live buffer, so the worker is
blocked on recv when Shutdown arrives and drains/abandons them under the
deadline. Found by autoreview.
The Timeout/Tick branches drained queued historical batches on any wakeup, but
a wakeup can be caused by the live-buffer deadline or a due retry -- so
historical was sent before its own interval elapsed, and a large drain could
delay the retry that triggered the wake. Guard the drain with the historical
interval (like the live buffer), keeping the batch queued until it's due. Found
by autoreview.
Pre-refactor, historical events flowed through the live buffer and were
auto-flushed at flush_at; the separate path only flushed on the interval, so
under a high flush_interval they'd sit (holding queue slots) until the interval
or an explicit flush. Mirror the live buffer: once flush_at events have queued,
send them right away. Below the threshold they stay queued, so a pending
shutdown still bounds them. Found by autoreview.
Drop `async` and `Result` from `Client::capture` / `Client::capture_batch`
on both clients and from the global `capture`. Enqueueing is non-blocking
and delivery failures are handled by the background worker, not surfaced to
the caller, so there was nothing meaningful to `await` or return.

`capture_exception` keeps its `Result` and async signature: building the
exception event is still fallible.

Collapse the two cfg-gated global `capture` definitions into a single
fire-and-forget fn now that the async and blocking signatures match, and
regenerate the public-API snapshot.
V0 `send_batch` captured the event count before `build_batch_payload` ran
`before_send`, which can drop events. The in-flight `len` accounting stayed
correct, but a batch held for retry counted the dropped events as in-flight —
inflating the bounded-queue depth and over-reporting the "dropping N event(s)"
logs — until that batch reached a terminal outcome.

`build_batch_payload` now returns the surviving event count; `send_batch`
decrements the before_send-dropped events immediately (they are terminal) and
tracks only the survivors, matching the V1 pipeline. Adds a regression test
asserting a before_send-dropped event in a retry-held batch is not counted as
in-flight.
`shutdown_timeout_ms` was only checked between batches: a single in-flight
blocking send still used the full `request_timeout_seconds` (30s default), so an
endpoint that accepted the request and then stalled could push `shutdown()` /
`Drop` well past the configured bound.

Thread the shutdown deadline into the send path (replacing the `final_attempt`
bool) and cap each shutdown/disconnect request's reqwest timeout at the time left
before the deadline, never exceeding the normal request timeout. Adds a
regression test that a stalled endpoint no longer blocks teardown past
`shutdown_timeout_ms`.
A `flush()` whose `Control::Flush` raced into the channel behind a `Shutdown`
(reachable with a shared `Arc<Client>` when one thread flushes while another
shuts down or drops the last handle) was never signaled: the worker processed
`Shutdown`, signaled only that completion, and returned, leaving the flush
caller's wait (`rx.await` / `rx.recv`) hanging forever.

Drain the channel when the worker exits on `Shutdown` and signal any queued
flush/shutdown completions so their callers return instead of blocking. Queued
captures are dropped — the client is closing. Adds a unit test for the helper.
`shutdown_timeout_ms` bounds the shutdown drain, but the single background
worker performs one blocking send at a time: a send already in flight when
shutdown is requested first runs to `request_timeout_seconds` before the drain
begins. The previous wording ("bounds how long teardown can block on a slow or
unreachable endpoint") overclaimed for that case. Clarify the option doc and the
worker comment; the full total-teardown bound comes with the sender pool.
Scheduled retries and the interval flush were only serviced on the idle
`Timeout` wake. Under sustained capture traffic the worker's `recv` always
returns a `Capture` before the timeout elapses, so due retries were never
re-attempted (and the interval flush was deferred to the `flush_at` size
trigger), leaving transient-failure batches pending indefinitely and holding
queue slots.

Move that servicing into a shared block that runs after every non-terminal wake
(Capture/HistoricalBatch/Flush/Tick/Timeout); `Shutdown` and the disconnect path
still `return` first so their drain stays deadline-bounded. The test-only `Tick`
defers its completion signal until after the block so clock-driven tests still
observe the serviced work.
`send_control` does not check `closed`; it returns `false` only once the worker
has exited (the channel is disconnected). Document that accurately, and why a
flush racing shutdown cannot hang: an orphaned control is either signaled by the
worker on its way out, or dropped with the channel — which wakes the caller's
wait with a recv error.
Expand the `shutdown_timeout_ms` doc (and the worker comment): the timeout bounds
the shutdown drain, but the single worker runs one blocking send at a time, so an
automatic flush/drain already in progress when shutdown is requested runs to
completion first — up to `request_timeout_seconds` per in-flight batch. A large
auto-drain can therefore delay teardown by several request timeouts. The sender
pool removes this; this just makes the single-worker contract accurate.
`shutdown()` and `Drop` gated both sending the `Shutdown` control and the worker
`join()` behind `begin_close()`. A non-winning caller returned immediately
without waiting; worse, if an async `shutdown().await` was cancelled (a timeout
or select) while awaiting the worker, `join()` was skipped, and a later `Drop`
saw `closed == true` and also skipped `join()` — leaving the worker draining on
a detached thread that the process could outlive, losing the flush guarantee.

Move `join()` outside the `begin_close()` guard on all four paths (async/blocking
x shutdown/drop) so every caller waits for the worker. The winner sends the
`Shutdown` synchronously before any await, so the worker always exits and
`join()` cannot hang.
`drain_pending_completions` (added to unblock a flush racing shutdown) dropped
`Capture`/`HistoricalBatch` messages still queued when the worker exits, but did
not decrement the in-flight `len` counter for them. A capture that reserved a
queue slot just before shutdown left `pending_events()` permanently nonzero
after the worker exited, skewing the compliance adapter's state/total-sent.

Pass `len` into `drain_pending_completions` and decrement by the dropped event
count. Extends the unit test to assert the counter settles to 0.
`backoff_duration` honored `Retry-After` as an unbounded `Duration::from_secs`.
A hostile or buggy `Retry-After` (or an extreme `retry_max_backoff_ms`) could
then overflow the worker's `now + delay` `Instant` addition, panicking the
worker thread — after which all later captures are silently dropped because
sends hit a disconnected channel.

Cap the scheduled backoff at one day (far beyond any sane retry delay) so the
addition can't overflow. Covers both V0 and V1 (shared `backoff_duration`).
Adds a unit test for an absurd `Retry-After`.
…tant

The worker computes `now + shutdown_timeout` on the shutdown/disconnect drain;
an absurd `shutdown_timeout_ms` could overflow that `Instant + Duration` and
panic the worker — the same failure mode `RETRY_BACKOFF_CAP` guards for retry
backoff. Clamp the timeout to one day at worker startup. Operator config (not
server-controlled), so low severity, but cheap and consistent.
The async `Client`'s `Drop` does a blocking drain, so dropping a `Client` inside
an async task stalls that executor thread until the drain finishes (up to
`shutdown_timeout_ms` plus any in-flight request). Make the caveat explicit and
steer callers to `shutdown().await` first.
`capture_batch` enqueues per event, so a full bounded queue can drop the tail of
a logical batch — unlike the prior model, which sent one atomic request.
Document the non-atomic behavior on both clients.
@cat-ph

cat-ph commented Jun 22, 2026

Copy link
Copy Markdown
Contributor Author

^ bunch of updates above, but mostly kept in separate commits, tl;dr:

  • historical migration goes through separate path to simplify (Hugues' suggestion)
  • fixed event count after before_send (Dustin's catch)
  • a few small clamps for durations
  • the shutdown deadline should be properly handled now (passed through reqwest etc, after discussion with Eli above)
  • dropped async/result from the capture public API - should be simpler now since it's fire-and-forget (technically a breaking change 😅)

@hpouillot hpouillot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🦀 🦀 🦀

cat-ph added 3 commits June 24, 2026 12:59
Brings in v0.13.2 / v0.13.3: #148 (remove the duplicate v0 batch-event api key,
via `InnerEvent::new_for_batch`) and #153 (internalize `EventOptions` on the
capture-v1 path).

Only one conflict: compliance/adapter/src/main.rs. The branch had moved the
adapter's event-building before the instances lock, while #153 replaced
`Event::set_option` (no longer public to the adapter) with a wire-option ->
magic-property mapping through a new `HARNESS_OPTION_TO_PROP` table. Kept the
branch's structure and applied #153's mapping to the pre-lock build; dropped the
now-redundant in-lock build. Everything else auto-merged.

Regenerated api/public-api.txt (matched the auto-merge). Full gate green across
all six feature configs; fmt + clippy clean.
The changeset still said `capture` / `capture_batch` "return `Ok(())`", but the
fire-and-forget change dropped `async` + `Result` — they now enqueue and return
`()` infallibly (no longer `async` on the async client). Also drop
`pending_events()` from the listed public API (it is `test-harness`-gated, not
part of the normal public surface) and add the new `shutdown_timeout_ms` option.
`now_utc` is used in the v0/default build too — `enqueue`/`enqueue_historical`
stamp each event's capture timestamp with it, and the v0 `send_batch` uses it
for the batch `sent_at`. So the "Unused by the v0 pipeline" doc was backwards
and the `#[cfg_attr(not(feature = "capture-v1"), allow(dead_code))]` was dead
(clippy is clean without it in every config). Reword the doc to its actual uses
and remove the allow.
@cat-ph cat-ph merged commit 155d00a into main Jun 24, 2026
25 checks passed
@cat-ph cat-ph deleted the ci/rust-event-transport branch June 24, 2026 14:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants