Skip to content

feat(aggregate): add AggregatorHandle for on-demand flush#1309

Draft
duncanista wants to merge 2 commits intomainfrom
jordan.gonzalez/aggregate/on-demand-flush
Draft

feat(aggregate): add AggregatorHandle for on-demand flush#1309
duncanista wants to merge 2 commits intomainfrom
jordan.gonzalez/aggregate/on-demand-flush

Conversation

@duncanista
Copy link
Copy Markdown
Contributor

@duncanista duncanista commented Apr 3, 2026

Summary

Add an AggregatorHandle to the Aggregate transform that allows triggering on-demand flushes from outside the topology. This is essential for serverless environments like AWS Lambda where flushing must happen at invocation boundaries rather than solely on a fixed timer.

The handle communicates with the transform via a bounded channel (mpsc::channel(64)). When a flush is requested, the transform immediately flushes all closed aggregation windows and dispatches the results downstream through the normal pipeline (encoder → forwarder).

The receiver is wrapped in Arc<tokio::sync::Mutex<...>> with OwnedMutexGuard acquisition, so the lock is held per-instance and released on component respawn.

Usage

```rust
let mut config = AggregateConfiguration::with_defaults();
let handle = config.create_handle();
// ... build and spawn topology with config ...

// Later, trigger flush at invocation boundary:
handle.flush().await?;
```

Timer-based flushing continues to work alongside on-demand flushes. If `create_handle()` is never called, behavior is unchanged.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

How did you test this PR?

  • `cargo check -p saluki-components` passes
  • Used in the datadog-lambda-extension integration branch (`jordan.gonzalez/serverless/saluki-dogstatsd-integration`) — verified on-demand flush triggers metric dispatch through encoder and forwarder on a real Lambda function
  • Logs confirm: "On-demand flush of aggregated metrics..." → "Dispatched on-demand flushed events." → "Request succeeded."

References

Add an AggregatorHandle that allows triggering flushes of the
aggregation state from outside the topology. This is essential for
serverless environments like AWS Lambda where flushing must happen
at invocation boundaries rather than on a fixed timer.

Usage:
  let mut config = AggregateConfiguration::with_defaults();
  let handle = config.create_handle();
  // ... build and spawn topology with config ...
  // Later, trigger flush:
  handle.flush().await?;

The handle communicates with the aggregate transform via an unbounded
channel. When a flush is requested, the transform immediately flushes
all closed aggregation windows and dispatches the results downstream.
The existing timer-based flushing continues to work alongside
on-demand flushes.
@dd-octo-sts dd-octo-sts bot added area/components Sources, transforms, and destinations. transform/aggregate Aggregate transform. labels Apr 3, 2026
Copy link
Copy Markdown
Member

@tobz tobz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still thinking through the change in behavior to the flushed metrics themselves, but left some feedback on the implementation of exposing on-demand flushing in the meantime.

/// Receiver for on-demand flush requests. Created via `create_handle()`.
/// Wrapped in a Mutex to allow `take()` from the `&self` `build()` method.
#[serde(skip)]
flush_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<oneshot::Sender<()>>>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realistically, this should be Arc<tokio::sync::Mutex<mpsc::Receiver<...>>>.

The design of component configuration is such that we eventually want to be able to respawn components. Components that expose external control mechanisms like this (a handle to externally control behavior) have to do so in a way where another instance of this component could conceivably be spawned and then take over. What we have here currently would consume the receiver and make it unavailable to the next instance of the component. By wrapping it in Arc<tokio::sync::Mutex<...>>, we allow for reusing it in future instances, and also gracefully awaiting lock acquisition in an async-friendly way.

The change from UnboundedReceiver to Receiver is because unbounded channels are bad. We never use them in Saluki.

Per tobz review:
- Use Arc<tokio::sync::Mutex<Receiver>> instead of Mutex<Option<UnboundedReceiver>>
  to support component respawning and async-friendly lock acquisition
- Switch from unbounded to bounded channel (capacity 64)
- Import generic_error! macro directly instead of path-qualifying
- Use OwnedMutexGuard so the lock is held per-instance and released on respawn
@pr-commenter
Copy link
Copy Markdown

pr-commenter bot commented Apr 3, 2026

Binary Size Analysis (Agent Data Plane)

Target: b7aedc3 (baseline) vs 8972e62 (comparison) diff
Analysis Type: Stripped binaries (debug symbols excluded)
Baseline Size: 26.35 MiB
Comparison Size: 26.36 MiB
Size Change: +15.90 KiB (+0.06%)
Pass/Fail Threshold: +5%
Result: PASSED ✅

Changes by Module

