Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ transforms-logs = [
transforms-logs-mezmo = [
"transforms-dedupe",
"transforms-filter",
"transforms-mezmo_datadog_agent_parser",
"transforms-mezmo_reduce",
"transforms-remap",
"transforms-route",
Expand Down Expand Up @@ -818,6 +819,7 @@ transforms-metric_to_log = []
transforms-reduce = ["transforms-impl-reduce"]
transforms-mezmo_reduce = ["transforms-reduce"] # depends on the upstream version for merge_strategies
transforms-mezmo_aggregate = []
transforms-mezmo_datadog_agent_parser = []
transforms-mezmo_log_to_metric = []
transforms-mezmo_log_clustering = ["dep:lru", "dep:blake2", "dep:base64", "dep:tokio-postgres"]
transforms-mezmo_log_classification = ["dep:grok"]
Expand Down
51 changes: 51 additions & 0 deletions src/internal_events/mezmo_datadog_agent_parser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;

#[derive(Debug)]
pub struct MezmoDatadogAgentParserError<'a> {
pub error: &'a str,
pub event_type: Option<&'a str>,
}

#[derive(Debug)]
pub struct MezmoDatadogAgentParserInvalidItem<'a> {
pub error: &'a str,
pub item_type: &'a str,
pub event_type: Option<&'a str>,
}

impl InternalEvent for MezmoDatadogAgentParserError<'_> {
fn emit(self) {
let event_type = self.event_type.unwrap_or("unknown");
error!(
message = "Failed to parse Datadog agent payload.",
error = %self.error,
event_type = %event_type,
internal_log_rate_limit = true,
);
counter!(
"mezmo_datadog_agent_parser_errors_total",
"event_type" => event_type.to_string(),
)
.increment(1);
}
}

impl InternalEvent for MezmoDatadogAgentParserInvalidItem<'_> {
fn emit(self) {
let event_type = self.event_type.unwrap_or("unknown");
error!(
message = "Invalid item error.",
error = %self.error,
item_type = %self.item_type,
event_type = %event_type,
internal_log_rate_limit = true,
);
counter!(
"mezmo_datadog_agent_parser_invalid_items_total",
"item_type" => self.item_type.to_string(),
"event_type" => event_type.to_string(),
)
.increment(1);
}
}
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ mod mezmo_aggregate;
#[cfg(feature = "transforms-mezmo_aggregate_distributed")]
mod mezmo_aggregate_distributed;
pub mod mezmo_config;
#[cfg(feature = "transforms-mezmo_datadog_agent_parser")]
mod mezmo_datadog_agent_parser;
#[cfg(feature = "transforms-mezmo_log_clustering")]
pub(crate) mod mezmo_log_clustering;
#[cfg(any(
Expand Down Expand Up @@ -255,6 +257,8 @@ pub(crate) use self::metric_to_log::*;
pub(crate) use self::mezmo_aggregate::*;
#[cfg(feature = "transforms-mezmo_aggregate_distributed")]
pub(crate) use self::mezmo_aggregate_distributed::*;
#[cfg(feature = "transforms-mezmo_datadog_agent_parser")]
pub(crate) use self::mezmo_datadog_agent_parser::*;
#[cfg(feature = "transforms-mezmo_tag_cardinality_limit")]
pub(crate) use self::mezmo_tag_cardinality_limit::*;
#[cfg(feature = "transforms-mezmo_throttle_distributed")]
Expand Down
93 changes: 93 additions & 0 deletions src/transforms/mezmo_datadog_agent_parser/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use chrono::{TimeZone, Utc};

use crate::config::log_schema;
use crate::event::{LogEvent, ObjectMap, Value};

#[derive(Copy, Clone, Debug)]
pub enum TimestampUnit {
Seconds,
Milliseconds,
Nanoseconds,
}

pub fn get_message_object(log: &LogEvent) -> Result<&ObjectMap, String> {
let message_path = log_schema()
.message_key_target_path()
.ok_or_else(|| "Missing message key".to_string())?;

log.get(message_path)
.and_then(Value::as_object)
.ok_or_else(|| "Message is not an object".to_string())
}

pub fn get_message_object_mut(log: &mut LogEvent) -> Result<&mut ObjectMap, String> {
let message_path = log_schema()
.message_key_target_path()
.ok_or_else(|| "Missing message key".to_string())?;

log.get_mut(message_path)
.and_then(Value::as_object_mut)
.ok_or_else(|| "Message is not an object".to_string())
}

pub fn parse_timestamp(value: &Value, unit: TimestampUnit) -> Option<chrono::DateTime<Utc>> {
match value {
Value::Timestamp(timestamp) => Some(*timestamp),
Value::Integer(value) => {
let (seconds, nanoseconds) = match unit {
TimestampUnit::Seconds => (*value, None),
TimestampUnit::Milliseconds => (
value.div_euclid(1000),
(value.rem_euclid(1000) as u32).checked_mul(1_000_000),
),
TimestampUnit::Nanoseconds => (
value.div_euclid(1_000_000_000),
Some(value.rem_euclid(1_000_000_000) as u32),
),
};
let nanoseconds = nanoseconds.unwrap_or_default();
Utc.timestamp_opt(seconds, nanoseconds).single()
}
Value::Float(value) => {
let value = value.into_inner();
if !value.is_finite() {
return None;
}
let seconds = match unit {
TimestampUnit::Seconds => value,
TimestampUnit::Milliseconds => value / 1000.0,
TimestampUnit::Nanoseconds => value / 1_000_000_000.0,
};
let (seconds, nanos) = split_float_seconds(seconds)?;
Utc.timestamp_opt(seconds, nanos).single()
}

_ => None,
}
}

