ref(eap-outcomes): add category_metrics#7847
Merged
MeredithAnya merged 4 commits intomasterfrom Mar 30, 2026
Merged
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Category metrics lost on backpressure during flush
- The aggregator now preserves category metrics when a flush is backpressured and emits them only after the carried-over message is eventually accepted.
Or push these changes by commenting:
@cursor push 0ae1fae826
Preview (0ae1fae826)
diff --git a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
--- a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
+++ b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
@@ -12,7 +12,7 @@
use sentry_arroyo::utils::timing::Deadline;
use sentry_protos::snuba::v1::TraceItem;
-use crate::types::{AggregatedOutcomesBatch, BucketKey};
+use crate::types::{AggregatedOutcomesBatch, BucketKey, CategoryMetrics};
#[derive(Debug, Default)]
struct TraceItemOutcome {
@@ -56,6 +56,8 @@
latest_offsets: HashMap<Partition, u64>,
/// A message rejected by the next step, to be retried on the next poll.
message_carried_over: Option<Message<AggregatedOutcomesBatch>>,
+ /// Category metrics for a carried-over message that has not been accepted yet.
+ carried_over_category_metrics: Option<BTreeMap<u32, CategoryMetrics>>,
/// Commit request carried over from a poll where we had a message to retry.
commit_request_carried_over: Option<CommitRequest>,
/// Temporary option to change the timestamp source from
@@ -80,11 +82,21 @@
batch: AggregatedOutcomesBatch::new(bucket_interval),
latest_offsets: HashMap::new(),
message_carried_over: None,
+ carried_over_category_metrics: None,
commit_request_carried_over: None,
use_item_timestamp,
}
}
+ fn emit_category_metrics(category_metrics: BTreeMap<u32, CategoryMetrics>) {
+ for (category, m) in category_metrics {
+ let cat_str = category.to_string();
+ counter!("accepted_outcomes.messages_seen", m.messages_seen, "data_category" => cat_str.as_str());
+ counter!("accepted_outcomes.total_quantity", m.total_quantity, "data_category" => cat_str.as_str());
+ counter!("accepted_outcomes.bucket_count", m.bucket_count, "data_category" => cat_str.as_str());
+ }
+ }
+
fn flush(&mut self) -> Result<(), StrategyError>
where
TNext: ProcessingStrategy<AggregatedOutcomesBatch>,
@@ -111,16 +123,12 @@
self.last_flush = now;
tracing::info!("flushed {} buckets after {} seconds", num_buckets, seconds);
- for (category, m) in category_metrics {
- let cat_str = category.to_string();
- counter!("accepted_outcomes.messages_seen", m.messages_seen, "data_category" => cat_str.as_str());
- counter!("accepted_outcomes.total_quantity", m.total_quantity, "data_category" => cat_str.as_str());
- counter!("accepted_outcomes.bucket_count", m.bucket_count, "data_category" => cat_str.as_str());
- }
+ Self::emit_category_metrics(category_metrics);
Ok(())
}
Err(SubmitError::MessageRejected(rejected)) => {
self.message_carried_over = Some(rejected.message);
+ self.carried_over_category_metrics = Some(category_metrics);
Ok(())
}
Err(SubmitError::InvalidMessage(e)) => Err(StrategyError::InvalidMessage(e)),
@@ -138,7 +146,11 @@
if let Some(msg) = self.message_carried_over.take() {
match self.next_step.submit(msg) {
- Ok(()) => {}
+ Ok(()) => {
+ if let Some(category_metrics) = self.carried_over_category_metrics.take() {
+ Self::emit_category_metrics(category_metrics);
+ }
+ }
Err(SubmitError::MessageRejected(MessageRejected {
message: carried_message,
})) => {
@@ -552,6 +564,68 @@
}
#[test]
+ fn backpressure_retains_category_metrics_until_retry_succeeds() {
+ struct RejectOnce {
+ rejected: bool,
+ }
+ impl ProcessingStrategy<AggregatedOutcomesBatch> for RejectOnce {
+ fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
+ Ok(None)
+ }
+ fn submit(
+ &mut self,
+ message: Message<AggregatedOutcomesBatch>,
+ ) -> Result<(), SubmitError<AggregatedOutcomesBatch>> {
+ if !self.rejected {
+ self.rejected = true;
+ Err(SubmitError::MessageRejected(MessageRejected { message }))
+ } else {
+ Ok(())
+ }
+ }
+ fn terminate(&mut self) {}
+ fn join(
+ &mut self,
+ _: Option<Duration>,
+ ) -> Result<Option<CommitRequest>, StrategyError> {
+ Ok(None)
+ }
+ }
+
+ let mut aggregator = OutcomesAggregator::new(
+ RejectOnce { rejected: false },
+ 1, // flush after 1 bucket
+ Duration::from_millis(30_000),
+ 60,
+ false,
+ );
+
+ let partition = Partition::new(Topic::new("test"), 0);
+ aggregator
+ .submit(Message::new_broker_message(
+ make_payload(6_000, 1, 2, 3, &[(4, 5)]),
+ partition,
+ 0,
+ Utc::now(),
+ ))
+ .unwrap();
+
+ // poll triggers flush; next_step rejects, so category metrics must be retained.
+ aggregator.poll().unwrap();
+ assert!(aggregator.message_carried_over.is_some());
+ let metrics = aggregator.carried_over_category_metrics.as_ref().unwrap();
+ let category_4 = metrics.get(&4).unwrap();
+ assert_eq!(category_4.messages_seen, 1);
+ assert_eq!(category_4.total_quantity, 5);
+ assert_eq!(category_4.bucket_count, 1);
+
+ // Next poll retries and succeeds, so retained metrics are consumed.
+ aggregator.poll().unwrap();
+ assert!(aggregator.message_carried_over.is_none());
+ assert!(aggregator.carried_over_category_metrics.is_none());
+ }
+
+ #[test]
fn join_honors_timeout_when_message_stays_carried_over() {
struct AlwaysReject;
impl ProcessingStrategy<AggregatedOutcomesBatch> for AlwaysReject {This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
onewland
approved these changes
Mar 30, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Moves the following metrics to be
CategoryMetricson the batch itself:accepted_outcomes.bucket_count(exists)accepted_outcomes.total_quantity. (exists)accepted_outcomes.messages_seen(new)Also adds
accepted_outcomes.got_backpressuremetric since right now we don't have any around backpressure right now