Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
0e6fc77
feat(worker): Add trigger to worker trait
VianneyRuhlmann Feb 11, 2026
305b853
feat(data_pipeline): add SharedRuntime
VianneyRuhlmann Feb 11, 2026
66e9e06
feat(worker): add initial trigger
VianneyRuhlmann Feb 13, 2026
b672262
feat(agent_info): use initial trigger
VianneyRuhlmann Feb 13, 2026
23022a6
feat(stats): implement stats worker
VianneyRuhlmann Feb 13, 2026
6c9c7f3
fix(shared_runtime): fix compile error
VianneyRuhlmann Feb 13, 2026
acc69ae
data-pipeline: pause all workers before joining in before_fork
VianneyRuhlmann Feb 13, 2026
17a9bb8
feat(exporter): use shared runtime in trace exporter
VianneyRuhlmann Feb 16, 2026
238a430
feat(shared_runtime): add worker handle
VianneyRuhlmann Feb 18, 2026
592c24c
refactor(worker): remove stopped status
VianneyRuhlmann Feb 18, 2026
ff6448c
chore(telemetry): move telemetry shutdown to worker
VianneyRuhlmann Feb 20, 2026
0dbce86
chore(shared-runtime): update error types
VianneyRuhlmann Feb 23, 2026
58bff21
chore(shared-runtime): return detailed errors in before_fork
VianneyRuhlmann Feb 26, 2026
16ab63b
chore(runtime): doc
VianneyRuhlmann Feb 27, 2026
3625c0a
test(telemetry): use shared runtime in tests
VianneyRuhlmann Mar 2, 2026
f86890a
test(telemetry): use client builder
VianneyRuhlmann Mar 2, 2026
3d285ae
chore(runtime): fix nit
VianneyRuhlmann Mar 4, 2026
1f8c0cf
refactor(runtime): move shutdown to runtime
VianneyRuhlmann Mar 5, 2026
62a63d6
test(telemetry): Add sleep after send
VianneyRuhlmann Mar 5, 2026
c1270f1
test(telemetry): fix deadlocks in telemetry tests
VianneyRuhlmann Mar 6, 2026
608556e
refactor(runtime): skip shutdown when runtime is None
VianneyRuhlmann Mar 9, 2026
34ec7d9
feat(runtime): add runtime to builder
VianneyRuhlmann Mar 9, 2026
a02eec6
chore(telemetry): remove macro
VianneyRuhlmann Mar 9, 2026
de8bce5
Merge branch 'main' into vianney/implement-shared-runtime
VianneyRuhlmann Mar 9, 2026
3119d5f
docs(runtime): add warnings
VianneyRuhlmann Mar 9, 2026
74425ba
feat(shared_runtime): add shared runtime ffi
VianneyRuhlmann Mar 10, 2026
2869d39
feat(trace_exporter): add shutdown method
VianneyRuhlmann Mar 10, 2026
4b1d0b6
fix(shared_runtime): add on_pause hook to release waker in info fetcher
VianneyRuhlmann Mar 10, 2026
0e74cad
fix(telemetry): add reset hook to telemetry
VianneyRuhlmann Mar 10, 2026
544b840
fix(telemetry): fix spawn and run loop for telemetry
VianneyRuhlmann Mar 10, 2026
a3e2c38
docs(info_fetcher): update doc for running the fetcher
VianneyRuhlmann Mar 10, 2026
67fcf2a
feat(runtime-ffi): remove redundant allocation of Box
VianneyRuhlmann Mar 11, 2026
018028f
Merge branch 'main' into vianney/implement-shared-runtime
VianneyRuhlmann Mar 11, 2026
6420f28
format
VianneyRuhlmann Mar 11, 2026
72f61f6
feat(runtime-ffi): use new handle in trace exporter builder
VianneyRuhlmann Mar 11, 2026
7bac719
chore: catch panics on ffi
VianneyRuhlmann Mar 13, 2026
fc1c81b
feat(telemetry): add check for existing action
VianneyRuhlmann Mar 16, 2026
57de7d4
refactor: move shared runtime to a separate crate
VianneyRuhlmann Mar 20, 2026
fb187ce
chore: apply suggestions
VianneyRuhlmann Mar 23, 2026
0f91bb3
chore(codeowners): add libdd-shared-runtime to codeowners
VianneyRuhlmann Mar 23, 2026
f12131f
docs(shared-runtime-ffi): add comment to error msg
VianneyRuhlmann Mar 23, 2026
1943f13
test: use thread::sleep instead of tokio
VianneyRuhlmann Mar 23, 2026
f1c8ea9
chore: rename join to wait_for_pause
VianneyRuhlmann Mar 23, 2026
16c20b6
docs: update after_fork_child doc
VianneyRuhlmann Mar 23, 2026
366dff5
refactor(shared-runtime): move shared runtime to a module
VianneyRuhlmann Mar 24, 2026
0a2cac4
chore: add debug logs
VianneyRuhlmann Mar 24, 2026
040ccdd
feat(shared_runtime): add block_on method to runtime
VianneyRuhlmann Mar 26, 2026
92e5ddc
chore: undo catch panic change
VianneyRuhlmann Mar 26, 2026
64b9bd2
chore: remove async trait from ddcommon
VianneyRuhlmann Mar 26, 2026
37b8105
refactor: use option to handle null pointer
VianneyRuhlmann Mar 26, 2026
febf920
docs: add comment to telemetry unused action
VianneyRuhlmann Mar 26, 2026
fc45ce9
feat: use futures unordered instead of JoinSet
VianneyRuhlmann Mar 26, 2026
c0c7b2e
fix(telemetry): clear items in telemetry store
VianneyRuhlmann Mar 27, 2026
ec6fbec
feat: use biased select to reduce time-to-pause
VianneyRuhlmann Mar 27, 2026
a523c1d
chore(shared-runtime): address mutex lock order concerns
VianneyRuhlmann Mar 27, 2026
21402c2
style: clippy and fmt
VianneyRuhlmann Mar 27, 2026
f6d69c1
chore: update 3rd party
VianneyRuhlmann Mar 27, 2026
8fdbd73
Merge branch 'main' into vianney/implement-shared-runtime
VianneyRuhlmann Mar 27, 2026
35d34c0
chore: fix conflicts
VianneyRuhlmann Mar 30, 2026
b1b5f9b
chore: remove legacy test
VianneyRuhlmann Mar 30, 2026
15c9352
Merge branch 'main' into vianney/implement-shared-runtime
VianneyRuhlmann Mar 30, 2026
da9ce1e
chore: bump libdd-common version
VianneyRuhlmann Mar 30, 2026
b9fe72e
chore: set shared runtime version
VianneyRuhlmann Mar 30, 2026
5938b42
feat(data-pipeline): port dd-trace-rs trace buffer implementation
paullegranddc Mar 31, 2026
41f26d8
docs: add comments on the export operation
paullegranddc Mar 31, 2026
4e809ee
fix: license 3rd party file
paullegranddc Mar 31, 2026
ca27d63
fix: make timeout infinite of None, instead of disabled
paullegranddc Mar 31, 2026
e6f17c2
perf: add trace buffer benchmarks
paullegranddc Apr 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ libdd-library-config*/ @DataDog/apm-sdk-capabilities-rust
libdd-libunwind*/ @DataDog/libdatadog-profiling
libdd-log*/ @DataDog/apm-common-components-core
libdd-profiling*/ @DataDog/libdatadog-profiling
libdd-shared-runtime*/ @DataDog/apm-common-components-core
libdd-telemetry*/ @DataDog/apm-common-components-core
libdd-tinybytes @DataDog/apm-common-components-core
libdd-trace-normalization @DataDog/serverless @DataDog/libdatadog-apm
Expand Down
27 changes: 26 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ members = [
"spawn_worker",
"tests/spawn_from_lib",
"bin_tests",
"libdd-shared-runtime",
"libdd-shared-runtime-ffi",
"libdd-data-pipeline",
"libdd-data-pipeline-ffi",
"libdd-ddsketch",
Expand Down
6 changes: 3 additions & 3 deletions LICENSE-3rdparty.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
root_name: builder, build_common, tools, libdd-alloc, libdd-crashtracker, libdd-common, libdd-telemetry, libdd-ddsketch, libdd-libunwind-sys, libdd-crashtracker-ffi, libdd-common-ffi, datadog-ffe, datadog-ffe-ffi, datadog-ipc, datadog-ipc-macros, libdd-tinybytes, spawn_worker, cc_utils, libdd-library-config, libdd-trace-protobuf, libdd-library-config-ffi, datadog-live-debugger, libdd-data-pipeline, libdd-dogstatsd-client, libdd-trace-stats, libdd-trace-utils, libdd-trace-normalization, libdd-log, datadog-live-debugger-ffi, libdd-profiling, libdd-profiling-protobuf, libdd-profiling-ffi, libdd-data-pipeline-ffi, libdd-ddsketch-ffi, libdd-log-ffi, libdd-telemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, libdd-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib
root_name: builder, build_common, tools, libdd-alloc, libdd-crashtracker, libdd-common, libdd-telemetry, libdd-ddsketch, libdd-shared-runtime, libdd-libunwind-sys, libdd-crashtracker-ffi, libdd-common-ffi, datadog-ffe, datadog-ffe-ffi, datadog-ipc, datadog-ipc-macros, libdd-tinybytes, spawn_worker, cc_utils, libdd-library-config, libdd-trace-protobuf, libdd-library-config-ffi, datadog-live-debugger, libdd-data-pipeline, libdd-dogstatsd-client, libdd-trace-stats, libdd-trace-utils, libdd-trace-normalization, libdd-log, datadog-live-debugger-ffi, libdd-profiling, libdd-profiling-protobuf, libdd-profiling-ffi, libdd-data-pipeline-ffi, libdd-ddsketch-ffi, libdd-log-ffi, libdd-telemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, libdd-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib, bin_tests
third_party_libraries:
- package_name: addr2line
package_version: 0.24.2
Expand Down Expand Up @@ -30600,9 +30600,9 @@ third_party_libraries:
- package_name: stringmetrics
package_version: 2.2.2
repository: https://github.com/pluots/stringmetrics
license: License specified in file ($CARGO_HOME/registry/src/index.crates.io-1949cf8c6b5b557f/stringmetrics-2.2.2/LICENSE)
license: License specified in file ($CARGO_HOME/registry/src/github.com-25cdd57fae9f0462/stringmetrics-2.2.2/LICENSE)
licenses:
- license: License specified in file ($CARGO_HOME/registry/src/index.crates.io-1949cf8c6b5b557f/stringmetrics-2.2.2/LICENSE)
- license: License specified in file ($CARGO_HOME/registry/src/github.com-25cdd57fae9f0462/stringmetrics-2.2.2/LICENSE)
text: |
Copyright 2022 Trevor Gross

Expand Down
1 change: 0 additions & 1 deletion libdd-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub mod test_utils;
pub mod threading;
pub mod timeout;
pub mod unix_utils;
pub mod worker;

/// Extension trait for `Mutex` to provide a method that acquires a lock, panicking if the lock is
/// poisoned.
Expand Down
12 changes: 0 additions & 12 deletions libdd-common/src/worker.rs

This file was deleted.

1 change: 1 addition & 0 deletions libdd-data-pipeline-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" }

[dependencies]
libdd-data-pipeline = { path = "../libdd-data-pipeline" }
libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" }
libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false }
libdd-tinybytes = { path = "../libdd-tinybytes" }
tracing = { version = "0.1", default-features = false }
2 changes: 1 addition & 1 deletion libdd-data-pipeline-ffi/cbindgen.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ must_use = "DDOG_CHECK_RETURN"

[parse]
parse_deps = true
include = ["libdd-common", "libdd-common-ffi", "libdd-data-pipeline"]
include = ["libdd-common", "libdd-common-ffi", "libdd-shared-runtime", "libdd-data-pipeline"]
39 changes: 37 additions & 2 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use libdd_common_ffi::{
CharSlice,
{slice::AsBytes, slice::ByteSlice},
};

use libdd_data_pipeline::trace_exporter::{
TelemetryConfig, TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
};
use std::{ptr::NonNull, time::Duration};
use libdd_shared_runtime::SharedRuntime;
use std::{ptr::NonNull, sync::Arc, time::Duration};
use tracing::{debug, error};

#[inline]
Expand Down Expand Up @@ -68,6 +68,7 @@ pub struct TraceExporterConfig {
process_tags: Option<String>,
test_session_token: Option<String>,
connection_timeout: Option<u64>,
shared_runtime: Option<Arc<SharedRuntime>>,
otlp_endpoint: Option<String>,
}

Expand Down Expand Up @@ -415,6 +416,36 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_connection_timeout(
)
}

/// Sets a shared runtime for the TraceExporter to use for background workers.
///
/// `handle` must have been initialized with [`ddog_shared_runtime_new`].
///
/// When set, the exporter will use the provided runtime instead of creating its own.
/// This allows multiple exporters (or other components) to share a single runtime.
/// The config holds a clone of the `Arc` (increments the strong count), so the
/// original handle remains valid and must still be freed with
/// [`ddog_shared_runtime_free`].
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_shared_runtime(
config: Option<&mut TraceExporterConfig>,
handle: Option<NonNull<SharedRuntime>>,
) -> Option<Box<ExporterError>> {
catch_panic!(
match (config, handle) {
(Some(config), Some(handle)) => {
// SAFETY: handle was produced by Arc::into_raw and the Arc is still alive.
// Increment the strong count before reconstructing so the config's Arc
// is independent from the caller's handle.
Arc::increment_strong_count(handle.as_ptr());
config.shared_runtime = Some(Arc::from_raw(handle.as_ptr()));
None
}
_ => gen_error!(ErrorCode::InvalidArgument),
},
gen_error!(ErrorCode::Panic)
)
}