Module File Size Symbols
saluki_components::transforms::aggregate +7.65 KiB 99
core +2.97 KiB 43
[Unmapped] +1.56 KiB 1
alloc +1.19 KiB 68
tokio +1.16 KiB 2
[sections] +594 B 7
hashbrown +492 B 5
saluki_components::common::datadog +252 B 1
hyper +90 B 3
saluki_config::secrets::resolver -90 B 1
unicode_segmentation +56 B 1
saluki_core::topology::blueprint +6 B 2
hickory_proto +0 B 10
rustls +0 B 6
webpki +0 B 2
figment +0 B 8
saluki_context::tags::tagset +0 B 2
tonic +0 B 4
serde_yaml +0 B 4
crossbeam_channel +0 B 8

Detailed Symbol Changes

    FILE SIZE        VM SIZE    
 --------------  -------------- 
  [NEW] +29.9Ki  [NEW] +29.7Ki    _<saluki_components::transforms::aggregate::Aggregate as saluki_core::components::transforms::Transform>::run::_{{closure}}::he85970d968cba8e1
  [NEW] +21.2Ki  [NEW] +21.0Ki    saluki_components::transforms::aggregate::AggregateConfiguration::from_configuration::h9d87a648c9790d41
  [NEW] +18.8Ki  [NEW] +18.6Ki    saluki_components::transforms::aggregate::AggregationState::flush::_{{closure}}::h8c923b65048695f1
  +0.7% +8.99Ki  +0.4% +4.02Ki    [330 Others]
  [NEW] +5.13Ki  [NEW] +5.00Ki    <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.11065
  [NEW] +5.00Ki  [NEW] +4.77Ki    _<saluki_components::transforms::aggregate::AggregateConfiguration as saluki_core::components::transforms::builder::TransformBuilder>::build::_{{closure}}::he42a78a7686f6660
  [NEW] +4.71Ki  [NEW] +4.59Ki    <rustls::error::Error as core::clone::Clone>::clone.11080
  [NEW] +3.83Ki  [NEW] +3.71Ki    <webpki::error::Error as core::fmt::Debug>::fmt.11258
  [NEW] +3.57Ki  [NEW] +3.45Ki    <rustls::error::Error as core::fmt::Debug>::fmt.11081
  [NEW] +2.33Ki  [NEW] +2.22Ki    <figment::error::Kind as core::fmt::Debug>::fmt.10069
  [NEW] +2.20Ki  [NEW] +2.08Ki    <figment::error::Actual as core::fmt::Debug>::fmt.10070
  [NEW] +2.20Ki  [NEW] +1.97Ki    core::ptr::drop_in_place<<saluki_components::transforms::aggregate::Aggregate as saluki_core::components::transforms::Transform>::run::{{closure}}>::h262a5e37a51564b9
  [DEL] -2.20Ki  [DEL] -2.08Ki    <figment::error::Actual as core::fmt::Debug>::fmt.10068
  [DEL] -2.33Ki  [DEL] -2.22Ki    <figment::error::Kind as core::fmt::Debug>::fmt.10067
  [DEL] -3.57Ki  [DEL] -3.45Ki    <rustls::error::Error as core::fmt::Debug>::fmt.11079
  [DEL] -3.83Ki  [DEL] -3.71Ki    <webpki::error::Error as core::fmt::Debug>::fmt.11256
  [DEL] -4.71Ki  [DEL] -4.59Ki    <rustls::error::Error as core::clone::Clone>::clone.11078
  [DEL] -4.87Ki  [DEL] -4.64Ki    _<saluki_components::transforms::aggregate::AggregateConfiguration as saluki_core::components::transforms::builder::TransformBuilder>::build::_{{closure}}::ha15a79160e9766d2
  [DEL] -5.13Ki  [DEL] -5.00Ki    <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.11063
  [DEL] -21.2Ki  [DEL] -21.0Ki    saluki_components::transforms::aggregate::AggregateConfiguration::from_configuration::h13c50565b2fff687
  [DEL] -44.1Ki  [DEL] -43.9Ki    _<saluki_components::transforms::aggregate::Aggregate as saluki_core::components::transforms::Transform>::run::_{{closure}}::h02e7a06583114c5e
  +0.1% +15.9Ki  +0.0% +10.6Ki    TOTAL

@pr-commenter
Copy link
Copy Markdown

pr-commenter bot commented Apr 3, 2026

Regression Detector (Agent Data Plane)

Regression Detector Results

Run ID: db0a06a0-daca-47ef-885a-328a092bffa6

Baseline: b7aedc3
Comparison: 8972e62
Diff

Optimization Goals: ✅ No significant changes detected

Experiments ignored for regressions

Regressions in experiments with settings containing erratic: true are ignored.

perf experiment goal Δ mean % Δ mean % CI trials links
otlp_ingest_logs_5mb_throughput ingress throughput -0.00 [-0.14, +0.13] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_cpu % cpu utilization -3.57 [-8.27, +1.12] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_memory memory utilization -8.92 [-9.35, -8.49] 1 (metrics) (profiles) (logs)

