diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 6ae8e545a..4841d82c3 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -1690,6 +1690,18 @@ impl Actor for SenderAccount { let _ = UNAGGREGATED_FEES .remove_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]); + let version = match state.sender_type { + crate::agent::sender_accounts_manager::SenderType::Legacy => TAP_V1, + crate::agent::sender_accounts_manager::SenderType::Horizon => TAP_V2, + }; + let _ = UNAGGREGATED_FEES_BY_VERSION.remove_label_values(&[ + &state.sender.to_string(), + &allocation_id.to_string(), + version, + ]); + let _ = INVALID_RECEIPT_FEES + .remove_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]); + // Check for deny conditions - look up correct allocation variant from state let allocation_enum = state .allocation_ids @@ -1770,6 +1782,15 @@ impl Actor for SenderAccount { if let Some(handle) = state.reconciliation_handle.take() { handle.abort(); } + + // Clean up sender-level metrics to avoid stale gauge values + let sender_label = state.sender.to_string(); + let _ = SENDER_DENIED.remove_label_values(&[&sender_label]); + let _ = ESCROW_BALANCE.remove_label_values(&[&sender_label]); + let _ = SENDER_FEE_TRACKER.remove_label_values(&[&sender_label]); + let _ = MAX_FEE_PER_SENDER.remove_label_values(&[&sender_label]); + let _ = RAV_REQUEST_TRIGGER_VALUE.remove_label_values(&[&sender_label]); + Ok(()) } } @@ -1835,7 +1856,11 @@ pub mod tests { Mock, MockServer, ResponseTemplate, }; - use super::{RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS}; + use super::{ + RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS, ESCROW_BALANCE, + INVALID_RECEIPT_FEES, MAX_FEE_PER_SENDER, RAV_REQUEST_TRIGGER_VALUE, SENDER_DENIED, + SENDER_FEE_TRACKER, TAP_V1, UNAGGREGATED_FEES_BY_VERSION, + }; use crate::{ agent::{ sender_account::ReceiptFees, sender_accounts_manager::AllocationId, @@ -3100,4 +3125,215 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } + + /// Test that UNAGGREGATED_FEES_BY_VERSION metric is cleaned up when allocation stops + /// + /// This test verifies the fix for stale gauge metrics that were introduced in the + /// Horizon V2 TAP support commit. Previously, UNAGGREGATED_FEES_BY_VERSION was set + /// but never cleaned up when allocations closed, leaving stale values in Prometheus. + #[tokio::test] + async fn test_unaggregated_fees_by_version_cleanup_on_allocation_stop() { + // Use a unique allocation ID for this test to avoid interference from other tests + // (prometheus metrics are global/shared) + let unique_allocation = test_assets::ALLOCATION_ID_1; + + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let (sender_account, mut msg_receiver, prefix, _, _) = + create_sender_account().pgpool(pgpool).call().await; + + // Create a mock sender allocation and link it to the sender account + let (mock_sender_allocation, _, next_unaggregated_fees) = + MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone()); + + let name = format!("{}:{}:{}", prefix, SENDER.1, unique_allocation); + let (allocation, _) = MockSenderAllocation::spawn_linked( + Some(name), + mock_sender_allocation, + (), + sender_account.get_cell(), + ) + .await + .unwrap(); + + // Send unaggregated fees to trigger metric set + next_unaggregated_fees.send(1000).unwrap(); + + // Directly set the metric to simulate the value being recorded + // (We do this because the actual message flow is complex and depends on + // allocation state being properly set up) + let sender_label = SENDER.1.to_string(); + let allocation_label = unique_allocation.to_string(); + UNAGGREGATED_FEES_BY_VERSION + .with_label_values(&[&sender_label, &allocation_label, TAP_V1]) + .set(1000.0); + + // Verify metric was set + let metric_value = UNAGGREGATED_FEES_BY_VERSION + .get_metric_with_label_values(&[&sender_label, &allocation_label, TAP_V1]) + .expect("Metric should exist after being set") + .get(); + assert_eq!( + metric_value, 1000.0, + "Metric should have value 1000.0 after set, got {metric_value}" + ); + + // Stop the allocation - this should trigger ActorTerminated supervision event + // which in turn should clean up the metric + allocation.stop_and_wait(None, None).await.unwrap(); + + // Give time for supervision event to be processed + flush_messages(&mut msg_receiver).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify metric was cleaned up. After remove_label_values, get_metric_with_label_values + // creates a NEW metric with default value 0. So the value changing from 1000 to 0 + // proves the old metric was removed. + // See: https://docs.rs/prometheus/latest/prometheus/core/struct.MetricVec.html + let metric_value_after = UNAGGREGATED_FEES_BY_VERSION + .with_label_values(&[&sender_label, &allocation_label, TAP_V1]) + .get(); + assert_eq!( + metric_value_after, 0.0, + "Metric should be 0 after removal (old value was 1000), got {metric_value_after}" + ); + + sender_account.stop_and_wait(None, None).await.unwrap(); + } + + /// Test that INVALID_RECEIPT_FEES metric is cleaned up when allocation stops + #[tokio::test] + async fn test_invalid_receipt_fees_cleanup_on_allocation_stop() { + let unique_allocation = test_assets::ALLOCATION_ID_1; + + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let (sender_account, mut msg_receiver, prefix, _, _) = + create_sender_account().pgpool(pgpool).call().await; + + let (mock_sender_allocation, _, next_unaggregated_fees) = + MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone()); + + let name = format!("{}:{}:{}", prefix, SENDER.1, unique_allocation); + let (allocation, _) = MockSenderAllocation::spawn_linked( + Some(name), + mock_sender_allocation, + (), + sender_account.get_cell(), + ) + .await + .unwrap(); + + next_unaggregated_fees.send(1000).unwrap(); + + let sender_label = SENDER.1.to_string(); + let allocation_label = unique_allocation.to_string(); + INVALID_RECEIPT_FEES + .with_label_values(&[&sender_label, &allocation_label]) + .set(500.0); + + let metric_value = INVALID_RECEIPT_FEES + .get_metric_with_label_values(&[&sender_label, &allocation_label]) + .expect("Metric should exist after being set") + .get(); + assert_eq!( + metric_value, 500.0, + "Metric should have value 500.0 after set, got {metric_value}" + ); + + allocation.stop_and_wait(None, None).await.unwrap(); + + flush_messages(&mut msg_receiver).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + let metric_value_after = INVALID_RECEIPT_FEES + .with_label_values(&[&sender_label, &allocation_label]) + .get(); + assert_eq!( + metric_value_after, 0.0, + "Metric should be 0 after removal (old value was 500), got {metric_value_after}" + ); + + sender_account.stop_and_wait(None, None).await.unwrap(); + } + + /// Test that sender-level metrics are cleaned up when SenderAccount stops + #[tokio::test] + async fn test_sender_level_gauges_cleanup_on_post_stop() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let (sender_account, mut msg_receiver, _, _, _) = + create_sender_account().pgpool(pgpool).call().await; + + flush_messages(&mut msg_receiver).await; + + let sender_label = SENDER.1.to_string(); + + // Set all sender-level metrics to non-zero values + SENDER_DENIED.with_label_values(&[&sender_label]).set(1); + ESCROW_BALANCE + .with_label_values(&[&sender_label]) + .set(1000.0); + SENDER_FEE_TRACKER + .with_label_values(&[&sender_label]) + .set(500.0); + MAX_FEE_PER_SENDER + .with_label_values(&[&sender_label]) + .set(2000.0); + RAV_REQUEST_TRIGGER_VALUE + .with_label_values(&[&sender_label]) + .set(100.0); + + // Verify metrics were set + assert_eq!( + SENDER_DENIED + .get_metric_with_label_values(&[&sender_label]) + .unwrap() + .get(), + 1 + ); + assert_eq!( + ESCROW_BALANCE + .get_metric_with_label_values(&[&sender_label]) + .unwrap() + .get(), + 1000.0 + ); + + // Stop sender account - this triggers post_stop which should clean up metrics + sender_account.stop_and_wait(None, None).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify all sender-level metrics were cleaned up + assert_eq!( + SENDER_DENIED.with_label_values(&[&sender_label]).get(), + 0, + "SENDER_DENIED should be 0 after cleanup" + ); + assert_eq!( + ESCROW_BALANCE.with_label_values(&[&sender_label]).get(), + 0.0, + "ESCROW_BALANCE should be 0 after cleanup" + ); + assert_eq!( + SENDER_FEE_TRACKER.with_label_values(&[&sender_label]).get(), + 0.0, + "SENDER_FEE_TRACKER should be 0 after cleanup" + ); + assert_eq!( + MAX_FEE_PER_SENDER.with_label_values(&[&sender_label]).get(), + 0.0, + "MAX_FEE_PER_SENDER should be 0 after cleanup" + ); + assert_eq!( + RAV_REQUEST_TRIGGER_VALUE + .with_label_values(&[&sender_label]) + .get(), + 0.0, + "RAV_REQUEST_TRIGGER_VALUE should be 0 after cleanup" + ); + } }