Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datadog-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ anyhow = { version = "1.0" }
arrayref = "0.3.7"
priority-queue = "2.1.1"
libdd-common = { path = "../libdd-common" }
libdd-capabilities-impl = { path = "../libdd-capabilities-impl" }
datadog-sidecar-macros = { path = "../datadog-sidecar-macros" }

libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] }
Expand Down
4 changes: 2 additions & 2 deletions datadog-sidecar/src/service/agent_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use datadog_ipc::platform::NamedShmHandle;
use futures::future::Shared;
use futures::FutureExt;
use http::uri::PathAndQuery;
use libdd_common::DefaultHttpClient;
use libdd_capabilities_impl::NativeCapabilities;
use libdd_common::{Endpoint, MutexExt};
use libdd_data_pipeline::agent_info::schema::AgentInfoStruct;
use libdd_data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
Expand Down Expand Up @@ -103,7 +103,7 @@ impl AgentInfoFetcher {
fetch_endpoint.url = http::Uri::from_parts(parts).unwrap();
loop {
let fetched =
fetch_info_with_state::<DefaultHttpClient>(&fetch_endpoint, state.as_deref())
fetch_info_with_state::<NativeCapabilities>(&fetch_endpoint, state.as_deref())
.await;
let mut complete_fut = None;
{
Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar/src/service/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use datadog_ipc::shm_stats::{
ShmSpanConcentrator, DEFAULT_SLOT_COUNT, DEFAULT_STRING_POOL_BYTES, RELOAD_FILL_RATIO,
};
use http::uri::PathAndQuery;
use libdd_capabilities_impl::{HttpClientTrait, NativeCapabilities};
use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities};
use libdd_common::{Endpoint, MutexExt};
use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata};
use std::collections::HashMap;
Expand Down
10 changes: 5 additions & 5 deletions datadog-sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use super::TraceSendData;
use crate::agent_remote_config::AgentRemoteConfigWriter;
use datadog_ipc::platform::NamedShmHandle;
use futures::future::join_all;
use libdd_common::capabilities::HttpClientTrait;
use libdd_common::{DefaultHttpClient, Endpoint, MutexExt};
use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities};
use libdd_common::{Endpoint, MutexExt};
use libdd_trace_utils::trace_utils;
use libdd_trace_utils::trace_utils::SendData;
use libdd_trace_utils::trace_utils::SendDataResult;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub(crate) struct TraceFlusher {
pub(crate) min_force_drop_size_bytes: AtomicU32, // put a limit on memory usage
remote_config: Mutex<AgentRemoteConfigs>,
pub metrics: Mutex<TraceFlusherMetrics>,
client: DefaultHttpClient,
capabilities: NativeCapabilities,
}
impl Default for TraceFlusher {
fn default() -> Self {
Expand All @@ -106,7 +106,7 @@ impl Default for TraceFlusher {
min_force_drop_size_bytes: AtomicU32::new(trace_utils::MAX_PAYLOAD_SIZE as u32),
remote_config: Mutex::new(Default::default()),
metrics: Mutex::new(Default::default()),
client: DefaultHttpClient::new_client(),
capabilities: NativeCapabilities::new_client(),
}
}
}
Expand Down Expand Up @@ -248,7 +248,7 @@ impl TraceFlusher {

async fn send_and_handle_trace(&self, send_data: SendData) {
let endpoint = send_data.get_target().clone();
let response = send_data.send(&self.client).await;
let response = send_data.send(&self.capabilities).await;
self.metrics.lock_or_panic().update(&response);
match response.last_result {
Ok(response) => {
Expand Down
1 change: 1 addition & 0 deletions libdd-capabilities-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bytes = "1"
http = "1"
libdd-capabilities = { path = "../libdd-capabilities", version = "0.1.0" }
libdd-common = { path = "../libdd-common", version = "3.0.2", default-features = false }
tokio = { version = "1", features = ["time", "rt"] }

[features]
default = ["https"]
Expand Down
6 changes: 4 additions & 2 deletions libdd-capabilities-impl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ Native implementations of `libdd-capabilities` traits.

## Capabilities

- **`DefaultHttpClient`**: HTTP client backed by hyper and the `libdd-common` connector infrastructure (supports Unix sockets, HTTPS with rustls, Windows named pipes).
- **`NativeHttpClient`**: HTTP client backed by hyper and the `libdd-common` connector infrastructure (supports Unix sockets, HTTPS with rustls, Windows named pipes).
- **`NativeSleepCapability`**: Sleep backed by `tokio::time::sleep`.
- **`NativeSpawnCapability`**: Task spawning backed by `tokio::runtime::Handle::spawn`.

## Types

- **`NativeCapabilities`**: Bundle type alias that implements all capability traits using native backends. Currently delegates to `DefaultHttpClient`; as more capability traits are added (spawn, sleep, etc.), this type will implement all of them.
- **`NativeCapabilities`**: Bundle struct that implements all capability traits using native backends. Delegates to `NativeHttpClient`, `NativeSleepCapability`, and `NativeSpawnCapability`.

## Usage

Expand Down
4 changes: 2 additions & 2 deletions libdd-capabilities-impl/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Re-exports `DefaultHttpClient` from `libdd-common`, where it lives alongside
//! Re-exports `NativeHttpClient` from `libdd-common`, where it lives alongside
//! the hyper infrastructure it wraps.
pub use libdd_common::DefaultHttpClient;
pub use libdd_common::NativeHttpClient;
66 changes: 55 additions & 11 deletions libdd-capabilities-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,59 @@
//! etc.). Leaf crates (FFI, benchmarks) pin this type as the generic parameter.

mod http;
pub mod sleep;
pub mod spawn;

use core::future::Future;
use std::time::Duration;

pub use http::DefaultHttpClient;
pub use http::NativeHttpClient;
use libdd_capabilities::http::HttpError;
pub use libdd_capabilities::HttpClientTrait;
pub use libdd_capabilities::HttpClientCapability;
use libdd_capabilities::MaybeSend;
pub use libdd_capabilities::SleepCapability;
pub use libdd_capabilities::SpawnCapability;
pub use sleep::NativeSleepCapability;
pub use spawn::{NativeJoinHandle, NativeSpawnCapability};

/// Bundle struct for native platform capabilities.
///
/// Delegates to [`DefaultHttpClient`] for HTTP. As more capability traits are
/// added (spawn, sleep, etc.), additional fields and impls are added here
/// without changing the type identity — consumers see the same
/// `NativeCapabilities` throughout.
/// Delegates to [`NativeHttpClient`] for HTTP, [`NativeSleepCapability`] for
/// sleep, and [`NativeSpawnCapability`] for task spawning.
///
/// Individual capability traits keep minimal per-function bounds (e.g.
/// functions that only need HTTP require just `H: HttpClientTrait`, not the
/// functions that only need HTTP require just `H: HttpClientCapability`, not the
/// full bundle) so that native callers like the sidecar can use
/// `DefaultHttpClient` directly without pulling in this bundle.
/// `NativeHttpClient` directly without pulling in this bundle.
#[derive(Clone, Debug)]
pub struct NativeCapabilities {
http: DefaultHttpClient,
http: NativeHttpClient,
sleep: NativeSleepCapability,
spawn: NativeSpawnCapability,
}

impl HttpClientTrait for NativeCapabilities {
impl Default for NativeCapabilities {
fn default() -> Self {
Self::new()
}
}

impl NativeCapabilities {
pub fn new() -> Self {
Self {
http: NativeHttpClient::new_client(),
sleep: NativeSleepCapability,
spawn: NativeSpawnCapability,
}
}
}

impl HttpClientCapability for NativeCapabilities {
fn new_client() -> Self {
Self {
http: DefaultHttpClient::new_client(),
http: NativeHttpClient::new_client(),
sleep: NativeSleepCapability,
spawn: NativeSpawnCapability,
}
}

Expand All @@ -46,3 +71,22 @@ impl HttpClientTrait for NativeCapabilities {
self.http.request(req)
}
}

impl SleepCapability for NativeCapabilities {
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend {
self.sleep.sleep(duration)
}
}

impl SpawnCapability for NativeCapabilities {
type RuntimeContext = tokio::runtime::Handle;
type JoinHandle<T: MaybeSend + 'static> = NativeJoinHandle<T>;

fn spawn<F, T>(&self, future: F, ctx: &tokio::runtime::Handle) -> NativeJoinHandle<T>
where
F: Future<Output = T> + MaybeSend + 'static,
T: MaybeSend + 'static,
{
self.spawn.spawn(future, ctx)
}
}
19 changes: 19 additions & 0 deletions libdd-capabilities-impl/src/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Native sleep implementation backed by `tokio::time::sleep`.

use core::future::Future;
use std::time::Duration;

use libdd_capabilities::maybe_send::MaybeSend;
use libdd_capabilities::sleep::SleepCapability;

#[derive(Clone, Debug)]
pub struct NativeSleepCapability;

impl SleepCapability for NativeSleepCapability {
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend {
tokio::time::sleep(duration)
}
}
48 changes: 48 additions & 0 deletions libdd-capabilities-impl/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Native spawn implementation backed by `tokio::runtime::Handle::spawn`.

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

use libdd_capabilities::maybe_send::MaybeSend;
use libdd_capabilities::spawn::SpawnCapability;
use tokio::task::JoinHandle;

#[derive(Clone, Debug)]
pub struct NativeSpawnCapability;

impl SpawnCapability for NativeSpawnCapability {
type RuntimeContext = tokio::runtime::Handle;
type JoinHandle<T: MaybeSend + 'static> = NativeJoinHandle<T>;

fn spawn<F, T>(&self, future: F, ctx: &tokio::runtime::Handle) -> NativeJoinHandle<T>
where
F: Future<Output = T> + MaybeSend + 'static,
T: MaybeSend + 'static,
{
NativeJoinHandle(ctx.spawn(future))
}
}

/// Newtype wrapping `tokio::task::JoinHandle<T>` that normalises the output to
/// `T` instead of `Result<T, JoinError>`.
///
/// A `JoinError` means the spawned task panicked or was aborted. Workers use
/// `CancellationToken` for graceful shutdown, so `JoinError` indicates a bug.
pub struct NativeJoinHandle<T>(JoinHandle<T>);

impl<T> Future for NativeJoinHandle<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
// JoinHandle<T>: Unpin, so Pin::new is safe.
match Pin::new(&mut self.get_mut().0).poll(cx) {
Poll::Ready(Ok(val)) => Poll::Ready(val),
Poll::Ready(Err(e)) => panic!("spawned task failed: {e}"),
Poll::Pending => Poll::Pending,
}
}
}
8 changes: 4 additions & 4 deletions libdd-capabilities/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ This crate has **zero platform dependencies**: it compiles on any target includi

## Traits

- **`HttpClientTrait`**: Async HTTP request/response using `http::Request<Bytes>` / `http::Response<Bytes>`.
- **`HttpClientCapability`**: Async HTTP request/response using `http::Request<Bytes>` / `http::Response<Bytes>`.
- **`MaybeSend`**: Conditional `Send` bound: equivalent to `Send` on native, auto-implemented for all types on wasm. This bridges the gap between tokio's multi-threaded runtime (requires `Send` futures) and wasm's single-threaded model (where JS interop types are `!Send`).

## Architecture

Three-layer design:

1. **Trait definitions** (this crate): Pure traits, no platform deps.
2. **Core crates** (`libdd-trace-utils`, `libdd-data-pipeline`): Generic over `C: HttpClientTrait`. Depend only on this crate for trait bounds.
2. **Core crates** (`libdd-trace-utils`, `libdd-data-pipeline`): Generic over `C: HttpClientCapability`. Depend only on this crate for trait bounds.
3. **Leaf crates** (FFI, wasm bindings): Pin a concrete type, `NativeCapabilities` from `libdd-capabilities-impl` on native, `WasmCapabilities` from the Node.js binding crate on wasm.

## Usage

```rust
use libdd_capabilities::{HttpClientTrait, MaybeSend};
use libdd_capabilities::{HttpClientCapability, MaybeSend};

async fn fetch<C: HttpClientTrait>(client: &C, req: http::Request<bytes::Bytes>) {
async fn fetch<C: HttpClientCapability>(client: &C, req: http::Request<bytes::Bytes>) {
let response = client.request(req).await.unwrap();
println!("status: {}", response.status());
}
Expand Down
2 changes: 1 addition & 1 deletion libdd-capabilities/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub enum HttpError {
Other(anyhow::Error),
}

pub trait HttpClientTrait: Clone + std::fmt::Debug {
pub trait HttpClientCapability: Clone + std::fmt::Debug {
fn new_client() -> Self;

fn request(
Expand Down
6 changes: 5 additions & 1 deletion libdd-capabilities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

pub mod http;
pub mod maybe_send;
pub mod sleep;
pub mod spawn;

pub use self::http::{HttpClientTrait, HttpError};
pub use self::http::{HttpClientCapability, HttpError};
pub use self::sleep::SleepCapability;
pub use self::spawn::SpawnCapability;
pub use ::http::{Request, Response};
pub use bytes::Bytes;
pub use maybe_send::MaybeSend;
15 changes: 15 additions & 0 deletions libdd-capabilities/src/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Sleep capability trait.
//!
//! Abstracts async sleep so that native code can use `tokio::time::sleep`
//! while wasm delegates to `setTimeout` via `JsFuture`.

use crate::maybe_send::MaybeSend;
use core::future::Future;
use std::time::Duration;

pub trait SleepCapability: Clone + std::fmt::Debug {
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend;
}
26 changes: 26 additions & 0 deletions libdd-capabilities/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Spawn capability trait.
//!
//! Abstracts task spawning so that native code can use `tokio::spawn`
//! while wasm delegates to `wasm_bindgen_futures::spawn_local` with a
//! `RemoteHandle` for join/cancel semantics.

use crate::maybe_send::MaybeSend;
use core::future::Future;

pub trait SpawnCapability: Clone + std::fmt::Debug {
/// Platform-specific context passed to [`spawn`](Self::spawn).
///
/// On native this is typically `tokio::runtime::Handle` — the spawner uses
/// it to schedule the future on the correct runtime. On wasm this is `()`
/// because `spawn_local` does not need an external handle.
type RuntimeContext;
type JoinHandle<T: MaybeSend + 'static>: Future<Output = T> + MaybeSend;

fn spawn<F, T>(&self, future: F, ctx: &Self::RuntimeContext) -> Self::JoinHandle<T>
where
F: Future<Output = T> + MaybeSend + 'static,
T: MaybeSend + 'static;
}
2 changes: 1 addition & 1 deletion libdd-common/src/capabilities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

//! Re-exports of capability trait types for convenience.
pub use libdd_capabilities::{HttpClientTrait, HttpError};
pub use libdd_capabilities::{HttpClientCapability, HttpError};
Loading
Loading