diff --git a/lib/saluki-components/src/encoders/datadog/traces/mod.rs b/lib/saluki-components/src/encoders/datadog/traces/mod.rs index 0abde3977c..e3bb9b2aa8 100644 --- a/lib/saluki-components/src/encoders/datadog/traces/mod.rs +++ b/lib/saluki-components/src/encoders/datadog/traces/mod.rs @@ -453,14 +453,15 @@ impl TraceEndpointEncoder { let app_version = resolve_app_version(resource_tags); // Resolve sampling metadata. - let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() { + let (priority, dropped_trace, decision_maker, otlp_sr, ets_error) = match trace.sampling() { Some(sampling) => ( sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY), sampling.dropped_trace, sampling.decision_maker.as_deref(), sampling.otlp_sampling_rate.unwrap_or(sampling_rate), + sampling.ets_error, ), - None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate), + None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate, false), }; // Now incrementally build the payload. @@ -561,6 +562,9 @@ impl TraceEndpointEncoder { if let Some(dm) = decision_maker { tags.write_entry(TAG_DECISION_MAKER, dm)?; } + if ets_error { + tags.write_entry("_dd.error_tracking_standalone.error", "true")?; + } self.string_builder.clear(); write!(&mut self.string_builder, "{:.2}", otlp_sr) diff --git a/lib/saluki-components/src/transforms/trace_sampler/mod.rs b/lib/saluki-components/src/transforms/trace_sampler/mod.rs index 7632ae62ed..53b07412e3 100644 --- a/lib/saluki-components/src/transforms/trace_sampler/mod.rs +++ b/lib/saluki-components/src/transforms/trace_sampler/mod.rs @@ -296,11 +296,23 @@ impl TraceSampler { } let now = std::time::SystemTime::now(); - let contains_error = self.trace_contains_error(trace, false); let Some(root_span_idx) = self.get_root_span_index(trace) else { return (false, PRIORITY_AUTO_DROP, "", None); }; + // ETS: only sample traces containing errors (including exception span events); skip all other samplers. + // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1068 + if self.error_tracking_standalone { + if self.trace_contains_error(trace, true) { + let keep = self.error_sampler.sample_error(now, trace, root_span_idx); + let priority = if keep { PRIORITY_AUTO_KEEP } else { PRIORITY_AUTO_DROP }; + return (keep, priority, "", Some(root_span_idx)); + } + return (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx)); + } + + let contains_error = self.trace_contains_error(trace, false); + // Run the rare sampler early, before all other samplers. This mirrors the Go agent behavior // where the rare sampler runs first to catch traces that would otherwise be dropped entirely. // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1078 @@ -443,9 +455,17 @@ impl TraceSampler { if let Some(root_idx) = root_span_idx { self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx); } + // ETS: mark kept error traces so the encoder emits the _dd.error_tracking_standalone.error chunk tag. + if self.error_tracking_standalone { + if let Some(sampling) = trace.sampling_mut() { + sampling.ets_error = true; + } + } return true; } + // ETS: suppress single span sampling and analytics events for dropped traces. + // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L976 if self.error_tracking_standalone { return false; } @@ -1060,4 +1080,105 @@ mod tests { assert!(!keep); assert_eq!(priority, PRIORITY_AUTO_DROP); } + + // ── Error Tracking Standalone tests ───────────────────────────────────────── + // Adapted from datadog-agent/pkg/trace/agent/agent.go runSamplers ETS block. + + fn create_sampler_with_ets() -> TraceSampler { + TraceSampler { + error_tracking_standalone: true, + ..create_test_sampler() + } + } + + /// ETS enabled + trace with error → kept by error sampler. + #[test] + fn ets_keeps_trace_with_error() { + let mut sampler = create_sampler_with_ets(); + + let span = create_test_span(100, 1, 1); // error=1 + let mut trace = create_test_trace(vec![span]); + + let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace); + assert!(keep, "ETS should keep traces with errors"); + assert_eq!(priority, PRIORITY_AUTO_KEEP); + assert_eq!(decision_maker, "", "ETS does not set a decision maker"); + } + + /// ETS enabled + trace without error → dropped; rare/probabilistic/priority not consulted. + #[test] + fn ets_drops_trace_without_error() { + let mut sampler = create_sampler_with_ets(); + + let span = create_test_span(101, 1, 0); // error=0 + let mut trace = create_test_trace(vec![span]); + + let (keep, priority, _, _) = sampler.run_samplers(&mut trace); + assert!(!keep, "ETS should drop traces without errors"); + assert_eq!(priority, PRIORITY_AUTO_DROP); + } + + /// ETS enabled + trace without error → SSS and analytics events are suppressed. + #[test] + fn ets_suppresses_sss_for_dropped_trace() { + let mut sampler = create_sampler_with_ets(); + + // Span with SSS metric — would normally trigger single span sampling. + let mut metrics = saluki_common::collections::FastHashMap::default(); + metrics.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0); + let span = create_test_span(102, 1, 0).with_metrics(metrics); + let mut trace = create_test_trace(vec![span]); + + let kept = sampler.process_trace(&mut trace); + assert!(!kept, "ETS should suppress SSS for non-error traces"); + } + + /// ETS enabled + trace with error → `ets_error` flag set on sampling metadata. + #[test] + fn ets_sets_ets_error_flag_on_kept_trace() { + let mut sampler = create_sampler_with_ets(); + + let span = create_test_span(103, 1, 1); // error=1 + let mut trace = create_test_trace(vec![span]); + + let kept = sampler.process_trace(&mut trace); + assert!(kept); + assert!( + trace.sampling().is_some_and(|s| s.ets_error), + "ets_error should be set on kept ETS traces" + ); + } + + /// ETS enabled + trace with exception span event → kept (exception events count as errors in ETS). + #[test] + fn ets_keeps_trace_with_exception_span_event() { + let mut sampler = create_sampler_with_ets(); + + // Span with error=0 but exception span event metadata. + let mut meta = saluki_common::collections::FastHashMap::default(); + meta.insert( + MetaString::from("_dd.span_events.has_exception"), + MetaString::from("true"), + ); + let span = create_test_span(104, 1, 0).with_meta(meta); + let mut trace = create_test_trace(vec![span]); + + let (keep, _, _, _) = sampler.run_samplers(&mut trace); + assert!(keep, "ETS should treat exception span events as errors"); + } + + /// ETS disabled → normal sampling path (probabilistic) is used. + #[test] + fn ets_disabled_uses_normal_sampling() { + let mut sampler = create_test_sampler(); // ETS disabled + sampler.sampling_rate = 1.0; + sampler.probabilistic_sampler_enabled = true; + + let span = create_test_span(105, 1, 0); // no error + let mut trace = create_test_trace(vec![span]); + + let (keep, _, decision_maker, _) = sampler.run_samplers(&mut trace); + assert!(keep, "normal probabilistic sampling should keep the trace"); + assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); + } } diff --git a/lib/saluki-core/src/data_model/event/trace/mod.rs b/lib/saluki-core/src/data_model/event/trace/mod.rs index 9647a8528d..b34ffc4cc0 100644 --- a/lib/saluki-core/src/data_model/event/trace/mod.rs +++ b/lib/saluki-core/src/data_model/event/trace/mod.rs @@ -35,6 +35,12 @@ pub struct TraceSampling { /// This corresponds to the `_dd.otlp_sr` tag and represents the effective sampling rate /// from the OTLP ingest path. pub otlp_sampling_rate: Option, + + /// Whether this trace was kept by Error Tracking Standalone mode. + /// + /// When true, the encoder emits `_dd.error_tracking_standalone.error = "true"` as a chunk + /// tag, signalling to the backend that the trace was sampled under ETS rules. + pub ets_error: bool, } impl TraceSampling { @@ -47,6 +53,7 @@ impl TraceSampling { priority, decision_maker, otlp_sampling_rate, + ets_error: false, } } } @@ -151,6 +158,11 @@ impl Trace { self.sampling.as_ref() } + /// Returns a mutable reference to the trace-level sampling metadata, if present. + pub fn sampling_mut(&mut self) -> Option<&mut TraceSampling> { + self.sampling.as_mut() + } + /// Sets the trace-level sampling metadata. pub fn set_sampling(&mut self, sampling: Option) { self.sampling = sampling;