fn split_float_seconds(value: f64) -> Option<(i64, u32)> {
if value < (i64::MIN as f64) || value > (i64::MAX as f64) {
return None;
}

// trunc handles negative values better than floor. -1.4 is -1, not -2
let mut secs = value.trunc();
let mut fractional_secs = value - secs;

if fractional_secs < 0.0 {
fractional_secs += 1.0;
secs -= 1.0;
}
let mut nanoseconds = (fractional_secs * 1_000_000_000.0).round();

if nanoseconds >= 1_000_000_000.0 {
nanoseconds -= 1_000_000_000.0;
secs += 1.0;
}

if secs < i64::MIN as f64 || secs > i64::MAX as f64 {
return None;
}
Some((secs as i64, nanoseconds as u32))
}
184 changes: 184 additions & 0 deletions src/transforms/mezmo_datadog_agent_parser/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
use vector_lib::config::{clone_input_definitions, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::ConfigTargetPath;

use crate::{
config::{
log_schema, DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
TransformOutput,
},
schema,
transforms::Transform,
};

use super::{
MezmoDatadogAgentParser, LOGS_OUTPUT, METRICS_OUTPUT, TRACES_OUTPUT, UNMATCHED_OUTPUT,
};

fn default_event_type_path() -> ConfigTargetPath {
ConfigTargetPath(
log_schema()
.metadata_key_target_path()
.expect("metadata key must exist")
.clone()
.with_field_appended("x-mezmo-dd-event-type"),
)
}

fn default_payload_version_path() -> ConfigTargetPath {
ConfigTargetPath(
log_schema()
.message_key_target_path()
.expect("message key must exist")
.clone()
.with_field_appended("mezmo_payload_version"),
)
}

/// Mapping of event type string values to internal types.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(default)]
pub struct EventTypeValues {
/// Value that identifies a log event.
pub log: String,

/// Value that identifies a metric event.
pub metric: String,

/// Value that identifies a trace event.
pub trace: String,

/// Value that identifies a sketch event.
pub sketch: String,
}

impl Default for EventTypeValues {
fn default() -> Self {
Self {
log: "log".into(),
metric: "metric".into(),
trace: "trace".into(),
sketch: "sketch".into(),
}
}
}

/// Configuration for the `mezmo_datadog_agent_parser` transform.
#[configurable_component(transform(
"mezmo_datadog_agent_parser",
"Parse and normalize Datadog agent payloads into structured log events."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct MezmoDatadogAgentParserConfig {
/// Path to the event type field in the incoming data.
#[serde(default = "default_event_type_path")]
pub event_type_path: ConfigTargetPath,

/// Mapping of event type string values to internal types.
#[serde(default)]
pub event_type_values: EventTypeValues,

/// Path to the payload version field in the incoming data.
#[serde(default = "default_payload_version_path")]
pub payload_version_path: ConfigTargetPath,

/// Parse log tag strings into arrays when true.
#[serde(default = "crate::serde::default_false")]
pub parse_log_tags: bool,

/// Split metric name into namespace and name when true.
///
/// When false, the full metric name is preserved in `name` and `namespace`
/// is omitted.
#[serde(default = "crate::serde::default_true")]
pub split_metric_namespace: bool,

/// Remove the event type field after identifying the event.
#[serde(default = "crate::serde::default_true")]
pub strip_event_type: bool,

/// Remove the payload version field before sending to output.
#[serde(default = "crate::serde::default_true")]
pub strip_payload_version: bool,

/// Route unrecognized event types to the _unmatched output instead of dropping.
#[serde(default = "crate::serde::default_true")]
pub reroute_unmatched: bool,
}

impl Default for MezmoDatadogAgentParserConfig {
fn default() -> Self {
Self {
event_type_path: default_event_type_path(),
event_type_values: EventTypeValues::default(),
payload_version_path: default_payload_version_path(),
parse_log_tags: false,
split_metric_namespace: true,
strip_event_type: false,
strip_payload_version: false,
reroute_unmatched: true,
}
}
}

impl GenerateConfig for MezmoDatadogAgentParserConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self::default()).unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "mezmo_datadog_agent_parser")]
impl TransformConfig for MezmoDatadogAgentParserConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Ok(Transform::synchronous(MezmoDatadogAgentParser::new(
self,
context.mezmo_ctx.clone(),
)))
}

fn input(&self) -> Input {
Input::log()
}

fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
let mut outputs = vec![
TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions))
.with_port(LOGS_OUTPUT),
TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions))
.with_port(METRICS_OUTPUT),
TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions))
.with_port(TRACES_OUTPUT),
];

if self.reroute_unmatched {
outputs.push(
TransformOutput::new(DataType::Log, clone_input_definitions(input_definitions))
.with_port(UNMATCHED_OUTPUT),
);
}

outputs
}

fn enable_concurrency(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<MezmoDatadogAgentParserConfig>();
}
}
Loading