chore(agent-data-plane): update health registry worker to allow being restarted#1176
chore(agent-data-plane): update health registry worker to allow being restarted#1176
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a shutdown-aware, restartable health registry runner so the registry can be spawned again after the runner stops.
Changes:
- Update
HealthRegistry::spawnto accept ashutdownfuture and allow respawning after the runner ends. - Introduce
RunnerGuardto return the liveness response receiver back into registry state on shutdown. - Add tests for “duplicate spawn while running” and “respawn after shutdown”.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
lib/saluki-health/src/lib.rs
Outdated
| let mut inner = self.registry.lock().unwrap(); | ||
| inner.responses_rx = Some(rx); | ||
| debug!("Returned response receiver to registry state."); |
There was a problem hiding this comment.
Drop should avoid panicking. Using lock().unwrap() inside drop() can panic if the mutex is poisoned, and a panic during drop can lead to aborts (especially if another panic is already in flight). Prefer handling poisoning explicitly (e.g., recover the inner value) and avoid unwrap() in drop().
| let mut inner = self.registry.lock().unwrap(); | |
| inner.responses_rx = Some(rx); | |
| debug!("Returned response receiver to registry state."); | |
| match self.registry.lock() { | |
| Ok(mut inner) => { | |
| inner.responses_rx = Some(rx); | |
| debug!("Returned response receiver to registry state."); | |
| } | |
| Err(poisoned) => { | |
| let mut inner = poisoned.into_inner(); | |
| inner.responses_rx = Some(rx); | |
| debug!("Returned response receiver to registry state after mutex poisoning."); | |
| } | |
| } |
There was a problem hiding this comment.
Seems like a valid point, though I would use error!, and... I'm not sure what should be done to correct the system in this case. Perhaps an abort is the right thing in which case:
// The system is screwed if the lock is poisoned so we want this to panic in that case.
let mut inner = self.registry.lock().expect("A thread panic occurred and we cannot recover the system. \
See earlier logs to find the true source of this problem.");There was a problem hiding this comment.
Yeah, agreed. If we're here, things are bad enough that we should generally just abort.
I'll rework this more in another PR further up the stack, but for now, I'll just swap unwrap for expect to be explicit.
| // Take the response receiver out of the guard so we can use it in the select loop. | ||
| // It will be put back when the guard is dropped. | ||
| let mut responses_rx = self | ||
| .guard | ||
| .responses_rx | ||
| .take() | ||
| .expect("responses_rx should always be Some when Runner is created"); |
There was a problem hiding this comment.
Taking responses_rx out of RunnerGuard means the receiver will not be returned to the registry if the task is cancelled/aborted or unwinds before reaching the “put it back” code path. That breaks the documented goal of being restartable “after shutdown or an error”. Consider an RAII pattern that guarantees the receiver is put back even on early-exit (e.g., a small local guard whose Drop moves responses_rx back), or keep the receiver inside RunnerGuard and only borrow it mutably for recv().
There was a problem hiding this comment.
From Claude, which I agree with:
Looking at the actual usage in
control_plane.rs:62-74, the supervisor awaits theJoinHandle-- it doesn't abort it. The shutdown path is cooperative viaprocess_shutdown. Tokio task cancellation only happens if someone explicitly callshandle.abort()or drops theJoinHandle, neither of which happens here. And if a panic occurs inside the select loop, the mutex will also be poisoned, so the guard'sDropwouldn't help anyway (it would hit theunwrap/expectand double-panic).
Binary Size Analysis (Agent Data Plane)Target: 7d03456 (baseline) vs ae24fcd (comparison) diff
|
| Module | File Size | Symbols |
|---|---|---|
saluki_core::runtime::supervisor |
+67.73 KiB | 61 |
core |
+44.21 KiB | 244 |
agent_data_plane::internal::initialize_and_launch_runtime |
-18.37 KiB | 1 |
std |
-10.85 KiB | 49 |
anyhow |
+8.25 KiB | 28 |
agent_data_plane::internal::control_plane |
-7.80 KiB | 26 |
[sections] |
+7.54 KiB | 10 |
saluki_core::runtime::process |
+6.71 KiB | 6 |
saluki_core::topology::running |
+5.50 KiB | 2 |
saluki_app::metrics::collect_runtime_metrics |
-4.75 KiB | 1 |
agent_data_plane::internal::observability |
+4.70 KiB | 16 |
tokio |
-3.65 KiB | 105 |
saluki_core::runtime::restart |
+3.50 KiB | 7 |
saluki_health::Runner::run |
+2.92 KiB | 8 |
tracing_core |
+2.24 KiB | 16 |
agent_data_plane::cli::run |
-2.02 KiB | 8 |
hashbrown |
+1.81 KiB | 8 |
saluki_health::RunnerGuard |
+1.73 KiB | 3 |
saluki_core::runtime::shutdown |
+1.65 KiB | 3 |
agent_data_plane::config::DataPlaneConfiguration |
+1.62 KiB | 1 |
Detailed Symbol Changes
FILE SIZE VM SIZE
-------------- --------------
+3.7% +81.8Ki +3.2% +56.6Ki [1065 Others]
[NEW] +63.1Ki [NEW] +62.8Ki _<agent_data_plane::internal::control_plane::PrivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::h3f7ccf32c810e351
[NEW] +21.3Ki [NEW] +21.1Ki _<agent_data_plane::internal::control_plane::UnprivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::hd6cb87566364a1bb
[NEW] +18.6Ki [NEW] +18.5Ki saluki_app::api::APIBuilder::serve::_{{closure}}::hab3e986182ae0c58
[NEW] +16.1Ki [NEW] +16.0Ki saluki_core::runtime::supervisor::WorkerState::add_worker::h515f709e3bc97a0b
[NEW] +15.1Ki [NEW] +15.0Ki _<core::pin::Pin<P> as core::future::future::Future>::poll::h638d45fb40fe1764
[NEW] +11.6Ki [NEW] +11.5Ki saluki_health::Runner::run::_{{closure}}::hdd1e38dbfe1be93a
[NEW] +11.0Ki [NEW] +10.9Ki std::sys::backtrace::__rust_begin_short_backtrace::h20fc2d2820f7e70a
[NEW] +10.6Ki [NEW] +10.5Ki <saluki_core::data_model::event::Event as core::clone::Clone>::clone.9842
[NEW] +9.47Ki [NEW] +9.34Ki saluki_core::runtime::supervisor::Supervisor::run_inner::_{{closure}}::ha8c7c6df0c8c7703
[NEW] +6.80Ki [NEW] +6.66Ki saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::ha25f5f301fb536ee
[NEW] +6.24Ki [NEW] +6.10Ki saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::hcbbea4392b2e770f
[NEW] +5.67Ki [NEW] +5.53Ki saluki_core::topology::running::RunningTopology::shutdown_with_timeout::_{{closure}}::h55f16f5e28371ac8
[NEW] +5.52Ki [NEW] +5.38Ki <hickory_proto::rr::record_data::RData as core::clone::Clone>::clone.8293
[DEL] -8.28Ki [DEL] -8.19Ki std::sys::backtrace::__rust_begin_short_backtrace::h1af6f2970282067d
[DEL] -9.20Ki [DEL] -9.10Ki saluki_health::Runner::run::_{{closure}}::hc683b62b20c4d3c8
[DEL] -10.6Ki [DEL] -10.5Ki <saluki_core::data_model::event::Event as core::clone::Clone>::clone.10072
[DEL] -17.3Ki [DEL] -17.1Ki agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::_{{closure}}::h0a06453c7c0a3090
[DEL] -18.4Ki [DEL] -18.2Ki agent_data_plane::internal::initialize_and_launch_runtime::_{{closure}}::h24807518510e5f4d
[DEL] -18.7Ki [DEL] -18.6Ki saluki_app::api::APIBuilder::serve::_{{closure}}::h80a1a8822223b886
[DEL] -84.6Ki [DEL] -84.5Ki agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::hbc654c75007186eb
+0.4% +115Ki +0.4% +89.7Ki TOTAL
Regression Detector (Agent Data Plane)Regression Detector ResultsRun ID: 68e6c2cb-864f-406d-9209-3655d62ea7f5 Baseline: 7d03456 Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | +0.19 | [-4.74, +5.12] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | +0.01 | [-0.12, +0.14] | 1 | (metrics) (profiles) (logs) |
| ✅ | otlp_ingest_logs_5mb_memory | memory utilization | -8.01 | [-8.54, -7.47] | 1 | (metrics) (profiles) (logs) |
Fine details of change detection per experiment
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_traces_5mb_cpu | % cpu utilization | +2.52 | [+0.32, +4.71] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_memory | memory utilization | +2.26 | [+2.02, +2.51] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_idle | memory utilization | +1.05 | [+1.02, +1.09] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_throughput | ingress throughput | +0.75 | [+0.62, +0.88] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_cpu | % cpu utilization | +0.52 | [-0.96, +2.00] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_cpu | % cpu utilization | +0.49 | [-30.66, +31.65] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_cpu | % cpu utilization | +0.47 | [-1.85, +2.80] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_low | memory utilization | +0.45 | [+0.25, +0.65] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_memory | memory utilization | +0.45 | [+0.26, +0.64] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_memory | memory utilization | +0.38 | [+0.13, +0.63] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_medium | memory utilization | +0.24 | [+0.04, +0.43] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | +0.19 | [-4.74, +5.12] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_memory | memory utilization | +0.17 | [+0.00, +0.35] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_cpu | % cpu utilization | +0.14 | [-5.82, +6.10] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_memory | memory utilization | +0.14 | [-0.03, +0.31] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_memory | memory utilization | +0.01 | [-0.23, +0.26] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_throughput | ingress throughput | +0.01 | [-0.13, +0.16] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | +0.01 | [-0.12, +0.14] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.04, +0.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.06, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_throughput | ingress throughput | +0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_throughput | ingress throughput | -0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_throughput | ingress throughput | -0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_throughput | ingress throughput | -0.01 | [-0.14, +0.13] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_throughput | ingress throughput | -0.01 | [-0.06, +0.04] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_ultraheavy | memory utilization | -0.04 | [-0.16, +0.09] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_cpu | % cpu utilization | -0.23 | [-2.42, +1.97] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_heavy | memory utilization | -0.27 | [-0.42, -0.12] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_memory | memory utilization | -0.32 | [-0.49, -0.15] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_memory | memory utilization | -0.35 | [-0.53, -0.18] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_memory | memory utilization | -0.55 | [-0.88, -0.21] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_cpu | % cpu utilization | -3.44 | [-59.92, +53.04] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_cpu | % cpu utilization | -3.77 | [-11.09, +3.55] | 1 | (metrics) (profiles) (logs) |
| ✅ | otlp_ingest_logs_5mb_memory | memory utilization | -8.01 | [-8.54, -7.47] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_cpu | % cpu utilization | -10.75 | [-61.25, +39.76] | 1 | (metrics) (profiles) (logs) |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | observed_value | links |
|---|---|---|---|---|---|
| ✅ | quality_gates_rss_dsd_heavy | memory_usage | 10/10 | 114.10MiB ≤ 140MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_low | memory_usage | 10/10 | 34.04MiB ≤ 50MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_medium | memory_usage | 10/10 | 53.98MiB ≤ 75MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_ultraheavy | memory_usage | 10/10 | 168.32MiB ≤ 200MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_idle | memory_usage | 10/10 | 21.23MiB ≤ 40MiB | (metrics) (profiles) (logs) |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
70c54c9 to
b72ece8
Compare
a0e1d19 to
7ca8cbc
Compare
7ca8cbc to
7c0a3b6
Compare
b72ece8 to
944baa2
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
7c0a3b6 to
a4f5a2b
Compare
3853ae9 to
8bc1907
Compare
a4f5a2b to
35cd34f
Compare
4c2d4bf to
b6e0c79
Compare
b6e0c79 to
1503a76
Compare
c07a545 to
0bf7bad
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
1503a76 to
1074624
Compare
lib/saluki-health/src/lib.rs
Outdated
| let mut inner = self.registry.lock().unwrap(); | ||
| inner.responses_rx = Some(rx); | ||
| debug!("Returned response receiver to registry state."); |
There was a problem hiding this comment.
Seems like a valid point, though I would use error!, and... I'm not sure what should be done to correct the system in this case. Perhaps an abort is the right thing in which case:
// The system is screwed if the lock is poisoned so we want this to panic in that case.
let mut inner = self.registry.lock().expect("A thread panic occurred and we cannot recover the system. \
See earlier logs to find the true source of this problem.");e7ac7df to
d5b72e8
Compare
1074624 to
6f8df24
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// finishes and this guard is dropped (for example, after a shutdown or task cancellation), the | ||
| /// receiver is returned to the registry state so that a subsequent call to `spawn()` can succeed. |
There was a problem hiding this comment.
RunnerGuard’s docstring claims the receiver is returned after “task cancellation”, but the current implementation takes responses_rx out of the guard during run(). If the task is aborted/cancelled before the end-of-loop assignment that puts it back, responses_rx will be dropped and not returned to RegistryState, so the doc is inaccurate. Either adjust the docs to only promise restartability after graceful shutdown/completion, or change the implementation to ensure the receiver is always restored even on cancellation.
| /// finishes and this guard is dropped (for example, after a shutdown or task cancellation), the | |
| /// receiver is returned to the registry state so that a subsequent call to `spawn()` can succeed. | |
| /// finishes cleanly and this guard is dropped (for example, after a coordinated shutdown or | |
| /// other graceful completion), the receiver is returned to the registry state so that a | |
| /// subsequent call to `spawn()` can succeed. |
d5b72e8 to
c695d2f
Compare
6f8df24 to
ae24fcd
Compare
c695d2f to
776ff1b
Compare
ae24fcd to
a6bb0f6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 2 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
a6bb0f6 to
66fb4b2
Compare

Summary
This PR slightly refactors
HealthWorkerand the underlying health registry runner code to support being able to restart the health registry worker.Prior to this PR, spawning the health registry worker would fail un subsequent attempts since the receiver used to registry new components into the registry was already consumed by the first call to spawn the worker. We've simply added the ability to return the receiver and reset the state such that subsequent attempts to spawn the worker can take the receiver. We're still limited by only being able to have a single health registry worker at a time, but at least we can now cleanly recover from it being restarted.
Change Type
How did you test this PR?
Existing and new unit tests.
References
AGTMETRICS-393