Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 125 additions & 6 deletions libdd-trace-stats/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(super) struct BorrowedAggregationKey<'a> {
http_status_code: u32,
is_synthetics_request: bool,
peer_tags: Vec<(&'a str, &'a str)>,
span_derived_primary_tags: Vec<(&'a str, &'a str)>,
is_trace_root: bool,
http_method: &'a str,
http_endpoint: &'a str,
Expand All @@ -54,6 +55,7 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
http_status_code,
is_synthetics_request,
peer_tags,
span_derived_primary_tags,
is_trace_root,
http_method,
http_endpoint,
Expand All @@ -74,6 +76,12 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
.iter()
.zip(peer_tags.iter())
.all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2)
&& self.span_derived_primary_tags.len() == span_derived_primary_tags.len()
&& self
.span_derived_primary_tags
.iter()
.zip(span_derived_primary_tags.iter())
.all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2)
&& self.is_trace_root == *is_trace_root
&& self.http_method == http_method
&& self.http_endpoint == http_endpoint
Expand All @@ -99,6 +107,7 @@ pub(super) struct OwnedAggregationKey {
http_status_code: u32,
is_synthetics_request: bool,
peer_tags: Vec<(String, String)>,
span_derived_primary_tags: Vec<(String, String)>,
is_trace_root: bool,
http_method: String,
http_endpoint: String,
Expand All @@ -121,6 +130,11 @@ impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey {
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
span_derived_primary_tags: value
.span_derived_primary_tags
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
is_trace_root: value.is_trace_root,
http_method: value.http_method.to_owned(),
http_endpoint: value.http_endpoint.to_owned(),
Expand Down Expand Up @@ -208,9 +222,14 @@ fn grpc_status_str_to_int_value(v: &str) -> Option<u8> {
impl<'a> BorrowedAggregationKey<'a> {
/// Return an AggregationKey matching the given span.
///
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
/// key.
pub(super) fn from_span<T: StatSpan<'a>>(span: &'a T, peer_tag_keys: &'a [String]) -> Self {
/// If `peer_tag_keys` is not empty, peer tags from the span will be included in the key
/// (only for client/producer/consumer spans). If `span_derived_primary_tag_keys` is not
/// empty, matching tags will be included unconditionally for all eligible spans.
pub(super) fn from_span<T: StatSpan<'a>>(
span: &'a T,
peer_tag_keys: &'a [String],
span_derived_primary_tag_keys: &'a [String],
) -> Self {
let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default();
let peer_tags = if should_track_peer_tags(span_kind) {
// Parse the meta tags of the span and return a list of the peer tags based on the list
Expand All @@ -226,6 +245,11 @@ impl<'a> BorrowedAggregationKey<'a> {
vec![]
};

let span_derived_primary_tags: Vec<(&'a str, &'a str)> = span_derived_primary_tag_keys
.iter()
.filter_map(|key| Some((key.as_str(), span.get_meta(key.as_str())?)))
.collect();

let http_method = span.get_meta("http.method").unwrap_or_default();

let http_endpoint = span
Expand Down Expand Up @@ -256,6 +280,7 @@ impl<'a> BorrowedAggregationKey<'a> {
.get_meta(TAG_ORIGIN)
.is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)),
peer_tags,
span_derived_primary_tags,
is_trace_root: span.is_trace_root(),
http_method,
http_endpoint,
Expand Down Expand Up @@ -283,6 +308,14 @@ impl From<pb::ClientGroupedStats> for OwnedAggregationKey {
Some((key.to_string(), value.to_string()))
})
.collect(),
span_derived_primary_tags: value
.span_derived_primary_tags
.into_iter()
.filter_map(|t| {
let (key, value) = t.split_once(':')?;
Some((key.to_string(), value.to_string()))
})
.collect(),
is_trace_root: value.is_trace_root == 1,
http_method: value.http_method,
http_endpoint: value.http_endpoint,
Expand Down Expand Up @@ -418,7 +451,11 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl
.map(|c| c.to_string())
.unwrap_or_default(),
service_source: key.service_source,
span_derived_primary_tags: vec![], // Todo
span_derived_primary_tags: key
.span_derived_primary_tags
.into_iter()
.map(|(k, v)| format!("{k}:{v}"))
.collect(),
}
}

Expand Down Expand Up @@ -907,7 +944,7 @@ mod tests {
];

for (span, expected_key) in test_cases {
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]);
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[], &[]);
assert_eq!(
OwnedAggregationKey::from(&borrowed_key),
expected_key,
Expand All @@ -920,7 +957,89 @@ mod tests {
}

for (span, expected_key) in test_cases_with_peer_tags {
let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice());
let borrowed_key =
BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice(), &[]);
assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key);
assert_eq!(
get_hash(&borrowed_key),
get_hash(&OwnedAggregationKey::from(&borrowed_key))
);
}

let test_span_derived_primary_tag_keys = vec!["env".to_string(), "version".to_string()];

let test_cases_with_span_derived_primary_tags: Vec<(SpanSlice, OwnedAggregationKey)> = vec![
// Span with span-derived primary tags: applied unconditionally (no span.kind gate)
(
SpanSlice {
service: "service",
name: "op",
resource: "res",
span_id: 1,
parent_id: 0,
meta: HashMap::from([("env", "prod"), ("version", "v1")]),
..Default::default()
},
OwnedAggregationKey {
service_name: "service".into(),
operation_name: "op".into(),
resource_name: "res".into(),
is_trace_root: true,
span_derived_primary_tags: vec![
("env".into(), "prod".into()),
("version".into(), "v1".into()),
],
..Default::default()
},
),
// Server span: span-derived primary tags still apply (unlike peer tags)
(
SpanSlice {
service: "service",
name: "op",
resource: "res",
span_id: 1,
parent_id: 0,
meta: HashMap::from([("span.kind", "server"), ("env", "staging")]),
..Default::default()
},
OwnedAggregationKey {
service_name: "service".into(),
operation_name: "op".into(),
resource_name: "res".into(),
span_kind: "server".into(),
is_trace_root: true,
span_derived_primary_tags: vec![("env".into(), "staging".into())],
..Default::default()
},
),
// Span with no matching keys: empty span_derived_primary_tags
(
SpanSlice {
service: "service",
name: "op",
resource: "res",
span_id: 1,
parent_id: 0,
meta: HashMap::from([("unrelated_tag", "value")]),
..Default::default()
},
OwnedAggregationKey {
service_name: "service".into(),
operation_name: "op".into(),
resource_name: "res".into(),
is_trace_root: true,
..Default::default()
},
),
];

for (span, expected_key) in test_cases_with_span_derived_primary_tags {
let borrowed_key = BorrowedAggregationKey::from_span(
&span,
&[],
test_span_derived_primary_tag_keys.as_slice(),
);
assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key);
assert_eq!(
get_hash(&borrowed_key),
Expand Down
21 changes: 18 additions & 3 deletions libdd-trace-stats/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ where
///
/// # Aggregation
/// Spans are aggregated into time buckets based on their end_time. Within each time bucket there
/// is another level of aggregation based on the spans fields (e.g. resource_name, service_name)
/// and the peer tags if the `peer_tags_aggregation` is enabled.
/// is another level of aggregation based on the spans fields (e.g. resource_name, service_name),
/// peer tags if configured, and span-derived primary tags if configured.
///
/// # Span eligibility
/// The ingested spans are only aggregated if they are root, top-level, measured or if their
Expand All @@ -66,6 +66,8 @@ pub struct SpanConcentrator {
span_kinds_stats_computed: Vec<String>,
/// keys for supplementary tags that describe peer.service entities
peer_tag_keys: Vec<String>,
/// keys for user-configured tags used as additional aggregation dimensions
span_derived_primary_tag_keys: Vec<String>,
}

impl SpanConcentrator {
Expand All @@ -74,6 +76,9 @@ impl SpanConcentrator {
/// - `now` the current system time, used to define the oldest bucket
/// - `span_kinds_stats_computed` list of span kinds eligible for stats computation
/// - `peer_tags_keys` list of keys considered as peer tags for aggregation
///
/// Span-derived primary tag keys can be set post-construction via
/// [`set_span_derived_primary_tag_keys`](Self::set_span_derived_primary_tag_keys).
pub fn new(
bucket_size: Duration,
now: SystemTime,
Expand All @@ -90,6 +95,7 @@ impl SpanConcentrator {
buffer_len: 2,
span_kinds_stats_computed,
peer_tag_keys,
span_derived_primary_tag_keys: vec![],
}
}

Expand All @@ -103,6 +109,11 @@ impl SpanConcentrator {
self.peer_tag_keys = peer_tags;
}

/// Set the list of keys used as span-derived primary tags for aggregation
pub fn set_span_derived_primary_tag_keys(&mut self, keys: Vec<String>) {
self.span_derived_primary_tag_keys = keys;
}

/// Return the bucket size used for aggregation
pub fn get_bucket_size(&self) -> Duration {
Duration::from_nanos(self.bucket_size)
Expand All @@ -124,7 +135,11 @@ impl SpanConcentrator {
bucket_timestamp = self.oldest_timestamp;
}

let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice());
let agg_key = BorrowedAggregationKey::from_span(
span,
self.peer_tag_keys.as_slice(),
self.span_derived_primary_tag_keys.as_slice(),
);

self.buckets
.entry(bucket_timestamp)
Expand Down
Loading
Loading