From 3946acd27974aee1c7bef444e4f59c47a23ef2e6 Mon Sep 17 00:00:00 2001 From: Joseph Livesey Date: Sat, 6 Dec 2025 08:38:36 -0500 Subject: [PATCH 1/3] fix(tap-agent): clean up UNAGGREGATED_FEES_BY_VERSION gauge on allocation stop The UNAGGREGATED_FEES_BY_VERSION gauge was added in the Horizon V2 TAP support commit (8c8dd2b) but was missing cleanup when SenderAllocation actors stop. This caused stale gauge values to persist for closed allocations, affecting Prometheus metrics accuracy. Add remove_label_values call in ActorTerminated handler to clean up the metric with the correct version label (v1/v2) based on sender_type. --- crates/tap-agent/src/agent/sender_account.rs | 91 +++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 6ae8e545a..6e22c8995 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -1690,6 +1690,16 @@ impl Actor for SenderAccount { let _ = UNAGGREGATED_FEES .remove_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]); + let version = match state.sender_type { + crate::agent::sender_accounts_manager::SenderType::Legacy => TAP_V1, + crate::agent::sender_accounts_manager::SenderType::Horizon => TAP_V2, + }; + let _ = UNAGGREGATED_FEES_BY_VERSION.remove_label_values(&[ + &state.sender.to_string(), + &allocation_id.to_string(), + version, + ]); + // Check for deny conditions - look up correct allocation variant from state let allocation_enum = state .allocation_ids @@ -1835,7 +1845,10 @@ pub mod tests { Mock, MockServer, ResponseTemplate, }; - use super::{RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS}; + use super::{ + RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS, TAP_V1, + UNAGGREGATED_FEES_BY_VERSION, + }; use crate::{ agent::{ sender_account::ReceiptFees, sender_accounts_manager::AllocationId, @@ -3100,4 +3113,80 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } + + /// Test that UNAGGREGATED_FEES_BY_VERSION metric is cleaned up when allocation stops + /// + /// This test verifies the fix for stale gauge metrics that were introduced in the + /// Horizon V2 TAP support commit. Previously, UNAGGREGATED_FEES_BY_VERSION was set + /// but never cleaned up when allocations closed, leaving stale values in Prometheus. + #[tokio::test] + async fn test_unaggregated_fees_by_version_cleanup_on_allocation_stop() { + // Use a unique allocation ID for this test to avoid interference from other tests + // (prometheus metrics are global/shared) + let unique_allocation = test_assets::ALLOCATION_ID_1; + + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let (sender_account, mut msg_receiver, prefix, _, _) = + create_sender_account().pgpool(pgpool).call().await; + + // Create a mock sender allocation and link it to the sender account + let (mock_sender_allocation, _, next_unaggregated_fees) = + MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone()); + + let name = format!("{}:{}:{}", prefix, SENDER.1, unique_allocation); + let (allocation, _) = MockSenderAllocation::spawn_linked( + Some(name), + mock_sender_allocation, + (), + sender_account.get_cell(), + ) + .await + .unwrap(); + + // Send unaggregated fees to trigger metric set + next_unaggregated_fees.send(1000).unwrap(); + + // Directly set the metric to simulate the value being recorded + // (We do this because the actual message flow is complex and depends on + // allocation state being properly set up) + let sender_label = SENDER.1.to_string(); + let allocation_label = unique_allocation.to_string(); + UNAGGREGATED_FEES_BY_VERSION + .with_label_values(&[&sender_label, &allocation_label, TAP_V1]) + .set(1000.0); + + // Verify metric was set + let metric_value = UNAGGREGATED_FEES_BY_VERSION + .get_metric_with_label_values(&[&sender_label, &allocation_label, TAP_V1]) + .expect("Metric should exist after being set") + .get(); + assert_eq!( + metric_value, 1000.0, + "Metric should have value 1000.0 after set, got {metric_value}" + ); + + // Stop the allocation - this should trigger ActorTerminated supervision event + // which in turn should clean up the metric + allocation.stop_and_wait(None, None).await.unwrap(); + + // Give time for supervision event to be processed + flush_messages(&mut msg_receiver).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify metric was cleaned up. After remove_label_values, get_metric_with_label_values + // creates a NEW metric with default value 0. So the value changing from 1000 to 0 + // proves the old metric was removed. + // See: https://docs.rs/prometheus/latest/prometheus/core/struct.MetricVec.html + let metric_value_after = UNAGGREGATED_FEES_BY_VERSION + .with_label_values(&[&sender_label, &allocation_label, TAP_V1]) + .get(); + assert_eq!( + metric_value_after, 0.0, + "Metric should be 0 after removal (old value was 1000), got {metric_value_after}" + ); + + sender_account.stop_and_wait(None, None).await.unwrap(); + } } From 8cb17e2fcbd6aaf0a34a3e373da19821966e4a73 Mon Sep 17 00:00:00 2001 From: Joseph Livesey Date: Sat, 6 Dec 2025 08:50:19 -0500 Subject: [PATCH 2/3] fix(tap-agent): clean up INVALID_RECEIPT_FEES gauge on allocation stop --- crates/tap-agent/src/agent/sender_account.rs | 63 +++++++++++++++++++- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 6e22c8995..bb5fe5ddb 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -1699,6 +1699,8 @@ impl Actor for SenderAccount { &allocation_id.to_string(), version, ]); + let _ = INVALID_RECEIPT_FEES + .remove_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]); // Check for deny conditions - look up correct allocation variant from state let allocation_enum = state @@ -1846,8 +1848,8 @@ pub mod tests { }; use super::{ - RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS, TAP_V1, - UNAGGREGATED_FEES_BY_VERSION, + RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS, INVALID_RECEIPT_FEES, + TAP_V1, UNAGGREGATED_FEES_BY_VERSION, }; use crate::{ agent::{ @@ -3189,4 +3191,61 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } + + /// Test that INVALID_RECEIPT_FEES metric is cleaned up when allocation stops + #[tokio::test] + async fn test_invalid_receipt_fees_cleanup_on_allocation_stop() { + let unique_allocation = test_assets::ALLOCATION_ID_1; + + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let (sender_account, mut msg_receiver, prefix, _, _) = + create_sender_account().pgpool(pgpool).call().await; + + let (mock_sender_allocation, _, next_unaggregated_fees) = + MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone()); + + let name = format!("{}:{}:{}", prefix, SENDER.1, unique_allocation); + let (allocation, _) = MockSenderAllocation::spawn_linked( + Some(name), + mock_sender_allocation, + (), + sender_account.get_cell(), + ) + .await + .unwrap(); + + next_unaggregated_fees.send(1000).unwrap(); + + let sender_label = SENDER.1.to_string(); + let allocation_label = unique_allocation.to_string(); + INVALID_RECEIPT_FEES + .with_label_values(&[&sender_label, &allocation_label]) + .set(500.0); + + let metric_value = INVALID_RECEIPT_FEES + .get_metric_with_label_values(&[&sender_label, &allocation_label]) + .expect("Metric should exist after being set") + .get(); + assert_eq!( + metric_value, 500.0, + "Metric should have value 500.0 after set, got {metric_value}" + ); + + allocation.stop_and_wait(None, None).await.unwrap(); + + flush_messages(&mut msg_receiver).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + let metric_value_after = INVALID_RECEIPT_FEES + .with_label_values(&[&sender_label, &allocation_label]) + .get(); + assert_eq!( + metric_value_after, 0.0, + "Metric should be 0 after removal (old value was 500), got {metric_value_after}" + ); + + sender_account.stop_and_wait(None, None).await.unwrap(); + } } From ee50ac86cfa0eae2960820cf94ad93e3571114c2 Mon Sep 17 00:00:00 2001 From: Joseph Livesey Date: Sat, 6 Dec 2025 08:55:54 -0500 Subject: [PATCH 3/3] fix(tap-agent): clean up sender-level gauges in SenderAccount post_stop --- crates/tap-agent/src/agent/sender_account.rs | 92 +++++++++++++++++++- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index bb5fe5ddb..4841d82c3 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -1782,6 +1782,15 @@ impl Actor for SenderAccount { if let Some(handle) = state.reconciliation_handle.take() { handle.abort(); } + + // Clean up sender-level metrics to avoid stale gauge values + let sender_label = state.sender.to_string(); + let _ = SENDER_DENIED.remove_label_values(&[&sender_label]); + let _ = ESCROW_BALANCE.remove_label_values(&[&sender_label]); + let _ = SENDER_FEE_TRACKER.remove_label_values(&[&sender_label]); + let _ = MAX_FEE_PER_SENDER.remove_label_values(&[&sender_label]); + let _ = RAV_REQUEST_TRIGGER_VALUE.remove_label_values(&[&sender_label]); + Ok(()) } } @@ -1848,8 +1857,9 @@ pub mod tests { }; use super::{ - RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS, INVALID_RECEIPT_FEES, - TAP_V1, UNAGGREGATED_FEES_BY_VERSION, + RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS, ESCROW_BALANCE, + INVALID_RECEIPT_FEES, MAX_FEE_PER_SENDER, RAV_REQUEST_TRIGGER_VALUE, SENDER_DENIED, + SENDER_FEE_TRACKER, TAP_V1, UNAGGREGATED_FEES_BY_VERSION, }; use crate::{ agent::{ @@ -3248,4 +3258,82 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } + + /// Test that sender-level metrics are cleaned up when SenderAccount stops + #[tokio::test] + async fn test_sender_level_gauges_cleanup_on_post_stop() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let (sender_account, mut msg_receiver, _, _, _) = + create_sender_account().pgpool(pgpool).call().await; + + flush_messages(&mut msg_receiver).await; + + let sender_label = SENDER.1.to_string(); + + // Set all sender-level metrics to non-zero values + SENDER_DENIED.with_label_values(&[&sender_label]).set(1); + ESCROW_BALANCE + .with_label_values(&[&sender_label]) + .set(1000.0); + SENDER_FEE_TRACKER + .with_label_values(&[&sender_label]) + .set(500.0); + MAX_FEE_PER_SENDER + .with_label_values(&[&sender_label]) + .set(2000.0); + RAV_REQUEST_TRIGGER_VALUE + .with_label_values(&[&sender_label]) + .set(100.0); + + // Verify metrics were set + assert_eq!( + SENDER_DENIED + .get_metric_with_label_values(&[&sender_label]) + .unwrap() + .get(), + 1 + ); + assert_eq!( + ESCROW_BALANCE + .get_metric_with_label_values(&[&sender_label]) + .unwrap() + .get(), + 1000.0 + ); + + // Stop sender account - this triggers post_stop which should clean up metrics + sender_account.stop_and_wait(None, None).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify all sender-level metrics were cleaned up + assert_eq!( + SENDER_DENIED.with_label_values(&[&sender_label]).get(), + 0, + "SENDER_DENIED should be 0 after cleanup" + ); + assert_eq!( + ESCROW_BALANCE.with_label_values(&[&sender_label]).get(), + 0.0, + "ESCROW_BALANCE should be 0 after cleanup" + ); + assert_eq!( + SENDER_FEE_TRACKER.with_label_values(&[&sender_label]).get(), + 0.0, + "SENDER_FEE_TRACKER should be 0 after cleanup" + ); + assert_eq!( + MAX_FEE_PER_SENDER.with_label_values(&[&sender_label]).get(), + 0.0, + "MAX_FEE_PER_SENDER should be 0 after cleanup" + ); + assert_eq!( + RAV_REQUEST_TRIGGER_VALUE + .with_label_values(&[&sender_label]) + .get(), + 0.0, + "RAV_REQUEST_TRIGGER_VALUE should be 0 after cleanup" + ); + } }