feat(aggregate): add AggregatorHandle for on-demand flush#1309
feat(aggregate): add AggregatorHandle for on-demand flush#1309duncanista wants to merge 2 commits intomainfrom
Conversation
Add an AggregatorHandle that allows triggering flushes of the aggregation state from outside the topology. This is essential for serverless environments like AWS Lambda where flushing must happen at invocation boundaries rather than on a fixed timer. Usage: let mut config = AggregateConfiguration::with_defaults(); let handle = config.create_handle(); // ... build and spawn topology with config ... // Later, trigger flush: handle.flush().await?; The handle communicates with the aggregate transform via an unbounded channel. When a flush is requested, the transform immediately flushes all closed aggregation windows and dispatches the results downstream. The existing timer-based flushing continues to work alongside on-demand flushes.
tobz
left a comment
There was a problem hiding this comment.
I'm still thinking through the change in behavior to the flushed metrics themselves, but left some feedback on the implementation of exposing on-demand flushing in the meantime.
| /// Receiver for on-demand flush requests. Created via `create_handle()`. | ||
| /// Wrapped in a Mutex to allow `take()` from the `&self` `build()` method. | ||
| #[serde(skip)] | ||
| flush_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<oneshot::Sender<()>>>>, |
There was a problem hiding this comment.
Realistically, this should be Arc<tokio::sync::Mutex<mpsc::Receiver<...>>>.
The design of component configuration is such that we eventually want to be able to respawn components. Components that expose external control mechanisms like this (a handle to externally control behavior) have to do so in a way where another instance of this component could conceivably be spawned and then take over. What we have here currently would consume the receiver and make it unavailable to the next instance of the component. By wrapping it in Arc<tokio::sync::Mutex<...>>, we allow for reusing it in future instances, and also gracefully awaiting lock acquisition in an async-friendly way.
The change from UnboundedReceiver to Receiver is because unbounded channels are bad. We never use them in Saluki.
Per tobz review: - Use Arc<tokio::sync::Mutex<Receiver>> instead of Mutex<Option<UnboundedReceiver>> to support component respawning and async-friendly lock acquisition - Switch from unbounded to bounded channel (capacity 64) - Import generic_error! macro directly instead of path-qualifying - Use OwnedMutexGuard so the lock is held per-instance and released on respawn
Binary Size Analysis (Agent Data Plane)Target: b7aedc3 (baseline) vs 8972e62 (comparison) diff
|
| Module | File Size | Symbols |
|---|---|---|
saluki_components::transforms::aggregate |
+7.65 KiB | 99 |
core |
+2.97 KiB | 43 |
[Unmapped] |
+1.56 KiB | 1 |
alloc |
+1.19 KiB | 68 |
tokio |
+1.16 KiB | 2 |
[sections] |
+594 B | 7 |
hashbrown |
+492 B | 5 |
saluki_components::common::datadog |
+252 B | 1 |
hyper |
+90 B | 3 |
saluki_config::secrets::resolver |
-90 B | 1 |
unicode_segmentation |
+56 B | 1 |
saluki_core::topology::blueprint |
+6 B | 2 |
hickory_proto |
+0 B | 10 |
rustls |
+0 B | 6 |
webpki |
+0 B | 2 |
figment |
+0 B | 8 |
saluki_context::tags::tagset |
+0 B | 2 |
tonic |
+0 B | 4 |
serde_yaml |
+0 B | 4 |
crossbeam_channel |
+0 B | 8 |
Detailed Symbol Changes
FILE SIZE VM SIZE
-------------- --------------
[NEW] +29.9Ki [NEW] +29.7Ki _<saluki_components::transforms::aggregate::Aggregate as saluki_core::components::transforms::Transform>::run::_{{closure}}::he85970d968cba8e1
[NEW] +21.2Ki [NEW] +21.0Ki saluki_components::transforms::aggregate::AggregateConfiguration::from_configuration::h9d87a648c9790d41
[NEW] +18.8Ki [NEW] +18.6Ki saluki_components::transforms::aggregate::AggregationState::flush::_{{closure}}::h8c923b65048695f1
+0.7% +8.99Ki +0.4% +4.02Ki [330 Others]
[NEW] +5.13Ki [NEW] +5.00Ki <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.11065
[NEW] +5.00Ki [NEW] +4.77Ki _<saluki_components::transforms::aggregate::AggregateConfiguration as saluki_core::components::transforms::builder::TransformBuilder>::build::_{{closure}}::he42a78a7686f6660
[NEW] +4.71Ki [NEW] +4.59Ki <rustls::error::Error as core::clone::Clone>::clone.11080
[NEW] +3.83Ki [NEW] +3.71Ki <webpki::error::Error as core::fmt::Debug>::fmt.11258
[NEW] +3.57Ki [NEW] +3.45Ki <rustls::error::Error as core::fmt::Debug>::fmt.11081
[NEW] +2.33Ki [NEW] +2.22Ki <figment::error::Kind as core::fmt::Debug>::fmt.10069
[NEW] +2.20Ki [NEW] +2.08Ki <figment::error::Actual as core::fmt::Debug>::fmt.10070
[NEW] +2.20Ki [NEW] +1.97Ki core::ptr::drop_in_place<<saluki_components::transforms::aggregate::Aggregate as saluki_core::components::transforms::Transform>::run::{{closure}}>::h262a5e37a51564b9
[DEL] -2.20Ki [DEL] -2.08Ki <figment::error::Actual as core::fmt::Debug>::fmt.10068
[DEL] -2.33Ki [DEL] -2.22Ki <figment::error::Kind as core::fmt::Debug>::fmt.10067
[DEL] -3.57Ki [DEL] -3.45Ki <rustls::error::Error as core::fmt::Debug>::fmt.11079
[DEL] -3.83Ki [DEL] -3.71Ki <webpki::error::Error as core::fmt::Debug>::fmt.11256
[DEL] -4.71Ki [DEL] -4.59Ki <rustls::error::Error as core::clone::Clone>::clone.11078
[DEL] -4.87Ki [DEL] -4.64Ki _<saluki_components::transforms::aggregate::AggregateConfiguration as saluki_core::components::transforms::builder::TransformBuilder>::build::_{{closure}}::ha15a79160e9766d2
[DEL] -5.13Ki [DEL] -5.00Ki <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.11063
[DEL] -21.2Ki [DEL] -21.0Ki saluki_components::transforms::aggregate::AggregateConfiguration::from_configuration::h13c50565b2fff687
[DEL] -44.1Ki [DEL] -43.9Ki _<saluki_components::transforms::aggregate::Aggregate as saluki_core::components::transforms::Transform>::run::_{{closure}}::h02e7a06583114c5e
+0.1% +15.9Ki +0.0% +10.6Ki TOTAL
Regression Detector (Agent Data Plane)Regression Detector ResultsRun ID: db0a06a0-daca-47ef-885a-328a092bffa6 Baseline: b7aedc3 Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | -0.00 | [-0.14, +0.13] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -3.57 | [-8.27, +1.12] | 1 | (metrics) (profiles) (logs) |
| ✅ | otlp_ingest_logs_5mb_memory | memory utilization | -8.92 | [-9.35, -8.49] | 1 | (metrics) (profiles) (logs) |
Fine details of change detection per experiment
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_traces_5mb_cpu | % cpu utilization | +2.50 | [+0.36, +4.65] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_cpu | % cpu utilization | +1.64 | [-5.64, +8.92] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_cpu | % cpu utilization | +0.89 | [-5.18, +6.96] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_memory | memory utilization | +0.58 | [+0.33, +0.83] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_heavy | memory utilization | +0.23 | [+0.10, +0.37] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_cpu | % cpu utilization | +0.18 | [-1.32, +1.68] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_idle | memory utilization | +0.18 | [+0.16, +0.20] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_memory | memory utilization | +0.14 | [-0.04, +0.32] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_memory | memory utilization | +0.11 | [-0.07, +0.30] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_medium | memory utilization | +0.08 | [-0.11, +0.27] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_memory | memory utilization | +0.07 | [-0.11, +0.24] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_throughput | ingress throughput | +0.02 | [-0.12, +0.15] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_memory | memory utilization | +0.01 | [-0.17, +0.18] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.03, +0.04] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.05, +0.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_throughput | ingress throughput | +0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.06, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_throughput | ingress throughput | -0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_throughput | ingress throughput | -0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | -0.00 | [-0.14, +0.13] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_ultraheavy | memory utilization | -0.02 | [-0.14, +0.11] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_throughput | ingress throughput | -0.03 | [-0.16, +0.10] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_memory | memory utilization | -0.22 | [-0.38, -0.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_memory | memory utilization | -0.38 | [-0.62, -0.13] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_low | memory utilization | -0.40 | [-0.59, -0.21] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_memory | memory utilization | -0.42 | [-0.76, -0.08] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_throughput | ingress throughput | -1.49 | [-1.62, -1.36] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_cpu | % cpu utilization | -1.68 | [-32.42, +29.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_memory | memory utilization | -1.69 | [-1.96, -1.42] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_cpu | % cpu utilization | -1.99 | [-4.39, +0.41] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_cpu | % cpu utilization | -2.31 | [-56.36, +51.74] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_cpu | % cpu utilization | -2.51 | [-4.59, -0.44] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -3.57 | [-8.27, +1.12] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_cpu | % cpu utilization | -6.64 | [-61.85, +48.58] | 1 | (metrics) (profiles) (logs) |
| ✅ | otlp_ingest_logs_5mb_memory | memory utilization | -8.92 | [-9.35, -8.49] | 1 | (metrics) (profiles) (logs) |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | observed_value | links |
|---|---|---|---|---|---|
| ✅ | quality_gates_rss_dsd_heavy | memory_usage | 10/10 | 113.81MiB ≤ 140MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_low | memory_usage | 10/10 | 33.66MiB ≤ 50MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_medium | memory_usage | 10/10 | 53.06MiB ≤ 75MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_ultraheavy | memory_usage | 10/10 | 168.52MiB ≤ 200MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_idle | memory_usage | 10/10 | 21.03MiB ≤ 40MiB | (metrics) (profiles) (logs) |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
| /// Wrapped in `Arc<tokio::sync::Mutex<...>>` to allow sharing across component respawns | ||
| /// and to enable async-friendly lock acquisition. | ||
| #[serde(skip)] | ||
| flush_rx: Option<Arc<tokio::sync::Mutex<mpsc::Receiver<oneshot::Sender<()>>>>>, |
There was a problem hiding this comment.
Sorry, I should have been more clear.
We should always create the flush channel, basically: make this into (Arc<Mutex<Receiver<...>>>, Sender<...>). From there, we would also make flush_rx in Aggregate simply be Arc<Mutex<Receiver<...>>>, and then we get to avoid the code around checking if it's set or not: we just always acquire the lock/receiver. This would also mean that AggregateConfiguration::create_handle just ends up cloning instead of creating anything new.
It's a little less efficient than the pending when not in use, but I prefer the clarity of just assuming we always need to check/care about the flush channel.
Summary
Add an
AggregatorHandleto the Aggregate transform that allows triggering on-demand flushes from outside the topology. This is essential for serverless environments like AWS Lambda where flushing must happen at invocation boundaries rather than solely on a fixed timer.The handle communicates with the transform via a bounded channel (
mpsc::channel(64)). When a flush is requested, the transform immediately flushes all closed aggregation windows and dispatches the results downstream through the normal pipeline (encoder → forwarder).The receiver is wrapped in
Arc<tokio::sync::Mutex<...>>withOwnedMutexGuardacquisition, so the lock is held per-instance and released on component respawn.Usage
```rust
let mut config = AggregateConfiguration::with_defaults();
let handle = config.create_handle();
// ... build and spawn topology with config ...
// Later, trigger flush at invocation boundary:
handle.flush().await?;
```
Timer-based flushing continues to work alongside on-demand flushes. If `create_handle()` is never called, behavior is unchanged.
Change Type
How did you test this PR?
References