-
Notifications
You must be signed in to change notification settings - Fork 2
feat(mezmo_datadog_agent_parser): Create MezmoDatadogAgentParser #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
39a967b to
89057ec
Compare
The transform receives a log event and outputs a single or multiple events to one of logs, metrics, traces or _unmatched ouptuts. The Datadog agent data is not decoded by this transform; the incoming event is assumed to be already decoded. For traces, each event is assumed to be an entry of the tracerPayloads or traces (legacy) field of the agent payload. For metrics, each event is assumed to be an entry of the series field of the agent payload. The implementation of the transform is mostly based on the Datadog agent source. Ref: LOG-22997
89057ec to
36ebe26
Compare
| let secs = value.floor(); | ||
| let fract = value - secs; | ||
| let mut nanos = (fract * 1_000_000_000.0).round(); | ||
|
|
||
| // Handle rounding overflow (e.g., 0.9999999999 rounding up to 1s) | ||
| let secs = if nanos >= 1_000_000_000.0 { | ||
| nanos -= 1_000_000_000.0; | ||
| secs + 1.0 | ||
| } else { | ||
| secs | ||
| }; | ||
|
|
||
| Some((secs as i64, nanos as u32)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for seconds you likley want to truncate rather than floor (-1.4 is -1 whole seconds, not -2)
you can also cast early to avoid working with secs as a float later, and then use the checked_* methods for overflow checking:
// truncate towards 0 AND perform a saturating cast f64 -> i64
let secs = value.trunc() as i64;
let frac = value - secs as f64;
let mut nanos = (frac * 1_000_000_000.0).round() as i64;
let mut secs = secs;
if nanos >= 1_000_000_000 {
nanos -= 1_000_000_000;
secs = secs.checked_add(1)?;
} else if nanos < 0 {
// also handle the negative offset
nanos += 1_000_000_000;
secs = secs.checked_sub(1)?;
}
Some((secs, nanos as u32))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good. I avoided trunc to not handle the negative values; that assumes all dates are positive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| let message_obj = { | ||
| let log = event | ||
| .maybe_as_log_mut() | ||
| .ok_or_else(|| "Event is not a log".to_string())?; | ||
| get_message_object(log)?.clone() | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this (and below for sketch) will unnecessarily clone the whole message which we pass by reference below to the transform_* functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm cloning the message mostly to ensure that if any step of the transformation fails, the original message is left intact and can be routed to unmatched
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as it stands, the transform functions below (and in sketch) already take a &ObjectMap reference, so the cloning isn't preventing any mutation - we are cloning and then passing the clone by ref.
this is equivalent without a clone:
let message_obj = {
let log = event
.maybe_as_log_mut()
.ok_or_else(|| "Event is not a log".to_string())?;
get_message_object(log)? // no clone
};
let output_messages = match version.as_str() {
// it's already a reference, remove the `&`
"v1" => transform_series_v1(message_obj)?,
"v2" => transform_series_v2(message_obj)?,
_ => transform_series_v2(message_obj)?,
};you can also swap .maybe_as_log_mut() for .maybe_as_log()... and then you will find that the event parameter itself doesn't even need to be mutable.
| if let Some(interval) = interval { | ||
| time_obj.insert("interval_ms".into(), Value::from(interval * 1000)); | ||
| } | ||
| output.insert("time".into(), Value::Object(time_obj)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if interval is None - is inserting an empty time object correct here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely missed that one. Should be in the if block
| let mut new_event = event.clone(); | ||
| insert_message(&mut new_event, payload)?; | ||
| Ok(new_event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we clone the entire event (including .message) only to overwrite .message below in insert_message.
can we do the removal above outside of insert_message to pare down the event first before adding back the message?
| if !tags.is_empty() { | ||
| output.insert("tags".into(), Value::Object(tags.clone())); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: construct a Some(Value::Object(tags)) once outside of this loop, then clone that instead
let tags: Option<Value> = if !tags.is_empty() {
Some(Value::Object(tags))
} else {
None
};
//...
for point in points {
//...
if let Some(tags) = &tags {
output.insert("tags".into(), tags.clone());
}
}same is true in the other transform functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this is def a nit so feel free to ignore - but consider:
// current - clone the map and wrap it in Value each iteration
let tags = ...
for foo in bar {
output.insert("tags".into(), Value::Object(tags.clone()));
}
vs
// suggested - clone the `Value` directly
let tags = Value::Object(...)
for foo in bar {
output.insert("tags".into(), tags.clone());
}| if self.reroute_unmatched { | ||
| output.push(Some(UNMATCHED_OUTPUT), event); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason we only reroute logs to _unmatched on error? for the other types we just drop the input events.
ETA: I think the issue is really the ownership of the original event for these types where 1 input event becomes N outputs. One idea would be to have all the transform_* functions (including transform_log) take ownership of the event, and return an error type that carries back the input event, e.g.:
pub struct TransformError {
pub message: String,
pub input: Event,
}that way we can always push the input to _unmatched on errors
|
|
||
| fn copy_timestamp_nanos(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) { | ||
| if let Some(nanos) = src.get(src_key).and_then(Value::as_integer) { | ||
| dst.insert(dst_key.into(), Value::Timestamp(Utc.timestamp_nanos(nanos))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestamp_nanos will panic when the data is out of range - maybe it would make sense to reuse common::parse_timestamp here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| parser.build_events_from_messages(event, output_messages) | ||
| parser | ||
| .build_events_from_payloads(event.clone(), output_messages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clone is still needed here because the method mutates the event
1. Reduce cost of cloning event when emitting multiple events for a received metric or trace 2. Implement trait for transforming event 3. Move event into transform to reduce cloning. 4. On error, move the error into the event 5. Route the original event to unmatched if transforming fails. Previously this was done for logs only. Ref: LOG-22997
3f41866 to
c71ec4c
Compare
The vector documentation now includes the two flags for Datadog logs and metrics - likely in newer Vector versions. This matches the implementation. Ref: LOG-22997
c71ec4c to
1313aa2
Compare
This PR adds support for transforming Datadog agent events into logs, metrics, traces and sketches.
This allows us to deprecate VRL handling of metrics while adding support for Datadog traces and sketches.
For reference the following protos apply:
Upstream of vector, the agent payload is intercepted, decoded and normalized. The transform thus expects TracerPayload item for trace events, Sketch for sketch metrics, MetricSeries for metrics and Log for logs.
The implementation is mostly based on the existing Datadog agent source implementation; we don't use that source because it exposes an http server to collect metrics
Ref: LOG-22997
Summary
Vector configuration
How did you test this PR?
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.cargo fmt --allcargo clippy --workspace --all-targets -- -D warningscargo nextest run --workspace(alternatively, you can runcargo test --all)git merge origin masterandgit push.Cargo.lock), pleaserun
cargo vdev build licensesto regenerate the license inventory and commit the changes (if any). More details here.