Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ libdd-telemetry = { version = "4.0.0", path = "../libdd-telemetry", default-feat
libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" }
libdd-trace-stats = { version = "2.0.0", path = "../libdd-trace-stats" }
libdd-trace-utils = { version = "3.0.1", path = "../libdd-trace-utils", default-features = false }
libdd-trace-obfuscation = { version = "2.0.0", path = "../libdd-trace-obfuscation", optional = true }
libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" }
libdd-dogstatsd-client = { version = "2.0.0", path = "../libdd-dogstatsd-client", default-features = false }
libdd-tinybytes = { version = "1.1.0", path = "../libdd-tinybytes", features = [
Expand Down Expand Up @@ -81,4 +82,7 @@ https = [
"libdd-trace-utils/https",
"libdd-dogstatsd-client/https",
]
stats-obfuscation = [
"libdd-trace-obfuscation"
]
test-utils = []
16 changes: 12 additions & 4 deletions libdd-data-pipeline/src/agent_info/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct AgentInfoStruct {
pub peer_tags: Option<Vec<String>>,
/// List of span kinds eligible for stats computation
pub span_kinds_stats_computed: Option<Vec<String>>,
/// Obfuscation version supported by the agent for client-side stats
pub obfuscation_version: Option<u32>,
/// Container tags hash from HTTP response header
pub container_tags_hash: Option<String>,
}
Expand All @@ -54,37 +56,43 @@ pub struct Config {
pub max_memory: Option<f64>,
pub max_cpu: Option<f64>,
pub analyzed_spans_by_service: Option<HashMap<String, HashMap<String, f64>>>,
pub obfuscation: Option<ObfuscationConfig>,
}

#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
pub struct ObfuscationConfig {
pub elastic_search: bool,
pub mongo: bool,
pub sql_exec_plan: bool,
pub sql_exec_plan_normalize: bool,
#[cfg(feature = "stats-obfuscation")]
#[serde(default)]
pub sql_obfuscation_mode: libdd_trace_obfuscation::sql::SqlObfuscationMode,
pub http: HttpObfuscationConfig,
pub remove_stack_traces: bool,
pub redis: RedisObfuscationConfig,
pub memcached: MemcachedObfuscationConfig,
}

#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
pub struct HttpObfuscationConfig {
pub remove_query_string: bool,
pub remove_path_digits: bool,
}

#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
#[serde(rename_all = "PascalCase")]
pub struct RedisObfuscationConfig {
pub enabled: bool,
pub remove_all_args: bool,
}

#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
#[serde(rename_all = "PascalCase")]
pub struct MemcachedObfuscationConfig {
pub enabled: bool,
pub keep_command: bool,
Expand Down
58 changes: 56 additions & 2 deletions libdd-data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

#[cfg(feature = "stats-obfuscation")]
use std::sync::atomic::AtomicBool;
use std::{
borrow::Borrow,
sync::{
Expand Down Expand Up @@ -33,6 +35,8 @@ pub struct StatsExporter<H: HttpClientTrait> {
endpoint: Endpoint,
meta: TracerMetadata,
sequence_id: AtomicU64,
#[cfg(feature = "stats-obfuscation")]
obfuscation_active: Arc<AtomicBool>,
client: H,
}

Expand All @@ -43,14 +47,13 @@ impl<H: HttpClientTrait> StatsExporter<H> {
/// - `concentrator` SpanConcentrator storing the stats to be sent to the agent
/// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent
/// - `endpoint` the Endpoint used to send stats to the agent
/// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the
/// concentrator
pub fn new(
flush_interval: time::Duration,
concentrator: Arc<Mutex<SpanConcentrator>>,
meta: TracerMetadata,
endpoint: Endpoint,
client: H,
#[cfg(feature = "stats-obfuscation")] obfuscation_active: Arc<AtomicBool>,
) -> Self {
Self {
flush_interval,
Expand All @@ -59,6 +62,8 @@ impl<H: HttpClientTrait> StatsExporter<H> {
meta,
sequence_id: AtomicU64::new(0),
client,
#[cfg(feature = "stats-obfuscation")]
obfuscation_active,
}
}

Expand Down Expand Up @@ -91,6 +96,16 @@ impl<H: HttpClientTrait> StatsExporter<H> {
libdd_common::header::APPLICATION_MSGPACK,
);

#[cfg(feature = "stats-obfuscation")]
if self.obfuscation_active.load(Ordering::Relaxed) {
headers.insert(
http::HeaderName::from_static("datadog-obfuscation-version"),
http::HeaderValue::from_static(
crate::trace_exporter::stats::SUPPORTED_OBFUSCATION_VERSION_STR,
),
);
}

let result = send_with_retry(
&self.client,
&self.endpoint,
Expand Down Expand Up @@ -272,6 +287,8 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
NativeCapabilities::new_client(),
#[cfg(feature = "stats-obfuscation")]
Arc::new(AtomicBool::new(false)),
);

let send_status = stats_exporter.send(true).await;
Expand Down Expand Up @@ -299,6 +316,8 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
NativeCapabilities::new_client(),
#[cfg(feature = "stats-obfuscation")]
Arc::new(AtomicBool::new(false)),
);

let send_status = stats_exporter.send(true).await;
Expand Down Expand Up @@ -333,6 +352,8 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
NativeCapabilities::new_client(),
#[cfg(feature = "stats-obfuscation")]
Arc::new(AtomicBool::new(false)),
);

let _handle = shared_runtime
Expand Down Expand Up @@ -374,6 +395,8 @@ mod tests {
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
NativeCapabilities::new_client(),
#[cfg(feature = "stats-obfuscation")]
Arc::new(AtomicBool::new(false)),
);

let _handle = shared_runtime
Expand Down Expand Up @@ -413,4 +436,35 @@ mod tests {
"Non-empty env should be preserved"
);
}
#[cfg(feature = "stats-obfuscation")]
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_send_stats_with_obfuscation_header() {
let server = MockServer::start_async().await;

let mock = server
.mock_async(|when, then| {
when.method(POST)
.header("Content-type", "application/msgpack")
.header("datadog-obfuscation-version", "1")
.path("/v0.6/stats")
.body_includes("libdatadog-test");
then.status(200).body("");
})
.await;

let stats_exporter = StatsExporter::new(
BUCKETS_DURATION,
Arc::new(Mutex::new(get_test_concentrator())),
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
NativeCapabilities::new_client(),
Arc::new(AtomicBool::new(true)),
);

let send_status = stats_exporter.send(true).await;
send_status.unwrap();

mock.assert_async().await;
}
}
Loading
Loading