Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use ottl::{EvalContextFamily, IndexExpr, PathAccessor, PathResolverMap, Value};
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::Span;

/// Family type for the span filter evaluation context.
Expand All @@ -34,13 +34,13 @@ pub struct SpanFilterContext<'a> {
/// Reference to the span being evaluated.
pub(super) span: &'a Span,
/// Reference to the trace's resource-level tags.
pub(super) resource_tags: &'a SharedTagSet,
pub(super) resource_tags: &'a TagSet,
}

impl<'a> SpanFilterContext<'a> {
/// Creates a context from references to the current span and resource tags.
#[inline]
pub fn new(span: &'a Span, resource_tags: &'a SharedTagSet) -> Self {
pub fn new(span: &'a Span, resource_tags: &'a TagSet) -> Self {
Self { span, resource_tags }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use async_trait::async_trait;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use ottl::{CallbackMap, EnumMap, OttlParser};
use saluki_config::GenericConfiguration;
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::{
components::{transforms::*, ComponentContext},
data_model::event::trace::Span,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl OttlTransform {
/// Each statement is executed in order. For editor statements (e.g. `set`), the `where`
/// clause is evaluated first; if it matches (or is absent), the editor function runs.
/// Errors are handled according to `error_mode`.
fn transform_span(&self, span: &mut Span, resource_tags: &SharedTagSet) {
fn transform_span(&self, span: &mut Span, resource_tags: &TagSet) {
let mut ctx = SpanTransformContext::new(span, resource_tags);

for parser in &self.span_parsers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
//! immutable reference to the resource tags.
//!
//! `attributes` supports both read and write via [`SpanAttributesAccessor`].
//! `resource.attributes` is read-only because [`SharedTagSet`] does not expose mutable access.
//! `resource.attributes` is read-only because [`TagSet`] does not expose mutable access.

use std::collections::HashMap;
use std::sync::Arc;

use ottl::{EvalContextFamily, IndexExpr, PathAccessor, PathResolverMap, Value};
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::Span;
use stringtheory::MetaString;

Expand All @@ -40,13 +40,13 @@ pub struct SpanTransformContext<'a> {
/// Mutable reference to the span being transformed.
pub(super) span: &'a mut Span,
/// Reference to the trace's resource-level tags (read-only).
pub(super) resource_tags: &'a SharedTagSet,
pub(super) resource_tags: &'a TagSet,
}

impl<'a> SpanTransformContext<'a> {
/// Creates a context from a mutable span reference and immutable resource tags.
#[inline]
pub fn new(span: &'a mut Span, resource_tags: &'a SharedTagSet) -> Self {
pub fn new(span: &'a mut Span, resource_tags: &'a TagSet) -> Self {
Self { span, resource_tags }
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ impl PathAccessor<SpanTransformFamily> for SpanAttributesAccessor {

/// Path accessor for `resource.attributes` (trace resource tags).
///
/// Read-only: [`SharedTagSet`] does not expose mutable access, so `set` returns an error.
/// Read-only: [`TagSet`] does not expose mutable access, so `set` returns an error.
#[derive(Debug)]
pub struct ResourceAttributesAccessor;

Expand All @@ -137,15 +137,14 @@ impl PathAccessor<SpanTransformFamily> for ResourceAttributesAccessor {
Ok(value)
}

/// AZH: TODO
/// Always returns an error: `SharedTagSet` is an Arc-based immutable type and `Trace`
/// Always returns an error: `TagSet` is an Arc-based immutable type and `Trace`
/// does not expose yet mutable way to access resource_tags, so there is no way to write changes
/// back to the trace's resource tags.
fn set<'a>(
&self, _ctx: &mut SpanTransformContext<'a>, path: &str, _indexes: &[IndexExpr], _value: &Value,
) -> ottl::Result<()> {
Err(format!(
"resource.attributes is read-only; setting path `{}` is not supported because SharedTagSet does not expose mutable access",
"resource.attributes is read-only; setting path `{}` is not supported because TagSet does not expose mutable access",
path
)
.into())
Expand Down
1 change: 1 addition & 0 deletions lib/saluki-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ saluki-metrics = { workspace = true }
serde = { workspace = true }
serde_with = { workspace = true }
sha3 = { workspace = true }
smallvec = { workspace = true }
stringtheory = { workspace = true }
tokio = { workspace = true, features = ["rt", "io-util", "macros", "rt-multi-thread"] }
tracing = { workspace = true }
Expand Down
147 changes: 147 additions & 0 deletions lib/saluki-common/src/collections/bitset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use smallvec::SmallVec;

/// A dense, contiguous bitset.
///
/// `ContiguousBitSet` is designed for tracking set membership of dense, contiguous values, such as the indexes of
/// values in a vector. It is able to hold up to 128 values (indices 0–127) inline with no heap allocation. It is _not_
/// suitable for sparse values (values that are far apart in value) as the size of the underlying storage will tied to
/// the largest value in the set: roughly `((max_value / 64) + 1) * 8` bytes.
///
/// All operations are O(1) with the exception of `set` when setting a bit that extends beyond the current capacity,
/// which will require a heap allocation.
#[derive(Clone, Debug, Default)]
pub struct ContiguousBitSet {
words: SmallVec<[u64; 2]>,
}

impl ContiguousBitSet {
/// Creates a new, empty bitset with no bits set.
pub fn new() -> Self {
Self::default()
}

/// Returns the number of bits that are set.
pub fn len(&self) -> usize {
self.words.iter().map(|w| w.count_ones() as usize).sum()
}

/// Returns `false` if no bits are set.
pub fn is_empty(&self) -> bool {
self.words.iter().all(|w| *w == 0)
}

/// Sets the bit at `index`.
pub fn set(&mut self, index: usize) {
let word = index / 64;
let bit = index % 64;

// Grow if needed.
while self.words.len() <= word {
self.words.push(0);
}

self.words[word] |= 1 << bit;
}

/// Clears the bit at `index`.
///
/// If `index` is beyond the current capacity, this is a no-op.
pub fn clear(&mut self, index: usize) {
let word = index / 64;
let bit = index % 64;

if let Some(w) = self.words.get_mut(word) {
*w &= !(1 << bit);
}
}

/// Returns `true` if the bit at `index` is set.
pub fn is_set(&self, index: usize) -> bool {
let word = index / 64;
let bit = index % 64;
self.words.get(word).is_some_and(|w| w & (1 << bit) != 0)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn empty_bitset() {
let bs = ContiguousBitSet::new();
assert!(bs.is_empty());
assert!(!bs.is_set(0));
assert!(!bs.is_set(127));
assert_eq!(bs.len(), 0);
}

#[test]
fn set_and_check() {
let mut bs = ContiguousBitSet::new();
bs.set(0);
bs.set(63);
bs.set(64);
bs.set(127);

assert!(bs.is_set(0));
assert!(bs.is_set(63));
assert!(bs.is_set(64));
assert!(bs.is_set(127));
assert!(!bs.is_set(1));
assert!(!bs.is_set(65));
assert!(!bs.is_empty());
assert_eq!(bs.len(), 4);
}

#[test]
fn clear_bit() {
let mut bs = ContiguousBitSet::new();
bs.set(10);
assert!(bs.is_set(10));

bs.clear(10);
assert!(!bs.is_set(10));
assert!(bs.is_empty());
}

#[test]
fn clear_beyond_capacity_is_noop() {
let mut bs = ContiguousBitSet::new();
bs.clear(999); // Should not panic or allocate.
assert!(bs.is_empty());
}

#[test]
fn grows_beyond_inline() {
let mut bs = ContiguousBitSet::new();
bs.set(200);

assert!(bs.is_set(200));
assert!(!bs.is_set(199));
assert!(!bs.is_empty());
assert_eq!(bs.len(), 1);
}

#[test]
fn len_across_words() {
let mut bs = ContiguousBitSet::new();
for i in (0..192).step_by(3) {
bs.set(i);
}
assert_eq!(bs.len(), 64);
}

#[test]
fn clone_independence() {
let mut bs = ContiguousBitSet::new();
bs.set(5);
let mut cloned = bs.clone();
cloned.set(10);

assert!(bs.is_set(5));
assert!(!bs.is_set(10));
assert!(cloned.is_set(5));
assert!(cloned.is_set(10));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::hash::{FastBuildHasher, NoopU64BuildHasher};

mod bitset;
pub use self::bitset::ContiguousBitSet;

/// A hash set based on the standard library's ([`HashSet`][std::collections::HashSet]) using [`FastHasher`][crate::hash::FastHasher].
pub type FastHashSet<T> = std::collections::HashSet<T, FastBuildHasher>;

Expand Down
10 changes: 5 additions & 5 deletions lib/saluki-components/src/common/otlp/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::LazyLock;
use opentelemetry_semantic_conventions::resource::*;
use otlp_protos::opentelemetry::proto::common::v1::{self as otlp_common, any_value::Value};
use saluki_common::collections::{FastHashMap, FastHashSet};
use saluki_context::tags::{SharedTagSet, TagSet};
use saluki_context::tags::TagSet;

// ============================================================================
// Datadog attribute key constants shared across the encoder and translator
Expand Down Expand Up @@ -144,9 +144,9 @@ pub fn extract_container_tags_from_resource_attributes(attributes: &[otlp_common

/// Extracts container tags from a resource tagset and inserts them into the provided TagSet.
///
/// This mirrors `extract_container_tags_from_resource_attributes`, but operates on a `SharedTagSet` representation of
/// This mirrors `extract_container_tags_from_resource_attributes`, but operates on a `TagSet` representation of
/// the resource.
pub fn extract_container_tags_from_resource_tagset(resource_tags: &SharedTagSet, tags: &mut TagSet) {
pub fn extract_container_tags_from_resource_tagset(resource_tags: &TagSet, tags: &mut TagSet) {
let mut extracted_tags = FastHashSet::default();

for tag in resource_tags {
Expand Down Expand Up @@ -208,10 +208,10 @@ pub fn resource_to_source(resource: &otlp_protos::opentelemetry::proto::resource
None
}

/// Resolves the source metadata from a resource `SharedTagSet`.
/// Resolves the source metadata from a resource `TagSet`.
///
/// This is equivalent to `resource_to_source`, but avoids the OTLP protobuf resource type.
pub fn tags_to_source(resource_tags: &SharedTagSet) -> Option<Source> {
pub fn tags_to_source(resource_tags: &TagSet) -> Option<Source> {
let get = |key: &str| -> Option<&str> { resource_tags.get_single_tag(key).and_then(|t| t.value()) };

// AWS ECS Fargate
Expand Down
4 changes: 2 additions & 2 deletions lib/saluki-components/src/destinations/dsd_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use saluki_api::{
APIHandler, StatusCode,
};
use saluki_common::time::get_coarse_unix_timestamp;
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::{
components::{
destinations::{Destination, DestinationBuilder, DestinationContext},
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Destination for DogStatsDStats {
#[derive(Eq, Hash, PartialEq, Serialize)]
struct ContextNoOrigin {
name: MetaString,
tags: SharedTagSet,
tags: TagSet,
}
#[derive(Deserialize)]
struct StatsQueryParams {
Expand Down
14 changes: 7 additions & 7 deletions lib/saluki-components/src/encoders/datadog/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use piecemeal::{ScratchBuffer, ScratchWriter};
use saluki_common::strings::StringBuilder;
use saluki_common::task::HandleExt as _;
use saluki_config::GenericConfiguration;
use saluki_context::tags::{SharedTagSet, TagSet};
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
use saluki_core::{
Expand Down Expand Up @@ -688,12 +688,12 @@ fn encode_attribute_array_value<S: ScratchBuffer>(
Ok(())
}

fn get_resource_tag_value<'a>(resource_tags: &'a SharedTagSet, key: &str) -> Option<&'a str> {
fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
resource_tags.get_single_tag(key).and_then(|t| t.value())
}

fn resolve_hostname<'a>(
resource_tags: &'a SharedTagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
ignore_missing_fields: bool,
) -> Option<&'a str> {
let mut hostname = match source {
Expand All @@ -715,7 +715,7 @@ fn resolve_hostname<'a>(
hostname
}

fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Option<&str> {
fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
return Some(value);
}
Expand All @@ -728,7 +728,7 @@ fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Opt
get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
}

fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
if let Some(value) = get_resource_tag_value(resource_tags, key) {
return Some(value);
Expand All @@ -746,15 +746,15 @@ fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<
None
}

fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> {
fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
return Some(value);
}
get_resource_tag_value(resource_tags, SERVICE_VERSION)
}

fn resolve_container_tags(
resource_tags: &SharedTagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
) -> Option<MetaString> {
// TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized
// since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would
Expand Down
Loading
Loading