From 36ebe261a0506b270c9dfd862d353982f029a46e Mon Sep 17 00:00:00 2001 From: Kwabena Boadu Date: Mon, 26 Jan 2026 16:54:38 -0500 Subject: [PATCH 1/3] feat(mezmo_datadog_agent_parser): Create MezmoDatadogAgentParser The transform receives a log event and outputs a single or multiple events to one of logs, metrics, traces or _unmatched ouptuts. The Datadog agent data is not decoded by this transform; the incoming event is assumed to be already decoded. For traces, each event is assumed to be an entry of the tracerPayloads or traces (legacy) field of the agent payload. For metrics, each event is assumed to be an entry of the series field of the agent payload. The implementation of the transform is mostly based on the Datadog agent source. Ref: LOG-22997 --- Cargo.toml | 2 + .../mezmo_datadog_agent_parser.rs | 51 + src/internal_events/mod.rs | 4 + .../mezmo_datadog_agent_parser/common.rs | 84 ++ .../mezmo_datadog_agent_parser/config.rs | 168 ++++ .../mezmo_datadog_agent_parser/logs.rs | 162 +++ .../mezmo_datadog_agent_parser/metrics.rs | 948 ++++++++++++++++++ .../mezmo_datadog_agent_parser/mod.rs | 354 +++++++ .../mezmo_datadog_agent_parser/traces.rs | 711 +++++++++++++ src/transforms/mod.rs | 2 + 10 files changed, 2486 insertions(+) create mode 100644 src/internal_events/mezmo_datadog_agent_parser.rs create mode 100644 src/transforms/mezmo_datadog_agent_parser/common.rs create mode 100644 src/transforms/mezmo_datadog_agent_parser/config.rs create mode 100644 src/transforms/mezmo_datadog_agent_parser/logs.rs create mode 100644 src/transforms/mezmo_datadog_agent_parser/metrics.rs create mode 100644 src/transforms/mezmo_datadog_agent_parser/mod.rs create mode 100644 src/transforms/mezmo_datadog_agent_parser/traces.rs diff --git a/Cargo.toml b/Cargo.toml index 5764fb841..39304b3fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -769,6 +769,7 @@ transforms-logs = [ transforms-logs-mezmo = [ "transforms-dedupe", "transforms-filter", + "transforms-mezmo_datadog_agent_parser", "transforms-mezmo_reduce", "transforms-remap", "transforms-route", @@ -818,6 +819,7 @@ transforms-metric_to_log = [] transforms-reduce = ["transforms-impl-reduce"] transforms-mezmo_reduce = ["transforms-reduce"] # depends on the upstream version for merge_strategies transforms-mezmo_aggregate = [] +transforms-mezmo_datadog_agent_parser = [] transforms-mezmo_log_to_metric = [] transforms-mezmo_log_clustering = ["dep:lru", "dep:blake2", "dep:base64", "dep:tokio-postgres"] transforms-mezmo_log_classification = ["dep:grok"] diff --git a/src/internal_events/mezmo_datadog_agent_parser.rs b/src/internal_events/mezmo_datadog_agent_parser.rs new file mode 100644 index 000000000..ea01c4bcd --- /dev/null +++ b/src/internal_events/mezmo_datadog_agent_parser.rs @@ -0,0 +1,51 @@ +use metrics::counter; +use vector_lib::internal_event::InternalEvent; + +#[derive(Debug)] +pub struct MezmoDatadogAgentParserError<'a> { + pub error: &'a str, + pub event_type: Option<&'a str>, +} + +#[derive(Debug)] +pub struct MezmoDatadogAgentParserInvalidItem<'a> { + pub error: &'a str, + pub item_type: &'a str, + pub event_type: Option<&'a str>, +} + +impl InternalEvent for MezmoDatadogAgentParserError<'_> { + fn emit(self) { + let event_type = self.event_type.unwrap_or("unknown"); + error!( + message = "Failed to parse Datadog agent payload.", + error = %self.error, + event_type = %event_type, + internal_log_rate_limit = true, + ); + counter!( + "mezmo_datadog_agent_parser_errors_total", + "event_type" => event_type.to_string(), + ) + .increment(1); + } +} + +impl InternalEvent for MezmoDatadogAgentParserInvalidItem<'_> { + fn emit(self) { + let event_type = self.event_type.unwrap_or("unknown"); + error!( + message = "Invalid item error.", + error = %self.error, + item_type = %self.item_type, + event_type = %event_type, + internal_log_rate_limit = true, + ); + counter!( + "mezmo_datadog_agent_parser_invalid_items_total", + "item_type" => self.item_type.to_string(), + "event_type" => event_type.to_string(), + ) + .increment(1); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index c4726f719..11f982fb9 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -90,6 +90,8 @@ mod mezmo_aggregate; #[cfg(feature = "transforms-mezmo_aggregate_distributed")] mod mezmo_aggregate_distributed; pub mod mezmo_config; +#[cfg(feature = "transforms-mezmo_datadog_agent_parser")] +mod mezmo_datadog_agent_parser; #[cfg(feature = "transforms-mezmo_log_clustering")] pub(crate) mod mezmo_log_clustering; #[cfg(any( @@ -255,6 +257,8 @@ pub(crate) use self::metric_to_log::*; pub(crate) use self::mezmo_aggregate::*; #[cfg(feature = "transforms-mezmo_aggregate_distributed")] pub(crate) use self::mezmo_aggregate_distributed::*; +#[cfg(feature = "transforms-mezmo_datadog_agent_parser")] +pub(crate) use self::mezmo_datadog_agent_parser::*; #[cfg(feature = "transforms-mezmo_tag_cardinality_limit")] pub(crate) use self::mezmo_tag_cardinality_limit::*; #[cfg(feature = "transforms-mezmo_throttle_distributed")] diff --git a/src/transforms/mezmo_datadog_agent_parser/common.rs b/src/transforms/mezmo_datadog_agent_parser/common.rs new file mode 100644 index 000000000..331e3e72b --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/common.rs @@ -0,0 +1,84 @@ +use chrono::{TimeZone, Utc}; + +use crate::config::log_schema; +use crate::event::{LogEvent, ObjectMap, Value}; + +#[derive(Copy, Clone, Debug)] +pub enum TimestampUnit { + Seconds, + Milliseconds, +} + +pub fn get_message_object(log: &LogEvent) -> Result<&ObjectMap, String> { + let message_path = log_schema() + .message_key_target_path() + .ok_or_else(|| "Missing message key".to_string())?; + + let message = log + .get(message_path) + .ok_or_else(|| "Missing message field".to_string())?; + + match message { + Value::Object(obj) => Ok(obj), + _ => Err("Message is not an object".to_string()), + } +} + +pub fn get_message_object_mut(log: &mut LogEvent) -> Result<&mut ObjectMap, String> { + let message_path = log_schema() + .message_key_target_path() + .ok_or_else(|| "Missing message key".to_string())?; + + let message = log + .get_mut(message_path) + .ok_or_else(|| "Missing message field".to_string())?; + + match message { + Value::Object(obj) => Ok(obj), + _ => Err("Message is not an object".to_string()), + } +} + +pub fn parse_timestamp(value: &Value, unit: TimestampUnit) -> Option> { + match value { + Value::Timestamp(timestamp) => Some(*timestamp), + Value::Integer(value) => match unit { + TimestampUnit::Seconds => Utc.timestamp_opt(*value, 0).single(), + TimestampUnit::Milliseconds => Utc.timestamp_millis_opt(*value).single(), + }, + Value::Float(value) => { + let value = value.into_inner(); + if !value.is_finite() { + return None; + } + let seconds = match unit { + TimestampUnit::Seconds => value, + TimestampUnit::Milliseconds => value / 1000.0, + }; + let (seconds, nanos) = split_float_seconds(seconds)?; + Utc.timestamp_opt(seconds, nanos).single() + } + + _ => None, + } +} + +fn split_float_seconds(value: f64) -> Option<(i64, u32)> { + if value < (i64::MIN as f64) || value > (i64::MAX as f64) { + return None; + } + + let secs = value.floor(); + let fract = value - secs; + let mut nanos = (fract * 1_000_000_000.0).round(); + + // Handle rounding overflow (e.g., 0.9999999999 rounding up to 1s) + let secs = if nanos >= 1_000_000_000.0 { + nanos -= 1_000_000_000.0; + secs + 1.0 + } else { + secs + }; + + Some((secs as i64, nanos as u32)) +} diff --git a/src/transforms/mezmo_datadog_agent_parser/config.rs b/src/transforms/mezmo_datadog_agent_parser/config.rs new file mode 100644 index 000000000..4ab5d7d37 --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/config.rs @@ -0,0 +1,168 @@ +use vector_lib::config::{clone_input_definitions, LogNamespace}; +use vector_lib::configurable::configurable_component; +use vector_lib::lookup::lookup_v2::ConfigTargetPath; + +use crate::{ + config::{ + log_schema, DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, + TransformOutput, + }, + schema, + transforms::Transform, +}; + +use super::{ + MezmoDatadogAgentParser, LOGS_OUTPUT, METRICS_OUTPUT, TRACES_OUTPUT, UNMATCHED_OUTPUT, +}; + +fn default_event_type_path() -> ConfigTargetPath { + ConfigTargetPath( + log_schema() + .metadata_key_target_path() + .expect("metadata key must exist") + .clone() + .with_field_appended("x-mezmo-dd-event-type"), + ) +} + +fn default_payload_version_path() -> ConfigTargetPath { + ConfigTargetPath( + log_schema() + .message_key_target_path() + .expect("message key must exist") + .clone() + .with_field_appended("mezmo_payload_version"), + ) +} + +/// Mapping of event type string values to internal types. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(default)] +pub struct EventTypeValues { + /// Value that identifies a log event. + pub log: String, + + /// Value that identifies a metric event. + pub metric: String, + + /// Value that identifies a trace event. + pub trace: String, + + /// Value that identifies a sketch event. + pub sketch: String, +} + +impl Default for EventTypeValues { + fn default() -> Self { + Self { + log: "log".into(), + metric: "metric".into(), + trace: "trace".into(), + sketch: "sketch".into(), + } + } +} + +/// Configuration for the `mezmo_datadog_agent_parser` transform. +#[configurable_component(transform( + "mezmo_datadog_agent_parser", + "Parse and normalize Datadog agent payloads into structured log events." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct MezmoDatadogAgentParserConfig { + /// Path to the event type field in the incoming data. + #[serde(default = "default_event_type_path")] + pub event_type_path: ConfigTargetPath, + + /// Mapping of event type string values to internal types. + #[serde(default)] + pub event_type_values: EventTypeValues, + + /// Path to the payload version field in the incoming data. + #[serde(default = "default_payload_version_path")] + pub payload_version_path: ConfigTargetPath, + + /// Remove the event type field after identifying the event. + #[serde(default = "crate::serde::default_true")] + pub strip_event_type: bool, + + /// Remove the payload version field before sending to output. + #[serde(default = "crate::serde::default_true")] + pub strip_payload_version: bool, + + /// Route unrecognized event types to the _unmatched output instead of dropping. + #[serde(default = "crate::serde::default_true")] + pub reroute_unmatched: bool, +} + +impl Default for MezmoDatadogAgentParserConfig { + fn default() -> Self { + Self { + event_type_path: default_event_type_path(), + event_type_values: EventTypeValues::default(), + payload_version_path: default_payload_version_path(), + strip_event_type: false, + strip_payload_version: false, + reroute_unmatched: true, + } + } +} + +impl GenerateConfig for MezmoDatadogAgentParserConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self::default()).unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "mezmo_datadog_agent_parser")] +impl TransformConfig for MezmoDatadogAgentParserConfig { + async fn build(&self, _context: &TransformContext) -> crate::Result { + Ok(Transform::synchronous(MezmoDatadogAgentParser::new(self))) + } + + fn input(&self) -> Input { + Input::log() + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + input_definitions: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + let mut outputs = vec![ + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(LOGS_OUTPUT), + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(METRICS_OUTPUT), + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(TRACES_OUTPUT), + ]; + + if self.reroute_unmatched { + outputs.push( + TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions)) + .with_port(UNMATCHED_OUTPUT), + ); + } + + outputs + } + + fn enable_concurrency(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/logs.rs b/src/transforms/mezmo_datadog_agent_parser/logs.rs new file mode 100644 index 000000000..5818208e2 --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/logs.rs @@ -0,0 +1,162 @@ +use crate::config::log_schema; +use crate::event::{Event, MaybeAsLogMut, Value}; +use bytes::Bytes; + +use super::common::{get_message_object, get_message_object_mut, parse_timestamp, TimestampUnit}; +use super::MezmoDatadogAgentParser; + +/// See: https://github.com/DataDog/agent-payload/blob/master/proto/logs/agent_logs_payload.proto +/// The log timestamp is in milliseconds, not seconds +pub fn transform_log(event: &mut Event, parser: &MezmoDatadogAgentParser) -> Result<(), String> { + let log = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string())?; + + let parsed_timestamp = { + let message_obj = get_message_object(log)?; + message_obj + .get("timestamp") + .and_then(|value| parse_timestamp(value, TimestampUnit::Milliseconds)) + }; + + let parsed_ddtags = { + let message_obj = get_message_object(log)?; + message_obj + .get("ddtags") + .and_then(|value| value.as_bytes()) + .map(parse_ddtags) + }; + + if let Some(parsed_ddtags) = parsed_ddtags { + let message_obj = get_message_object_mut(log)?; + message_obj.insert("ddtags".into(), parsed_ddtags); + } + + if let Some(parsed) = parsed_timestamp { + if let Some(timestamp_path) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_path, parsed); + } + } + + parser.strip_fields(event); + + Ok(()) +} + +// Mirrors the Datadog agent source parse_ddtags implementation. +fn parse_ddtags(ddtags: &Bytes) -> Value { + String::from_utf8_lossy(ddtags) + .split(',') + .filter_map(|tag_entry| { + let trimmed = tag_entry.trim(); + if trimmed.is_empty() { + None + } else { + Some(Value::Bytes(Bytes::from(trimmed.to_string()))) + } + }) + .collect::>() + .into() +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use bytes::Bytes; + use chrono::{TimeZone, Utc}; + + use crate::config::log_schema; + use crate::event::{Event, EventMetadata, KeyString, LogEvent, Value}; + use crate::transforms::mezmo_datadog_agent_parser::{ + MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, + }; + + use super::transform_log; + + fn build_event(message: BTreeMap) -> Event { + Event::Log(LogEvent::from_map( + [( + log_schema() + .message_key() + .expect("message key") + .to_string() + .into(), + Value::Object(message), + )] + .into_iter() + .collect(), + EventMetadata::default(), + )) + } + + #[test] + fn parse_timestamp_and_tags() { + let mut message = BTreeMap::new(); + message.insert("timestamp".into(), Value::Integer(1_700_000_000_123)); + message.insert( + "ddtags".into(), + Value::Bytes(Bytes::from_static(b"env:prod,team:core")), + ); + message.insert("status".into(), Value::Bytes(Bytes::from_static(b"info"))); + + let mut event = build_event(message); + let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default()); + + transform_log(&mut event, &parser).expect("transform should succeed"); + + let log = event.as_log(); + let ts_path = log_schema() + .timestamp_key_target_path() + .expect("timestamp key"); + + let message = log + .get(log_schema().message_key_target_path().expect("message key")) + .and_then(|val| val.as_object()) + .expect("message object"); + + let message_ts = message + .get("timestamp") + .and_then(Value::as_integer) + .expect("message timestamp"); + let expected_ts = Utc + .timestamp_millis_opt(message_ts) + .latest() + .expect("valid timestamp"); + + assert_eq!(log.get(ts_path), Some(&Value::Timestamp(expected_ts))); + + assert_eq!( + message.get("ddtags"), + Some(&Value::Array(vec![ + Value::Bytes(Bytes::from_static(b"env:prod")), + Value::Bytes(Bytes::from_static(b"team:core")), + ])) + ); + assert_eq!(message.get("timestamp"), Some(&Value::Integer(message_ts))); + assert_eq!( + message.get("status"), + Some(&Value::Bytes(Bytes::from_static(b"info"))) + ); + } + + #[test] + fn handles_invalid_timestamp() { + let mut message = BTreeMap::new(); + message.insert( + "timestamp".into(), + Value::Bytes(Bytes::from_static(b"not-a-ts")), + ); + + let mut event = build_event(message); + let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default()); + + transform_log(&mut event, &parser).expect("transform should succeed"); + + let log = event.as_log(); + let ts_path = log_schema() + .timestamp_key_target_path() + .expect("timestamp key"); + assert!(log.get(ts_path).is_none()); + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/metrics.rs b/src/transforms/mezmo_datadog_agent_parser/metrics.rs new file mode 100644 index 000000000..286b1677d --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/metrics.rs @@ -0,0 +1,948 @@ +use crate::common::datadog::DatadogMetricType; +use crate::config::log_schema; +use crate::event::{Event, MaybeAsLogMut, MetricKind, ObjectMap, Value}; + +use super::common::{get_message_object, parse_timestamp, TimestampUnit}; +use super::MezmoDatadogAgentParser; + +/// Transform a metric event into the normalized MezmoMetric log format. +/// +/// The incoming event is a log event with the metric data in `.message`. +/// The payload version determines whether this is a v1 or v2 series metric. +/// Metrics data is intercepted and decoded in Kong with each item in the series +/// field emitted as a separate event +/// See: https://github.com/answerbook/pipeline-gateway-kong/blob/6fefc73374b32996b4bbb5ab1052eb2e6e6d3293/kong/plugins/pipeline-routing/lib/datadog.lua#L157 +pub fn transform_metric( + mut event: Event, + parser: &MezmoDatadogAgentParser, +) -> Result, String> { + let version = parser + .get_payload_version(&event) + .unwrap_or_else(|| "v2".to_string()); + + let message_obj = { + let log = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string())?; + get_message_object(log)?.clone() + }; + + let output_messages = match version.as_str() { + "v1" => transform_series_v1(&message_obj)?, + "v2" => transform_series_v2(&message_obj)?, + _ => transform_series_v2(&message_obj)?, + }; + + parser.build_events_from_messages(event, output_messages) +} + +/// Transform a sketch event into the normalized MezmoMetric log format. +pub fn transform_sketch( + mut event: Event, + parser: &MezmoDatadogAgentParser, +) -> Result, String> { + let message_obj = { + let log = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string())?; + get_message_object(log)?.clone() + }; + + parser.build_events_from_messages(event, transform_sketch_payload(&message_obj)?) +} + +/// Transforms Datadog series v1 metrics (/api/v1/series) +/// v1 metrics store points as a list of lists ([[timestamp, value]]) +fn transform_series_v1(message: &ObjectMap) -> Result)>, String> { + let metric_name = message + .get("metric") + .and_then(|v| v.as_str()) + .ok_or_else(|| "Missing metric name".to_string())?; + + let metric_type = message + .get("type") + .ok_or_else(|| "Missing metric type".to_string()) + .and_then(parse_metric_type)?; + let metric_kind = MetricKind::from(&metric_type); + + let points = message + .get("points") + .and_then(|v| v.as_array()) + .ok_or_else(|| "Missing points".to_string())?; + + let mut tags = message + .get("tags") + .and_then(|v| v.as_array()) + .map(parse_tag_array) + .unwrap_or_default(); + + insert_source_type_name(message, &mut tags); + + let (namespace, name) = namespace_name_from_dd_metric(&metric_name); + + let interval = message + .get("interval") + .and_then(|v| v.as_integer()) + .map(|i| i as u32) + .unwrap_or(0); + + let mut outputs = Vec::new(); + + for point in points { + let point = point + .as_array() + .ok_or_else(|| "Missing point value".to_string())?; + + if point.len() < 2 { + return Err("Missing point value".to_string()); + } + + let point_timestamp = parse_point_timestamp(&point[0])?; + let point_value = parse_metric_value(&point[1])?; + + let mut output = ObjectMap::new(); + output.insert("name".into(), Value::from(name.to_string())); + if let Some(ns) = namespace { + output.insert("namespace".into(), Value::from(ns.to_string())); + } + + output.insert("kind".into(), Value::from(metric_kind_as_str(metric_kind))); + output.insert("type".into(), Value::from(metric_type.as_metric_kind_str())); + + if !tags.is_empty() { + output.insert("tags".into(), Value::Object(tags.clone())); + } + + let interval = match metric_type { + DatadogMetricType::Rate => { + // Rates are converted to type: counter + // See: https://github.com/mezmo/vector/blob/f5010f6b3e3cda9f201429c3802cee94efb16586/src/sources/datadog_agent/metrics.rs#L280 + let interval = if interval != 0 { interval } else { 1 }; + output.insert("value".into(), Value::from(point_value * (interval as f64))); + Some(interval) + } + DatadogMetricType::Gauge => { + output.insert("value".into(), Value::from(point_value)); + if interval > 0 { + Some(interval) + } else { + None + } + } + DatadogMetricType::Count => { + output.insert("value".into(), Value::from(point_value)); + None + } + }; + + let mut time_obj = ObjectMap::new(); + if let Some(interval) = interval { + time_obj.insert("interval_ms".into(), Value::from(interval * 1000)); + } + output.insert("time".into(), Value::Object(time_obj)); + + if let Some(host) = message.get("host") { + output.insert("host".into(), host.clone()); + } + + outputs.push((output, Some(point_timestamp))); + } + + Ok(outputs) +} + +/// Transforms Datadog series v1 metrics (/api/v1/series) +/// v2 metrics store points as a list of objects ([{timestamp, value}]) +fn transform_series_v2(message: &ObjectMap) -> Result)>, String> { + let metric_name = message + .get("metric") + .and_then(|v| v.as_str()) + .ok_or_else(|| "Missing metric name".to_string())?; + + let metric_type = message + .get("type") + .ok_or_else(|| "Missing metric type".to_string()) + .and_then(parse_metric_type)?; + let metric_kind = MetricKind::from(&metric_type); + + let points = message + .get("points") + .and_then(|v| v.as_array()) + .ok_or_else(|| "Missing points".to_string())?; + + let mut tags = message + .get("tags") + .and_then(|v| v.as_array()) + .map(parse_tag_array) + .unwrap_or_default(); + + let (host_from_resource, resource_tags) = extract_resources(message); + tags.extend(resource_tags); + insert_source_type_name(message, &mut tags); + + let (namespace, name) = namespace_name_from_dd_metric(&metric_name); + + let interval = message + .get("interval") + .and_then(|v| v.as_integer()) + .map(|i| i as u32) + .unwrap_or(0); + + let mut outputs = Vec::new(); + + for point in points { + let point = point + .as_object() + .ok_or_else(|| "Missing point value".to_string())?; + + let point_value = point + .get("value") + .ok_or_else(|| "Missing point value".to_string()) + .and_then(parse_metric_value)?; + let point_timestamp = point + .get("timestamp") + .ok_or_else(|| "Missing point timestamp".to_string()) + .and_then(parse_point_timestamp)?; + + let mut output = ObjectMap::new(); + output.insert("name".into(), Value::from(name.to_string())); + if let Some(ns) = namespace { + output.insert("namespace".into(), Value::from(ns.to_string())); + } + output.insert("kind".into(), Value::from(metric_kind_as_str(metric_kind))); + output.insert("type".into(), Value::from(metric_type.as_metric_kind_str())); + + if !tags.is_empty() { + output.insert("tags".into(), Value::Object(tags.clone())); + } + + let interval = match metric_type { + DatadogMetricType::Rate => { + let interval = if interval != 0 { interval } else { 1 }; + output.insert("value".into(), Value::from(point_value * (interval as f64))); + Some(interval) + } + DatadogMetricType::Gauge => { + output.insert("value".into(), Value::from(point_value)); + if interval > 0 { + Some(interval) + } else { + None + } + } + DatadogMetricType::Count => { + output.insert("value".into(), Value::from(point_value)); + None + } + }; + + let mut time_obj = ObjectMap::new(); + if let Some(interval) = interval { + time_obj.insert("interval_ms".into(), Value::from(interval * 1000)); + } + output.insert("time".into(), Value::Object(time_obj)); + + if let Some(host) = host_from_resource.as_ref() { + output.insert("host".into(), host.clone()); + } + + outputs.push((output, Some(point_timestamp))); + } + + Ok(outputs) +} + +/// Transform a Datadog sketch (custom metric) +/// Schema: https://github.com/DataDog/agent-payload/blob/b14d89ab49cdb6a4618c2ac76c0b21e4e5423c5b/proto/metrics/agent_payload.proto#L91 +/// In Kong, the sketches field of the agent payload is unrolled and forwarded to vector +/// See: https://github.com/answerbook/pipeline-gateway-kong/blob/6fefc73374b32996b4bbb5ab1052eb2e6e6d3293/kong/plugins/pipeline-routing/lib/datadog.lua#L201 +fn transform_sketch_payload( + message: &ObjectMap, +) -> Result)>, String> { + let metric_name = message + .get("metric") + .and_then(|v| v.as_str()) + .ok_or_else(|| "Missing metric name".to_string())?; + + let mut tags = message + .get("tags") + .and_then(|v| v.as_array()) + .map(parse_tag_array) + .unwrap_or_default(); + + insert_sketch_host_tag(message, &mut tags); + + let (namespace, metric_name) = namespace_name_from_dd_metric(&metric_name); + + let dog_sketches = message + .get("dogsketches") + .and_then(|v| v.as_array()) + .ok_or_else(|| "Missing sketches".to_string())?; + + let mut outputs = Vec::new(); + + for sketch in dog_sketches { + let mut sketch_value = sketch.clone(); + let mut output = ObjectMap::new(); + output.insert("name".into(), Value::from(metric_name.to_string())); + if let Some(namespace) = namespace { + output.insert("namespace".into(), Value::from(namespace.to_string())); + } + + output.insert( + "kind".into(), + Value::from(metric_kind_as_str(MetricKind::Incremental)), + ); + + if !tags.is_empty() { + output.insert("tags".into(), Value::Object(tags.clone())); + } + + output.insert("type".into(), Value::from("sketch")); + + let timestamp = sketch + .as_object() + .and_then(|obj| obj.get("ts")) + .and_then(|value| parse_point_timestamp(value).ok()); + + if let Value::Object(ref mut sketch) = sketch_value { + sketch.remove("ts"); + } + + output.insert("value".into(), sketch_value); + outputs.push((output, timestamp)); + } + + Ok(outputs) +} + +fn parse_tag_array(tags: &[Value]) -> ObjectMap { + tags.iter() + .filter_map(|tag| tag.as_str()) + .map(|tag| { + let (key, val) = match tag.split_once(':') { + Some((prefix, suffix)) => (prefix, Value::from(suffix.to_string())), + None => (tag.as_ref(), Value::Null), + }; + (key.to_string().into(), val) + }) + .collect() +} + +fn parse_metric_type(metric_type: &Value) -> Result { + // Some legacy agents send the metric type as an integer + if let Some(metric_type) = metric_type.as_integer() { + return match metric_type { + 1 => Ok(DatadogMetricType::Count), + 2 => Ok(DatadogMetricType::Rate), + 3 => Ok(DatadogMetricType::Gauge), + _ => Err("Unknown metric type".to_string()), + }; + } + + if let Some(metric_type) = metric_type.as_str() { + let normalized = metric_type.trim().to_ascii_lowercase(); + return match normalized.as_str() { + "count" => Ok(DatadogMetricType::Count), + "rate" => Ok(DatadogMetricType::Rate), + "gauge" => Ok(DatadogMetricType::Gauge), + _ => Err("Unknown metric type".to_string()), + }; + } + + Err("Unknown metric type".to_string()) +} + +fn parse_metric_value(value: &Value) -> Result { + match value { + Value::Float(val) => Ok(val.into_inner()), + Value::Integer(val) => Ok(*val as f64), + _ => Err("Missing point value".to_string()), + } +} + +fn insert_source_type_name(message: &ObjectMap, tags: &mut ObjectMap) { + if let Some(source_type_name) = message.get("source_type_name").and_then(|v| v.as_str()) { + if !source_type_name.is_empty() { + tags.insert( + "source_type_name".into(), + Value::from(source_type_name.to_string()), + ); + } + } +} + +fn insert_sketch_host_tag(message: &ObjectMap, tags: &mut ObjectMap) { + let host = match message.get("host") { + Some(host) => host, + None => return, + }; + + let host_key = match log_schema().host_key() { + Some(host_key) => host_key.to_string(), + None => return, + }; + + tags.insert(host_key.into(), host.clone()); +} + +fn extract_resources(message: &ObjectMap) -> (Option, ObjectMap) { + let mut host = None; + let mut resource_tags = ObjectMap::new(); + let resources = match message.get("resources").and_then(|v| v.as_array()) { + Some(resources) => resources, + None => return (host, resource_tags), + }; + + for resource in resources { + let resource_item = match resource.as_object() { + Some(obj) => obj, + None => continue, + }; + + let resource_type = match resource_item.get("type").and_then(|v| v.as_str()) { + Some(resource_type) => resource_type, + None => continue, + }; + + let resource_name = match resource_item.get("name") { + Some(resource_name) => resource_name, + None => continue, + }; + + if resource_type == "host" { + host = Some(resource_name.clone()); + continue; + } + + resource_tags.insert( + format!("resource.{resource_type}").into(), + resource_name.clone(), + ); + } + + (host, resource_tags) +} + +fn parse_point_timestamp(value: &Value) -> Result { + if let Some(timestamp) = parse_timestamp(value, TimestampUnit::Seconds) { + return Ok(Value::Timestamp(timestamp)); + } + + let error = match value { + Value::Integer(_) | Value::Float(_) => "Invalid point timestamp", + _ => "Missing point timestamp", + }; + + Err(error.to_string()) +} + +impl<'a> From<&'a DatadogMetricType> for MetricKind { + fn from(value: &'a DatadogMetricType) -> Self { + match value { + DatadogMetricType::Gauge => MetricKind::Absolute, + DatadogMetricType::Count | DatadogMetricType::Rate => MetricKind::Incremental, + } + } +} + +trait AsMetricKindStr { + fn as_metric_kind_str(&self) -> &'static str; +} + +impl AsMetricKindStr for DatadogMetricType { + fn as_metric_kind_str(&self) -> &'static str { + match self { + DatadogMetricType::Gauge => "gauge", + DatadogMetricType::Count | DatadogMetricType::Rate => "counter", + } + } +} + +const fn metric_kind_as_str(kind: MetricKind) -> &'static str { + match kind { + MetricKind::Incremental => "incremental", + MetricKind::Absolute => "absolute", + } +} + +fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) { + match dd_metric_name.split_once('.') { + Some((namespace, name)) => (Some(namespace), name), + None => (None, dd_metric_name), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::LogEvent; + use crate::transforms::mezmo_datadog_agent_parser::{ + MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, + }; + use chrono::{TimeZone, Utc}; + + fn create_v2_metric_event() -> Event { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.usage", + "type": 3, + "points": [{"timestamp": 1234567890, "value": 42.5}, {"timestamp": 1234567891, "value": 43.0}], + "tags": ["env:prod", "host:server1"], + "resources": [ + {"type": "host", "name": "myhost"}, + {"type": "cluster", "name": "cluster-a"} + ], + "source_type_name": "my-source" + }), + ); + Event::Log(log) + } + + fn create_v1_metric_event() -> Event { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v1", + "metric": "system.memory.used", + "type": "gauge", + "points": [[1234567890, 1024.0], [1234567891, 1025.0]], + "tags": ["env:staging"], + "host": "testhost", + "source_type_name": "my-source" + }), + ); + Event::Log(log) + } + + #[test] + fn test_transform_v2_metric() { + let event = create_v2_metric_event(); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_metric(event, &parser).unwrap(); + assert_eq!(results.len(), 2); + + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("cpu.usage".to_string()) + ); + assert_eq!( + message + .get("namespace") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("system".to_string()) + ); + + let tags = message.get("tags").unwrap().as_object().unwrap(); + assert!(matches!( + tags.get("resource.cluster").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "cluster-a" + )); + assert!(matches!( + tags.get("source_type_name").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "my-source" + )); + + let val = message.get("value").unwrap().as_float().unwrap(); + assert_eq!(val, ordered_float::NotNan::new(42.5).unwrap()); + + let timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + + let second_log = results[1].as_log(); + let second_log_message = second_log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + let second_log_value = second_log_message.get("value").unwrap().as_float().unwrap(); + assert_eq!(second_log_value, ordered_float::NotNan::new(43.0).unwrap()); + } + + #[test] + fn test_transform_v1_metric() { + let event = create_v1_metric_event(); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_metric(event, &parser).unwrap(); + assert_eq!(results.len(), 2); + + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("memory.used".to_string()) + ); + assert_eq!( + message + .get("namespace") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("system".to_string()) + ); + + let tags = message.get("tags").unwrap().as_object().unwrap(); + assert!(matches!( + tags.get("source_type_name").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "my-source" + )); + + let timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + + #[test] + fn test_parse_tag_array() { + let tags = vec![ + Value::from("env:prod"), + Value::from("host:server1"), + Value::from("bare_tag"), + ]; + + let result = parse_tag_array(&tags); + + assert_eq!( + result + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("prod".to_string()) + ); + assert_eq!( + result + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("server1".to_string()) + ); + assert_eq!( + result.get("bare_tag").map(|v| matches!(v, Value::Null)), + Some(true) + ); + } + + #[test] + fn test_rate_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.rate", + "type": 2, // Rate + "interval": 10, // 10s + "points": [{"timestamp": 1234567890, "value": 5.0}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_metric(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let val = message.get("value").unwrap().as_float().unwrap(); + // value = 5.0 * 10 = 50.0 + assert_eq!(val, ordered_float::NotNan::new(50.0).unwrap()); + + let time_obj = message.get("time").unwrap().as_object().unwrap(); + let interval_ms = time_obj.get("interval_ms").unwrap().as_integer().unwrap(); + // interval_ms = 10 * 1000 = 10000 + assert_eq!(interval_ms, 10000); + + assert_eq!( + message.get("kind").unwrap().as_str().unwrap(), + "incremental" + ); + assert_eq!(message.get("type").unwrap().as_str().unwrap(), "counter") + } + + #[test] + fn test_string_metric_type() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.usage", + "type": "GAUGE", + "points": [{"timestamp": 1234567890, "value": 42.5}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_metric(event, &parser).unwrap(); + let message = results[0] + .as_log() + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert!(matches!( + message.get("type").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "gauge" + )); + } + + #[test] + fn test_gauge_interval_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.gauge", + "type": 3, // Gauge + "interval": 10, // 10s + "points": [{"timestamp": 1234567890, "value": 5.0}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_metric(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let time_obj = message.get("time").unwrap().as_object().unwrap(); + let interval_ms = time_obj.get("interval_ms").unwrap().as_integer().unwrap(); + assert_eq!(interval_ms, 10000); + } + + #[test] + fn test_count_interval_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.count", + "type": 1, // Count + "interval": 10, // 10s + "points": [{"timestamp": 1234567890, "value": 5.0}], + "tags": [], + "resources": [] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_metric(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let time_obj = message.get("time").unwrap().as_object().unwrap(); + assert!(time_obj.get("interval_ms").is_none()); + } + + #[test] + fn test_sketch_metric() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "metric": "system.cpu.sketch", + "tags": ["env:prod", "bare_tag"], + "host": "testhost", + "dogsketches": [ + { + "cnt": 12, + "min": 1.0, + "max": 9.0, + "sum": 15.0, + "avg": 4.5, + "k": [1, 2], + "n": [3, 4], + "ts": 1234567890 + } + ] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_sketch(event, &parser).unwrap(); + assert_eq!(results.len(), 1); + + let message = results[0] + .as_log() + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert!(matches!( + message.get("kind").and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "incremental" + )); + assert!(matches!( + message + .get("type") + .and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "sketch" + )); + let tags = message.get("tags").unwrap().as_object().unwrap(); + let host_key = log_schema().host_key().unwrap().to_string(); + assert!(matches!( + tags.get(host_key.as_str()).and_then(|v| v.as_str()), + Some(value) if value.as_ref() == "testhost" + )); + + let timestamp = results[0] + .as_log() + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + + #[test] + fn test_sketch_message_ts() { + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + serde_json::json!({ + "metric": "system.cpu.sketch", + "tags": ["env:prod", "bare_tag"], + "host": "testhost", + "dogsketches": [ + { + "cnt": 12, + "min": 1.0, + "max": 9.0, + "sum": 15.0, + "avg": 4.5, + "k": [1, 2], + "n": [3, 4], + "ts": 1234567890 + } + ] + }), + ); + let event = Event::Log(log); + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let results = transform_sketch(event, &parser).unwrap(); + assert_eq!(results.len(), 1); + + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let value_obj = message.get("value").unwrap().as_object().unwrap(); + assert!(value_obj.get("ts").is_none()); + assert_eq!(value_obj.get("cnt").and_then(|v| v.as_integer()), Some(12)); + + let timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + + #[test] + fn test_parse_point_timestamp_value_variants() { + let timestamp = Utc.timestamp_opt(1234567890, 0).single().unwrap(); + let value = Value::Timestamp(timestamp); + let parsed = parse_point_timestamp(&value).unwrap(); + let parsed_timestamp = parsed.as_timestamp().expect("timestamp should be set"); + assert_eq!(*parsed_timestamp, timestamp); + + let value = Value::from(1234567890); + let parsed = parse_point_timestamp(&value).unwrap(); + let parsed_timestamp = parsed.as_timestamp().expect("timestamp should be set"); + assert_eq!( + *parsed_timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + + let value = Value::from(1234567890.0); + let parsed = parse_point_timestamp(&value).unwrap(); + let parsed_timestamp = parsed.as_timestamp().expect("timestamp should be set"); + assert_eq!( + *parsed_timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + + let value = Value::from(f64::INFINITY); + let err = parse_point_timestamp(&value).unwrap_err(); + assert_eq!(err, "Invalid point timestamp"); + + let value = Value::from("not-a-timestamp"); + let err = parse_point_timestamp(&value).unwrap_err(); + assert_eq!(err, "Missing point timestamp"); + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/mod.rs b/src/transforms/mezmo_datadog_agent_parser/mod.rs new file mode 100644 index 000000000..7152fc99c --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/mod.rs @@ -0,0 +1,354 @@ +use std::borrow::Cow; + +use vector_lib::lookup::lookup_v2::ConfigTargetPath; +use vector_lib::transform::SyncTransform; + +use crate::{ + config::log_schema, + event::{Event, MaybeAsLogMut, ObjectMap, Value}, + internal_events::MezmoDatadogAgentParserError, +}; + +mod common; +mod config; +mod logs; +mod metrics; +mod traces; + +pub use config::{EventTypeValues, MezmoDatadogAgentParserConfig}; + +pub const LOGS_OUTPUT: &str = "logs"; +pub const METRICS_OUTPUT: &str = "metrics"; +pub const TRACES_OUTPUT: &str = "traces"; +pub const UNMATCHED_OUTPUT: &str = "_unmatched"; + +#[derive(Clone, Debug)] +pub struct MezmoDatadogAgentParser { + event_type_path: ConfigTargetPath, + event_type_values: EventTypeValues, + payload_version_path: ConfigTargetPath, + strip_event_type: bool, + strip_payload_version: bool, + reroute_unmatched: bool, +} + +impl MezmoDatadogAgentParser { + pub fn new(config: &MezmoDatadogAgentParserConfig) -> Self { + Self { + event_type_path: config.event_type_path.clone(), + event_type_values: config.event_type_values.clone(), + payload_version_path: config.payload_version_path.clone(), + strip_event_type: config.strip_event_type, + strip_payload_version: config.strip_payload_version, + reroute_unmatched: config.reroute_unmatched, + } + } + + fn get_event_type(&self, event: &Event) -> Option { + event + .maybe_as_log() + .and_then(|log| log.get(&self.event_type_path.0)) + .and_then(|v| v.as_str()) + .map(Cow::into_owned) + } + + fn get_payload_version(&self, event: &Event) -> Option { + event + .maybe_as_log() + .and_then(|log| log.get(&self.payload_version_path.0)) + .and_then(|v| v.as_str()) + .map(Cow::into_owned) + } + + pub fn strip_fields(&self, event: &mut Event) { + if let Some(log) = event.maybe_as_log_mut() { + if self.strip_event_type { + log.remove(&self.event_type_path.0); + } + if self.strip_payload_version { + log.remove(&self.payload_version_path.0); + } + } + } + + pub fn build_events_from_messages( + &self, + mut event: Event, + payloads: Vec<(ObjectMap, Option)>, + ) -> Result, String> { + if payloads.is_empty() { + return Ok(Vec::new()); + } + + self.strip_fields(&mut event); + + payloads + .into_iter() + .map(|payload| { + let mut new_event = event.clone(); + insert_message(&mut new_event, payload)?; + Ok(new_event) + }) + .collect() + } +} + +fn insert_message(event: &mut Event, payload: (ObjectMap, Option)) -> Result<(), String> { + let log = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string())?; + + let (message, timestamp) = payload; + + log.insert( + log_schema() + .message_key_target_path() + .ok_or_else(|| "Missing message key".to_string())?, + Value::Object(message), + ); + + if let Some(timestamp) = timestamp { + if let Some(timestamp_path) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_path, timestamp); + } + } + + Ok(()) +} + +impl SyncTransform for MezmoDatadogAgentParser { + fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) { + let event_type = self.get_event_type(&event); + + match event_type.as_deref() { + Some(t) if t == self.event_type_values.log => { + let mut event = event; + match logs::transform_log(&mut event, self) { + Ok(()) => output.push(Some(LOGS_OUTPUT), event), + Err(err) => { + emit!(MezmoDatadogAgentParserError { + error: &err, + event_type: Some("log"), + }); + if self.reroute_unmatched { + output.push(Some(UNMATCHED_OUTPUT), event); + } + } + } + } + Some(t) if t == self.event_type_values.metric => { + match metrics::transform_metric(event, self) { + Ok(events) => { + for event in events { + output.push(Some(METRICS_OUTPUT), event); + } + } + Err(err) => { + emit!(MezmoDatadogAgentParserError { + error: &err, + event_type: Some("metric"), + }); + } + } + } + Some(t) if t == self.event_type_values.sketch => { + match metrics::transform_sketch(event, self) { + Ok(events) => { + for event in events { + output.push(Some(METRICS_OUTPUT), event); + } + } + Err(err) => { + emit!(MezmoDatadogAgentParserError { + error: &err, + event_type: Some("sketch"), + }); + } + } + } + Some(t) if t == self.event_type_values.trace => { + match traces::transform_trace(event, self) { + Ok(event) => output.push(Some(TRACES_OUTPUT), event), + Err(err) => { + emit!(MezmoDatadogAgentParserError { + error: &err, + event_type: Some("trace"), + }); + } + } + } + _ => { + if self.reroute_unmatched { + output.push(Some(UNMATCHED_OUTPUT), event); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use chrono::{TimeZone, Utc}; + + use super::*; + use crate::config::{log_schema, DataType, TransformOutput}; + use crate::event::{LogEvent, Value}; + use crate::transforms::SyncTransform; + use vector_lib::transform::TransformOutputsBuf; + + fn build_outputs_buf() -> (Vec<&'static str>, TransformOutputsBuf) { + let output_names = vec![LOGS_OUTPUT, METRICS_OUTPUT, TRACES_OUTPUT, UNMATCHED_OUTPUT]; + let buf = TransformOutputsBuf::new_with_capacity( + output_names + .iter() + .map(|output_name| { + TransformOutput::new(DataType::Log, HashMap::new()) + .with_port(output_name.to_owned()) + }) + .collect(), + 1, + ); + (output_names, buf) + } + + fn build_event(event_type: &str, message: Value) -> Event { + let mut log = LogEvent::default(); + log.insert(log_schema().message_key_target_path().unwrap(), message); + let event_type_path = log_schema() + .metadata_key_target_path() + .expect("metadata key must exist") + .with_field_appended("x-mezmo-dd-event-type"); + log.insert(&event_type_path, Value::from(event_type)); + Event::Log(log) + } + + #[test] + fn routes_events_to_expected_outputs() { + let config = MezmoDatadogAgentParserConfig::default(); + let mut parser = MezmoDatadogAgentParser::new(&config); + let (output_names, mut outputs) = build_outputs_buf(); + + let log_event = build_event( + "log", + serde_json::json!({ + "status": "ok" + }) + .into(), + ); + parser.transform(log_event, &mut outputs); + + let metric_event = build_event( + "metric", + serde_json::json!({ + "mezmo_payload_version": "v2", + "metric": "system.cpu.usage", + "type": 3, + "points": [{"timestamp": 1234567890, "value": 42.5}], + "tags": [], + "resources": [] + }) + .into(), + ); + parser.transform(metric_event, &mut outputs); + + let sketch_event = build_event( + "sketch", + serde_json::json!({ + "metric": "system.cpu.sketch", + "tags": ["env:prod"], + "host": "testhost", + "dogsketches": [ + { + "cnt": 12, + "min": 1.0, + "max": 9.0, + "sum": 15.0, + "avg": 4.5, + "k": [1, 2], + "n": [3, 4], + "ts": 1234567890 + } + ] + }) + .into(), + ); + parser.transform(sketch_event, &mut outputs); + + let trace_event = build_event( + "trace", + serde_json::json!({ + "mezmo_payload_version": "v2", + "chunks": [] + }) + .into(), + ); + parser.transform(trace_event, &mut outputs); + + let unmatched_event = build_event("unknown", serde_json::json!({}).into()); + parser.transform(unmatched_event, &mut outputs); + + for output_name in output_names { + let events: Vec<_> = outputs.drain_named(output_name).collect(); + match output_name { + LOGS_OUTPUT => assert_eq!(events.len(), 1), + METRICS_OUTPUT => assert_eq!(events.len(), 2), + TRACES_OUTPUT => assert_eq!(events.len(), 1), + UNMATCHED_OUTPUT => assert_eq!(events.len(), 1), + _ => unreachable!("unexpected output"), + } + } + } + + #[test] + fn build_events_from_messages_sets_message_and_timestamp() { + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let mut log = LogEvent::default(); + log.insert( + log_schema().message_key_target_path().unwrap(), + Value::Object(ObjectMap::new()), + ); + let event = Event::Log(log); + + let timestamp = Value::Timestamp(Utc.timestamp_opt(1234567890, 0).single().unwrap()); + let mut first = ObjectMap::new(); + first.insert("message".into(), Value::from("first")); + let mut second = ObjectMap::new(); + second.insert("message".into(), Value::from("second")); + + let results = parser + .build_events_from_messages( + event, + vec![ + (first, Some(timestamp.clone())), + (second, Some(timestamp.clone())), + ], + ) + .expect("build events"); + + assert_eq!(results.len(), 2); + for result in results { + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .and_then(Value::as_object) + .expect("message object"); + assert!(matches!( + message.get("message").and_then(Value::as_str), + Some(_) + )); + + let parsed_timestamp = log + .get(log_schema().timestamp_key_target_path().unwrap()) + .and_then(Value::as_timestamp) + .expect("timestamp should be set"); + assert_eq!( + *parsed_timestamp, + Utc.timestamp_opt(1234567890, 0).single().unwrap() + ); + } + } +} diff --git a/src/transforms/mezmo_datadog_agent_parser/traces.rs b/src/transforms/mezmo_datadog_agent_parser/traces.rs new file mode 100644 index 000000000..0563ad444 --- /dev/null +++ b/src/transforms/mezmo_datadog_agent_parser/traces.rs @@ -0,0 +1,711 @@ +use chrono::{TimeZone, Utc}; +use ordered_float::NotNan; + +use crate::config::log_schema; +use crate::event::{Event, MaybeAsLogMut, ObjectMap, Value}; +use crate::internal_events::MezmoDatadogAgentParserInvalidItem; + +use super::common::get_message_object; +use super::MezmoDatadogAgentParser; + +pub fn transform_trace( + mut event: Event, + parser: &MezmoDatadogAgentParser, +) -> Result { + let version = parser + .get_payload_version(&event) + .unwrap_or_else(|| "v2".to_string()); + + let log = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string())?; + + let output_message = { + let message_obj = get_message_object(log)?; + + let mut msg = match version.as_str() { + "v1" => transform_trace_v1(message_obj)?, + "v2" => transform_tracer_payload(message_obj), + _ => return Err(format!("Unsupported payload version: {version}")), + }; + + msg.insert("payload_version".into(), Value::from(version.clone())); + msg + }; + + log.insert( + log_schema().message_key_target_path().unwrap(), + Value::Object(output_message), + ); + + parser.strip_fields(&mut event); + + Ok(event) +} + +fn transform_trace_v1(message: &ObjectMap) -> Result { + if message.get("transactions").is_some() { + return Ok(transform_transaction(message)); + } + Ok(transform_api_trace(message)) +} + +/// APITrace: https://github.com/mezmo/vector/blob/f5010f6b3e3cda9f201429c3802cee94efb16586/proto/vector/dd_trace.proto#L20 +/// Transforms an APITrace item +/// See Kong implementation: https://github.com/answerbook/pipeline-gateway-kong/blob/af22369e9c53d319aa808391f06eae5ac0e8d036/kong/plugins/pipeline-routing/lib/datadog.lua#L83 +fn transform_api_trace(input: &ObjectMap) -> ObjectMap { + let mut output = ObjectMap::new(); + + copy_string(input, "hostName", &mut output, "host"); + copy_string(input, "env", &mut output, "env"); + copy_u64(input, "traceID", &mut output, "trace_id"); + copy_timestamp_nanos(input, "startTime", &mut output, "start_time"); + copy_timestamp_nanos(input, "endTime", &mut output, "end_time"); + + if let Some(spans) = input.get("spans").and_then(Value::as_array) { + let transformed: Vec = spans + .iter() + .filter_map(|span_val| { + if let Some(obj) = span_val.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Span is not an object", + item_type: "span", + event_type: Some("trace"), + }); + None + } + }) + .map(|span| transform_span(span, None, None, None)) + .collect(); + if !transformed.is_empty() { + output.insert("spans".into(), Value::Array(transformed)); + } + } + + output +} + +/// Transactions are basically spans: https://github.com/mezmo/vector/blob/f5010f6b3e3cda9f201429c3802cee94efb16586/proto/vector/dd_trace.proto#L11 +/// Transforms an APITrace item +/// See Kong implementation: https://github.com/answerbook/pipeline-gateway-kong/blob/af22369e9c53d319aa808391f06eae5ac0e8d036/kong/plugins/pipeline-routing/lib/datadog.lua#L83 +fn transform_transaction(input: &ObjectMap) -> ObjectMap { + let mut output = ObjectMap::new(); + + copy_string(input, "hostName", &mut output, "host"); + copy_string(input, "env", &mut output, "env"); + + let spans: Vec = input + .get("transactions") + .and_then(Value::as_array) + .map(|arr| { + arr.iter() + .filter_map(|txn_val| { + if let Some(obj) = txn_val.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Transaction is not an object", + item_type: "transaction", + event_type: Some("trace"), + }); + None + } + }) + .map(|span| transform_span(span, Some(true), None, None)) + .collect() + }) + .unwrap_or_default(); + + output.insert("spans".into(), Value::Array(spans)); + output +} + +/// TracerPayload: https://github.com/DataDog/datadog-agent/blob/main/pkg/proto/datadog/trace/tracer_payload.proto +fn transform_tracer_payload(input: &ObjectMap) -> ObjectMap { + let mut output = ObjectMap::new(); + + copy_string(input, "hostName", &mut output, "host"); + copy_string(input, "env", &mut output, "env"); + copy_string(input, "agentVersion", &mut output, "agent_version"); + copy_string(input, "containerID", &mut output, "container_id"); + copy_string(input, "languageName", &mut output, "language_name"); + copy_string(input, "languageVersion", &mut output, "language_version"); + copy_string(input, "tracerVersion", &mut output, "tracer_version"); + copy_string(input, "runtimeID", &mut output, "runtime_id"); + copy_string(input, "appVersion", &mut output, "app_version"); + copy_float(input, "targetTPS", &mut output, "target_tps"); + copy_float(input, "errorTPS", &mut output, "error_tps"); + copy_bool( + input, + "rareSamplerEnabled", + &mut output, + "rare_sampler_enabled", + ); + + let payload_tags = input.get("tags").and_then(Value::as_object); + + let spans: Vec = input + .get("chunks") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter_map(|chunk| { + if let Some(obj) = chunk.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Chunk is not an object", + item_type: "chunk", + event_type: Some("trace"), + }); + None + } + }) + .flat_map(|chunk| process_chunk(chunk, payload_tags)) + .collect(); + + output.insert("spans".into(), Value::Array(spans)); + output +} + +fn process_chunk(chunk: &ObjectMap, payload_tags: Option<&ObjectMap>) -> Vec { + let priority = chunk.get("priority").and_then(Value::as_integer); + let origin = chunk.get("origin").and_then(Value::as_str); + let dropped = chunk.get("droppedTrace").and_then(Value::as_boolean); + let chunk_tags = chunk.get("tags").and_then(Value::as_object); + + chunk + .get("spans") + .and_then(Value::as_array) + .map(|spans| { + spans + .iter() + .filter_map(|span_val| { + if let Some(obj) = span_val.as_object() { + Some(obj) + } else { + emit!(MezmoDatadogAgentParserInvalidItem { + error: "Span is not an object", + item_type: "span", + event_type: Some("trace"), + }); + None + } + }) + .map(|span| { + let mut transformed = + transform_span(span, dropped, priority, Some((chunk_tags, payload_tags))); + + if let (Some(origin_str), Value::Object(ref mut obj)) = + (&origin, &mut transformed) + { + obj.insert( + "origin".into(), + Value::from(origin_str.clone().into_owned()), + ); + } + + transformed + }) + .collect() + }) + .unwrap_or_default() +} + +fn transform_span( + input: &ObjectMap, + dropped: Option, + priority: Option, + extra_tags: Option<(Option<&ObjectMap>, Option<&ObjectMap>)>, // (chunk_tags, payload_tags) +) -> Value { + let mut output = ObjectMap::new(); + + copy_string(input, "service", &mut output, "service"); + copy_string(input, "name", &mut output, "name"); + copy_string(input, "resource", &mut output, "resource"); + copy_string(input, "type", &mut output, "type"); + + copy_u64(input, "traceID", &mut output, "trace_id"); + copy_u64(input, "spanID", &mut output, "span_id"); + copy_u64(input, "parentID", &mut output, "parent_id"); + + copy_timestamp_nanos(input, "start", &mut output, "start"); + copy_i64(input, "duration", &mut output, "duration"); + + copy_i64(input, "error", &mut output, "error"); + + copy_string_object(input, "meta", &mut output, "meta"); + copy_metrics(input, "metrics", &mut output, "metrics"); + copy_meta_struct(input, "metaStruct", &mut output, "meta_struct"); + + // Simplified Tag Merging + let mut tags = ObjectMap::new(); + + if let Some((chunk_tags, payload_tags)) = extra_tags { + if let Some(chunk_tags) = chunk_tags { + merge_string_map(chunk_tags, &mut tags); + } + if let Some(payload_tags) = payload_tags { + merge_string_map(payload_tags, &mut tags); + } + } + if let Some(span_tags) = input.get("tags").and_then(Value::as_object) { + merge_string_map(span_tags, &mut tags); + } + + if !tags.is_empty() { + output.insert("tags".into(), Value::Object(tags)); + } + + if let Some(d) = dropped { + output.insert("dropped".into(), Value::Boolean(d)); + } + if let Some(p) = priority { + output.insert("priority".into(), Value::from(p)); + } + + Value::Object(output) +} + +fn copy_string(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(val) = src.get(src_key) { + if val.as_str().is_some() { + dst.insert(dst_key.into(), val.clone()); + } + } +} + +fn copy_u64(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(v) = src.get(src_key).and_then(Value::as_integer) { + if u64::try_from(v).is_ok() { + dst.insert(dst_key.into(), Value::from(v)); + } + } +} + +fn copy_i64(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(v) = src.get(src_key).and_then(Value::as_integer) { + dst.insert(dst_key.into(), Value::from(v)); + } +} + +fn copy_timestamp_nanos(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(nanos) = src.get(src_key).and_then(Value::as_integer) { + dst.insert(dst_key.into(), Value::Timestamp(Utc.timestamp_nanos(nanos))); + } +} + +fn copy_float(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(f) = src.get(src_key).and_then(value_to_f64) { + if let Ok(nn) = NotNan::new(f) { + dst.insert(dst_key.into(), Value::Float(nn)); + } + } +} + +fn copy_bool(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(b) = src.get(src_key).and_then(Value::as_boolean) { + dst.insert(dst_key.into(), Value::Boolean(b)); + } +} + +fn copy_string_object(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(Value::Object(obj)) = src.get(src_key) { + copy_object_map(obj, dst, dst_key, |value| { + if value.as_str().is_some() { + Some(value.clone()) + } else { + None + } + }); + } +} + +fn copy_metrics(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(Value::Object(obj)) = src.get(src_key) { + copy_object_map(obj, dst, dst_key, |value| { + Some( + value + .as_integer() + .map(|v| v as f64) + .or_else(|| value.as_float().map(|v| v.into_inner())) + .and_then(|f| NotNan::new(f).ok()) + .map(Value::Float) + .unwrap_or(Value::Null), + ) + }); + } +} + +fn copy_meta_struct(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { + if let Some(Value::Object(obj)) = src.get(src_key) { + copy_object_map(obj, dst, dst_key, |value| match value { + Value::Bytes(bytes) => Some(Value::Bytes(bytes.clone())), + _ => Some(Value::Null), + }); + } +} + +fn copy_object_map(src: &ObjectMap, dst: &mut ObjectMap, dst_key: &str, mut convert: F) +where + F: FnMut(&Value) -> Option, +{ + if src.is_empty() { + return; + } + + let mapped: ObjectMap = src + .iter() + .filter_map(|(key, value)| convert(value).map(|mapped| (key.clone(), mapped))) + .collect(); + + if !mapped.is_empty() { + dst.insert(dst_key.into(), Value::Object(mapped)); + } +} + +fn merge_string_map(src: &ObjectMap, dst: &mut ObjectMap) { + for (key, value) in src { + if value.as_str().is_some() { + dst.insert(key.clone(), value.clone()); + } + } +} + +fn value_to_f64(value: &Value) -> Option { + match value { + Value::Integer(value) => Some(*value as f64), + Value::Float(value) => Some(value.into_inner()), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::LogEvent; + use crate::transforms::mezmo_datadog_agent_parser::{ + MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, + }; + use bytes::Bytes; + + fn build_event(message: Value) -> Event { + let mut log = LogEvent::default(); + log.insert(log_schema().message_key_target_path().unwrap(), message); + Event::Log(log) + } + + #[test] + fn test_transform_v1_trace() { + let event = build_event( + serde_json::json!({ + "hostName": "tracer-host", + "env": "production", + "mezmo_payload_version": "v1", + "traceID": 12345, + "startTime": 1_000_000_000i64, + "endTime": 2_000_000_000i64, + "spans": [ + { + "service": "web", + "name": "http.request", + "resource": "/api/users", + "traceID": 12345, + "spanID": 1, + "parentID": 0, + "start": 1_000_000_000i64, + "duration": 500_000_000i64, + "error": 0, + "meta": {"http.method": "GET"}, + "metrics": {"_sample_rate": 1.0} + } + ] + }) + .into(), + ); + + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let result = transform_trace(event, &parser).unwrap(); + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("tracer-host".to_string()) + ); + assert_eq!( + message + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("production".to_string()) + ); + assert_eq!( + message.get("trace_id").and_then(Value::as_integer), + Some(12345) + ); + assert_eq!( + message.get("start_time"), + Some(&Value::Timestamp(Utc.timestamp_nanos(1_000_000_000))) + ); + assert_eq!( + message.get("end_time"), + Some(&Value::Timestamp(Utc.timestamp_nanos(2_000_000_000))) + ); + + let spans = message.get("spans").and_then(|v| v.as_array()).unwrap(); + let span = spans[0].as_object().unwrap(); + assert_eq!( + span.get("start"), + Some(&Value::Timestamp(Utc.timestamp_nanos(1_000_000_000))) + ); + assert_eq!( + span.get("trace_id").and_then(Value::as_integer), + Some(12345) + ); + assert_eq!(span.get("span_id").and_then(Value::as_integer), Some(1)); + assert_eq!(span.get("parent_id").and_then(Value::as_integer), Some(0)); + } + + #[test] + fn test_transform_v1_transactions() { + let event = build_event( + serde_json::json!({ + "hostName": "myhost", + "env": "production", + "mezmo_payload_version": "v1", + "transactions": [ + { + "service": "web", + "name": "txn", + "traceID": 12345, + "spanID": 1, + "start": 1_000_000_000i64, + "duration": 500_000_000i64, + "error": 0 + } + ] + }) + .into(), + ); + + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let result = transform_trace(event, &parser).unwrap(); + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + let spans = message.get("spans").and_then(|v| v.as_array()).unwrap(); + let span = spans[0].as_object().unwrap(); + assert_eq!(span.get("dropped"), Some(&Value::Boolean(true))); + assert_eq!( + message + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("myhost".to_string()) + ); + assert_eq!( + message + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("production".to_string()) + ); + } + + #[test] + fn test_transform_v2_trace() { + let event = build_event( + serde_json::json!({ + "hostName": "myhost", + "env": "staging", + "mezmo_payload_version": "v2", + "agentVersion": "7.0.0", + "targetTPS": 10.0, + "errorTPS": 1.0, + "rareSamplerEnabled": true, + "containerID": "abc123", + "languageName": "python", + "languageVersion": "3.9", + "tracerVersion": "1.0.0", + "runtimeID": "runtime-123", + "appVersion": "2.0.0", + "tags": {"payload_tag": "value"}, + "chunks": [ + { + "priority": 1, + "origin": "lambda", + "droppedTrace": true, + "tags": {"chunk_tag": "value"}, + "spans": [ + { + "service": "api", + "name": "handler", + "resource": "process", + "traceID": 67890, + "spanID": 2, + "parentID": 1, + "start": 2_000_000_000i64, + "duration": 100_000_000i64, + "error": 0, + "tags": {"span_tag": "value"}, + "metrics": {"_sample_rate": 1}, + "metaStruct": {"blob": "data"} + } + ] + } + ] + }) + .into(), + ); + + let config = MezmoDatadogAgentParserConfig::default(); + let parser = MezmoDatadogAgentParser::new(&config); + + let result = transform_trace(event, &parser).unwrap(); + let log = result.as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("agent_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("7.0.0".to_string()) + ); + assert_eq!( + message + .get("target_tps") + .and_then(Value::as_float) + .map(|value| value.into_inner()), + Some(10.0) + ); + assert_eq!( + message + .get("error_tps") + .and_then(Value::as_float) + .map(|value| value.into_inner()), + Some(1.0) + ); + assert_eq!( + message.get("rare_sampler_enabled"), + Some(&Value::Boolean(true)) + ); + assert_eq!( + message + .get("container_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("abc123".to_string()) + ); + assert_eq!( + message + .get("language_name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("python".to_string()) + ); + assert_eq!( + message + .get("language_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("3.9".to_string()) + ); + assert_eq!( + message + .get("tracer_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("1.0.0".to_string()) + ); + assert_eq!( + message + .get("runtime_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("runtime-123".to_string()) + ); + assert_eq!( + message + .get("app_version") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("2.0.0".to_string()) + ); + assert_eq!( + message + .get("host") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("myhost".to_string()) + ); + assert_eq!( + message + .get("env") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("staging".to_string()) + ); + + let spans = message.get("spans").and_then(|v| v.as_array()).unwrap(); + let span = spans[0].as_object().unwrap(); + let tags = span.get("tags").and_then(|v| v.as_object()).unwrap(); + + assert_eq!( + tags.get("span_tag").and_then(Value::as_str).as_deref(), + Some("value") + ); + assert_eq!( + tags.get("chunk_tag").and_then(Value::as_str).as_deref(), + Some("value") + ); + assert_eq!( + tags.get("payload_tag").and_then(Value::as_str).as_deref(), + Some("value") + ); + assert_eq!(span.get("priority").and_then(Value::as_integer), Some(1)); + assert_eq!( + span.get("origin").and_then(Value::as_str).as_deref(), + Some("lambda") + ); + assert_eq!(span.get("dropped"), Some(&Value::Boolean(true))); + assert_eq!( + span.get("start"), + Some(&Value::Timestamp(Utc.timestamp_nanos(2_000_000_000))) + ); + + let metrics = span.get("metrics").and_then(|v| v.as_object()).unwrap(); + assert_eq!( + metrics.get("_sample_rate"), + Some(&Value::Float(NotNan::new(1.0).unwrap())) + ); + + let meta_struct = span.get("meta_struct").and_then(|v| v.as_object()).unwrap(); + assert_eq!( + meta_struct.get("blob"), + Some(&Value::Bytes(Bytes::from_static(b"data"))) + ); + } +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index c6de2f057..ef9d77823 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -28,6 +28,8 @@ pub mod mezmo_aggregate; pub mod mezmo_aggregate_distributed; #[cfg(feature = "transforms-mezmo_aggregate_v2")] pub mod mezmo_aggregate_v2; +#[cfg(feature = "transforms-mezmo_datadog_agent_parser")] +pub mod mezmo_datadog_agent_parser; #[cfg(feature = "transforms-mezmo_log_classification")] pub mod mezmo_log_classification; #[cfg(feature = "transforms-mezmo_log_clustering")] From 2348d29be4361db8d6ffc31cb70fe87160d458a8 Mon Sep 17 00:00:00 2001 From: Kwabena Boadu Date: Tue, 27 Jan 2026 18:21:26 -0500 Subject: [PATCH 2/3] refactor: Address pull request feedback 1. Reduce cost of cloning event when emitting multiple events for a received metric or trace 2. Implement trait for transforming event 3. Move event into transform to reduce cloning. 4. On error, move the error into the event 5. Route the original event to unmatched if transforming fails. Previously this was done for logs only. Ref: LOG-22997 --- .../mezmo_datadog_agent_parser/common.rs | 49 ++++-- .../mezmo_datadog_agent_parser/logs.rs | 92 +++++----- .../mezmo_datadog_agent_parser/metrics.rs | 164 +++++++++++------- .../mezmo_datadog_agent_parser/mod.rs | 140 +++++++++------ .../mezmo_datadog_agent_parser/traces.rs | 100 +++++++---- 5 files changed, 340 insertions(+), 205 deletions(-) diff --git a/src/transforms/mezmo_datadog_agent_parser/common.rs b/src/transforms/mezmo_datadog_agent_parser/common.rs index 331e3e72b..e975106d0 100644 --- a/src/transforms/mezmo_datadog_agent_parser/common.rs +++ b/src/transforms/mezmo_datadog_agent_parser/common.rs @@ -7,6 +7,7 @@ use crate::event::{LogEvent, ObjectMap, Value}; pub enum TimestampUnit { Seconds, Milliseconds, + Nanoseconds, } pub fn get_message_object(log: &LogEvent) -> Result<&ObjectMap, String> { @@ -42,10 +43,21 @@ pub fn get_message_object_mut(log: &mut LogEvent) -> Result<&mut ObjectMap, Stri pub fn parse_timestamp(value: &Value, unit: TimestampUnit) -> Option> { match value { Value::Timestamp(timestamp) => Some(*timestamp), - Value::Integer(value) => match unit { - TimestampUnit::Seconds => Utc.timestamp_opt(*value, 0).single(), - TimestampUnit::Milliseconds => Utc.timestamp_millis_opt(*value).single(), - }, + Value::Integer(value) => { + let (seconds, nanoseconds) = match unit { + TimestampUnit::Seconds => (*value, None), + TimestampUnit::Milliseconds => ( + value.div_euclid(1000), + (value.rem_euclid(1000) as u32).checked_mul(1_000_000), + ), + TimestampUnit::Nanoseconds => ( + value.div_euclid(1_000_000_000), + Some(value.rem_euclid(1_000_000_000) as u32), + ), + }; + let nanoseconds = nanoseconds.unwrap_or_default(); + Utc.timestamp_opt(seconds, nanoseconds).single() + } Value::Float(value) => { let value = value.into_inner(); if !value.is_finite() { @@ -54,6 +66,7 @@ pub fn parse_timestamp(value: &Value, unit: TimestampUnit) -> Option value, TimestampUnit::Milliseconds => value / 1000.0, + TimestampUnit::Nanoseconds => value / 1_000_000_000.0, }; let (seconds, nanos) = split_float_seconds(seconds)?; Utc.timestamp_opt(seconds, nanos).single() @@ -68,17 +81,23 @@ fn split_float_seconds(value: f64) -> Option<(i64, u32)> { return None; } - let secs = value.floor(); - let fract = value - secs; - let mut nanos = (fract * 1_000_000_000.0).round(); + // trunc handles negative values better than floor. -1.4 is -1, not -2 + let mut secs = value.trunc(); + let mut fractional_secs = value - secs; - // Handle rounding overflow (e.g., 0.9999999999 rounding up to 1s) - let secs = if nanos >= 1_000_000_000.0 { - nanos -= 1_000_000_000.0; - secs + 1.0 - } else { - secs - }; + if fractional_secs < 0.0 { + fractional_secs += 1.0; + secs -= 1.0; + } + let mut nanoseconds = (fractional_secs * 1_000_000_000.0).round(); - Some((secs as i64, nanos as u32)) + if nanoseconds >= 1_000_000_000.0 { + nanoseconds -= 1_000_000_000.0; + secs += 1.0; + } + + if secs < i64::MIN as f64 || secs > i64::MAX as f64 { + return None; + } + Some((secs as i64, nanoseconds as u32)) } diff --git a/src/transforms/mezmo_datadog_agent_parser/logs.rs b/src/transforms/mezmo_datadog_agent_parser/logs.rs index 5818208e2..f419a95c0 100644 --- a/src/transforms/mezmo_datadog_agent_parser/logs.rs +++ b/src/transforms/mezmo_datadog_agent_parser/logs.rs @@ -2,45 +2,55 @@ use crate::config::log_schema; use crate::event::{Event, MaybeAsLogMut, Value}; use bytes::Bytes; -use super::common::{get_message_object, get_message_object_mut, parse_timestamp, TimestampUnit}; -use super::MezmoDatadogAgentParser; - -/// See: https://github.com/DataDog/agent-payload/blob/master/proto/logs/agent_logs_payload.proto -/// The log timestamp is in milliseconds, not seconds -pub fn transform_log(event: &mut Event, parser: &MezmoDatadogAgentParser) -> Result<(), String> { - let log = event - .maybe_as_log_mut() - .ok_or_else(|| "Event is not a log".to_string())?; - - let parsed_timestamp = { - let message_obj = get_message_object(log)?; - message_obj - .get("timestamp") - .and_then(|value| parse_timestamp(value, TimestampUnit::Milliseconds)) - }; - - let parsed_ddtags = { - let message_obj = get_message_object(log)?; - message_obj +use super::common::{get_message_object_mut, parse_timestamp, TimestampUnit}; +use super::{MezmoDatadogAgentParser, TransformDatadogEvent, TransformError}; + +pub(super) struct DatadogLogEvent; + +impl TransformDatadogEvent for DatadogLogEvent { + /// See: https://github.com/DataDog/agent-payload/blob/master/proto/logs/agent_logs_payload.proto + /// The log timestamp is in milliseconds, not seconds + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let log_result = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string()); + + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let message_obj = match get_message_object_mut(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let parsed_ddtags = message_obj .get("ddtags") .and_then(|value| value.as_bytes()) - .map(parse_ddtags) - }; + .map(parse_ddtags); - if let Some(parsed_ddtags) = parsed_ddtags { - let message_obj = get_message_object_mut(log)?; - message_obj.insert("ddtags".into(), parsed_ddtags); - } - - if let Some(parsed) = parsed_timestamp { - if let Some(timestamp_path) = log_schema().timestamp_key_target_path() { - log.insert(timestamp_path, parsed); + if let Some(parsed_ddtags) = parsed_ddtags { + message_obj.insert("ddtags".into(), parsed_ddtags); } - } - parser.strip_fields(event); + let parsed_timestamp = message_obj + .get("timestamp") + .and_then(|value| parse_timestamp(value, TimestampUnit::Milliseconds)); + + if let Some(parsed) = parsed_timestamp { + if let Some(timestamp_path) = log_schema().timestamp_key_target_path() { + log.insert(timestamp_path, parsed); + } + }; - Ok(()) + parser.strip_fields(&mut event); + + Ok(vec![event]) + } } // Mirrors the Datadog agent source parse_ddtags implementation. @@ -66,14 +76,14 @@ mod tests { use bytes::Bytes; use chrono::{TimeZone, Utc}; + use super::super::TransformDatadogEvent; + use super::DatadogLogEvent; use crate::config::log_schema; use crate::event::{Event, EventMetadata, KeyString, LogEvent, Value}; use crate::transforms::mezmo_datadog_agent_parser::{ MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, }; - use super::transform_log; - fn build_event(message: BTreeMap) -> Event { Event::Log(LogEvent::from_map( [( @@ -100,10 +110,12 @@ mod tests { ); message.insert("status".into(), Value::Bytes(Bytes::from_static(b"info"))); - let mut event = build_event(message); + let event = build_event(message); let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default()); - transform_log(&mut event, &parser).expect("transform should succeed"); + let mut results = + DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); + let event = results.pop().expect("transformed event"); let log = event.as_log(); let ts_path = log_schema() @@ -148,10 +160,12 @@ mod tests { Value::Bytes(Bytes::from_static(b"not-a-ts")), ); - let mut event = build_event(message); + let event = build_event(message); let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default()); - transform_log(&mut event, &parser).expect("transform should succeed"); + let mut results = + DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); + let event = results.pop().expect("transformed event"); let log = event.as_log(); let ts_path = log_schema() diff --git a/src/transforms/mezmo_datadog_agent_parser/metrics.rs b/src/transforms/mezmo_datadog_agent_parser/metrics.rs index 286b1677d..239e9607d 100644 --- a/src/transforms/mezmo_datadog_agent_parser/metrics.rs +++ b/src/transforms/mezmo_datadog_agent_parser/metrics.rs @@ -3,52 +3,81 @@ use crate::config::log_schema; use crate::event::{Event, MaybeAsLogMut, MetricKind, ObjectMap, Value}; use super::common::{get_message_object, parse_timestamp, TimestampUnit}; -use super::MezmoDatadogAgentParser; - -/// Transform a metric event into the normalized MezmoMetric log format. -/// -/// The incoming event is a log event with the metric data in `.message`. -/// The payload version determines whether this is a v1 or v2 series metric. -/// Metrics data is intercepted and decoded in Kong with each item in the series -/// field emitted as a separate event -/// See: https://github.com/answerbook/pipeline-gateway-kong/blob/6fefc73374b32996b4bbb5ab1052eb2e6e6d3293/kong/plugins/pipeline-routing/lib/datadog.lua#L157 -pub fn transform_metric( - mut event: Event, - parser: &MezmoDatadogAgentParser, -) -> Result, String> { - let version = parser - .get_payload_version(&event) - .unwrap_or_else(|| "v2".to_string()); - - let message_obj = { - let log = event +use super::{MezmoDatadogAgentParser, TransformDatadogEvent, TransformError}; + +pub(super) struct DatadogMetricEvent; + +impl TransformDatadogEvent for DatadogMetricEvent { + /// Transform a metric event into the normalized MezmoMetric log format. + /// + /// The incoming event is a log event with the metric data in `.message`. + /// The payload version determines whether this is a v1 or v2 series metric. + /// Metrics data is intercepted and decoded in Kong with each item in the series + /// field emitted as a separate event + /// See: https://github.com/answerbook/pipeline-gateway-kong/blob/6fefc73374b32996b4bbb5ab1052eb2e6e6d3293/kong/plugins/pipeline-routing/lib/datadog.lua#L157 + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let version = parser + .get_payload_version(&event) + .unwrap_or_else(|| "v2".to_string()); + + let log_result = event .maybe_as_log_mut() - .ok_or_else(|| "Event is not a log".to_string())?; - get_message_object(log)?.clone() - }; + .ok_or_else(|| "Event is not a log".to_string()); - let output_messages = match version.as_str() { - "v1" => transform_series_v1(&message_obj)?, - "v2" => transform_series_v2(&message_obj)?, - _ => transform_series_v2(&message_obj)?, - }; + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; - parser.build_events_from_messages(event, output_messages) + let message_obj = match get_message_object(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let output_messages = match version.as_str() { + "v1" => transform_series_v1(message_obj), + _ => transform_series_v2(message_obj), + } + .map_err(|msg| TransformError::from(event.clone(), &msg))?; + + parser + .build_events_from_payloads(event.clone(), output_messages) + .map_err(|msg| TransformError::from(event, &msg)) + } } -/// Transform a sketch event into the normalized MezmoMetric log format. -pub fn transform_sketch( - mut event: Event, - parser: &MezmoDatadogAgentParser, -) -> Result, String> { - let message_obj = { - let log = event +pub(super) struct DatadogSketchEvent; + +impl TransformDatadogEvent for DatadogSketchEvent { + /// Transform a sketch event into the normalized MezmoMetric log format. + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let log_result = event .maybe_as_log_mut() - .ok_or_else(|| "Event is not a log".to_string())?; - get_message_object(log)?.clone() - }; + .ok_or_else(|| "Event is not a log".to_string()); - parser.build_events_from_messages(event, transform_sketch_payload(&message_obj)?) + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + let message_obj = match get_message_object(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + + let sketch_metrics = match transform_sketch_payload(message_obj) { + Ok(sketches) => sketches, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + parser + .build_events_from_payloads(event.clone(), sketch_metrics) + .map_err(|msg| TransformError::from(event, &msg)) + } } /// Transforms Datadog series v1 metrics (/api/v1/series) @@ -78,6 +107,12 @@ fn transform_series_v1(message: &ObjectMap) -> Result Result Result Result Result Result (Option<&str>, &str) { #[cfg(test)] mod tests { + use super::super::TransformDatadogEvent; use super::*; use crate::event::LogEvent; use crate::transforms::mezmo_datadog_agent_parser::{ @@ -525,7 +572,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_metric(event, &parser).unwrap(); + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 2); let log = results[0].as_log(); @@ -588,7 +635,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_metric(event, &parser).unwrap(); + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 2); let log = results[0].as_log(); @@ -678,7 +725,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_metric(event, &parser).unwrap(); + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let log = results[0].as_log(); let message = log .get(log_schema().message_key_target_path().unwrap()) @@ -720,7 +767,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_metric(event, &parser).unwrap(); + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let message = results[0] .as_log() .get(log_schema().message_key_target_path().unwrap()) @@ -753,7 +800,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_metric(event, &parser).unwrap(); + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let log = results[0].as_log(); let message = log .get(log_schema().message_key_target_path().unwrap()) @@ -785,7 +832,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_metric(event, &parser).unwrap(); + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let log = results[0].as_log(); let message = log .get(log_schema().message_key_target_path().unwrap()) @@ -793,8 +840,7 @@ mod tests { .as_object() .unwrap(); - let time_obj = message.get("time").unwrap().as_object().unwrap(); - assert!(time_obj.get("interval_ms").is_none()); + assert!(message.get("time").is_none()); } #[test] @@ -824,7 +870,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_sketch(event, &parser).unwrap(); + let results = DatadogSketchEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 1); let message = results[0] @@ -889,7 +935,7 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let results = transform_sketch(event, &parser).unwrap(); + let results = DatadogSketchEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 1); let log = results[0].as_log(); diff --git a/src/transforms/mezmo_datadog_agent_parser/mod.rs b/src/transforms/mezmo_datadog_agent_parser/mod.rs index 7152fc99c..eb3d2a186 100644 --- a/src/transforms/mezmo_datadog_agent_parser/mod.rs +++ b/src/transforms/mezmo_datadog_agent_parser/mod.rs @@ -9,6 +9,10 @@ use crate::{ internal_events::MezmoDatadogAgentParserError, }; +use logs::DatadogLogEvent; +use metrics::{DatadogMetricEvent, DatadogSketchEvent}; +use traces::DatadogTraceEvent; + mod common; mod config; mod logs; @@ -60,7 +64,34 @@ impl MezmoDatadogAgentParser { .map(Cow::into_owned) } - pub fn strip_fields(&self, event: &mut Event) { + fn handle_transform_event( + &self, + event: Event, + output: &mut vector_lib::transform::TransformOutputsBuf, + output_name: &'static str, + event_type_name: &'static str, + ) where + T: TransformDatadogEvent, + { + match T::transform(event, self) { + Ok(events) => { + for event in events { + output.push(Some(output_name), event); + } + } + Err(err) => { + emit!(MezmoDatadogAgentParserError { + error: &err.message, + event_type: Some(event_type_name) + }); + if self.reroute_unmatched { + output.push(Some(UNMATCHED_OUTPUT), *err.input); + } + } + } + } + + pub(super) fn strip_fields(&self, event: &mut Event) { if let Some(log) = event.maybe_as_log_mut() { if self.strip_event_type { log.remove(&self.event_type_path.0); @@ -71,7 +102,7 @@ impl MezmoDatadogAgentParser { } } - pub fn build_events_from_messages( + pub(super) fn build_events_from_payloads( &self, mut event: Event, payloads: Vec<(ObjectMap, Option)>, @@ -82,18 +113,25 @@ impl MezmoDatadogAgentParser { self.strip_fields(&mut event); + // Reduce the cost of cloning the event for each payload item + event.maybe_as_log_mut().and_then(|log| { + log_schema() + .message_key_target_path() + .and_then(|path| log.remove(path)) + }); + payloads .into_iter() .map(|payload| { let mut new_event = event.clone(); - insert_message(&mut new_event, payload)?; + insert_payload(&mut new_event, payload)?; Ok(new_event) }) .collect() } } -fn insert_message(event: &mut Event, payload: (ObjectMap, Option)) -> Result<(), String> { +fn insert_payload(event: &mut Event, payload: (ObjectMap, Option)) -> Result<(), String> { let log = event .maybe_as_log_mut() .ok_or_else(|| "Event is not a log".to_string())?; @@ -122,60 +160,31 @@ impl SyncTransform for MezmoDatadogAgentParser { match event_type.as_deref() { Some(t) if t == self.event_type_values.log => { - let mut event = event; - match logs::transform_log(&mut event, self) { - Ok(()) => output.push(Some(LOGS_OUTPUT), event), - Err(err) => { - emit!(MezmoDatadogAgentParserError { - error: &err, - event_type: Some("log"), - }); - if self.reroute_unmatched { - output.push(Some(UNMATCHED_OUTPUT), event); - } - } - } + self.handle_transform_event::(event, output, LOGS_OUTPUT, "log"); } Some(t) if t == self.event_type_values.metric => { - match metrics::transform_metric(event, self) { - Ok(events) => { - for event in events { - output.push(Some(METRICS_OUTPUT), event); - } - } - Err(err) => { - emit!(MezmoDatadogAgentParserError { - error: &err, - event_type: Some("metric"), - }); - } - } + self.handle_transform_event::( + event, + output, + METRICS_OUTPUT, + "metric", + ); } Some(t) if t == self.event_type_values.sketch => { - match metrics::transform_sketch(event, self) { - Ok(events) => { - for event in events { - output.push(Some(METRICS_OUTPUT), event); - } - } - Err(err) => { - emit!(MezmoDatadogAgentParserError { - error: &err, - event_type: Some("sketch"), - }); - } - } + self.handle_transform_event::( + event, + output, + METRICS_OUTPUT, + "sketch", + ); } Some(t) if t == self.event_type_values.trace => { - match traces::transform_trace(event, self) { - Ok(event) => output.push(Some(TRACES_OUTPUT), event), - Err(err) => { - emit!(MezmoDatadogAgentParserError { - error: &err, - event_type: Some("trace"), - }); - } - } + self.handle_transform_event::( + event, + output, + TRACES_OUTPUT, + "trace", + ); } _ => { if self.reroute_unmatched { @@ -186,6 +195,29 @@ impl SyncTransform for MezmoDatadogAgentParser { } } +#[derive(Debug)] +pub(super) struct TransformError { + message: String, + // Reduce memory used by Result enums using this error + input: Box, +} + +impl TransformError { + fn from(input: Event, message: &str) -> Self { + Self { + input: Box::new(input), + message: message.to_string(), + } + } +} + +trait TransformDatadogEvent { + fn transform( + event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError>; +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -302,7 +334,7 @@ mod tests { } #[test] - fn build_events_from_messages_sets_message_and_timestamp() { + fn build_events_from_payloads_sets_message_and_timestamp() { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); @@ -320,7 +352,7 @@ mod tests { second.insert("message".into(), Value::from("second")); let results = parser - .build_events_from_messages( + .build_events_from_payloads( event, vec![ (first, Some(timestamp.clone())), diff --git a/src/transforms/mezmo_datadog_agent_parser/traces.rs b/src/transforms/mezmo_datadog_agent_parser/traces.rs index 0563ad444..dfbf301de 100644 --- a/src/transforms/mezmo_datadog_agent_parser/traces.rs +++ b/src/transforms/mezmo_datadog_agent_parser/traces.rs @@ -1,46 +1,61 @@ -use chrono::{TimeZone, Utc}; use ordered_float::NotNan; use crate::config::log_schema; use crate::event::{Event, MaybeAsLogMut, ObjectMap, Value}; use crate::internal_events::MezmoDatadogAgentParserInvalidItem; +use crate::transforms::mezmo_datadog_agent_parser::common::{parse_timestamp, TimestampUnit}; use super::common::get_message_object; -use super::MezmoDatadogAgentParser; - -pub fn transform_trace( - mut event: Event, - parser: &MezmoDatadogAgentParser, -) -> Result { - let version = parser - .get_payload_version(&event) - .unwrap_or_else(|| "v2".to_string()); - - let log = event - .maybe_as_log_mut() - .ok_or_else(|| "Event is not a log".to_string())?; - - let output_message = { - let message_obj = get_message_object(log)?; +use super::{MezmoDatadogAgentParser, TransformDatadogEvent, TransformError}; + +pub(super) struct DatadogTraceEvent; + +impl TransformDatadogEvent for DatadogTraceEvent { + fn transform( + mut event: Event, + parser: &MezmoDatadogAgentParser, + ) -> Result, TransformError> { + let version = parser + .get_payload_version(&event) + .unwrap_or_else(|| "v2".to_string()); + + let log_result = event + .maybe_as_log_mut() + .ok_or_else(|| "Event is not a log".to_string()); + + let log = match log_result { + Ok(log) => log, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + let message_obj = match get_message_object(log) { + Ok(message_obj) => message_obj, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; - let mut msg = match version.as_str() { - "v1" => transform_trace_v1(message_obj)?, + let result = match version.as_str() { + "v1" => transform_trace_v1(message_obj), "v2" => transform_tracer_payload(message_obj), - _ => return Err(format!("Unsupported payload version: {version}")), + _ => { + return Err(TransformError::from( + event, + &format!("Unsupported payload version: {version}"), + )) + } }; + let mut message = match result { + Ok(message) => message, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; + message.insert("payload_version".into(), Value::from(version.clone())); - msg.insert("payload_version".into(), Value::from(version.clone())); - msg - }; - - log.insert( - log_schema().message_key_target_path().unwrap(), - Value::Object(output_message), - ); - - parser.strip_fields(&mut event); + log.insert( + log_schema().message_key_target_path().unwrap(), + Value::Object(message), + ); + parser.strip_fields(&mut event); - Ok(event) + Ok(vec![event]) + } } fn transform_trace_v1(message: &ObjectMap) -> Result { @@ -123,7 +138,7 @@ fn transform_transaction(input: &ObjectMap) -> ObjectMap { } /// TracerPayload: https://github.com/DataDog/datadog-agent/blob/main/pkg/proto/datadog/trace/tracer_payload.proto -fn transform_tracer_payload(input: &ObjectMap) -> ObjectMap { +fn transform_tracer_payload(input: &ObjectMap) -> Result { let mut output = ObjectMap::new(); copy_string(input, "hostName", &mut output, "host"); @@ -167,7 +182,7 @@ fn transform_tracer_payload(input: &ObjectMap) -> ObjectMap { .collect(); output.insert("spans".into(), Value::Array(spans)); - output + Ok(output) } fn process_chunk(chunk: &ObjectMap, payload_tags: Option<&ObjectMap>) -> Vec { @@ -292,8 +307,12 @@ fn copy_i64(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) } fn copy_timestamp_nanos(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { - if let Some(nanos) = src.get(src_key).and_then(Value::as_integer) { - dst.insert(dst_key.into(), Value::Timestamp(Utc.timestamp_nanos(nanos))); + let src_timestamp = src + .get(src_key) + .and_then(|v| parse_timestamp(v, TimestampUnit::Nanoseconds)); + + if let Some(timestamp) = src_timestamp { + dst.insert(dst_key.into(), Value::from(timestamp)); } } @@ -384,12 +403,14 @@ fn value_to_f64(value: &Value) -> Option { #[cfg(test)] mod tests { + use super::super::TransformDatadogEvent; use super::*; use crate::event::LogEvent; use crate::transforms::mezmo_datadog_agent_parser::{ MezmoDatadogAgentParser, MezmoDatadogAgentParserConfig, }; use bytes::Bytes; + use chrono::{TimeZone, Utc}; fn build_event(message: Value) -> Event { let mut log = LogEvent::default(); @@ -429,7 +450,8 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let result = transform_trace(event, &parser).unwrap(); + let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); + let result = results.pop().expect("transformed event"); let log = result.as_log(); let message = log .get(log_schema().message_key_target_path().unwrap()) @@ -503,7 +525,8 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let result = transform_trace(event, &parser).unwrap(); + let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); + let result = results.pop().expect("transformed event"); let log = result.as_log(); let message = log .get(log_schema().message_key_target_path().unwrap()) @@ -579,7 +602,8 @@ mod tests { let config = MezmoDatadogAgentParserConfig::default(); let parser = MezmoDatadogAgentParser::new(&config); - let result = transform_trace(event, &parser).unwrap(); + let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); + let result = results.pop().expect("transformed event"); let log = result.as_log(); let message = log .get(log_schema().message_key_target_path().unwrap()) From 1313aa2bc6b66a2a05b5a928fd10f6f4cc885869 Mon Sep 17 00:00:00 2001 From: Kwabena Boadu Date: Tue, 27 Jan 2026 21:47:05 -0500 Subject: [PATCH 3/3] feat: Add parse_log_tags and split_metric_namespace flags The vector documentation now includes the two flags for Datadog logs and metrics - likely in newer Vector versions. This matches the implementation. Ref: LOG-22997 --- .../mezmo_datadog_agent_parser/common.rs | 22 ++--- .../mezmo_datadog_agent_parser/config.rs | 20 ++++- .../mezmo_datadog_agent_parser/logs.rs | 51 ++++++++++-- .../mezmo_datadog_agent_parser/metrics.rs | 83 ++++++++++++++----- .../mezmo_datadog_agent_parser/mod.rs | 33 +++++++- .../mezmo_datadog_agent_parser/traces.rs | 6 +- 6 files changed, 161 insertions(+), 54 deletions(-) diff --git a/src/transforms/mezmo_datadog_agent_parser/common.rs b/src/transforms/mezmo_datadog_agent_parser/common.rs index e975106d0..a78ce6dd0 100644 --- a/src/transforms/mezmo_datadog_agent_parser/common.rs +++ b/src/transforms/mezmo_datadog_agent_parser/common.rs @@ -15,14 +15,9 @@ pub fn get_message_object(log: &LogEvent) -> Result<&ObjectMap, String> { .message_key_target_path() .ok_or_else(|| "Missing message key".to_string())?; - let message = log - .get(message_path) - .ok_or_else(|| "Missing message field".to_string())?; - - match message { - Value::Object(obj) => Ok(obj), - _ => Err("Message is not an object".to_string()), - } + log.get(message_path) + .and_then(Value::as_object) + .ok_or_else(|| "Message is not an object".to_string()) } pub fn get_message_object_mut(log: &mut LogEvent) -> Result<&mut ObjectMap, String> { @@ -30,14 +25,9 @@ pub fn get_message_object_mut(log: &mut LogEvent) -> Result<&mut ObjectMap, Stri .message_key_target_path() .ok_or_else(|| "Missing message key".to_string())?; - let message = log - .get_mut(message_path) - .ok_or_else(|| "Missing message field".to_string())?; - - match message { - Value::Object(obj) => Ok(obj), - _ => Err("Message is not an object".to_string()), - } + log.get_mut(message_path) + .and_then(Value::as_object_mut) + .ok_or_else(|| "Message is not an object".to_string()) } pub fn parse_timestamp(value: &Value, unit: TimestampUnit) -> Option> { diff --git a/src/transforms/mezmo_datadog_agent_parser/config.rs b/src/transforms/mezmo_datadog_agent_parser/config.rs index 4ab5d7d37..6611b282c 100644 --- a/src/transforms/mezmo_datadog_agent_parser/config.rs +++ b/src/transforms/mezmo_datadog_agent_parser/config.rs @@ -84,6 +84,17 @@ pub struct MezmoDatadogAgentParserConfig { #[serde(default = "default_payload_version_path")] pub payload_version_path: ConfigTargetPath, + /// Parse log tag strings into arrays when true. + #[serde(default = "crate::serde::default_false")] + pub parse_log_tags: bool, + + /// Split metric name into namespace and name when true. + /// + /// When false, the full metric name is preserved in `name` and `namespace` + /// is omitted. + #[serde(default = "crate::serde::default_true")] + pub split_metric_namespace: bool, + /// Remove the event type field after identifying the event. #[serde(default = "crate::serde::default_true")] pub strip_event_type: bool, @@ -103,6 +114,8 @@ impl Default for MezmoDatadogAgentParserConfig { event_type_path: default_event_type_path(), event_type_values: EventTypeValues::default(), payload_version_path: default_payload_version_path(), + parse_log_tags: false, + split_metric_namespace: true, strip_event_type: false, strip_payload_version: false, reroute_unmatched: true, @@ -119,8 +132,11 @@ impl GenerateConfig for MezmoDatadogAgentParserConfig { #[async_trait::async_trait] #[typetag::serde(name = "mezmo_datadog_agent_parser")] impl TransformConfig for MezmoDatadogAgentParserConfig { - async fn build(&self, _context: &TransformContext) -> crate::Result { - Ok(Transform::synchronous(MezmoDatadogAgentParser::new(self))) + async fn build(&self, context: &TransformContext) -> crate::Result { + Ok(Transform::synchronous(MezmoDatadogAgentParser::new( + self, + context.mezmo_ctx.clone(), + ))) } fn input(&self) -> Input { diff --git a/src/transforms/mezmo_datadog_agent_parser/logs.rs b/src/transforms/mezmo_datadog_agent_parser/logs.rs index f419a95c0..09a332bb8 100644 --- a/src/transforms/mezmo_datadog_agent_parser/logs.rs +++ b/src/transforms/mezmo_datadog_agent_parser/logs.rs @@ -28,13 +28,15 @@ impl TransformDatadogEvent for DatadogLogEvent { Err(msg) => return Err(TransformError::from(event, &msg)), }; - let parsed_ddtags = message_obj - .get("ddtags") - .and_then(|value| value.as_bytes()) - .map(parse_ddtags); - - if let Some(parsed_ddtags) = parsed_ddtags { - message_obj.insert("ddtags".into(), parsed_ddtags); + if parser.parse_log_tags { + let parsed_ddtags = message_obj + .get("ddtags") + .and_then(|value| value.as_bytes()) + .map(parse_ddtags); + + if let Some(parsed_ddtags) = parsed_ddtags { + message_obj.insert("ddtags".into(), parsed_ddtags); + } } let parsed_timestamp = message_obj @@ -111,7 +113,11 @@ mod tests { message.insert("status".into(), Value::Bytes(Bytes::from_static(b"info"))); let event = build_event(message); - let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default()); + let config = MezmoDatadogAgentParserConfig { + parse_log_tags: true, + ..Default::default() + }; + let parser = MezmoDatadogAgentParser::new(&config, None); let mut results = DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); @@ -161,7 +167,7 @@ mod tests { ); let event = build_event(message); - let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default()); + let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default(), None); let mut results = DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); @@ -173,4 +179,31 @@ mod tests { .expect("timestamp key"); assert!(log.get(ts_path).is_none()); } + + #[test] + fn does_not_parse_tags_when_disabled() { + let mut message = BTreeMap::new(); + message.insert( + "ddtags".into(), + Value::Bytes(Bytes::from_static(b"env:prod,team:core")), + ); + + let event = build_event(message); + let parser = MezmoDatadogAgentParser::new(&MezmoDatadogAgentParserConfig::default(), None); + + let mut results = + DatadogLogEvent::transform(event, &parser).expect("transform should succeed"); + let event = results.pop().expect("transformed event"); + + let message = event + .as_log() + .get(log_schema().message_key_target_path().expect("message key")) + .and_then(Value::as_object) + .expect("message object"); + + assert_eq!( + message.get("ddtags"), + Some(&Value::Bytes(Bytes::from_static(b"env:prod,team:core"))) + ); + } } diff --git a/src/transforms/mezmo_datadog_agent_parser/metrics.rs b/src/transforms/mezmo_datadog_agent_parser/metrics.rs index 239e9607d..a56f8611f 100644 --- a/src/transforms/mezmo_datadog_agent_parser/metrics.rs +++ b/src/transforms/mezmo_datadog_agent_parser/metrics.rs @@ -38,8 +38,8 @@ impl TransformDatadogEvent for DatadogMetricEvent { }; let output_messages = match version.as_str() { - "v1" => transform_series_v1(message_obj), - _ => transform_series_v2(message_obj), + "v1" => transform_series_v1(message_obj, parser.split_metric_namespace), + _ => transform_series_v2(message_obj, parser.split_metric_namespace), } .map_err(|msg| TransformError::from(event.clone(), &msg))?; @@ -70,10 +70,11 @@ impl TransformDatadogEvent for DatadogSketchEvent { Err(msg) => return Err(TransformError::from(event, &msg)), }; - let sketch_metrics = match transform_sketch_payload(message_obj) { - Ok(sketches) => sketches, - Err(msg) => return Err(TransformError::from(event, &msg)), - }; + let sketch_metrics = + match transform_sketch_payload(message_obj, parser.split_metric_namespace) { + Ok(sketches) => sketches, + Err(msg) => return Err(TransformError::from(event, &msg)), + }; parser .build_events_from_payloads(event.clone(), sketch_metrics) .map_err(|msg| TransformError::from(event, &msg)) @@ -82,7 +83,10 @@ impl TransformDatadogEvent for DatadogSketchEvent { /// Transforms Datadog series v1 metrics (/api/v1/series) /// v1 metrics store points as a list of lists ([[timestamp, value]]) -fn transform_series_v1(message: &ObjectMap) -> Result)>, String> { +fn transform_series_v1( + message: &ObjectMap, + split_metric_namespace: bool, +) -> Result)>, String> { let metric_name = message .get("metric") .and_then(|v| v.as_str()) @@ -113,7 +117,7 @@ fn transform_series_v1(message: &ObjectMap) -> Result Result Result)>, String> { +fn transform_series_v2( + message: &ObjectMap, + split_metric_namespace: bool, +) -> Result)>, String> { let metric_name = message .get("metric") .and_then(|v| v.as_str()) @@ -221,7 +228,7 @@ fn transform_series_v2(message: &ObjectMap) -> Result Result Result)>, String> { let metric_name = message .get("metric") @@ -318,7 +326,8 @@ fn transform_sketch_payload( None }; - let (namespace, metric_name) = namespace_name_from_dd_metric(&metric_name); + let (namespace, metric_name) = + namespace_name_from_dd_metric(&metric_name, split_metric_namespace); let dog_sketches = message .get("dogsketches") @@ -512,7 +521,14 @@ const fn metric_kind_as_str(kind: MetricKind) -> &'static str { } } -fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) { +fn namespace_name_from_dd_metric( + dd_metric_name: &str, + split_metric_namespace: bool, +) -> (Option<&str>, &str) { + if !split_metric_namespace { + return (None, dd_metric_name); + } + match dd_metric_name.split_once('.') { Some((namespace, name)) => (Some(namespace), name), None => (None, dd_metric_name), @@ -570,7 +586,7 @@ mod tests { fn test_transform_v2_metric() { let event = create_v2_metric_event(); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogMetricEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 2); @@ -629,11 +645,38 @@ mod tests { assert_eq!(second_log_value, ordered_float::NotNan::new(43.0).unwrap()); } + #[test] + fn test_transform_v2_metric_no_namespace_split() { + let event = create_v2_metric_event(); + let config = MezmoDatadogAgentParserConfig { + split_metric_namespace: false, + ..Default::default() + }; + let parser = MezmoDatadogAgentParser::new(&config, None); + + let results = DatadogMetricEvent::transform(event, &parser).unwrap(); + let log = results[0].as_log(); + let message = log + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .as_object() + .unwrap(); + + assert_eq!( + message + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + Some("system.cpu.usage".to_string()) + ); + assert!(message.get("namespace").is_none()); + } + #[test] fn test_transform_v1_metric() { let event = create_v1_metric_event(); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogMetricEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 2); @@ -723,7 +766,7 @@ mod tests { ); let event = Event::Log(log); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let log = results[0].as_log(); @@ -765,7 +808,7 @@ mod tests { ); let event = Event::Log(log); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let message = results[0] @@ -798,7 +841,7 @@ mod tests { ); let event = Event::Log(log); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let log = results[0].as_log(); @@ -830,7 +873,7 @@ mod tests { ); let event = Event::Log(log); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogMetricEvent::transform(event, &parser).unwrap(); let log = results[0].as_log(); @@ -868,7 +911,7 @@ mod tests { ); let event = Event::Log(log); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogSketchEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 1); @@ -933,7 +976,7 @@ mod tests { ); let event = Event::Log(log); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let results = DatadogSketchEvent::transform(event, &parser).unwrap(); assert_eq!(results.len(), 1); diff --git a/src/transforms/mezmo_datadog_agent_parser/mod.rs b/src/transforms/mezmo_datadog_agent_parser/mod.rs index eb3d2a186..cbcbe13ab 100644 --- a/src/transforms/mezmo_datadog_agent_parser/mod.rs +++ b/src/transforms/mezmo_datadog_agent_parser/mod.rs @@ -3,6 +3,8 @@ use std::borrow::Cow; use vector_lib::lookup::lookup_v2::ConfigTargetPath; use vector_lib::transform::SyncTransform; +use mezmo::{user_log_warn, user_trace::MezmoUserLog, MezmoContext}; + use crate::{ config::log_schema, event::{Event, MaybeAsLogMut, ObjectMap, Value}, @@ -31,20 +33,26 @@ pub struct MezmoDatadogAgentParser { event_type_path: ConfigTargetPath, event_type_values: EventTypeValues, payload_version_path: ConfigTargetPath, + parse_log_tags: bool, + split_metric_namespace: bool, strip_event_type: bool, strip_payload_version: bool, reroute_unmatched: bool, + mezmo_ctx: Option, } impl MezmoDatadogAgentParser { - pub fn new(config: &MezmoDatadogAgentParserConfig) -> Self { + pub fn new(config: &MezmoDatadogAgentParserConfig, mezmo_ctx: Option) -> Self { Self { event_type_path: config.event_type_path.clone(), event_type_values: config.event_type_values.clone(), payload_version_path: config.payload_version_path.clone(), + parse_log_tags: config.parse_log_tags, + split_metric_namespace: config.split_metric_namespace, strip_event_type: config.strip_event_type, strip_payload_version: config.strip_payload_version, reroute_unmatched: config.reroute_unmatched, + mezmo_ctx, } } @@ -70,6 +78,7 @@ impl MezmoDatadogAgentParser { output: &mut vector_lib::transform::TransformOutputsBuf, output_name: &'static str, event_type_name: &'static str, + user_log_event_type: &'static str, ) where T: TransformDatadogEvent, { @@ -80,6 +89,13 @@ impl MezmoDatadogAgentParser { } } Err(err) => { + user_log_warn!( + self.mezmo_ctx.clone(), + format!( + "Failed to transform {}: {}", + user_log_event_type, err.message + ) + ); emit!(MezmoDatadogAgentParserError { error: &err.message, event_type: Some(event_type_name) @@ -160,7 +176,13 @@ impl SyncTransform for MezmoDatadogAgentParser { match event_type.as_deref() { Some(t) if t == self.event_type_values.log => { - self.handle_transform_event::(event, output, LOGS_OUTPUT, "log"); + self.handle_transform_event::( + event, + output, + LOGS_OUTPUT, + "log", + "log", + ); } Some(t) if t == self.event_type_values.metric => { self.handle_transform_event::( @@ -168,6 +190,7 @@ impl SyncTransform for MezmoDatadogAgentParser { output, METRICS_OUTPUT, "metric", + "metric", ); } Some(t) if t == self.event_type_values.sketch => { @@ -176,6 +199,7 @@ impl SyncTransform for MezmoDatadogAgentParser { output, METRICS_OUTPUT, "sketch", + "metric", ); } Some(t) if t == self.event_type_values.trace => { @@ -184,6 +208,7 @@ impl SyncTransform for MezmoDatadogAgentParser { output, TRACES_OUTPUT, "trace", + "trace", ); } _ => { @@ -259,7 +284,7 @@ mod tests { #[test] fn routes_events_to_expected_outputs() { let config = MezmoDatadogAgentParserConfig::default(); - let mut parser = MezmoDatadogAgentParser::new(&config); + let mut parser = MezmoDatadogAgentParser::new(&config, None); let (output_names, mut outputs) = build_outputs_buf(); let log_event = build_event( @@ -336,7 +361,7 @@ mod tests { #[test] fn build_events_from_payloads_sets_message_and_timestamp() { let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let mut log = LogEvent::default(); log.insert( diff --git a/src/transforms/mezmo_datadog_agent_parser/traces.rs b/src/transforms/mezmo_datadog_agent_parser/traces.rs index dfbf301de..74d64bca9 100644 --- a/src/transforms/mezmo_datadog_agent_parser/traces.rs +++ b/src/transforms/mezmo_datadog_agent_parser/traces.rs @@ -448,7 +448,7 @@ mod tests { ); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); let result = results.pop().expect("transformed event"); @@ -523,7 +523,7 @@ mod tests { ); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); let result = results.pop().expect("transformed event"); @@ -600,7 +600,7 @@ mod tests { ); let config = MezmoDatadogAgentParserConfig::default(); - let parser = MezmoDatadogAgentParser::new(&config); + let parser = MezmoDatadogAgentParser::new(&config, None); let mut results = DatadogTraceEvent::transform(event, &parser).unwrap(); let result = results.pop().expect("transformed event");