Conversation
c3124df to
eedcbbb
Compare
9870966 to
871f057
Compare
There was a problem hiding this comment.
Pull request overview
This PR enhances the agent data plane by implementing a supervisor scheme for internal observability and control plane management. The change adds Future trait implementation to ProcessShutdown to enable it to be directly awaited in async contexts.
Changes:
- Implements the
Futuretrait forProcessShutdownto allow direct polling of shutdown signals
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Binary Size Analysis (Agent Data Plane)Target: 7d03456 (baseline) vs c695d2f (comparison) diff
|
| Module | File Size | Symbols |
|---|---|---|
saluki_core::runtime::supervisor |
+67.73 KiB | 61 |
core |
+43.65 KiB | 228 |
agent_data_plane::internal::initialize_and_launch_runtime |
-18.37 KiB | 1 |
std |
-10.85 KiB | 50 |
anyhow |
+8.25 KiB | 28 |
[sections] |
+7.60 KiB | 10 |
agent_data_plane::internal::control_plane |
-6.87 KiB | 26 |
saluki_core::runtime::process |
+6.71 KiB | 6 |
saluki_core::topology::running |
+5.50 KiB | 2 |
saluki_app::metrics::collect_runtime_metrics |
-4.75 KiB | 1 |
agent_data_plane::internal::observability |
+4.66 KiB | 16 |
tokio |
-3.79 KiB | 73 |
saluki_core::runtime::restart |
+3.50 KiB | 7 |
tracing_core |
+2.24 KiB | 16 |
agent_data_plane::cli::run |
-2.05 KiB | 8 |
hashbrown |
+1.81 KiB | 8 |
saluki_core::runtime::shutdown |
+1.65 KiB | 3 |
agent_data_plane::config::DataPlaneConfiguration |
+1.62 KiB | 1 |
indexmap |
+1.61 KiB | 2 |
saluki_core::runtime::dedicated |
+972 B | 2 |
Detailed Symbol Changes
FILE SIZE VM SIZE
-------------- --------------
+3.6% +79.2Ki +3.1% +54.6Ki [973 Others]
[NEW] +63.0Ki [NEW] +62.8Ki _<agent_data_plane::internal::control_plane::PrivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::h3f7ccf32c810e351
[NEW] +21.3Ki [NEW] +21.1Ki _<agent_data_plane::internal::control_plane::UnprivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::hd6cb87566364a1bb
[NEW] +18.6Ki [NEW] +18.5Ki saluki_app::api::APIBuilder::serve::_{{closure}}::hab3e986182ae0c58
[NEW] +16.1Ki [NEW] +16.0Ki saluki_core::runtime::supervisor::WorkerState::add_worker::h515f709e3bc97a0b
[NEW] +15.1Ki [NEW] +15.0Ki _<core::pin::Pin<P> as core::future::future::Future>::poll::h638d45fb40fe1764
[NEW] +11.0Ki [NEW] +10.9Ki std::sys::backtrace::__rust_begin_short_backtrace::h20fc2d2820f7e70a
[NEW] +10.6Ki [NEW] +10.5Ki <saluki_core::data_model::event::Event as core::clone::Clone>::clone.9840
[NEW] +9.47Ki [NEW] +9.34Ki saluki_core::runtime::supervisor::Supervisor::run_inner::_{{closure}}::ha8c7c6df0c8c7703
[NEW] +6.80Ki [NEW] +6.66Ki saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::ha25f5f301fb536ee
[NEW] +6.24Ki [NEW] +6.10Ki saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::hcbbea4392b2e770f
[NEW] +5.67Ki [NEW] +5.53Ki saluki_core::topology::running::RunningTopology::shutdown_with_timeout::_{{closure}}::h55f16f5e28371ac8
[NEW] +5.52Ki [NEW] +5.38Ki <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.8291
[NEW] +5.13Ki [NEW] +5.00Ki <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.11054
[DEL] -5.52Ki [DEL] -5.38Ki <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.8525
[DEL] -8.28Ki [DEL] -8.19Ki std::sys::backtrace::__rust_begin_short_backtrace::h1af6f2970282067d
[DEL] -10.6Ki [DEL] -10.5Ki <saluki_core::data_model::event::Event as core::clone::Clone>::clone.10072
[DEL] -17.3Ki [DEL] -17.1Ki agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::_{{closure}}::h0a06453c7c0a3090
[DEL] -18.4Ki [DEL] -18.2Ki agent_data_plane::internal::initialize_and_launch_runtime::_{{closure}}::h24807518510e5f4d
[DEL] -18.7Ki [DEL] -18.6Ki saluki_app::api::APIBuilder::serve::_{{closure}}::h80a1a8822223b886
[DEL] -84.6Ki [DEL] -84.5Ki agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::hbc654c75007186eb
+0.4% +110Ki +0.4% +84.9Ki TOTAL
Regression Detector (Agent Data Plane)Regression Detector ResultsRun ID: 9c7673e5-1c1b-4cca-84b5-13baaab2d049 Baseline: 7d03456 Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_logs_5mb_memory | memory utilization | +4.54 | [+4.03, +5.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | -0.03 | [-0.16, +0.11] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -1.16 | [-6.07, +3.74] | 1 | (metrics) (profiles) (logs) |
Fine details of change detection per experiment
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | dsd_uds_1mb_3k_contexts_cpu | % cpu utilization | +8.47 | [-47.95, +64.88] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_memory | memory utilization | +4.54 | [+4.03, +5.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_cpu | % cpu utilization | +3.66 | [-1.78, +9.10] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_memory | memory utilization | +0.82 | [+0.63, +1.00] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_memory | memory utilization | +0.77 | [+0.60, +0.94] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_low | memory utilization | +0.55 | [+0.35, +0.75] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_medium | memory utilization | +0.54 | [+0.35, +0.74] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_memory | memory utilization | +0.24 | [+0.06, +0.41] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_ultraheavy | memory utilization | +0.09 | [-0.03, +0.21] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_throughput | ingress throughput | +0.01 | [-0.13, +0.14] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_throughput | ingress throughput | +0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.06, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_throughput | ingress throughput | -0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_throughput | ingress throughput | -0.00 | [-0.06, +0.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_throughput | ingress throughput | -0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_throughput | ingress throughput | -0.01 | [-0.04, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_memory | memory utilization | -0.02 | [-0.19, +0.16] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_throughput | ingress throughput | -0.02 | [-0.15, +0.11] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | -0.03 | [-0.16, +0.11] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_idle | memory utilization | -0.19 | [-0.22, -0.15] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_memory | memory utilization | -0.19 | [-0.37, -0.01] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_cpu | % cpu utilization | -0.37 | [-1.78, +1.03] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_memory | memory utilization | -0.39 | [-0.64, -0.14] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_heavy | memory utilization | -0.41 | [-0.55, -0.28] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_throughput | ingress throughput | -0.42 | [-0.54, -0.30] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_memory | memory utilization | -0.47 | [-0.72, -0.22] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_cpu | % cpu utilization | -0.71 | [-57.06, +55.64] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_memory | memory utilization | -0.79 | [-1.13, -0.46] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_cpu | % cpu utilization | -0.90 | [-6.59, +4.80] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -1.16 | [-6.07, +3.74] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_memory | memory utilization | -1.87 | [-2.09, -1.66] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_cpu | % cpu utilization | -2.12 | [-4.23, -0.00] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_cpu | % cpu utilization | -2.21 | [-4.38, -0.04] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_cpu | % cpu utilization | -2.56 | [-5.13, +0.01] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_cpu | % cpu utilization | -8.80 | [-36.21, +18.61] | 1 | (metrics) (profiles) (logs) |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | observed_value | links |
|---|---|---|---|---|---|
| ✅ | quality_gates_rss_dsd_heavy | memory_usage | 10/10 | 113.70MiB ≤ 140MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_low | memory_usage | 10/10 | 34.02MiB ≤ 50MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_medium | memory_usage | 10/10 | 53.43MiB ≤ 75MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_ultraheavy | memory_usage | 10/10 | 166.09MiB ≤ 200MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_idle | memory_usage | 10/10 | 21.14MiB ≤ 40MiB | (metrics) (profiles) (logs) |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
eedcbbb to
ecbf5da
Compare
871f057 to
ebbe088
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 6 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ebbe088 to
8db420f
Compare
ecbf5da to
0a7e461
Compare
8db420f to
9402e2f
Compare
0a7e461 to
70c54c9
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 5 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| impl Future for ProcessShutdown { | ||
| type Output = (); | ||
|
|
||
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| if let Some(mut shutdown_rx) = self.shutdown.take() { | ||
| match shutdown_rx.poll_unpin(cx) { | ||
| Poll::Pending => { | ||
| self.shutdown = Some(shutdown_rx); | ||
| Poll::Pending | ||
| } | ||
| Poll::Ready(()) => Poll::Ready(()), | ||
| } | ||
| } else { | ||
| Poll::Ready(()) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
tokio::sync::oneshot::Receiver<T> polls to Poll<Result<T, Canceled>>, not Poll<T>. As written, Poll::Ready(()) won’t compile (and it also doesn’t address the canceled case). Update the match to handle Poll::Ready(Ok(())) and treat Err(_) as shutdown (or otherwise decide how cancellation should be surfaced).
3853ae9 to
8bc1907
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 5 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
a2ba6c1 to
576ab86
Compare
8bc1907 to
4d639de
Compare
576ab86 to
aedd368
Compare
4d639de to
d6d4281
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 5 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d6d4281 to
c07a545
Compare
c07a545 to
0bf7bad
Compare
e7ac7df to
d5b72e8
Compare
d5b72e8 to
c695d2f
Compare
| // If the supervisor already exited (i.e., the select! above matched its branch), both the send | ||
| // and await resolve immediately — the send is a no-op and the future is already complete. | ||
| let _ = internal_shutdown_tx.send(()); | ||
| let _ = internal_supervisor_fut.await; |
There was a problem hiding this comment.
On line 217, we (potentially) repeatedly poll the future until completion
result = &mut internal_supervisor_fut => {
and here we poll it again, does this cause a panic?
There was a problem hiding this comment.
Mmmm, good call, yeah.
We should fuse this future so that it becomes a no-op when we poll it again after triggering internal shutdown. I'll make that change.
|
|
||
| // TODO: just make the API handler for `ComponentRegistry` cloneable so we can create/hold on to it in | ||
| // `UnprivilegedApiWorker` without having to create a scoped one here just to maintain the ownership necessary | ||
| let scoped_registry = component_registry.get_or_create("control-plane"); |
There was a problem hiding this comment.
Does the memory API now use control-plane instead of the root?
Heres a rough trace
let scoped_registry = component_registry.get_or_create("control-plane");
supervisor.add_worker(UnprivilegedApiWorker::new(
dp_config.clone(),
health_registry,
scoped_registry,
));
control_plane.rs line 117
let api_builder = APIBuilder::new()
.with_handler(self.health_registry.api_handler())
.with_handler(self.component_registry.api_handler());
registry.rs 217
pub fn api_handler(&self) -> MemoryAPIHandler {
MemoryAPIHandler::from_state(Arc::clone(&self.inner))
}
And I think innner gets set here
fn default() -> Self {
Self {
inner: Arc::new(Mutex::new(ComponentMetadata::from_full_name(None))),
}
}
There was a problem hiding this comment.
Nice analysis. Should this block the PR? Are there expected to be components running at the root and would they be unbounded?
There was a problem hiding this comment.
None of this changes any behavior with declaring bounds or reporting memory usage.
The comment is maybe not as clear as it could be, but essentially: we need a ComponentRegistry that we can own in UnprivilegedApiWorker such that we can create the necessary API handlers during initialization.
We explicitly avoided making ComponentRegistry implement Clone in order to avoid people doing that to satisfy things that need a registered component without actually taking the time to properly register said component... however, in this case, it becomes confusing because we're knowingly just holding on to ComponentRegistry for the API handler, not because we're trying to represent a registered component.
In future PRs, this particular code will go away because we'll have an alternate mechanism for registering API handlers dynamically that won't require keeping around a handle to the component registry.
webern
left a comment
There was a problem hiding this comment.
Nice trait. I'm impressed by @andrewqian2001datadog's feedback.
| /// same runtime that is used for running the process is used for initialization. The resulting future is expected | ||
| /// to complete as soon as reasonably possible after `process_shutdown` resolves. | ||
| /// | ||
| /// **Important:** The `process_shutdown` signal must be moved into the returned [`SupervisorFuture`] so the |
There was a problem hiding this comment.
I tried to think of a way to enforce this but didn't come up with one. 🤔
| @@ -1,8 +1,8 @@ | |||
| use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; | |||
There was a problem hiding this comment.
General note as I'm learning the codebase. I encounter a cognitive load around the concept of Worker vs Supervisor.
- ChildSpecification::Worker — means "leaf process" (as opposed to ::Supervisor).
- WorkerState, WorkerFuture, WorkerError, add_worker, etc. — these operate on any child, including nested supervisors. Calling add_worker to add a Supervisor is the tell. The doc says "A Supervisor can also be added as a worker" (found at the add_worker function)
supervisor.rs might benefit from a mod-level overview documentation.
|
|
||
| // TODO: just make the API handler for `ComponentRegistry` cloneable so we can create/hold on to it in | ||
| // `UnprivilegedApiWorker` without having to create a scoped one here just to maintain the ownership necessary | ||
| let scoped_registry = component_registry.get_or_create("control-plane"); |
There was a problem hiding this comment.
Nice analysis. Should this block the PR? Are there expected to be components running at the root and would they be unbounded?
| }, | ||
| // We've exceeded the shutdown timeout, so we need to abort the process. | ||
| _ = &mut shutdown_deadline => { | ||
| debug!(worker_id, "Shutdown timeout expired, forcefully aborting process."); |
There was a problem hiding this comment.
Just a suggestion since the Supervisable trait does not have a way to enforce the use of the ProcessShutdown signal.
| warn!(worker_id, "Shutdown timeout expired, forcefully aborting process. Possible missing ProcessShutdown handling."); |
…ane to supervisor scheme
c695d2f to
776ff1b
Compare

Summary
This PR starts the process of migrating tasks over to management via supervision tree, starting with the "internal" bits that are specific to the ADP binary: the control plane, and internal observability pipeline.
Overall, this change mostly revolves around creating new structs for different tasks types -- internal observability pipeline, health registry worker, (un)privileged) API worker, etc -- and then moving the relevant logic to that new type.. and eventually building and wiring up the supervision tree to run alongside the topology.
I don't love how verbose some of the construction feels, but I'm leaving the cleanup phase (and this PR contains a number of TODO comments along those lines) as a future step once things are integrated and shown to work properly/well.
Change Type
How did you test this PR?
Existing tests: unit, integration, and correctness.
References
AGTMETRICS-393