Skip to content

ref(eap-outcomes): add category_metrics#7847

Merged
MeredithAnya merged 4 commits intomasterfrom
meredith/3-27-26
Mar 30, 2026
Merged

ref(eap-outcomes): add category_metrics#7847
MeredithAnya merged 4 commits intomasterfrom
meredith/3-27-26

Conversation

@MeredithAnya
Copy link
Copy Markdown
Member

Moves the following metrics to be CategoryMetrics on the batch itself:

  • accepted_outcomes.bucket_count (exists)
  • accepted_outcomes.total_quantity. (exists)
  • accepted_outcomes.messages_seen (new)

Also adds accepted_outcomes.got_backpressure metric since right now we don't have any around backpressure right now

@MeredithAnya MeredithAnya requested a review from a team as a code owner March 27, 2026 22:07
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Category metrics lost on backpressure during flush
    • The aggregator now preserves category metrics when a flush is backpressured and emits them only after the carried-over message is eventually accepted.

Create PR

Or push these changes by commenting:

@cursor push 0ae1fae826
Preview (0ae1fae826)
diff --git a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
--- a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
+++ b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
@@ -12,7 +12,7 @@
 use sentry_arroyo::utils::timing::Deadline;
 use sentry_protos::snuba::v1::TraceItem;
 
-use crate::types::{AggregatedOutcomesBatch, BucketKey};
+use crate::types::{AggregatedOutcomesBatch, BucketKey, CategoryMetrics};
 
 #[derive(Debug, Default)]
 struct TraceItemOutcome {
@@ -56,6 +56,8 @@
     latest_offsets: HashMap<Partition, u64>,
     /// A message rejected by the next step, to be retried on the next poll.
     message_carried_over: Option<Message<AggregatedOutcomesBatch>>,
+    /// Category metrics for a carried-over message that has not been accepted yet.
+    carried_over_category_metrics: Option<BTreeMap<u32, CategoryMetrics>>,
     /// Commit request carried over from a poll where we had a message to retry.
     commit_request_carried_over: Option<CommitRequest>,
     /// Temporary option to change the timestamp source from
@@ -80,11 +82,21 @@
             batch: AggregatedOutcomesBatch::new(bucket_interval),
             latest_offsets: HashMap::new(),
             message_carried_over: None,
+            carried_over_category_metrics: None,
             commit_request_carried_over: None,
             use_item_timestamp,
         }
     }
 
+    fn emit_category_metrics(category_metrics: BTreeMap<u32, CategoryMetrics>) {
+        for (category, m) in category_metrics {
+            let cat_str = category.to_string();
+            counter!("accepted_outcomes.messages_seen", m.messages_seen, "data_category" => cat_str.as_str());
+            counter!("accepted_outcomes.total_quantity", m.total_quantity, "data_category" => cat_str.as_str());
+            counter!("accepted_outcomes.bucket_count", m.bucket_count, "data_category" => cat_str.as_str());
+        }
+    }
+
     fn flush(&mut self) -> Result<(), StrategyError>
     where
         TNext: ProcessingStrategy<AggregatedOutcomesBatch>,
@@ -111,16 +123,12 @@
                 self.last_flush = now;
 
                 tracing::info!("flushed {} buckets after {} seconds", num_buckets, seconds);
-                for (category, m) in category_metrics {
-                    let cat_str = category.to_string();
-                    counter!("accepted_outcomes.messages_seen", m.messages_seen, "data_category" => cat_str.as_str());
-                    counter!("accepted_outcomes.total_quantity", m.total_quantity, "data_category" => cat_str.as_str());
-                    counter!("accepted_outcomes.bucket_count", m.bucket_count, "data_category" => cat_str.as_str());
-                }
+                Self::emit_category_metrics(category_metrics);
                 Ok(())
             }
             Err(SubmitError::MessageRejected(rejected)) => {
                 self.message_carried_over = Some(rejected.message);
+                self.carried_over_category_metrics = Some(category_metrics);
                 Ok(())
             }
             Err(SubmitError::InvalidMessage(e)) => Err(StrategyError::InvalidMessage(e)),
@@ -138,7 +146,11 @@
 
         if let Some(msg) = self.message_carried_over.take() {
             match self.next_step.submit(msg) {
-                Ok(()) => {}
+                Ok(()) => {
+                    if let Some(category_metrics) = self.carried_over_category_metrics.take() {
+                        Self::emit_category_metrics(category_metrics);
+                    }
+                }
                 Err(SubmitError::MessageRejected(MessageRejected {
                     message: carried_message,
                 })) => {
@@ -552,6 +564,68 @@
     }
 
     #[test]
+    fn backpressure_retains_category_metrics_until_retry_succeeds() {
+        struct RejectOnce {
+            rejected: bool,
+        }
+        impl ProcessingStrategy<AggregatedOutcomesBatch> for RejectOnce {
+            fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
+                Ok(None)
+            }
+            fn submit(
+                &mut self,
+                message: Message<AggregatedOutcomesBatch>,
+            ) -> Result<(), SubmitError<AggregatedOutcomesBatch>> {
+                if !self.rejected {
+                    self.rejected = true;
+                    Err(SubmitError::MessageRejected(MessageRejected { message }))
+                } else {
+                    Ok(())
+                }
+            }
+            fn terminate(&mut self) {}
+            fn join(
+                &mut self,
+                _: Option<Duration>,
+            ) -> Result<Option<CommitRequest>, StrategyError> {
+                Ok(None)
+            }
+        }
+
+        let mut aggregator = OutcomesAggregator::new(
+            RejectOnce { rejected: false },
+            1, // flush after 1 bucket
+            Duration::from_millis(30_000),
+            60,
+            false,
+        );
+
+        let partition = Partition::new(Topic::new("test"), 0);
+        aggregator
+            .submit(Message::new_broker_message(
+                make_payload(6_000, 1, 2, 3, &[(4, 5)]),
+                partition,
+                0,
+                Utc::now(),
+            ))
+            .unwrap();
+
+        // poll triggers flush; next_step rejects, so category metrics must be retained.
+        aggregator.poll().unwrap();
+        assert!(aggregator.message_carried_over.is_some());
+        let metrics = aggregator.carried_over_category_metrics.as_ref().unwrap();
+        let category_4 = metrics.get(&4).unwrap();
+        assert_eq!(category_4.messages_seen, 1);
+        assert_eq!(category_4.total_quantity, 5);
+        assert_eq!(category_4.bucket_count, 1);
+
+        // Next poll retries and succeeds, so retained metrics are consumed.
+        aggregator.poll().unwrap();
+        assert!(aggregator.message_carried_over.is_none());
+        assert!(aggregator.carried_over_category_metrics.is_none());
+    }
+
+    #[test]
     fn join_honors_timeout_when_message_stays_carried_over() {
         struct AlwaysReject;
         impl ProcessingStrategy<AggregatedOutcomesBatch> for AlwaysReject {

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@MeredithAnya MeredithAnya merged commit 81ef015 into master Mar 30, 2026
46 checks passed
@MeredithAnya MeredithAnya deleted the meredith/3-27-26 branch March 30, 2026 17:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants