Skip to content

Conversation

@biblicalph
Copy link
Contributor

@biblicalph biblicalph commented Jan 26, 2026

This PR adds support for transforming Datadog agent events into logs, metrics, traces and sketches.
This allows us to deprecate VRL handling of metrics while adding support for Datadog traces and sketches.

For reference the following protos apply:

  1. Logs: https://github.com/DataDog/agent-payload/blob/master/proto/logs/agent_logs_payload.proto
  2. Metrics: https://github.com/DataDog/agent-payload/blob/master/proto/metrics/agent_payload.proto#L91
  3. Traces: https://github.com/DataDog/datadog-agent/blob/main/pkg/proto/datadog/trace/agent_payload.proto

Upstream of vector, the agent payload is intercepted, decoded and normalized. The transform thus expects TracerPayload item for trace events, Sketch for sketch metrics, MetricSeries for metrics and Log for logs.

The implementation is mostly based on the existing Datadog agent source implementation; we don't use that source because it exposes an http server to collect metrics

Ref: LOG-22997

Summary

Vector configuration

How did you test this PR?

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run cargo vdev build licenses to regenerate the license inventory and commit the changes (if any). More details here.

@biblicalph biblicalph self-assigned this Jan 26, 2026
@biblicalph biblicalph marked this pull request as draft January 26, 2026 22:03
@biblicalph biblicalph force-pushed the biblicalph/LOG-22997 branch 2 times, most recently from 39a967b to 89057ec Compare January 26, 2026 22:59
@biblicalph biblicalph marked this pull request as ready for review January 26, 2026 22:59
The transform receives a log event and outputs a single
or multiple events to one of logs, metrics, traces or
 _unmatched ouptuts.
The Datadog agent data is not decoded by this transform;
the incoming event is assumed to be already decoded.
For traces, each event is assumed to be an entry of the
tracerPayloads or traces (legacy) field of the agent
payload. For metrics, each event is assumed to be an
entry of the series field of the agent payload.
The implementation of the transform is mostly based on
the Datadog agent source.

Ref: LOG-22997
@biblicalph biblicalph force-pushed the biblicalph/LOG-22997 branch from 89057ec to 36ebe26 Compare January 26, 2026 23:08
Comment on lines 71 to 83
let secs = value.floor();
let fract = value - secs;
let mut nanos = (fract * 1_000_000_000.0).round();

// Handle rounding overflow (e.g., 0.9999999999 rounding up to 1s)
let secs = if nanos >= 1_000_000_000.0 {
nanos -= 1_000_000_000.0;
secs + 1.0
} else {
secs
};

Some((secs as i64, nanos as u32))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for seconds you likley want to truncate rather than floor (-1.4 is -1 whole seconds, not -2)

you can also cast early to avoid working with secs as a float later, and then use the checked_* methods for overflow checking:

// truncate towards 0 AND perform a saturating cast f64 -> i64
let secs = value.trunc() as i64;
let frac = value - secs as f64;

let mut nanos = (frac * 1_000_000_000.0).round() as i64;

let mut secs = secs;
if nanos >= 1_000_000_000 {
    nanos -= 1_000_000_000;
    secs = secs.checked_add(1)?;
} else if nanos < 0 {
    // also handle the negative offset
    nanos += 1_000_000_000;
    secs = secs.checked_sub(1)?;
}

Some((secs, nanos as u32))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. I avoided trunc to not handle the negative values; that assumes all dates are positive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 23 to 28
let message_obj = {
let log = event
.maybe_as_log_mut()
.ok_or_else(|| "Event is not a log".to_string())?;
get_message_object(log)?.clone()
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this (and below for sketch) will unnecessarily clone the whole message which we pass by reference below to the transform_* functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm cloning the message mostly to ensure that if any step of the transformation fails, the original message is left intact and can be routed to unmatched

Copy link
Member

@mdeltito mdeltito Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it stands, the transform functions below (and in sketch) already take a &ObjectMap reference, so the cloning isn't preventing any mutation - we are cloning and then passing the clone by ref.

this is equivalent without a clone:

    let message_obj = {
        let log = event
            .maybe_as_log_mut()
            .ok_or_else(|| "Event is not a log".to_string())?;
        get_message_object(log)? // no clone
    };

    let output_messages = match version.as_str() {
        // it's already a reference, remove the `&`
        "v1" => transform_series_v1(message_obj)?,
        "v2" => transform_series_v2(message_obj)?,
        _ => transform_series_v2(message_obj)?,
    };

you can also swap .maybe_as_log_mut() for .maybe_as_log()... and then you will find that the event parameter itself doesn't even need to be mutable.

if let Some(interval) = interval {
time_obj.insert("interval_ms".into(), Value::from(interval * 1000));
}
output.insert("time".into(), Value::Object(time_obj));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if interval is None - is inserting an empty time object correct here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely missed that one. Should be in the if block

Comment on lines 88 to 90
let mut new_event = event.clone();
insert_message(&mut new_event, payload)?;
Ok(new_event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we clone the entire event (including .message) only to overwrite .message below in insert_message.

can we do the removal above outside of insert_message to pare down the event first before adding back the message?

Comment on lines 215 to 217
if !tags.is_empty() {
output.insert("tags".into(), Value::Object(tags.clone()));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: construct a Some(Value::Object(tags)) once outside of this loop, then clone that instead

let tags: Option<Value> = if !tags.is_empty() {
    Some(Value::Object(tags))
} else {
    None
};

//...
for point in points {
  //...
  if let Some(tags) = &tags {
    output.insert("tags".into(), tags.clone());
  }
}

same is true in the other transform functions

Copy link
Contributor Author

@biblicalph biblicalph Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is def a nit so feel free to ignore - but consider:

// current - clone the map and wrap it in Value each iteration
let tags = ...
for foo in bar {
  output.insert("tags".into(), Value::Object(tags.clone()));
}

vs 

// suggested - clone the `Value` directly
let tags = Value::Object(...)
for foo in bar {
  output.insert("tags".into(), tags.clone());
}

Comment on lines 133 to 135
if self.reroute_unmatched {
output.push(Some(UNMATCHED_OUTPUT), event);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we only reroute logs to _unmatched on error? for the other types we just drop the input events.

ETA: I think the issue is really the ownership of the original event for these types where 1 input event becomes N outputs. One idea would be to have all the transform_* functions (including transform_log) take ownership of the event, and return an error type that carries back the input event, e.g.:

pub struct TransformError {
    pub message: String,
    pub input: Event,
}

that way we can always push the input to _unmatched on errors


fn copy_timestamp_nanos(src: &ObjectMap, src_key: &str, dst: &mut ObjectMap, dst_key: &str) {
if let Some(nanos) = src.get(src_key).and_then(Value::as_integer) {
dst.insert(dst_key.into(), Value::Timestamp(Utc.timestamp_nanos(nanos)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamp_nanos will panic when the data is out of range - maybe it would make sense to reuse common::parse_timestamp here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


parser.build_events_from_messages(event, output_messages)
parser
.build_events_from_payloads(event.clone(), output_messages)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone is still needed here because the method mutates the event

1. Reduce cost of cloning event when emitting
multiple events for a received metric or trace
2. Implement trait for transforming event
3. Move event into transform to reduce cloning.
4. On error, move the error into the event
5. Route the original event to unmatched if
transforming fails. Previously this was done
for logs only.

Ref: LOG-22997
@biblicalph biblicalph force-pushed the biblicalph/LOG-22997 branch 2 times, most recently from 3f41866 to c71ec4c Compare January 28, 2026 03:25
The vector documentation now includes the two flags for
Datadog logs and metrics - likely in newer Vector versions.

This matches the implementation.

Ref: LOG-22997
@biblicalph biblicalph force-pushed the biblicalph/LOG-22997 branch from c71ec4c to 1313aa2 Compare January 28, 2026 04:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants