Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/saluki-components/src/encoders/datadog/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,15 @@ impl TraceEndpointEncoder {
let app_version = resolve_app_version(resource_tags);

// Resolve sampling metadata.
let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() {
let (priority, dropped_trace, decision_maker, otlp_sr, ets_error) = match trace.sampling() {
Some(sampling) => (
sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY),
sampling.dropped_trace,
sampling.decision_maker.as_deref(),
sampling.otlp_sampling_rate.unwrap_or(sampling_rate),
sampling.ets_error,
),
None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate),
None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate, false),
};

// Now incrementally build the payload.
Expand Down Expand Up @@ -561,6 +562,9 @@ impl TraceEndpointEncoder {
if let Some(dm) = decision_maker {
tags.write_entry(TAG_DECISION_MAKER, dm)?;
}
if ets_error {
tags.write_entry("_dd.error_tracking_standalone.error", "true")?;
}

self.string_builder.clear();
write!(&mut self.string_builder, "{:.2}", otlp_sr)
Expand Down
123 changes: 122 additions & 1 deletion lib/saluki-components/src/transforms/trace_sampler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,23 @@ impl TraceSampler {
}

let now = std::time::SystemTime::now();
let contains_error = self.trace_contains_error(trace, false);
let Some(root_span_idx) = self.get_root_span_index(trace) else {
return (false, PRIORITY_AUTO_DROP, "", None);
};

// ETS: only sample traces containing errors (including exception span events); skip all other samplers.
// logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1068
if self.error_tracking_standalone {
if self.trace_contains_error(trace, true) {
let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
let priority = if keep { PRIORITY_AUTO_KEEP } else { PRIORITY_AUTO_DROP };
return (keep, priority, "", Some(root_span_idx));
}
return (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx));
}

let contains_error = self.trace_contains_error(trace, false);

// Run the rare sampler early, before all other samplers. This mirrors the Go agent behavior
// where the rare sampler runs first to catch traces that would otherwise be dropped entirely.
// logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1078
Expand Down Expand Up @@ -443,9 +455,17 @@ impl TraceSampler {
if let Some(root_idx) = root_span_idx {
self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx);
}
// ETS: mark kept error traces so the encoder emits the _dd.error_tracking_standalone.error chunk tag.
if self.error_tracking_standalone {
if let Some(sampling) = trace.sampling_mut() {
sampling.ets_error = true;
}
}
return true;
}

// ETS: suppress single span sampling and analytics events for dropped traces.
// logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L976
if self.error_tracking_standalone {
return false;
}
Expand Down Expand Up @@ -1060,4 +1080,105 @@ mod tests {
assert!(!keep);
assert_eq!(priority, PRIORITY_AUTO_DROP);
}

// ── Error Tracking Standalone tests ─────────────────────────────────────────
// Adapted from datadog-agent/pkg/trace/agent/agent.go runSamplers ETS block.

fn create_sampler_with_ets() -> TraceSampler {
TraceSampler {
error_tracking_standalone: true,
..create_test_sampler()
}
}

/// ETS enabled + trace with error → kept by error sampler.
#[test]
fn ets_keeps_trace_with_error() {
let mut sampler = create_sampler_with_ets();

let span = create_test_span(100, 1, 1); // error=1
let mut trace = create_test_trace(vec![span]);

let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
assert!(keep, "ETS should keep traces with errors");
assert_eq!(priority, PRIORITY_AUTO_KEEP);
assert_eq!(decision_maker, "", "ETS does not set a decision maker");
}

/// ETS enabled + trace without error → dropped; rare/probabilistic/priority not consulted.
#[test]
fn ets_drops_trace_without_error() {
let mut sampler = create_sampler_with_ets();

let span = create_test_span(101, 1, 0); // error=0
let mut trace = create_test_trace(vec![span]);

let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
assert!(!keep, "ETS should drop traces without errors");
assert_eq!(priority, PRIORITY_AUTO_DROP);
}

/// ETS enabled + trace without error → SSS and analytics events are suppressed.
#[test]
fn ets_suppresses_sss_for_dropped_trace() {
let mut sampler = create_sampler_with_ets();

// Span with SSS metric — would normally trigger single span sampling.
let mut metrics = saluki_common::collections::FastHashMap::default();
metrics.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0);
let span = create_test_span(102, 1, 0).with_metrics(metrics);
let mut trace = create_test_trace(vec![span]);

let kept = sampler.process_trace(&mut trace);
assert!(!kept, "ETS should suppress SSS for non-error traces");
}

/// ETS enabled + trace with error → `ets_error` flag set on sampling metadata.
#[test]
fn ets_sets_ets_error_flag_on_kept_trace() {
let mut sampler = create_sampler_with_ets();

let span = create_test_span(103, 1, 1); // error=1
let mut trace = create_test_trace(vec![span]);

let kept = sampler.process_trace(&mut trace);
assert!(kept);
assert!(
trace.sampling().is_some_and(|s| s.ets_error),
"ets_error should be set on kept ETS traces"
);
}

/// ETS enabled + trace with exception span event → kept (exception events count as errors in ETS).
#[test]
fn ets_keeps_trace_with_exception_span_event() {
let mut sampler = create_sampler_with_ets();

// Span with error=0 but exception span event metadata.
let mut meta = saluki_common::collections::FastHashMap::default();
meta.insert(
MetaString::from("_dd.span_events.has_exception"),
MetaString::from("true"),
);
let span = create_test_span(104, 1, 0).with_meta(meta);
let mut trace = create_test_trace(vec![span]);

let (keep, _, _, _) = sampler.run_samplers(&mut trace);
assert!(keep, "ETS should treat exception span events as errors");
}

/// ETS disabled → normal sampling path (probabilistic) is used.
#[test]
fn ets_disabled_uses_normal_sampling() {
let mut sampler = create_test_sampler(); // ETS disabled
sampler.sampling_rate = 1.0;
sampler.probabilistic_sampler_enabled = true;

let span = create_test_span(105, 1, 0); // no error
let mut trace = create_test_trace(vec![span]);

let (keep, _, decision_maker, _) = sampler.run_samplers(&mut trace);
assert!(keep, "normal probabilistic sampling should keep the trace");
assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC);
}
}
12 changes: 12 additions & 0 deletions lib/saluki-core/src/data_model/event/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ pub struct TraceSampling {
/// This corresponds to the `_dd.otlp_sr` tag and represents the effective sampling rate
/// from the OTLP ingest path.
pub otlp_sampling_rate: Option<f64>,

/// Whether this trace was kept by Error Tracking Standalone mode.
///
/// When true, the encoder emits `_dd.error_tracking_standalone.error = "true"` as a chunk
/// tag, signalling to the backend that the trace was sampled under ETS rules.
pub ets_error: bool,
}

impl TraceSampling {
Expand All @@ -47,6 +53,7 @@ impl TraceSampling {
priority,
decision_maker,
otlp_sampling_rate,
ets_error: false,
}
}
}
Expand Down Expand Up @@ -151,6 +158,11 @@ impl Trace {
self.sampling.as_ref()
}

/// Returns a mutable reference to the trace-level sampling metadata, if present.
pub fn sampling_mut(&mut self) -> Option<&mut TraceSampling> {
self.sampling.as_mut()
}

/// Sets the trace-level sampling metadata.
pub fn set_sampling(&mut self, sampling: Option<TraceSampling>) {
self.sampling = sampling;
Expand Down
Loading