diff --git a/Cargo.toml b/Cargo.toml index 5764fb841..39304b3fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -769,6 +769,7 @@ transforms-logs = [ transforms-logs-mezmo = [ "transforms-dedupe", "transforms-filter", + "transforms-mezmo_datadog_agent_parser", "transforms-mezmo_reduce", "transforms-remap", "transforms-route", @@ -818,6 +819,7 @@ transforms-metric_to_log = [] transforms-reduce = ["transforms-impl-reduce"] transforms-mezmo_reduce = ["transforms-reduce"] # depends on the upstream version for merge_strategies transforms-mezmo_aggregate = [] +transforms-mezmo_datadog_agent_parser = [] transforms-mezmo_log_to_metric = [] transforms-mezmo_log_clustering = ["dep:lru", "dep:blake2", "dep:base64", "dep:tokio-postgres"] transforms-mezmo_log_classification = ["dep:grok"] diff --git a/src/internal_events/mezmo_datadog_agent_parser.rs b/src/internal_events/mezmo_datadog_agent_parser.rs new file mode 100644 index 000000000..ea01c4bcd --- /dev/null +++ b/src/internal_events/mezmo_datadog_agent_parser.rs @@ -0,0 +1,51 @@ +use metrics::counter; +use vector_lib::internal_event::InternalEvent; + +#[derive(Debug)] +pub struct MezmoDatadogAgentParserError<'a> { + pub error: &'a str, + pub event_type: Option<&'a str>, +} + +#[derive(Debug)] +pub struct MezmoDatadogAgentParserInvalidItem<'a> { + pub error: &'a str, + pub item_type: &'a str, + pub event_type: Option<&'a str>, +} + +impl InternalEvent for MezmoDatadogAgentParserError<'_> { + fn emit(self) { + let event_type = self.event_type.unwrap_or("unknown"); + error!( + message = "Failed to parse Datadog agent payload.", + error = %self.error, + event_type = %event_type, + internal_log_rate_limit = true, + ); + counter!( + "mezmo_datadog_agent_parser_errors_total", + "event_type" => event_type.to_string(), + ) + .increment(1); + } +} + +impl InternalEvent for MezmoDatadogAgentParserInvalidItem<'_> { + fn emit(self) { + let event_type = self.event_type.unwrap_or("unknown"); + error!( + message = "Invalid item error.", + error = %self.error, + item_type = %self.item_type, + event_type = %event_type, + internal_log_rate_limit = true, + ); + counter!( + "mezmo_datadog_agent_parser_invalid_items_total", + "item_type" => self.item_type.to_string(), + "event_type" => event_type.to_string(), + ) + .increment(1); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index c4726f719..11f982fb9 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -90,6 +90,8 @@ mod mezmo_aggregate; #[cfg(feature = "transforms-mezmo_aggregate_distributed")] mod mezmo_aggregate_distributed; pub mod mezmo_config; +#[cfg(feature = "transforms-mezmo_datadog_agent_parser")] +mod mezmo_datadog_agent_parser; #[cfg(feature = "transforms-mezmo_log_clustering")] pub(crate) mod mezmo_log_clustering; #[cfg(any( @@ -255,6 +257,8 @@ pub(crate) use self::metric_to_log::*; pub(crate) use self::mezmo_aggregate::*; #[cfg(feature = "transforms-mezmo_aggregate_distributed")] pub(crate) use self::mezmo_aggregate_distributed::*; +#[cfg(feature = "transforms-mezmo_datadog_agent_parser")] +pub(crate) use self::mezmo_datadog_agent_parser::*; #[cfg(feature = "transforms-mezmo_tag_cardinality_limit")] pub(crate) use self::mezmo_tag_cardinality_limit::*; #[cfg(feature = "transforms-mezmo_throttle_distributed")] diff --git a/src/transforms/mezmo_datadog_agent_parser/common.rs b/src/transforms/mezmo_datadog_agent_parser/common.rs new file mode 100644 index 000000000..a78ce6dd0 --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/common.rs @@ -0,0 +1,93 @@ +use chrono::{TimeZone, Utc}; + +use crate::config::log_schema; +use crate::event::{LogEvent, ObjectMap, Value}; + +#[derive(Copy, Clone, Debug)] +pub enum TimestampUnit { + Seconds, + Milliseconds, + Nanoseconds, +} + +pub fn get_message_object(log: &LogEvent) -> Result<&ObjectMap, String> { + let message_path = log_schema() + .message_key_target_path() + .ok_or_else(|| "Missing message key".to_string())?; + + log.get(message_path) + .and_then(Value::as_object) + .ok_or_else(|| "Message is not an object".to_string()) +} + +pub fn get_message_object_mut(log: &mut LogEvent) -> Result<&mut ObjectMap, String> { + let message_path = log_schema() + .message_key_target_path() + .ok_or_else(|| "Missing message key".to_string())?; + + log.get_mut(message_path) + .and_then(Value::as_object_mut) + .ok_or_else(|| "Message is not an object".to_string()) +} + +pub fn parse_timestamp(value: &Value, unit: TimestampUnit) -> Option> { + match value { + Value::Timestamp(timestamp) => Some(*timestamp), + Value::Integer(value) => { + let (seconds, nanoseconds) = match unit { + TimestampUnit::Seconds => (*value, None), + TimestampUnit::Milliseconds => ( + value.div_euclid(1000), + (value.rem_euclid(1000) as u32).checked_mul(1_000_000), + ), + TimestampUnit::Nanoseconds => ( + value.div_euclid(1_000_000_000), + Some(value.rem_euclid(1_000_000_000) as u32), + ), + }; + let nanoseconds = nanoseconds.unwrap_or_default(); + Utc.timestamp_opt(seconds, nanoseconds).single() + } + Value::Float(value) => { + let value = value.into_inner(); + if !value.is_finite() { + return None; + } + let seconds = match unit { + TimestampUnit::Seconds => value, + TimestampUnit::Milliseconds => value / 1000.0, + TimestampUnit::Nanoseconds => value / 1_000_000_000.0, + }; + let (seconds, nanos) = split_float_seconds(seconds)?; + Utc.timestamp_opt(seconds, nanos).single() + } + + _ => None, + } +} + +fn split_float_seconds(value: f64) -> Option<(i64, u32)> { + if value < (i64::MIN as f64) || value > (i64::MAX as f64) { + return None; + } + + // trunc handles negative values better than floor. -1.4 is -1, not -2 + let mut secs = value.trunc(); + let mut fractional_secs = value - secs; + + if fractional_secs < 0.0 { + fractional_secs += 1.0; + secs -= 1.0; + } + let mut nanoseconds = (fractional_secs * 1_000_000_000.0).round(); + + if nanoseconds >= 1_000_000_000.0 { + nanoseconds -= 1_000_000_000.0; + secs += 1.0; + } + + if secs < i64::MIN as f64 || secs > i64::MAX as f64 { + return None; + } + Some((secs as i64, nanoseconds as u32)) +} diff --git a/src/transforms/mezmo_datadog_agent_parser/config.rs b/src/transforms/mezmo_datadog_agent_parser/config.rs new file mode 100644 index 000000000..6611b282c --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/config.rs @@ -0,0 +1,184 @@ +use vector_lib::config::{clone_input_definitions, LogNamespace}; +use vector_lib::configurable::configurable_component; +use vector_lib::lookup::lookup_v2::ConfigTargetPath; + +use crate::{ + config::{ + log_schema, DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, + TransformOutput, + }, + schema, + transforms::Transform, +}; + +use super::{ + MezmoDatadogAgentParser, LOGS_OUTPUT, METRICS_OUTPUT, TRACES_OUTPUT, UNMATCHED_OUTPUT, +}; + +fn default_event_type_path() -> ConfigTargetPath { + ConfigTargetPath( + log_schema() + .metadata_key_target_path() + .expect("metadata key must exist") + .clone() + .with_field_appended("x-mezmo-dd-event-type"), + ) +} + +fn default_payload_version_path() -> ConfigTargetPath { + ConfigTargetPath( + log_schema() + .message_key_target_path() + .expect("message key must exist") + .clone() + .with_field_appended("mezmo_payload_version"), + ) +} + +/// Mapping of event type string values to internal types. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(default)] +pub struct EventTypeValues { + /// Value that identifies a log event. + pub log: String, + + /// Value that identifies a metric event. + pub metric: String, + + /// Value that identifies a trace event. + pub trace: String, + + /// Value that identifies a sketch event. + pub sketch: String, +} + +impl Default for EventTypeValues { + fn default() -> Self { + Self { + log: "log".into(), + metric: "metric".into(), + trace: "trace".into(), + sketch: "sketch".into(), + } + } +} + +/// Configuration for the `mezmo_datadog_agent_parser` transform. +#[configurable_component(transform( + "mezmo_datadog_agent_parser", + "Parse and normalize Datadog agent payloads into structured log events." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct MezmoDatadogAgentParserConfig { + /// Path to the event type field in the incoming data. + #[serde(default = "default_event_type_path")] + pub event_type_path: ConfigTargetPath, + + /// Mapping of event type string values to internal types. + #[serde(default)] + pub event_type_values: EventTypeValues, + + /// Path to the payload version field in the incoming data. + #[serde(default = "default_payload_version_path")] + pub payload_version_path: ConfigTargetPath, + + /// Parse log tag strings into arrays when true. + #[serde(default = "crate::serde::default_false")] + pub parse_log_tags: bool, + + /// Split metric name into namespace and name when true. + /// + /// When false, the full metric name is preserved in `name` and `namespace` + /// is omitted. + #[serde(default = "crate::serde::default_true")] + pub split_metric_namespace: bool, + + /// Remove the event type field after identifying the event. + #[serde(default = "crate::serde::default_true")] + pub strip_event_type: bool, + + /// Remove the payload version field before sending to output. + #[serde(default = "crate::serde::default_true")] + pub strip_payload_version: bool, + + /// Route unrecognized event types to the _unmatched output instead of dropping. + #[serde(default = "crate::serde::default_true")] + pub reroute_unmatched: bool, +} + +impl Default for MezmoDatadogAgentParserConfig { + fn default() -> Self { + Self { + event_type_path: default_event_type_path(), + event_type_values: EventTypeValues::default(), + payload_version_path: default_payload_version_path(), + parse_log_tags: false, + split_metric_namespace: true, + strip_event_type: false, + strip_payload_version: false, + reroute_unmatched: true, + } + } +} + +impl GenerateConfig for MezmoDatadogAgentParserConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self::default()).unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "mezmo_datadog_agent_parser")] +impl TransformConfig for MezmoDatadogAgentParserConfig { + async fn build(&self, context: &TransformContext) -> crate::Result { + Ok(Transform::synchronous(MezmoDatadogAgentParser::new( + self, + context.mezmo_ctx.clone(), + ))) + } + + fn input(&self) -> Input { + Input::log() + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + input_definitions: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + let mut outputs = vec![ + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(LOGS_OUTPUT), + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(METRICS_OUTPUT), + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(TRACES_OUTPUT), + ]; + + if self.reroute_unmatched { + outputs.push( + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(UNMATCHED_OUTPUT), + ); + } + + outputs + } + + fn enable_concurrency(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/logs.rs b/src/transforms/mezmo_datadog_agent_parser/logs.rs new file mode 100644 index 000000000..09a332bb8 --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/logs.rs @@ -0,0 +1,209 @@ +use crate::config::log_schema; +use crate::event::{Event, MaybeAsLogMut, Value}; +use bytes::Bytes; + +use super::common::{get_message_object_mut, parse_timestamp, TimestampUnit}; +use super::{MezmoDatadogAgentParser, TransformDatadogEvent, TransformError}; + +pub(super) struct DatadogLogEvent; + +impl TransformDatadogEvent for DatadogLogEvent { + /// See: https://github.com/DataDog/agent-payload/blob/master/proto/logs/agent_logs_payload.proto + /// The log timestamp is in milliseconds, not seconds + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let log_result = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string()); + + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let message_obj = match get_message_object_mut(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + if parser.parse_log_tags { + let parsed_ddtags = message_obj + .get("ddtags") + .and_then(|value| value.as_bytes()) + .map(parse_ddtags); + + if let Some(parsed_ddtags) = parsed_ddtags { + message_obj.insert("ddtags".into(), parsed_ddtags); + } + } + + let parsed_timestamp = message_obj + .get("timestamp") + .and_then(|value| parse_timestamp(value, TimestampUnit::Milliseconds)); + + if let Some(parsed) = parsed_timestamp { + if let Some(timestamp_path) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_path, parsed); + } + }; + + parser.strip_fields(&mut event); + + Ok(vec![event]) + } +} + +// Mirrors the Datadog agent source parse_ddtags implementation. +fn parse_ddtags(ddtags: &Bytes) -> Value { + String::from_utf8_lossy(ddtags) + .split(',') + .filter_map(|tag_entry| { + let trimmed = tag_entry.trim(); + if trimmed.is_empty() { + None + } else { + Some(Value::Bytes(Bytes::from(trimmed.to_string()))) + } + }) + .collect::>() + .into() +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use bytes::Bytes; + use chrono::{TimeZone, Utc}; + + use super::super::TransformDatadogEvent; + use super::DatadogLogEvent; + use crate::config::log_schema; + use crate::event::{Event, EventMetadata, KeyString, LogEvent, Value}; + use crate::transforms::mezmo_datadog_agent_parser::{ + MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, + }; + + fn build_event(message: BTreeMap) -> Event { + Event::Log(LogEvent::from_map( + [( + log_schema() + .message_key() + .expect("message key") + .to_string() + .into(), + Value::Object(message), + )] + .into_iter() + .collect(), + EventMetadata::default(), + )) + } + + #[test] + fn parse_timestamp_and_tags() { + let mut message = BTreeMap::new(); + message.insert("timestamp".into(), Value::Integer(1_700_000_000_123)); + message.insert( + "ddtags".into(), + Value::Bytes(Bytes::from_static(b"env:prod,team:core")), + ); + message.insert("status".into(), Value::Bytes(Bytes::from_static(b"info"))); + + let event = build_event(message); + let config = MezmoDatadogAgentParserConfig { + parse_log_tags: true, + ..Default::default() + }; + let parser = MezmoDatadogAgentParser::new(&config, None); + + let mut results = + DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); + let event = results.pop().expect("transformed event"); + + let log = event.as_log(); + let ts_path = log_schema() + .timestamp_key_target_path() + .expect("timestamp key"); + + let message = log + .get(log_schema().message_key_target_path().expect("message key")) + .and_then(|val| val.as_object()) + .expect("message object"); + + let message_ts = message + .get("timestamp") + .and_then(Value::as_integer) + .expect("message timestamp"); + let expected_ts = Utc + .timestamp_millis_opt(message_ts) + .latest() + .expect("valid timestamp"); + + assert_eq!(log.get(ts_path), Some(&Value::Timestamp(expected_ts))); + + assert_eq!( + message.get("ddtags"), + Some(&Value::Array(vec![ + Value::Bytes(Bytes::from_static(b"env:prod")), + Value::Bytes(Bytes::from_static(b"team:core")), + ])) + ); + assert_eq!(message.get("timestamp"), Some(&Value::Integer(message_ts))); + assert_eq!( + message.get("status"), + Some(&Value::Bytes(Bytes::from_static(b"info"))) + ); + } + + #[test] + fn handles_invalid_timestamp() { + let mut message = BTreeMap::new(); + message.insert( + "timestamp".into(), + Value::Bytes(Bytes::from_static(b"not-a-ts")), + ); + + let event = build_event(message); + let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default(), None); + + let mut results = + DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); + let event = results.pop().expect("transformed event"); + + let log = event.as_log(); + let ts_path = log_schema() + .timestamp_key_target_path() + .expect("timestamp key"); + assert!(log.get(ts_path).is_none()); + } + + #[test] + fn does_not_parse_tags_when_disabled() { + let mut message = BTreeMap::new(); + message.insert( + "ddtags".into(), + Value::Bytes(Bytes::from_static(b"env:prod,team:core")), + ); + + let event = build_event(message); + let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default(), None); + + let mut results = + DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); + let event = results.pop().expect("transformed event"); + + let message = event + .as_log() + .get(log_schema().message_key_target_path().expect("message key")) + .and_then(Value::as_object) + .expect("message object"); + + assert_eq!( + message.get("ddtags"), + Some(&Value::Bytes(Bytes::from_static(b"env:prod,team:core"))) + ); + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/metrics.rs b/src/transforms/mezmo_datadog_agent_parser/metrics.rs new file mode 100644 index 000000000..a56f8611f --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/metrics.rs @@ -0,0 +1,1037 @@ +use crate::common::datadog::DatadogMetricType; +use crate::config::log_schema; +use crate::event::{Event, MaybeAsLogMut, MetricKind, ObjectMap, Value}; + +use super::common::{get_message_object, parse_timestamp, TimestampUnit}; +use super::{MezmoDatadogAgentParser, TransformDatadogEvent, TransformError}; + +pub(super) struct DatadogMetricEvent; + +impl TransformDatadogEvent for DatadogMetricEvent { + /// Transform a metric event into the normalized MezmoMetric log format. + /// + /// The incoming event is a log event with the metric data in `.message`. + /// The payload version determines whether this is a v1 or v2 series metric. + /// Metrics data is intercepted and decoded in Kong with each item in the series + /// field emitted as a separate event + /// See: https://github.com/answerbook/pipeline-gateway-kong/blob/6fefc73374b32996b4bbb5ab1052eb2e6e6d3293/kong/plugins/pipeline-routing/lib/datadog.lua#L157 + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let version = parser + .get_payload_version(&event) + .unwrap_or_else(|| "v2".to_string()); + + let log_result = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string()); + + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let message_obj = match get_message_object(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let output_messages = match version.as_str() { + "v1" => transform_series_v1(message_obj, parser.split_metric_namespace), + _ => transform_series_v2(message_obj, parser.split_metric_namespace), + } + .map_err(|msg| TransformError::from(event.clone(), &msg))?; + + parser + .build_events_from_payloads(event.clone(), output_messages) + .map_err(|msg| TransformError::from(event, &msg)) + } +} + +pub(super) struct DatadogSketchEvent; + +impl TransformDatadogEvent for DatadogSketchEvent { + /// Transform a sketch event into the normalized MezmoMetric log format. + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let log_result = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string()); + + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + let message_obj = match get_message_object(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let sketch_metrics = + match transform_sketch_payload(message_obj, parser.split_metric_namespace) { + Ok(sketches) => sketches, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + parser + .build_events_from_payloads(event.clone(), sketch_metrics) + .map_err(|msg| TransformError::from(event, &msg)) + } +} + +/// Transforms Datadog series v1 metrics (/api/v1/series) +/// v1 metrics store points as a list of lists ([[timestamp, value]]) +fn transform_series_v1( + message: &ObjectMap, + split_metric_namespace: bool, +) -> Result)>, String> { + let metric_name = message + .get("metric") + .and_then(|v| v.as_str()) + .ok_or_else(|| "Missing metric name".to_string())?; + + let metric_type = message + .get("type") + .ok_or_else(|| "Missing metric type".to_string()) + .and_then(parse_metric_type)?; + let metric_kind = MetricKind::from(&metric_type); + + let points = message + .get("points") + .and_then(|v| v.as_array()) + .ok_or_else(|| "Missing points".to_string())?; + + let mut tags = message + .get("tags") + .and_then(|v| v.as_array()) + .map(parse_tag_array) + .unwrap_or_default(); + + insert_source_type_name(message, &mut tags); + + let tags = if !tags.is_empty() { + Some(Value::Object(tags)) + } else { + None + }; + + let (namespace, name) = namespace_name_from_dd_metric(&metric_name, split_metric_namespace); + + let interval = message + .get("interval") + .and_then(|v| v.as_integer()) + .map(|i| i as u32) + .unwrap_or(0); + + let mut outputs = Vec::new(); + + for point in points { + let point = point + .as_array() + .ok_or_else(|| "Missing point value".to_string())?; + + if point.len() < 2 { + return Err("Missing point value".to_string()); + } + + let point_timestamp = parse_point_timestamp(&point[0])?; + let point_value = parse_metric_value(&point[1])?; + + let mut output = ObjectMap::new(); + output.insert("name".into(), Value::from(name.to_string())); + if let Some(ns) = namespace { + output.insert("namespace".into(), Value::from(ns.to_string())); + } + + output.insert("kind".into(), Value::from(metric_kind_as_str(metric_kind))); + output.insert("type".into(), Value::from(metric_type.as_metric_kind_str())); + + if let Some(ref tags) = tags { + output.insert("tags".into(), tags.clone()); + } + + let interval = match metric_type { + DatadogMetricType::Rate => { + // Rates are converted to type: counter + // See: https://github.com/mezmo/vector/blob/f5010f6b3e3cda9f201429c3802cee94efb16586/src/sources/datadog_agent/metrics.rs#L280 + let interval = if interval != 0 { interval } else { 1 }; + output.insert("value".into(), Value::from(point_value * (interval as f64))); + Some(interval) + } + DatadogMetricType::Gauge => { + output.insert("value".into(), Value::from(point_value)); + if interval > 0 { + Some(interval) + } else { + None + } + } + DatadogMetricType::Count => { + output.insert("value".into(), Value::from(point_value)); + None + } + }; + + if let Some(interval) = interval { + let mut time_obj = ObjectMap::new(); + time_obj.insert("interval_ms".into(), Value::from(interval * 1000)); + output.insert("time".into(), Value::Object(time_obj)); + } + + if let Some(host) = message.get("host") { + output.insert("host".into(), host.clone()); + } + + outputs.push((output, Some(point_timestamp))); + } + + Ok(outputs) +} + +/// Transforms Datadog series v1 metrics (/api/v1/series) +/// v2 metrics store points as a list of objects ([{timestamp, value}]) +fn transform_series_v2( + message: &ObjectMap, + split_metric_namespace: bool, +) -> Result)>, String> { + let metric_name = message + .get("metric") + .and_then(|v| v.as_str()) + .ok_or_else(|| "Missing metric name".to_string())?; + + let metric_type = message + .get("type") + .ok_or_else(|| "Missing metric type".to_string()) + .and_then(parse_metric_type)?; + let metric_kind = MetricKind::from(&metric_type); + + let points = message + .get("points") + .and_then(|v| v.as_array()) + .ok_or_else(|| "Missing points".to_string())?; + + let mut tags = message + .get("tags") + .and_then(|v| v.as_array()) + .map(parse_tag_array) + .unwrap_or_default(); + + let (host_from_resource, resource_tags) = extract_resources(message); + tags.extend(resource_tags); + insert_source_type_name(message, &mut tags); + + let tags = if !tags.is_empty() { + Some(Value::Object(tags)) + } else { + None + }; + + let (namespace, name) = namespace_name_from_dd_metric(&metric_name, split_metric_namespace); + + let interval = message + .get("interval") + .and_then(|v| v.as_integer()) + .map(|i| i as u32) + .unwrap_or(0); + + let mut outputs = Vec::new(); + + for point in points { + let point = point + .as_object() + .ok_or_else(|| "Missing point value".to_string())?; + + let point_value = point + .get("value") + .ok_or_else(|| "Missing point value".to_string()) + .and_then(parse_metric_value)?; + let point_timestamp = point + .get("timestamp") + .ok_or_else(|| "Missing point timestamp".to_string()) + .and_then(parse_point_timestamp)?; + + let mut output = ObjectMap::new(); + output.insert("name".into(), Value::from(name.to_string())); + if let Some(ns) = namespace { + output.insert("namespace".into(), Value::from(ns.to_string())); + } + output.insert("kind".into(), Value::from(metric_kind_as_str(metric_kind))); + output.insert("type".into(), Value::from(metric_type.as_metric_kind_str())); + + if let Some(ref tags) = tags { + output.insert("tags".into(), tags.clone()); + } + + let interval = match metric_type { + DatadogMetricType::Rate => { + let interval = if interval != 0 { interval } else { 1 }; + output.insert("value".into(), Value::from(point_value * (interval as f64))); + Some(interval) + } + DatadogMetricType::Gauge => { + output.insert("value".into(), Value::from(point_value)); + if interval > 0 { + Some(interval) + } else { + None + } + } + DatadogMetricType::Count => { + output.insert("value".into(), Value::from(point_value)); + None + } + }; + + if let Some(interval) = interval { + let mut time_obj = ObjectMap::new(); + time_obj.insert("interval_ms".into(), Value::from(interval * 1000)); + output.insert("time".into(), Value::Object(time_obj)); + } + + if let Some(host) = host_from_resource.as_ref() { + output.insert("host".into(), host.clone()); + } + + outputs.push((output, Some(point_timestamp))); + } + + Ok(outputs) +} + +/// Transform a Datadog sketch (custom metric) +/// Schema: https://github.com/DataDog/agent-payload/blob/b14d89ab49cdb6a4618c2ac76c0b21e4e5423c5b/proto/metrics/agent_payload.proto#L91 +/// In Kong, the sketches field of the agent payload is unrolled and forwarded to vector +/// See: https://github.com/answerbook/pipeline-gateway-kong/blob/6fefc73374b32996b4bbb5ab1052eb2e6e6d3293/kong/plugins/pipeline-routing/lib/datadog.lua#L201 +fn transform_sketch_payload( + message: &ObjectMap, + split_metric_namespace: bool, +) -> Result)>, String> { + let metric_name = message + .get("metric") + .and_then(|v| v.as_str()) + .ok_or_else(|| "Missing metric name".to_string())?; + + let mut tags = message + .get("tags") + .and_then(|v| v.as_array()) + .map(parse_tag_array) + .unwrap_or_default(); + + insert_sketch_host_tag(message, &mut tags); + let tags = if !tags.is_empty() { + Some(Value::Object(tags)) + } else { + None + }; + + let (namespace, metric_name) = + namespace_name_from_dd_metric(&metric_name, split_metric_namespace); + + let dog_sketches = message + .get("dogsketches") + .and_then(|v| v.as_array()) + .ok_or_else(|| "Missing sketches".to_string())?; + + let mut outputs = Vec::new(); + + for sketch in dog_sketches { + let mut sketch_value = sketch.clone(); + let mut output = ObjectMap::new(); + output.insert("name".into(), Value::from(metric_name.to_string())); + if let Some(namespace) = namespace { + output.insert("namespace".into(), Value::from(namespace.to_string())); + } + + output.insert( + "kind".into(), + Value::from(metric_kind_as_str(MetricKind::Incremental)), + ); + + if let Some(ref tags) = tags { + output.insert("tags".into(), tags.clone()); + } + + output.insert("type".into(), Value::from("sketch")); + + let timestamp = sketch + .as_object() + .and_then(|obj| obj.get("ts")) + .and_then(|value| parse_point_timestamp(value).ok()); + + if let Value::Object(ref mut sketch) = sketch_value { + sketch.remove("ts"); + } + + output.insert("value".into(), sketch_value); + outputs.push((output, timestamp)); + } + + Ok(outputs) +} + +fn parse_tag_array(tags: &[Value]) -> ObjectMap { + tags.iter() + .filter_map(|tag| tag.as_str()) + .map(|tag| { + let (key, val) = match tag.split_once(':') { + Some((prefix, suffix)) => (prefix, Value::from(suffix.to_string())), + None => (tag.as_ref(), Value::Null), + }; + (key.to_string().into(), val) + }) + .collect() +} + +fn parse_metric_type(metric_type: &Value) -> Result { + // Some legacy agents send the metric type as an integer + if let Some(metric_type) = metric_type.as_integer() { + return match metric_type { + 1 => Ok(DatadogMetricType::Count), + 2 => Ok(DatadogMetricType::Rate), + 3 => Ok(DatadogMetricType::Gauge), + _ => Err("Unknown metric type".to_string()), + }; + } + + if let Some(metric_type) = metric_type.as_str() { + let normalized = metric_type.trim().to_ascii_lowercase(); + return match normalized.as_str() { + "count" => Ok(DatadogMetricType::Count), + "rate" => Ok(DatadogMetricType::Rate), + "gauge" => Ok(DatadogMetricType::Gauge), + _ => Err("Unknown metric type".to_string()), + }; + } + + Err("Unknown metric type".to_string()) +} + +fn parse_metric_value(value: &Value) -> Result { + match value { + Value::Float(val) => Ok(val.into_inner()), + Value::Integer(val) => Ok(*val as f64), + _ => Err("Missing point value".to_string()), + } +} + +fn insert_source_type_name(message: &ObjectMap, tags: &mut ObjectMap) { + if let Some(source_type_name) = message.get("source_type_name").and_then(|v| v.as_str()) { + if !source_type_name.is_empty() { + tags.insert( + "source_type_name".into(), + Value::from(source_type_name.to_string()), + ); + } + } +} + +fn insert_sketch_host_tag(message: &ObjectMap, tags: &mut ObjectMap) { + let host = match message.get("host") { + Some(host) => host, + None => return, + }; + + let host_key = match log_schema().host_key() { + Some(host_key) => host_key.to_string(), + None => return, + }; + + tags.insert(host_key.into(), host.clone()); +} + +fn extract_resources(message: &ObjectMap) -> (Option, ObjectMap) { + let mut host = None; + let mut resource_tags = ObjectMap::new(); + let resources = match message.get("resources").and_then(|v| v.as_array()) { + Some(resources) => resources, + None => return (host, resource_tags), + }; + + for resource in resources { + let resource_item = match resource.as_object() { + Some(obj) => obj, + None => continue, + }; + + let resource_type = match resource_item.get("type").and_then(|v| v.as_str()) { + Some(resource_type) => resource_type, + None => continue, + }; + + let resource_name = match resource_item.get("name") { + Some(resource_name) => resource_name, + None => continue, + }; + + if resource_type == "host" { + host = Some(resource_name.clone()); + continue; + } + + resource_tags.insert( + format!("resource.{resource_type}").into(), + resource_name.clone(), + ); + } + + (host, resource_tags) +} + +fn parse_point_timestamp(value: &Value) -> Result { + if let Some(timestamp) = parse_timestamp(value, TimestampUnit::Seconds) { + return Ok(Value::Timestamp(timestamp)); + } + + let error = match value { + Value::Integer(_) | Value::Float(_) => "Invalid point timestamp", + _ => "Missing point timestamp", + }; + + Err(error.to_string()) +} + +impl<'a> From<&'a DatadogMetricType> for MetricKind { + fn from(value: &'a DatadogMetricType) -> Self { + match value { + DatadogMetricType::Gauge => MetricKind::Absolute, + DatadogMetricType::Count | DatadogMetricType::Rate => MetricKind::Incremental, + } + } +} + +trait AsMetricKindStr { + fn as_metric_kind_str(&self) -> &'static str; +} + +impl AsMetricKindStr for DatadogMetricType { + fn as_metric_kind_str(&self) -> &'static str { + match self { + DatadogMetricType::Gauge => "gauge", + DatadogMetricType::Count | DatadogMetricType::Rate => "counter", + } + } +} + +const fn metric_kind_as_str(kind: MetricKind) -> &'static str { + match kind { + MetricKind::Incremental => "incremental", + MetricKind::Absolute => "absolute", + } +} + +fn namespace_name_from_dd_metric( + dd_metric_name: &str, + split_metric_namespace: bool, +) -> (Option<&str>, &str) { + if !split_metric_namespace { + return (None, dd_metric_name); + } + + match dd_metric_name.split_once('.') { + Some((namespace, name)) => (Some(namespace), name), + None => (None, dd_metric_name), + } +} + +#[cfg(test)] +mod tests { + use super::super::TransformDatadogEvent; + use super::*; + use crate::event::LogEvent; + use crate::transforms::mezmo_datadog_agent_parser::{ + MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, + }; + use chrono::{TimeZone, Utc}; + + fn create_v2_metric_event() -> Event { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.usage", + "type": 3, + "points": [{"timestamp": 1234567890, "value": 42.5}, {"timestamp": 1234567891, "value": 43.0}], + "tags": ["env:prod", "host:server1"], + "resources": [ + {"type": "host", "name": "myhost"}, + {"type": "cluster", "name": "cluster-a"} + ], + "source_type_name": "my-source" + }), + ); + Event::Log(log) + } + + fn create_v1_metric_event() -> Event { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v1", + "metric": "system.memory.used", + "type": "gauge", + "points": [[1234567890, 1024.0], [1234567891, 1025.0]], + "tags": ["env:staging"], + "host": "testhost", + "source_type_name": "my-source" + }), + ); + Event::Log(log) + } + + #[test] + fn test_transform_v2_metric() { + let event = create_v2_metric_event(); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + assert_eq!(results.len(), 2); + + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("cpu.usage".to_string()) + ); + assert_eq!( + message + .get("namespace") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("system".to_string()) + ); + + let tags = message.get("tags").unwrap().as_object().unwrap(); + assert!(matches!( + tags.get("resource.cluster").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "cluster-a" + )); + assert!(matches!( + tags.get("source_type_name").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "my-source" + )); + + let val = message.get("value").unwrap().as_float().unwrap(); + assert_eq!(val, ordered_float::NotNan::new(42.5).unwrap()); + + let timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + + let second_log = results[1].as_log(); + let second_log_message = second_log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + let second_log_value = second_log_message.get("value").unwrap().as_float().unwrap(); + assert_eq!(second_log_value, ordered_float::NotNan::new(43.0).unwrap()); + } + + #[test] + fn test_transform_v2_metric_no_namespace_split() { + let event = create_v2_metric_event(); + let config = MezmoDatadogAgentParserConfig { + split_metric_namespace: false, + ..Default::default() + }; + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("system.cpu.usage".to_string()) + ); + assert!(message.get("namespace").is_none()); + } + + #[test] + fn test_transform_v1_metric() { + let event = create_v1_metric_event(); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + assert_eq!(results.len(), 2); + + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("memory.used".to_string()) + ); + assert_eq!( + message + .get("namespace") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("system".to_string()) + ); + + let tags = message.get("tags").unwrap().as_object().unwrap(); + assert!(matches!( + tags.get("source_type_name").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "my-source" + )); + + let timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + + #[test] + fn test_parse_tag_array() { + let tags = vec![ + Value::from("env:prod"), + Value::from("host:server1"), + Value::from("bare_tag"), + ]; + + let result = parse_tag_array(&tags); + + assert_eq!( + result + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("prod".to_string()) + ); + assert_eq!( + result + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("server1".to_string()) + ); + assert_eq!( + result.get("bare_tag").map(|v| matches!(v, Value::Null)), + Some(true) + ); + } + + #[test] + fn test_rate_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.rate", + "type": 2, // Rate + "interval": 10, // 10s + "points": [{"timestamp": 1234567890, "value": 5.0}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let val = message.get("value").unwrap().as_float().unwrap(); + // value = 5.0 * 10 = 50.0 + assert_eq!(val, ordered_float::NotNan::new(50.0).unwrap()); + + let time_obj = message.get("time").unwrap().as_object().unwrap(); + let interval_ms = time_obj.get("interval_ms").unwrap().as_integer().unwrap(); + // interval_ms = 10 * 1000 = 10000 + assert_eq!(interval_ms, 10000); + + assert_eq!( + message.get("kind").unwrap().as_str().unwrap(), + "incremental" + ); + assert_eq!(message.get("type").unwrap().as_str().unwrap(), "counter") + } + + #[test] + fn test_string_metric_type() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.usage", + "type": "GAUGE", + "points": [{"timestamp": 1234567890, "value": 42.5}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + let message = results[0] + .as_log() + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert!(matches!( + message.get("type").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "gauge" + )); + } + + #[test] + fn test_gauge_interval_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.gauge", + "type": 3, // Gauge + "interval": 10, // 10s + "points": [{"timestamp": 1234567890, "value": 5.0}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let time_obj = message.get("time").unwrap().as_object().unwrap(); + let interval_ms = time_obj.get("interval_ms").unwrap().as_integer().unwrap(); + assert_eq!(interval_ms, 10000); + } + + #[test] + fn test_count_interval_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.count", + "type": 1, // Count + "interval": 10, // 10s + "points": [{"timestamp": 1234567890, "value": 5.0}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert!(message.get("time").is_none()); + } + + #[test] + fn test_sketch_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "metric": "system.cpu.sketch", + "tags": ["env:prod", "bare_tag"], + "host": "testhost", + "dogsketches": [ + { + "cnt": 12, + "min": 1.0, + "max": 9.0, + "sum": 15.0, + "avg": 4.5, + "k": [1, 2], + "n": [3, 4], + "ts": 1234567890 + } + ] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogSketchEvent::transform(event, &parser).unwrap(); + assert_eq!(results.len(), 1); + + let message = results[0] + .as_log() + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert!(matches!( + message.get("kind").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "incremental" + )); + assert!(matches!( + message + .get("type") + .and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "sketch" + )); + let tags = message.get("tags").unwrap().as_object().unwrap(); + let host_key = log_schema().host_key().unwrap().to_string(); + assert!(matches!( + tags.get(host_key.as_str()).and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "testhost" + )); + + let timestamp = results[0] + .as_log() + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + + #[test] + fn test_sketch_message_ts() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "metric": "system.cpu.sketch", + "tags": ["env:prod", "bare_tag"], + "host": "testhost", + "dogsketches": [ + { + "cnt": 12, + "min": 1.0, + "max": 9.0, + "sum": 15.0, + "avg": 4.5, + "k": [1, 2], + "n": [3, 4], + "ts": 1234567890 + } + ] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogSketchEvent::transform(event, &parser).unwrap(); + assert_eq!(results.len(), 1); + + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let value_obj = message.get("value").unwrap().as_object().unwrap(); + assert!(value_obj.get("ts").is_none()); + assert_eq!(value_obj.get("cnt").and_then(|v| v.as_integer()), Some(12)); + + let timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + + #[test] + fn test_parse_point_timestamp_value_variants() { + let timestamp = Utc.timestamp_opt(1234567890, 0).single().unwrap(); + let value = Value::Timestamp(timestamp); + let parsed = parse_point_timestamp(&value).unwrap(); + let parsed_timestamp = parsed.as_timestamp().expect("timestamp should be set"); + assert_eq!(*parsed_timestamp, timestamp); + + let value = Value::from(1234567890); + let parsed = parse_point_timestamp(&value).unwrap(); + let parsed_timestamp = parsed.as_timestamp().expect("timestamp should be set"); + assert_eq!( + *parsed_timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + + let value = Value::from(1234567890.0); + let parsed = parse_point_timestamp(&value).unwrap(); + let parsed_timestamp = parsed.as_timestamp().expect("timestamp should be set"); + assert_eq!( + *parsed_timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + + let value = Value::from(f64::INFINITY); + let err = parse_point_timestamp(&value).unwrap_err(); + assert_eq!(err, "Invalid point timestamp"); + + let value = Value::from("not-a-timestamp"); + let err = parse_point_timestamp(&value).unwrap_err(); + assert_eq!(err, "Missing point timestamp"); + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/mod.rs b/src/transforms/mezmo_datadog_agent_parser/mod.rs new file mode 100644 index 000000000..cbcbe13ab --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/mod.rs @@ -0,0 +1,411 @@ +use std::borrow::Cow; + +use vector_lib::lookup::lookup_v2::ConfigTargetPath; +use vector_lib::transform::SyncTransform; + +use mezmo::{user_log_warn, user_trace::MezmoUserLog, MezmoContext}; + +use crate::{ + config::log_schema, + event::{Event, MaybeAsLogMut, ObjectMap, Value}, + internal_events::MezmoDatadogAgentParserError, +}; + +use logs::DatadogLogEvent; +use metrics::{DatadogMetricEvent, DatadogSketchEvent}; +use traces::DatadogTraceEvent; + +mod common; +mod config; +mod logs; +mod metrics; +mod traces; + +pub use config::{EventTypeValues, MezmoDatadogAgentParserConfig}; + +pub const LOGS_OUTPUT: &str = "logs"; +pub const METRICS_OUTPUT: &str = "metrics"; +pub const TRACES_OUTPUT: &str = "traces"; +pub const UNMATCHED_OUTPUT: &str = "_unmatched"; + +#[derive(Clone, Debug)] +pub struct MezmoDatadogAgentParser { + event_type_path: ConfigTargetPath, + event_type_values: EventTypeValues, + payload_version_path: ConfigTargetPath, + parse_log_tags: bool, + split_metric_namespace: bool, + strip_event_type: bool, + strip_payload_version: bool, + reroute_unmatched: bool, + mezmo_ctx: Option, +} + +impl MezmoDatadogAgentParser { + pub fn new(config: &MezmoDatadogAgentParserConfig, mezmo_ctx: Option) -> Self { + Self { + event_type_path: config.event_type_path.clone(), + event_type_values: config.event_type_values.clone(), + payload_version_path: config.payload_version_path.clone(), + parse_log_tags: config.parse_log_tags, + split_metric_namespace: config.split_metric_namespace, + strip_event_type: config.strip_event_type, + strip_payload_version: config.strip_payload_version, + reroute_unmatched: config.reroute_unmatched, + mezmo_ctx, + } + } + + fn get_event_type(&self, event: &Event) -> Option { + event + .maybe_as_log() + .and_then(|log| log.get(&self.event_type_path.0)) + .and_then(|v| v.as_str()) + .map(Cow::into_owned) + } + + fn get_payload_version(&self, event: &Event) -> Option { + event + .maybe_as_log() + .and_then(|log| log.get(&self.payload_version_path.0)) + .and_then(|v| v.as_str()) + .map(Cow::into_owned) + } + + fn handle_transform_event( + &self, + event: Event, + output: &mut vector_lib::transform::TransformOutputsBuf, + output_name: &'static str, + event_type_name: &'static str, + user_log_event_type: &'static str, + ) where + T: TransformDatadogEvent, + { + match T::transform(event, self) { + Ok(events) => { + for event in events { + output.push(Some(output_name), event); + } + } + Err(err) => { + user_log_warn!( + self.mezmo_ctx.clone(), + format!( + "Failed to transform {}: {}", + user_log_event_type, err.message + ) + ); + emit!(MezmoDatadogAgentParserError { + error: &err.message, + event_type: Some(event_type_name) + }); + if self.reroute_unmatched { + output.push(Some(UNMATCHED_OUTPUT), *err.input); + } + } + } + } + + pub(super) fn strip_fields(&self, event: &mut Event) { + if let Some(log) = event.maybe_as_log_mut() { + if self.strip_event_type { + log.remove(&self.event_type_path.0); + } + if self.strip_payload_version { + log.remove(&self.payload_version_path.0); + } + } + } + + pub(super) fn build_events_from_payloads( + &self, + mut event: Event, + payloads: Vec<(ObjectMap, Option)>, + ) -> Result, String> { + if payloads.is_empty() { + return Ok(Vec::new()); + } + + self.strip_fields(&mut event); + + // Reduce the cost of cloning the event for each payload item + event.maybe_as_log_mut().and_then(|log| { + log_schema() + .message_key_target_path() + .and_then(|path| log.remove(path)) + }); + + payloads + .into_iter() + .map(|payload| { + let mut new_event = event.clone(); + insert_payload(&mut new_event, payload)?; + Ok(new_event) + }) + .collect() + } +} + +fn insert_payload(event: &mut Event, payload: (ObjectMap, Option)) -> Result<(), String> { + let log = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string())?; + + let (message, timestamp) = payload; + + log.insert( + log_schema() + .message_key_target_path() + .ok_or_else(|| "Missing message key".to_string())?, + Value::Object(message), + ); + + if let Some(timestamp) = timestamp { + if let Some(timestamp_path) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_path, timestamp); + } + } + + Ok(()) +} + +impl SyncTransform for MezmoDatadogAgentParser { + fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) { + let event_type = self.get_event_type(&event); + + match event_type.as_deref() { + Some(t) if t == self.event_type_values.log => { + self.handle_transform_event::( + event, + output, + LOGS_OUTPUT, + "log", + "log", + ); + } + Some(t) if t == self.event_type_values.metric => { + self.handle_transform_event::( + event, + output, + METRICS_OUTPUT, + "metric", + "metric", + ); + } + Some(t) if t == self.event_type_values.sketch => { + self.handle_transform_event::( + event, + output, + METRICS_OUTPUT, + "sketch", + "metric", + ); + } + Some(t) if t == self.event_type_values.trace => { + self.handle_transform_event::( + event, + output, + TRACES_OUTPUT, + "trace", + "trace", + ); + } + _ => { + if self.reroute_unmatched { + output.push(Some(UNMATCHED_OUTPUT), event); + } + } + } + } +} + +#[derive(Debug)] +pub(super) struct TransformError { + message: String, + // Reduce memory used by Result enums using this error + input: Box, +} + +impl TransformError { + fn from(input: Event, message: &str) -> Self { + Self { + input: Box::new(input), + message: message.to_string(), + } + } +} + +trait TransformDatadogEvent { + fn transform( + event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError>; +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use chrono::{TimeZone, Utc}; + + use super::*; + use crate::config::{log_schema, DataType, TransformOutput}; + use crate::event::{LogEvent, Value}; + use crate::transforms::SyncTransform; + use vector_lib::transform::TransformOutputsBuf; + + fn build_outputs_buf() -> (Vec<&'static str>, TransformOutputsBuf) { + let output_names = vec![LOGS_OUTPUT, METRICS_OUTPUT, TRACES_OUTPUT, UNMATCHED_OUTPUT]; + let buf = TransformOutputsBuf::new_with_capacity( + output_names + .iter() + .map(|output_name| { + TransformOutput::new(DataType::Log, HashMap::new()) + .with_port(output_name.to_owned()) + }) + .collect(), + 1, + ); + (output_names, buf) + } + + fn build_event(event_type: &str, message: Value) -> Event { + let mut log = LogEvent::default(); + log.insert(log_schema().message_key_target_path().unwrap(), message); + let event_type_path = log_schema() + .metadata_key_target_path() + .expect("metadata key must exist") + .with_field_appended("x-mezmo-dd-event-type"); + log.insert(&event_type_path, Value::from(event_type)); + Event::Log(log) + } + + #[test] + fn routes_events_to_expected_outputs() { + let config = MezmoDatadogAgentParserConfig::default(); + let mut parser = MezmoDatadogAgentParser::new(&config, None); + let (output_names, mut outputs) = build_outputs_buf(); + + let log_event = build_event( + "log", + serde_json::json!({ + "status": "ok" + }) + .into(), + ); + parser.transform(log_event, &mut outputs); + + let metric_event = build_event( + "metric", + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.usage", + "type": 3, + "points": [{"timestamp": 1234567890, "value": 42.5}], + "tags": [], + "resources": [] + }) + .into(), + ); + parser.transform(metric_event, &mut outputs); + + let sketch_event = build_event( + "sketch", + serde_json::json!({ + "metric": "system.cpu.sketch", + "tags": ["env:prod"], + "host": "testhost", + "dogsketches": [ + { + "cnt": 12, + "min": 1.0, + "max": 9.0, + "sum": 15.0, + "avg": 4.5, + "k": [1, 2], + "n": [3, 4], + "ts": 1234567890 + } + ] + }) + .into(), + ); + parser.transform(sketch_event, &mut outputs); + + let trace_event = build_event( + "trace", + serde_json::json!({ + "mezmo_payload_version": "v2", + "chunks": [] + }) + .into(), + ); + parser.transform(trace_event, &mut outputs); + + let unmatched_event = build_event("unknown", serde_json::json!({}).into()); + parser.transform(unmatched_event, &mut outputs); + + for output_name in output_names { + let events: Vec<_> = outputs.drain_named(output_name).collect(); + match output_name { + LOGS_OUTPUT => assert_eq!(events.len(), 1), + METRICS_OUTPUT => assert_eq!(events.len(), 2), + TRACES_OUTPUT => assert_eq!(events.len(), 1), + UNMATCHED_OUTPUT => assert_eq!(events.len(), 1), + _ => unreachable!("unexpected output"), + } + } + } + + #[test] + fn build_events_from_payloads_sets_message_and_timestamp() { + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + Value::Object(ObjectMap::new()), + ); + let event = Event::Log(log); + + let timestamp = Value::Timestamp(Utc.timestamp_opt(1234567890, 0).single().unwrap()); + let mut first = ObjectMap::new(); + first.insert("message".into(), Value::from("first")); + let mut second = ObjectMap::new(); + second.insert("message".into(), Value::from("second")); + + let results = parser + .build_events_from_payloads( + event, + vec![ + (first, Some(timestamp.clone())), + (second, Some(timestamp.clone())), + ], + ) + .expect("build events"); + + assert_eq!(results.len(), 2); + for result in results { + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .and_then(Value::as_object) + .expect("message object"); + assert!(matches!( + message.get("message").and_then(Value::as_str), + Some(_) + )); + + let parsed_timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *parsed_timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/traces.rs b/src/transforms/mezmo_datadog_agent_parser/traces.rs new file mode 100644 index 000000000..74d64bca9 --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/traces.rs @@ -0,0 +1,735 @@ +use ordered_float::NotNan; + +use crate::config::log_schema; +use crate::event::{Event, MaybeAsLogMut, ObjectMap, Value}; +use crate::internal_events::MezmoDatadogAgentParserInvalidItem; +use crate::transforms::mezmo_datadog_agent_parser::common::{parse_timestamp, TimestampUnit}; + +use super::common::get_message_object; +use super::{MezmoDatadogAgentParser, TransformDatadogEvent, TransformError}; + +pub(super) struct DatadogTraceEvent; + +impl TransformDatadogEvent for DatadogTraceEvent { + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let version = parser + .get_payload_version(&event) + .unwrap_or_else(|| "v2".to_string()); + + let log_result = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string()); + + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + let message_obj = match get_message_object(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let result = match version.as_str() { + "v1" => transform_trace_v1(message_obj), + "v2" => transform_tracer_payload(message_obj), + _ => { + return Err(TransformError::from( + event, + &format!("Unsupported payload version: {version}"), + )) + } + }; + let mut message = match result { + Ok(message) => message, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + message.insert("payload_version".into(), Value::from(version.clone())); + + log.insert( + log_schema().message_key_target_path().unwrap(), + Value::Object(message), + ); + parser.strip_fields(&mut event); + + Ok(vec![event]) + } +} + +fn transform_trace_v1(message: &ObjectMap) -> Result { + if message.get("transactions").is_some() { + return Ok(transform_transaction(message)); + } + Ok(transform_api_trace(message)) +} + +/// APITrace: https://github.com/mezmo/vector/blob/f5010f6b3e3cda9f201429c3802cee94efb16586/proto/vector/dd_trace.proto#L20 +/// Transforms an APITrace item +/// See Kong implementation: https://github.com/answerbook/pipeline-gateway-kong/blob/af22369e9c53d319aa808391f06eae5ac0e8d036/kong/plugins/pipeline-routing/lib/datadog.lua#L83 +fn transform_api_trace(input: &ObjectMap) -> ObjectMap { + let mut output = ObjectMap::new(); + + copy_string(input, "hostName", &mut output, "host"); + copy_string(input, "env", &mut output, "env"); + copy_u64(input, "traceID", &mut output, "trace_id"); + copy_timestamp_nanos(input, "startTime", &mut output, "start_time"); + copy_timestamp_nanos(input, "endTime", &mut output, "end_time"); + + if let Some(spans) = input.get("spans").and_then(Value::as_array) { + let transformed: Vec = spans + .iter() + .filter_map(|span_val| { + if let Some(obj) = span_val.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Span is not an object", + item_type: "span", + event_type: Some("trace"), + }); + None + } + }) + .map(|span| transform_span(span, None, None, None)) + .collect(); + if !transformed.is_empty() { + output.insert("spans".into(), Value::Array(transformed)); + } + } + + output +} + +/// Transactions are basically spans: https://github.com/mezmo/vector/blob/f5010f6b3e3cda9f201429c3802cee94efb16586/proto/vector/dd_trace.proto#L11 +/// Transforms an APITrace item +/// See Kong implementation: https://github.com/answerbook/pipeline-gateway-kong/blob/af22369e9c53d319aa808391f06eae5ac0e8d036/kong/plugins/pipeline-routing/lib/datadog.lua#L83 +fn transform_transaction(input: &ObjectMap) -> ObjectMap { + let mut output = ObjectMap::new(); + + copy_string(input, "hostName", &mut output, "host"); + copy_string(input, "env", &mut output, "env"); + + let spans: Vec = input + .get("transactions") + .and_then(Value::as_array) + .map(|arr| { + arr.iter() + .filter_map(|txn_val| { + if let Some(obj) = txn_val.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Transaction is not an object", + item_type: "transaction", + event_type: Some("trace"), + }); + None + } + }) + .map(|span| transform_span(span, Some(true), None, None)) + .collect() + }) + .unwrap_or_default(); + + output.insert("spans".into(), Value::Array(spans)); + output +} + +/// TracerPayload: https://github.com/DataDog/datadog-agent/blob/main/pkg/proto/datadog/trace/tracer_payload.proto +fn transform_tracer_payload(input: &ObjectMap) -> Result { + let mut output = ObjectMap::new(); + + copy_string(input, "hostName", &mut output, "host"); + copy_string(input, "env", &mut output, "env"); + copy_string(input, "agentVersion", &mut output, "agent_version"); + copy_string(input, "containerID", &mut output, "container_id"); + copy_string(input, "languageName", &mut output, "language_name"); + copy_string(input, "languageVersion", &mut output, "language_version"); + copy_string(input, "tracerVersion", &mut output, "tracer_version"); + copy_string(input, "runtimeID", &mut output, "runtime_id"); + copy_string(input, "appVersion", &mut output, "app_version"); + copy_float(input, "targetTPS", &mut output, "target_tps"); + copy_float(input, "errorTPS", &mut output, "error_tps"); + copy_bool( + input, + "rareSamplerEnabled", + &mut output, + "rare_sampler_enabled", + ); + + let payload_tags = input.get("tags").and_then(Value::as_object); + + let spans: Vec = input + .get("chunks") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter_map(|chunk| { + if let Some(obj) = chunk.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Chunk is not an object", + item_type: "chunk", + event_type: Some("trace"), + }); + None + } + }) + .flat_map(|chunk| process_chunk(chunk, payload_tags)) + .collect(); + + output.insert("spans".into(), Value::Array(spans)); + Ok(output) +} + +fn process_chunk(chunk: &ObjectMap, payload_tags: Option<&ObjectMap>) -> Vec { + let priority = chunk.get("priority").and_then(Value::as_integer); + let origin = chunk.get("origin").and_then(Value::as_str); + let dropped = chunk.get("droppedTrace").and_then(Value::as_boolean); + let chunk_tags = chunk.get("tags").and_then(Value::as_object); + + chunk + .get("spans") + .and_then(Value::as_array) + .map(|spans| { + spans + .iter() + .filter_map(|span_val| { + if let Some(obj) = span_val.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Span is not an object", + item_type: "span", + event_type: Some("trace"), + }); + None + } + }) + .map(|span| { + let mut transformed = + transform_span(span, dropped, priority, Some((chunk_tags, payload_tags))); + + if let (Some(origin_str), Value::Object(ref mut obj)) = + (&origin, &mut transformed) + { + obj.insert( + "origin".into(), + Value::from(origin_str.clone().into_owned()), + ); + } + + transformed + }) + .collect() + }) + .unwrap_or_default() +} + +fn transform_span( + input: &ObjectMap, + dropped: Option, + priority: Option, + extra_tags: Option<(Option<&ObjectMap>, Option<&ObjectMap>)>, // (chunk_tags, payload_tags) +) -> Value { + let mut output = ObjectMap::new(); + + copy_string(input, "service", &mut output, "service"); + copy_string(input, "name", &mut output, "name"); + copy_string(input, "resource", &mut output, "resource"); + copy_string(input, "type", &mut output, "type"); + + copy_u64(input, "traceID", &mut output, "trace_id"); + copy_u64(input, "spanID", &mut output, "span_id"); + copy_u64(input, "parentID", &mut output, "parent_id"); + + copy_timestamp_nanos(input, "start", &mut output, "start"); + copy_i64(input, "duration", &mut output, "duration"); + + copy_i64(input, "error", &mut output, "error"); + + copy_string_object(input, "meta", &mut output, "meta"); + copy_metrics(input, "metrics", &mut output, "metrics"); + copy_meta_struct(input, "metaStruct", &mut output, "meta_struct"); + + // Simplified Tag Merging + let mut tags = ObjectMap::new(); + + if let Some((chunk_tags, payload_tags)) = extra_tags { + if let Some(chunk_tags) = chunk_tags { + merge_string_map(chunk_tags, &mut tags); + } + if let Some(payload_tags) = payload_tags { + merge_string_map(payload_tags, &mut tags); + } + } + if let Some(span_tags) = input.get("tags").and_then(Value::as_object) { + merge_string_map(span_tags, &mut tags); + } + + if !tags.is_empty() { + output.insert("tags".into(), Value::Object(tags)); + } + + if let Some(d) = dropped { + output.insert("dropped".into(), Value::Boolean(d)); + } + if let Some(p) = priority { + output.insert("priority".into(), Value::from(p)); + } + + Value::Object(output) +} + +fn copy_string(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(val) = src.get(src_key) { + if val.as_str().is_some() { + dst.insert(dst_key.into(), val.clone()); + } + } +} + +fn copy_u64(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(v) = src.get(src_key).and_then(Value::as_integer) { + if u64::try_from(v).is_ok() { + dst.insert(dst_key.into(), Value::from(v)); + } + } +} + +fn copy_i64(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(v) = src.get(src_key).and_then(Value::as_integer) { + dst.insert(dst_key.into(), Value::from(v)); + } +} + +fn copy_timestamp_nanos(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + let src_timestamp = src + .get(src_key) + .and_then(|v| parse_timestamp(v, TimestampUnit::Nanoseconds)); + + if let Some(timestamp) = src_timestamp { + dst.insert(dst_key.into(), Value::from(timestamp)); + } +} + +fn copy_float(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(f) = src.get(src_key).and_then(value_to_f64) { + if let Ok(nn) = NotNan::new(f) { + dst.insert(dst_key.into(), Value::Float(nn)); + } + } +} + +fn copy_bool(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(b) = src.get(src_key).and_then(Value::as_boolean) { + dst.insert(dst_key.into(), Value::Boolean(b)); + } +} + +fn copy_string_object(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(Value::Object(obj)) = src.get(src_key) { + copy_object_map(obj, dst, dst_key, |value| { + if value.as_str().is_some() { + Some(value.clone()) + } else { + None + } + }); + } +} + +fn copy_metrics(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(Value::Object(obj)) = src.get(src_key) { + copy_object_map(obj, dst, dst_key, |value| { + Some( + value + .as_integer() + .map(|v| v as f64) + .or_else(|| value.as_float().map(|v| v.into_inner())) + .and_then(|f| NotNan::new(f).ok()) + .map(Value::Float) + .unwrap_or(Value::Null), + ) + }); + } +} + +fn copy_meta_struct(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(Value::Object(obj)) = src.get(src_key) { + copy_object_map(obj, dst, dst_key, |value| match value { + Value::Bytes(bytes) => Some(Value::Bytes(bytes.clone())), + _ => Some(Value::Null), + }); + } +} + +fn copy_object_map(src: &ObjectMap, dst: &mut ObjectMap, dst_key: &str, mut convert: F) +where + F: FnMut(&Value) -> Option, +{ + if src.is_empty() { + return; + } + + let mapped: ObjectMap = src + .iter() + .filter_map(|(key, value)| convert(value).map(|mapped| (key.clone(), mapped))) + .collect(); + + if !mapped.is_empty() { + dst.insert(dst_key.into(), Value::Object(mapped)); + } +} + +fn merge_string_map(src: &ObjectMap, dst: &mut ObjectMap) { + for (key, value) in src { + if value.as_str().is_some() { + dst.insert(key.clone(), value.clone()); + } + } +} + +fn value_to_f64(value: &Value) -> Option { + match value { + Value::Integer(value) => Some(*value as f64), + Value::Float(value) => Some(value.into_inner()), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::super::TransformDatadogEvent; + use super::*; + use crate::event::LogEvent; + use crate::transforms::mezmo_datadog_agent_parser::{ + MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, + }; + use bytes::Bytes; + use chrono::{TimeZone, Utc}; + + fn build_event(message: Value) -> Event { + let mut log = LogEvent::default(); + log.insert(log_schema().message_key_target_path().unwrap(), message); + Event::Log(log) + } + + #[test] + fn test_transform_v1_trace() { + let event = build_event( + serde_json::json!({ + "hostName": "tracer-host", + "env": "production", + "mezmo_payload_version": "v1", + "traceID": 12345, + "startTime": 1_000_000_000i64, + "endTime": 2_000_000_000i64, + "spans": [ + { + "service": "web", + "name": "http.request", + "resource": "/api/users", + "traceID": 12345, + "spanID": 1, + "parentID": 0, + "start": 1_000_000_000i64, + "duration": 500_000_000i64, + "error": 0, + "meta": {"http.method": "GET"}, + "metrics": {"_sample_rate": 1.0} + } + ] + }) + .into(), + ); + + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); + let result = results.pop().expect("transformed event"); + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("tracer-host".to_string()) + ); + assert_eq!( + message + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("production".to_string()) + ); + assert_eq!( + message.get("trace_id").and_then(Value::as_integer), + Some(12345) + ); + assert_eq!( + message.get("start_time"), + Some(&Value::Timestamp(Utc.timestamp_nanos(1_000_000_000))) + ); + assert_eq!( + message.get("end_time"), + Some(&Value::Timestamp(Utc.timestamp_nanos(2_000_000_000))) + ); + + let spans = message.get("spans").and_then(|v| v.as_array()).unwrap(); + let span = spans[0].as_object().unwrap(); + assert_eq!( + span.get("start"), + Some(&Value::Timestamp(Utc.timestamp_nanos(1_000_000_000))) + ); + assert_eq!( + span.get("trace_id").and_then(Value::as_integer), + Some(12345) + ); + assert_eq!(span.get("span_id").and_then(Value::as_integer), Some(1)); + assert_eq!(span.get("parent_id").and_then(Value::as_integer), Some(0)); + } + + #[test] + fn test_transform_v1_transactions() { + let event = build_event( + serde_json::json!({ + "hostName": "myhost", + "env": "production", + "mezmo_payload_version": "v1", + "transactions": [ + { + "service": "web", + "name": "txn", + "traceID": 12345, + "spanID": 1, + "start": 1_000_000_000i64, + "duration": 500_000_000i64, + "error": 0 + } + ] + }) + .into(), + ); + + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); + let result = results.pop().expect("transformed event"); + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let spans = message.get("spans").and_then(|v| v.as_array()).unwrap(); + let span = spans[0].as_object().unwrap(); + assert_eq!(span.get("dropped"), Some(&Value::Boolean(true))); + assert_eq!( + message + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("myhost".to_string()) + ); + assert_eq!( + message + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("production".to_string()) + ); + } + + #[test] + fn test_transform_v2_trace() { + let event = build_event( + serde_json::json!({ + "hostName": "myhost", + "env": "staging", + "mezmo_payload_version": "v2", + "agentVersion": "7.0.0", + "targetTPS": 10.0, + "errorTPS": 1.0, + "rareSamplerEnabled": true, + "containerID": "abc123", + "languageName": "python", + "languageVersion": "3.9", + "tracerVersion": "1.0.0", + "runtimeID": "runtime-123", + "appVersion": "2.0.0", + "tags": {"payload_tag": "value"}, + "chunks": [ + { + "priority": 1, + "origin": "lambda", + "droppedTrace": true, + "tags": {"chunk_tag": "value"}, + "spans": [ + { + "service": "api", + "name": "handler", + "resource": "process", + "traceID": 67890, + "spanID": 2, + "parentID": 1, + "start": 2_000_000_000i64, + "duration": 100_000_000i64, + "error": 0, + "tags": {"span_tag": "value"}, + "metrics": {"_sample_rate": 1}, + "metaStruct": {"blob": "data"} + } + ] + } + ] + }) + .into(), + ); + + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config, None); + + let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); + let result = results.pop().expect("transformed event"); + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("agent_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("7.0.0".to_string()) + ); + assert_eq!( + message + .get("target_tps") + .and_then(Value::as_float) + .map(|value| value.into_inner()), + Some(10.0) + ); + assert_eq!( + message + .get("error_tps") + .and_then(Value::as_float) + .map(|value| value.into_inner()), + Some(1.0) + ); + assert_eq!( + message.get("rare_sampler_enabled"), + Some(&Value::Boolean(true)) + ); + assert_eq!( + message + .get("container_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("abc123".to_string()) + ); + assert_eq!( + message + .get("language_name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("python".to_string()) + ); + assert_eq!( + message + .get("language_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("3.9".to_string()) + ); + assert_eq!( + message + .get("tracer_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("1.0.0".to_string()) + ); + assert_eq!( + message + .get("runtime_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("runtime-123".to_string()) + ); + assert_eq!( + message + .get("app_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("2.0.0".to_string()) + ); + assert_eq!( + message + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("myhost".to_string()) + ); + assert_eq!( + message + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("staging".to_string()) + ); + + let spans = message.get("spans").and_then(|v| v.as_array()).unwrap(); + let span = spans[0].as_object().unwrap(); + let tags = span.get("tags").and_then(|v| v.as_object()).unwrap(); + + assert_eq!( + tags.get("span_tag").and_then(Value::as_str).as_deref(), + Some("value") + ); + assert_eq!( + tags.get("chunk_tag").and_then(Value::as_str).as_deref(), + Some("value") + ); + assert_eq!( + tags.get("payload_tag").and_then(Value::as_str).as_deref(), + Some("value") + ); + assert_eq!(span.get("priority").and_then(Value::as_integer), Some(1)); + assert_eq!( + span.get("origin").and_then(Value::as_str).as_deref(), + Some("lambda") + ); + assert_eq!(span.get("dropped"), Some(&Value::Boolean(true))); + assert_eq!( + span.get("start"), + Some(&Value::Timestamp(Utc.timestamp_nanos(2_000_000_000))) + ); + + let metrics = span.get("metrics").and_then(|v| v.as_object()).unwrap(); + assert_eq!( + metrics.get("_sample_rate"), + Some(&Value::Float(NotNan::new(1.0).unwrap())) + ); + + let meta_struct = span.get("meta_struct").and_then(|v| v.as_object()).unwrap(); + assert_eq!( + meta_struct.get("blob"), + Some(&Value::Bytes(Bytes::from_static(b"data"))) + ); + } +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index c6de2f057..ef9d77823 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -28,6 +28,8 @@ pub mod mezmo_aggregate; pub mod mezmo_aggregate_distributed; #[cfg(feature = "transforms-mezmo_aggregate_v2")] pub mod mezmo_aggregate_v2; +#[cfg(feature = "transforms-mezmo_datadog_agent_parser")] +pub mod mezmo_datadog_agent_parser; #[cfg(feature = "transforms-mezmo_log_classification")] pub mod mezmo_log_classification; #[cfg(feature = "transforms-mezmo_log_clustering")]