/// Enables OTLP HTTP/JSON export and sets the endpoint URL.
///
/// When set, traces are sent to this URL in OTLP HTTP/JSON format instead of the Datadog
Expand Down Expand Up @@ -497,6 +528,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
builder.enable_health_metrics();
}

if let Some(runtime) = config.shared_runtime.clone() {
builder.set_shared_runtime(runtime);
}

if let Some(ref url) = config.otlp_endpoint {
builder.set_otlp_endpoint(url);
}
Expand Down
9 changes: 8 additions & 1 deletion libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ autobenches = false
[dependencies]
anyhow = { version = "1.0" }
arc-swap = "1.7.1"
async-trait = "0.1"
http = "1.1"
http-body-util = "0.1"
tracing = { version = "0.1", default-features = false }
Expand All @@ -25,12 +26,13 @@ sha2 = "0.10"
either = "1.13.0"
tokio = { version = "1.23", features = [
"rt",
"sync",
"test-util",
"time",
], default-features = false }
uuid = { version = "1.10.0", features = ["v4"] }
tokio-util = "0.7.11"
libdd-common = { version = "3.0.2", path = "../libdd-common", default-features = false }
libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" }
libdd-telemetry = { version = "4.0.0", path = "../libdd-telemetry", default-features = false }
libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" }
libdd-trace-stats = { version = "2.0.0", path = "../libdd-trace-stats" }
Expand All @@ -45,8 +47,13 @@ libdd-tinybytes = { version = "1.1.0", path = "../libdd-tinybytes", features = [
[lib]
bench = false

[[bench]]
name = "trace_buffer"
harness = false

[dev-dependencies]
libdd-log = { path = "../libdd-log" }
libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" }
clap = { version = "4.0", features = ["derive"] }
criterion = "0.5.1"
libdd-trace-utils = { path = "../libdd-trace-utils", features = [
Expand Down
108 changes: 108 additions & 0 deletions libdd-data-pipeline/benches/trace_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use libdd_data_pipeline::trace_buffer::{Export, TraceBuffer, TraceBufferConfig, TraceChunk};
use libdd_data_pipeline::trace_exporter::{
agent_response::AgentResponse, error::TraceExporterError, TraceExporter,
};
use libdd_shared_runtime::SharedRuntime;

// ~300-byte payload approximating a real span's size.
type Span = [u8; 300];

// Number of chunks each sender thread sends per benchmark iteration.
const CHUNKS_PER_SENDER: usize = 100;

// Simulates async IO by sleeping 2ms per export batch.
#[derive(Debug)]
struct SleepExport;

impl Export<Span> for SleepExport {
fn export_trace_chunks<'a: 'c, 'b: 'c, 'c>(
&'a mut self,
_trace_chunks: Vec<TraceChunk<Span>>,
_trace_exporter: &'b TraceExporter,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<AgentResponse, TraceExporterError>> + Send + 'c,
>,
> {
Box::pin(async {
tokio::time::sleep(Duration::from_millis(2)).await;
Ok(AgentResponse::Unchanged)
})
}
}

fn setup_buffer() -> (Arc<SharedRuntime>, Arc<TraceBuffer<Span>>) {
let rt = Arc::new(SharedRuntime::new().expect("SharedRuntime::new"));
let mut builder = TraceExporter::builder();
builder.set_shared_runtime(rt.clone());
let cfg = TraceBufferConfig::new()
.max_buffered_spans(100_000)
.span_flush_threshold(1_000)
.max_flush_interval(Duration::from_secs(2));
let (buf, worker) = TraceBuffer::new(
cfg,
Box::new(|_| {}),
Box::new(SleepExport),
builder.build().expect("TraceExporter::build"),
);
rt.spawn_worker(worker).expect("spawn_worker");
(rt, Arc::new(buf))
}

fn bench_trace_buffer(c: &mut Criterion) {
let mut group = c.benchmark_group("trace_buffer");

// (label, inter-send delay)
let workloads: &[(&str, Option<Duration>)] = &[
("no_delay", None),
("1us_delay", Some(Duration::from_micros(1))),
("10us_delay", Some(Duration::from_micros(100))),
];

for &(delay_label, delay) in workloads {
for num_senders in [1_usize, 2, 4, 8] {
let (rt, sender) = setup_buffer();

group.throughput(Throughput::Elements(
(num_senders * CHUNKS_PER_SENDER) as u64,
));

group.bench_function(
BenchmarkId::new(format!("{}_senders", num_senders), delay_label),
|b| {
b.iter(|| {
std::thread::scope(|s| {
for _ in 0..num_senders {
let sender = sender.clone();
s.spawn(move || {
for _ in 0..CHUNKS_PER_SENDER {
// BatchFull errors are expected under high load.
let _ = sender.send_chunk(vec![[0u8; 300]]);
if let Some(d) = delay {
std::thread::sleep(d);
}
}
});
}
});
});
},
);

rt.shutdown(None).expect("runtime shutdown");
}
}

group.finish();
}

criterion_group!(benches, bench_trace_buffer);
criterion_main!(benches);
9 changes: 7 additions & 2 deletions libdd-data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use libdd_data_pipeline::trace_exporter::{
use libdd_log::logger::{
logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget,
};
use libdd_shared_runtime::SharedRuntime;
use libdd_trace_protobuf::pb;
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, UNIX_EPOCH},
};

Expand Down Expand Up @@ -53,6 +55,8 @@ fn main() {
.expect("Failed to configure logger");
logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level");

let shared_runtime = Arc::new(SharedRuntime::new().expect("Failed to create runtime"));

let args = Args::parse();
let telemetry_cfg = TelemetryConfig::default();
let mut builder = TraceExporter::builder();
Expand All @@ -67,6 +71,7 @@ fn main() {
.set_language_version(env!("CARGO_PKG_RUST_VERSION"))
.set_input_format(TraceExporterInputFormat::V04)
.set_output_format(TraceExporterOutputFormat::V04)
.set_shared_runtime(shared_runtime.clone())
.enable_telemetry(telemetry_cfg)
.enable_stats(Duration::from_secs(10));
let exporter = builder.build().expect("Failed to build TraceExporter");
Expand All @@ -86,7 +91,7 @@ fn main() {
let data = rmp_serde::to_vec_named(&traces).expect("Failed to serialize traces");

exporter.send(data.as_ref()).expect("Failed to send traces");
exporter
shared_runtime
.shutdown(None)
.expect("Failed to shutdown exporter");
.expect("Failed to shutdown runtime");
}
Loading
Loading