Fine details of change detection per experiment

perf experiment goal Δ mean % Δ mean % CI trials links
otlp_ingest_traces_5mb_cpu % cpu utilization +2.50 [+0.36, +4.65] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_cpu % cpu utilization +1.64 [-5.64, +8.92] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_cpu % cpu utilization +0.89 [-5.18, +6.96] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_ottl_transform_5mb_memory memory utilization +0.58 [+0.33, +0.83] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_heavy memory utilization +0.23 [+0.10, +0.37] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_cpu % cpu utilization +0.18 [-1.32, +1.68] 1 (metrics) (profiles) (logs)
quality_gates_rss_idle memory utilization +0.18 [+0.16, +0.20] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_memory memory utilization +0.14 [-0.04, +0.32] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_memory memory utilization +0.11 [-0.07, +0.30] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_medium memory utilization +0.08 [-0.11, +0.27] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_memory memory utilization +0.07 [-0.11, +0.24] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_throughput ingress throughput +0.02 [-0.12, +0.15] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_memory memory utilization +0.01 [-0.17, +0.18] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_throughput ingress throughput +0.00 [-0.03, +0.04] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_throughput ingress throughput +0.00 [-0.05, +0.05] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_throughput ingress throughput +0.00 [-0.02, +0.02] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_throughput ingress throughput +0.00 [-0.06, +0.06] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_ottl_filtering_5mb_throughput ingress throughput -0.00 [-0.02, +0.02] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_ottl_transform_5mb_throughput ingress throughput -0.00 [-0.02, +0.02] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_throughput ingress throughput -0.00 [-0.14, +0.13] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_ultraheavy memory utilization -0.02 [-0.14, +0.11] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_throughput ingress throughput -0.03 [-0.16, +0.10] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_memory memory utilization -0.22 [-0.38, -0.05] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_memory memory utilization -0.38 [-0.62, -0.13] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_low memory utilization -0.40 [-0.59, -0.21] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_ottl_filtering_5mb_memory memory utilization -0.42 [-0.76, -0.08] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_throughput ingress throughput -1.49 [-1.62, -1.36] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_cpu % cpu utilization -1.68 [-32.42, +29.06] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_memory memory utilization -1.69 [-1.96, -1.42] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_ottl_filtering_5mb_cpu % cpu utilization -1.99 [-4.39, +0.41] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_cpu % cpu utilization -2.31 [-56.36, +51.74] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_ottl_transform_5mb_cpu % cpu utilization -2.51 [-4.59, -0.44] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_cpu % cpu utilization -3.57 [-8.27, +1.12] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_cpu % cpu utilization -6.64 [-61.85, +48.58] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_memory memory utilization -8.92 [-9.35, -8.49] 1 (metrics) (profiles) (logs)

Bounds Checks: ✅ Passed

perf experiment bounds_check_name replicates_passed observed_value links
quality_gates_rss_dsd_heavy memory_usage 10/10 113.81MiB ≤ 140MiB (metrics) (profiles) (logs)
quality_gates_rss_dsd_low memory_usage 10/10 33.66MiB ≤ 50MiB (metrics) (profiles) (logs)
quality_gates_rss_dsd_medium memory_usage 10/10 53.06MiB ≤ 75MiB (metrics) (profiles) (logs)
quality_gates_rss_dsd_ultraheavy memory_usage 10/10 168.52MiB ≤ 200MiB (metrics) (profiles) (logs)
quality_gates_rss_idle memory_usage 10/10 21.03MiB ≤ 40MiB (metrics) (profiles) (logs)

Explanation

Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%

Performance changes are noted in the perf column of each table:

  • ✅ = significantly better comparison variant performance
  • ❌ = significantly worse comparison variant performance
  • ➖ = no significant change in performance

A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".

For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:

  1. Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.

  2. Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.

  3. Its configuration does not mark it "erratic".

/// Wrapped in `Arc<tokio::sync::Mutex<...>>` to allow sharing across component respawns
/// and to enable async-friendly lock acquisition.
#[serde(skip)]
flush_rx: Option<Arc<tokio::sync::Mutex<mpsc::Receiver<oneshot::Sender<()>>>>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have been more clear.

We should always create the flush channel, basically: make this into (Arc<Mutex<Receiver<...>>>, Sender<...>). From there, we would also make flush_rx in Aggregate simply be Arc<Mutex<Receiver<...>>>, and then we get to avoid the code around checking if it's set or not: we just always acquire the lock/receiver. This would also mean that AggregateConfiguration::create_handle just ends up cloning instead of creating anything new.

It's a little less efficient than the pending when not in use, but I prefer the clarity of just assuming we always need to check/care about the flush channel.

@tobz tobz added the type/enhancement An enhancement in functionality or support. label Apr 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/components Sources, transforms, and destinations. transform/aggregate Aggregate transform. type/enhancement An enhancement in functionality or support.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants