Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 60 additions & 52 deletions bin/agent-data-plane/src/components/tag_filterlist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use foldhash::fast::RandomState as FoldHashState;
use hashbrown::{HashMap, HashSet};
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_context::tags::{Tag, TagSet};
use saluki_context::{tags::Tag, TagSetMutViewState};
use saluki_core::{
components::{
transforms::{Transform, TransformBuilder, TransformContext},
Expand Down Expand Up @@ -194,6 +194,8 @@ impl Transform for TagFilterlist {

let mut watcher = self.configuration.watch_for_updates("metric_tag_filterlist");

let mut view_state = TagSetMutViewState::default();

debug!("Metric Tag Filterlist transform started.");

loop {
Expand All @@ -204,7 +206,7 @@ impl Transform for TagFilterlist {
for event in &mut events {
if let Some(metric) = event.try_as_metric_mut() {
if metric.values().is_sketch() {
let outcome = filter_metric_tags(metric, &self.filters);
let outcome = filter_metric_tags(metric, &mut view_state, &self.filters);
self.telemetry.record(outcome);
}
}
Expand Down Expand Up @@ -233,54 +235,40 @@ fn should_keep_tag(tag: &Tag, is_exclude: bool, names: &HashSet<String, FoldHash
is_exclude != names.contains(tag.as_borrowed().name())
}

#[inline]
fn has_removable_tags(tags: &TagSet, is_exclude: bool, names: &HashSet<String, FoldHashState>) -> bool {
tags.into_iter().any(|tag| !should_keep_tag(tag, is_exclude, names))
}

/// Filter the tags of a distribution metric according to the compiled filter table.
///
/// Both instrumented tags and origin tags are filtered using the same tag key list.
/// If the metric name is not present in `filters`, the metric is left unchanged.
/// If filtering would not change any tags, the metric context is left untouched (zero allocations).
#[inline]
pub fn filter_metric_tags(metric: &mut Metric, filters: &CompiledFilters) -> FilterMetricTagsOutcome {
pub fn filter_metric_tags(
metric: &mut Metric, state: &mut TagSetMutViewState, filters: &CompiledFilters,
) -> FilterMetricTagsOutcome {
let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else {
return FilterMetricTagsOutcome::RuleMiss;
};

let is_exclude = *is_exclude;

let has_tag_removals = has_removable_tags(metric.context().tags(), is_exclude, tag_names);
let has_origin_removals = has_removable_tags(metric.context().origin_tags(), is_exclude, tag_names);
let mut tag_set_view = metric.context_mut().tags_mut_view(state);
tag_set_view.retain_tags(|tag| should_keep_tag(tag, *is_exclude, tag_names));
tag_set_view.retain_origin_tags(|tag| should_keep_tag(tag, *is_exclude, tag_names));
let total_removed = tag_set_view.finish();

if !has_tag_removals && !has_origin_removals {
return FilterMetricTagsOutcome::NoChange;
}

let mut total_removed = 0;
metric.context_mut().with_tag_sets_mut(|tags, origin_tags| {
if has_tag_removals {
let before = tags.len();
tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names));
total_removed += before - tags.len();
}
if has_origin_removals {
let before = origin_tags.len();
origin_tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names));
total_removed += before - origin_tags.len();
if total_removed == 0 {
FilterMetricTagsOutcome::NoChange
} else {
FilterMetricTagsOutcome::Modified {
removed_tags: total_removed,
}
});

FilterMetricTagsOutcome::Modified {
removed_tags: total_removed,
}
}

#[cfg(test)]
mod tests {
use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader};
use saluki_context::{tags::Tag, Context};
use saluki_context::{
tags::{Tag, TagSet},
Context, TagSetMutViewState,
};
use saluki_core::data_model::event::metric::Metric;
use saluki_metrics::{test::TestRecorder, MetricsBuilder};
use serde_json::json;
Expand Down Expand Up @@ -326,7 +314,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["service:web"]);
}
Expand All @@ -341,7 +330,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["env:prod"]);
}
Expand All @@ -356,7 +346,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}
Expand All @@ -378,7 +369,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["production", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["service:web"]);
}
Expand All @@ -393,7 +385,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert!(metric.context().tags().is_empty());
}
Expand All @@ -408,7 +401,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}
Expand All @@ -430,7 +424,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["service:web"]);
}
Expand All @@ -452,7 +447,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}
Expand All @@ -474,7 +470,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}
Expand All @@ -483,7 +480,8 @@ mod tests {
fn no_config_is_noop() {
let filters = compile_filters(&[]);
let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);
assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

Expand Down Expand Up @@ -563,7 +561,8 @@ mod tests {

let mut metric =
distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(origin_tag_names(&metric), vec!["service:web"]);
}
Expand All @@ -579,7 +578,8 @@ mod tests {

let mut metric =
distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(origin_tag_names(&metric), vec!["env:prod"]);
}
Expand All @@ -594,7 +594,8 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["service:web"]);
assert!(metric.context().origin_tags().is_empty());
Expand All @@ -621,7 +622,8 @@ mod tests {
}];
let filters = compile_filters(&entries);

filter_metric_tags(&mut metric1, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric1, &mut state, &filters);

assert_eq!(origin_tag_names(&metric1), vec!["service:web"]);
let metric2_origin: Vec<_> = metric2
Expand Down Expand Up @@ -651,7 +653,8 @@ mod tests {
&["env:prod", "service:web", "host:h1"],
&["env:prod", "host:h1", "region:us-east-1"],
);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(tag_names(&metric), vec!["service:web"]);
assert_eq!(origin_tag_names(&metric), vec!["region:us-east-1"]);
Expand Down Expand Up @@ -714,7 +717,8 @@ mod tests {
let filters = compile_filters(new_entries.as_deref().unwrap_or(&[]));

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);
assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

Expand Down Expand Up @@ -765,7 +769,8 @@ mod tests {
let filters = compile_filters(new_entries.as_deref().unwrap_or(&[]));

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);
assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

Expand Down Expand Up @@ -800,7 +805,8 @@ mod tests {
let filters = compile_filters(new_entries.as_deref().unwrap_or(&[]));

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]);
filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
filter_metric_tags(&mut metric, &mut state, &filters);
assert_eq!(tag_names(&metric), vec!["service:web"]);
}

Expand All @@ -815,7 +821,8 @@ mod tests {

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1"]);

let outcome = filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
let outcome = filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(outcome, FilterMetricTagsOutcome::Modified { removed_tags: 1 });
assert_eq!(tag_names(&metric), vec!["env:prod"]);
Expand All @@ -841,7 +848,8 @@ mod tests {
assert!(!metric.context().tags().is_modified());
assert!(!metric.context().origin_tags().is_modified());

let outcome = filter_metric_tags(&mut metric, &filters);
let mut state = TagSetMutViewState::default();
let outcome = filter_metric_tags(&mut metric, &mut state, &filters);

assert_eq!(outcome, FilterMetricTagsOutcome::NoChange);
assert_eq!(tag_names(&metric), vec!["env:prod", "host:h1"]);
Expand Down
7 changes: 4 additions & 3 deletions lib/saluki-context/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{fmt, hash, sync::Arc};

use metrics::Gauge;
use saluki_common::collections::ContiguousBitSet;
use saluki_common::collections::{ContiguousBitSet, PrehashedHashSet};
use stringtheory::MetaString;

use crate::{
hash::{hash_context, ContextKey},
hash::{hash_context, hash_context_with_seen, ContextKey},
tags::{Tag, TagSet},
};

Expand Down Expand Up @@ -303,6 +303,7 @@ pub struct TagSetMutViewState {
tag_addition_removals: ContiguousBitSet,
origin_base_removals: ContiguousBitSet,
origin_addition_removals: ContiguousBitSet,
hash_seen: PrehashedHashSet<u64>,
}

impl TagSetMutViewState {
Expand Down Expand Up @@ -383,7 +384,7 @@ impl<'a, 'b> TagSetMutView<'a, 'b> {
.apply_removals(&self.state.origin_base_removals, &self.state.origin_addition_removals);
}

let (key, _) = hash_context(&inner.name, &inner.tags, &inner.origin_tags);
let (key, _) = hash_context_with_seen(&inner.name, &inner.tags, &inner.origin_tags, &mut self.state.hash_seen);
inner.key = key;

total
Expand Down
Loading