diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index f60113a6ac..d2951c96f7 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -34,6 +34,7 @@ pub(super) struct BorrowedAggregationKey<'a> { http_status_code: u32, is_synthetics_request: bool, peer_tags: Vec<(&'a str, &'a str)>, + span_derived_primary_tags: Vec<(&'a str, &'a str)>, is_trace_root: bool, http_method: &'a str, http_endpoint: &'a str, @@ -54,6 +55,7 @@ impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { http_status_code, is_synthetics_request, peer_tags, + span_derived_primary_tags, is_trace_root, http_method, http_endpoint, @@ -74,6 +76,12 @@ impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { .iter() .zip(peer_tags.iter()) .all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2) + && self.span_derived_primary_tags.len() == span_derived_primary_tags.len() + && self + .span_derived_primary_tags + .iter() + .zip(span_derived_primary_tags.iter()) + .all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2) && self.is_trace_root == *is_trace_root && self.http_method == http_method && self.http_endpoint == http_endpoint @@ -99,6 +107,7 @@ pub(super) struct OwnedAggregationKey { http_status_code: u32, is_synthetics_request: bool, peer_tags: Vec<(String, String)>, + span_derived_primary_tags: Vec<(String, String)>, is_trace_root: bool, http_method: String, http_endpoint: String, @@ -121,6 +130,11 @@ impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey { .iter() .map(|(k, v)| (k.to_string(), v.to_string())) .collect(), + span_derived_primary_tags: value + .span_derived_primary_tags + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), is_trace_root: value.is_trace_root, http_method: value.http_method.to_owned(), http_endpoint: value.http_endpoint.to_owned(), @@ -208,9 +222,14 @@ fn grpc_status_str_to_int_value(v: &str) -> Option { impl<'a> BorrowedAggregationKey<'a> { /// Return an AggregationKey matching the given span. /// - /// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the - /// key. - pub(super) fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { + /// If `peer_tag_keys` is not empty, peer tags from the span will be included in the key + /// (only for client/producer/consumer spans). If `span_derived_primary_tag_keys` is not + /// empty, matching tags will be included unconditionally for all eligible spans. + pub(super) fn from_span>( + span: &'a T, + peer_tag_keys: &'a [String], + span_derived_primary_tag_keys: &'a [String], + ) -> Self { let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default(); let peer_tags = if should_track_peer_tags(span_kind) { // Parse the meta tags of the span and return a list of the peer tags based on the list @@ -226,6 +245,11 @@ impl<'a> BorrowedAggregationKey<'a> { vec![] }; + let span_derived_primary_tags: Vec<(&'a str, &'a str)> = span_derived_primary_tag_keys + .iter() + .filter_map(|key| Some((key.as_str(), span.get_meta(key.as_str())?))) + .collect(); + let http_method = span.get_meta("http.method").unwrap_or_default(); let http_endpoint = span @@ -256,6 +280,7 @@ impl<'a> BorrowedAggregationKey<'a> { .get_meta(TAG_ORIGIN) .is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)), peer_tags, + span_derived_primary_tags, is_trace_root: span.is_trace_root(), http_method, http_endpoint, @@ -283,6 +308,14 @@ impl From for OwnedAggregationKey { Some((key.to_string(), value.to_string())) }) .collect(), + span_derived_primary_tags: value + .span_derived_primary_tags + .into_iter() + .filter_map(|t| { + let (key, value) = t.split_once(':')?; + Some((key.to_string(), value.to_string())) + }) + .collect(), is_trace_root: value.is_trace_root == 1, http_method: value.http_method, http_endpoint: value.http_endpoint, @@ -418,7 +451,11 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl .map(|c| c.to_string()) .unwrap_or_default(), service_source: key.service_source, - span_derived_primary_tags: vec![], // Todo + span_derived_primary_tags: key + .span_derived_primary_tags + .into_iter() + .map(|(k, v)| format!("{k}:{v}")) + .collect(), } } @@ -907,7 +944,7 @@ mod tests { ]; for (span, expected_key) in test_cases { - let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]); + let borrowed_key = BorrowedAggregationKey::from_span(&span, &[], &[]); assert_eq!( OwnedAggregationKey::from(&borrowed_key), expected_key, @@ -920,7 +957,89 @@ mod tests { } for (span, expected_key) in test_cases_with_peer_tags { - let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice()); + let borrowed_key = + BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice(), &[]); + assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key); + assert_eq!( + get_hash(&borrowed_key), + get_hash(&OwnedAggregationKey::from(&borrowed_key)) + ); + } + + let test_span_derived_primary_tag_keys = vec!["env".to_string(), "version".to_string()]; + + let test_cases_with_span_derived_primary_tags: Vec<(SpanSlice, OwnedAggregationKey)> = vec![ + // Span with span-derived primary tags: applied unconditionally (no span.kind gate) + ( + SpanSlice { + service: "service", + name: "op", + resource: "res", + span_id: 1, + parent_id: 0, + meta: HashMap::from([("env", "prod"), ("version", "v1")]), + ..Default::default() + }, + OwnedAggregationKey { + service_name: "service".into(), + operation_name: "op".into(), + resource_name: "res".into(), + is_trace_root: true, + span_derived_primary_tags: vec![ + ("env".into(), "prod".into()), + ("version".into(), "v1".into()), + ], + ..Default::default() + }, + ), + // Server span: span-derived primary tags still apply (unlike peer tags) + ( + SpanSlice { + service: "service", + name: "op", + resource: "res", + span_id: 1, + parent_id: 0, + meta: HashMap::from([("span.kind", "server"), ("env", "staging")]), + ..Default::default() + }, + OwnedAggregationKey { + service_name: "service".into(), + operation_name: "op".into(), + resource_name: "res".into(), + span_kind: "server".into(), + is_trace_root: true, + span_derived_primary_tags: vec![("env".into(), "staging".into())], + ..Default::default() + }, + ), + // Span with no matching keys: empty span_derived_primary_tags + ( + SpanSlice { + service: "service", + name: "op", + resource: "res", + span_id: 1, + parent_id: 0, + meta: HashMap::from([("unrelated_tag", "value")]), + ..Default::default() + }, + OwnedAggregationKey { + service_name: "service".into(), + operation_name: "op".into(), + resource_name: "res".into(), + is_trace_root: true, + ..Default::default() + }, + ), + ]; + + for (span, expected_key) in test_cases_with_span_derived_primary_tags { + let borrowed_key = BorrowedAggregationKey::from_span( + &span, + &[], + test_span_derived_primary_tag_keys.as_slice(), + ); assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key); assert_eq!( get_hash(&borrowed_key), diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 1ad0ba8b24..8e097f0f7e 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -41,8 +41,8 @@ where /// /// # Aggregation /// Spans are aggregated into time buckets based on their end_time. Within each time bucket there -/// is another level of aggregation based on the spans fields (e.g. resource_name, service_name) -/// and the peer tags if the `peer_tags_aggregation` is enabled. +/// is another level of aggregation based on the spans fields (e.g. resource_name, service_name), +/// peer tags if configured, and span-derived primary tags if configured. /// /// # Span eligibility /// The ingested spans are only aggregated if they are root, top-level, measured or if their @@ -66,6 +66,8 @@ pub struct SpanConcentrator { span_kinds_stats_computed: Vec, /// keys for supplementary tags that describe peer.service entities peer_tag_keys: Vec, + /// keys for user-configured tags used as additional aggregation dimensions + span_derived_primary_tag_keys: Vec, } impl SpanConcentrator { @@ -74,6 +76,9 @@ impl SpanConcentrator { /// - `now` the current system time, used to define the oldest bucket /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation /// - `peer_tags_keys` list of keys considered as peer tags for aggregation + /// + /// Span-derived primary tag keys can be set post-construction via + /// [`set_span_derived_primary_tag_keys`](Self::set_span_derived_primary_tag_keys). pub fn new( bucket_size: Duration, now: SystemTime, @@ -90,6 +95,7 @@ impl SpanConcentrator { buffer_len: 2, span_kinds_stats_computed, peer_tag_keys, + span_derived_primary_tag_keys: vec![], } } @@ -103,6 +109,11 @@ impl SpanConcentrator { self.peer_tag_keys = peer_tags; } + /// Set the list of keys used as span-derived primary tags for aggregation + pub fn set_span_derived_primary_tag_keys(&mut self, keys: Vec) { + self.span_derived_primary_tag_keys = keys; + } + /// Return the bucket size used for aggregation pub fn get_bucket_size(&self) -> Duration { Duration::from_nanos(self.bucket_size) @@ -124,7 +135,11 @@ impl SpanConcentrator { bucket_timestamp = self.oldest_timestamp; } - let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice()); + let agg_key = BorrowedAggregationKey::from_span( + span, + self.peer_tag_keys.as_slice(), + self.span_derived_primary_tag_keys.as_slice(), + ); self.buckets .entry(bucket_timestamp) diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 0a45bb3151..f03711c457 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1044,6 +1044,223 @@ fn test_base_service_peer_tag() { ); } +/// Test that span-derived primary tags create separate aggregation groups for all eligible spans, +/// regardless of span.kind (unlike peer tags which only apply to client/producer/consumer) +#[test] +fn test_span_derived_primary_tags_aggregation() { + let now = SystemTime::now(); + let mut spans = vec![ + // Root span with env=prod + get_test_span_with_meta( + now, + 1, + 0, + 100, + 5, + "A1", + "GET /users", + 0, + &[("env", "prod"), ("version", "v1")], + &[], + ), + // Root span with env=staging: different value, should be a separate bucket + get_test_span_with_meta( + now, + 2, + 0, + 50, + 5, + "A1", + "GET /users", + 0, + &[("env", "staging"), ("version", "v1")], + &[], + ), + // Server span: span-derived primary tags apply unconditionally (no span.kind gate) + get_test_span_with_meta( + now, + 3, + 1, + 80, + 5, + "A1", + "POST /users", + 0, + &[("span.kind", "server"), ("env", "prod"), ("version", "v1")], + &[("_dd.measured", 1.0)], + ), + // Client span: span-derived primary tags and peer tags coexist + get_test_span_with_meta( + now, + 4, + 1, + 60, + 5, + "A1", + "SELECT * FROM users", + 0, + &[ + ("span.kind", "client"), + ("env", "prod"), + ("db.instance", "i-1234"), + ], + &[("_dd.measured", 1.0)], + ), + ]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator_without_keys = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + ); + let mut concentrator_with_keys = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec!["db.instance".to_string()], + ); + concentrator_with_keys + .set_span_derived_primary_tag_keys(vec!["env".to_string(), "version".to_string()]); + + for span in &spans { + concentrator_without_keys.add_span(span); + concentrator_with_keys.add_span(span); + } + + let flushtime = now + + Duration::from_nanos( + concentrator_with_keys.bucket_size * concentrator_with_keys.buffer_len as u64, + ); + + // Without keys: root spans aggregate together, measured spans separate by span_kind + let expected_without_keys = vec![ + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "GET /users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + duration: 150, + hits: 2, + top_level_hits: 2, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }, + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "POST /users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + span_kind: "server".to_string(), + duration: 80, + hits: 1, + top_level_hits: 0, + errors: 0, + is_trace_root: pb::Trilean::False.into(), + ..Default::default() + }, + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "SELECT * FROM users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + span_kind: "client".to_string(), + duration: 60, + hits: 1, + top_level_hits: 0, + errors: 0, + is_trace_root: pb::Trilean::False.into(), + ..Default::default() + }, + ]; + + // With keys: each unique (env, version) combination becomes a separate bucket + let expected_with_keys = vec![ + // Root span env=prod, version=v1 + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "GET /users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + duration: 100, + hits: 1, + top_level_hits: 1, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + span_derived_primary_tags: vec!["env:prod".to_string(), "version:v1".to_string()], + ..Default::default() + }, + // Root span env=staging, version=v1 + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "GET /users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + duration: 50, + hits: 1, + top_level_hits: 1, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + span_derived_primary_tags: vec!["env:staging".to_string(), "version:v1".to_string()], + ..Default::default() + }, + // Server span env=prod, version=v1: no span.kind gate for derived tags + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "POST /users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + span_kind: "server".to_string(), + duration: 80, + hits: 1, + top_level_hits: 0, + errors: 0, + is_trace_root: pb::Trilean::False.into(), + span_derived_primary_tags: vec!["env:prod".to_string(), "version:v1".to_string()], + ..Default::default() + }, + // Client span env=prod: also has peer tag db.instance:i-1234 + // "version" key is configured but not present on this span, so only "env" is included + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "SELECT * FROM users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + span_kind: "client".to_string(), + peer_tags: vec!["db.instance:i-1234".to_string()], + duration: 60, + hits: 1, + top_level_hits: 0, + errors: 0, + is_trace_root: pb::Trilean::False.into(), + span_derived_primary_tags: vec!["env:prod".to_string()], + ..Default::default() + }, + ]; + + let stats_without_keys = concentrator_without_keys.flush(flushtime, false); + assert_counts_equal( + expected_without_keys, + stats_without_keys + .first() + .expect("There should be at least one time bucket") + .stats + .clone(), + ); + + let stats_with_keys = concentrator_with_keys.flush(flushtime, false); + assert_counts_equal( + expected_with_keys, + stats_with_keys + .first() + .expect("There should be at least one time bucket") + .stats + .clone(), + ); +} + #[test] fn test_compute_stats_for_span_kind() { let test_cases: Vec<(SpanSlice, bool)> = vec![