diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index fd470b674d..6dac247b7d 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -61,7 +61,7 @@ jobs: export AWS_LC_FIPS_SYS_NO_ASM=1 fi # shellcheck disable=SC2046 - cargo clippy --workspace --all-targets --all-features -- -D warnings + cargo clippy --workspace --all-targets --all-features -- -D warnings $([ ${{ matrix.rust_version }} = 1.84.1 ] || echo -Aclippy::manual_is_multiple_of) licensecheck: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 2e34470802..114b59c4f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,9 +330,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.83" +version = "0.1.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", @@ -1395,7 +1395,10 @@ dependencies = [ "io-lifetimes", "libc", "libdd-common", + "libdd-ddsketch", "libdd-tinybytes", + "libdd-trace-protobuf", + "libdd-trace-stats", "memfd", "nix 0.29.0", "page_size", @@ -1409,6 +1412,7 @@ dependencies = [ "tracing-subscriber", "winapi 0.3.9", "windows-sys 0.48.0", + "zwohash", ] [[package]] @@ -1521,6 +1525,7 @@ dependencies = [ "http-body-util", "httpmock", "libc", + "libdd-capabilities-impl", "libdd-common", "libdd-common-ffi", "libdd-crashtracker", @@ -1529,6 +1534,8 @@ dependencies = [ "libdd-dogstatsd-client", "libdd-telemetry", "libdd-tinybytes", + "libdd-trace-protobuf", + "libdd-trace-stats", "libdd-trace-utils", "manual_future", "memory-stats", @@ -1537,6 +1544,7 @@ dependencies = [ "prctl", "priority-queue", "rand 0.8.5", + "rmp-serde", "sendfd", "serde", "serde_json", @@ -1544,6 +1552,7 @@ dependencies = [ "sha2", "simd-json", "spawn_worker", + "sys-info", "tempfile", "tokio", "tokio-util", @@ -2980,7 +2989,6 @@ dependencies = [ "libdd-capabilities", "libdd-capabilities-impl", "libdd-common", - "libdd-ddsketch", "libdd-dogstatsd-client", "libdd-log", "libdd-shared-runtime", @@ -3342,12 +3350,25 @@ dependencies = [ name = "libdd-trace-stats" version = "2.0.0" dependencies = [ + "anyhow", + "async-trait", "criterion", "hashbrown 0.15.1", + "http", + "httpmock", + "libdd-capabilities", + "libdd-capabilities-impl", + "libdd-common", "libdd-ddsketch", + "libdd-shared-runtime", "libdd-trace-protobuf", "libdd-trace-utils", "rand 0.8.5", + "rmp-serde", + "serde", + "tokio", + "tokio-util", + "tracing", ] [[package]] diff --git a/datadog-ipc/Cargo.toml b/datadog-ipc/Cargo.toml index 5cb6383c17..d528f146fb 100644 --- a/datadog-ipc/Cargo.toml +++ b/datadog-ipc/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dependencies] anyhow = { version = "1.0" } +zwohash = "0.1.2" bincode = { version = "1" } futures = { version = "0.3", default-features = false } io-lifetimes = { version = "1.0" } @@ -19,6 +20,9 @@ libdd-tinybytes = { path = "../libdd-tinybytes", optional = true } libdd-common = { path = "../libdd-common" } +libdd-ddsketch = { path = "../libdd-ddsketch" } +libdd-trace-protobuf = { path = "../libdd-trace-protobuf" } +libdd-trace-stats = { path = "../libdd-trace-stats" } datadog-ipc-macros = { path = "../datadog-ipc-macros" } tracing = { version = "0.1", default-features = false } diff --git a/datadog-ipc/src/atomic_option.rs b/datadog-ipc/src/atomic_option.rs new file mode 100644 index 0000000000..0afdfe1732 --- /dev/null +++ b/datadog-ipc/src/atomic_option.rs @@ -0,0 +1,123 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Lock-free `Option` with atomic take, valid for any `T` where +//! `size_of::>() <= 8`. + +use std::cell::UnsafeCell; +use std::mem::{self, MaybeUninit}; +use std::ptr; +use std::sync::atomic::{AtomicU16, AtomicU32, AtomicU64, AtomicU8, Ordering}; + +/// An `Option` that supports lock-free atomic take. +/// +/// # Constraints +/// `size_of::>()` must be `<= 8`. Enforced by a `debug_assert` in +/// `From>`). This holds for niche-optimised types (`NonNull`, +/// `Box`, …) and for any `Option` that fits in a single machine word. +/// +/// # Storage +/// The option is stored in a `UnsafeCell>`, giving it exactly the size +/// and alignment of `Option` itself. `take()` picks the narrowest atomic that +/// covers `size_of::>()` bytes — `AtomicU8` for 1-byte options up to +/// `AtomicU64` for 5–8 byte options. The atomic cast is valid because +/// `align_of::() == align_of::() <= align_of::>()`. +/// +/// # None sentinel +/// The "none" bit-pattern is computed by value (`Option::::None`) rather than +/// assumed to be zero, so the implementation is correct for both niche-optimised +/// types and discriminant-based options. +/// +/// `UnsafeCell` provides the interior-mutability aliasing permission required by +/// Rust's memory model when mutating through a shared reference. +pub struct AtomicOption(UnsafeCell>); + +impl AtomicOption { + /// Encode `val` as a `u64`, transferring ownership into the bit representation. + const fn encode(val: Option) -> u64 { + let mut bits = 0u64; + unsafe { + ptr::copy_nonoverlapping( + ptr::from_ref(&val).cast::(), + ptr::from_mut(&mut bits).cast::(), + size_of::>(), + ); + mem::forget(val); + } + bits + } + + /// Atomically swap the storage with `new_bits`, returning the old bits. + #[inline] + fn atomic_swap(&self, new_bits: u64) -> u64 { + unsafe { + let ptr = self.0.get(); + match size_of::>() { + 1 => (*(ptr as *const AtomicU8)).swap(new_bits as u8, Ordering::AcqRel) as u64, + 2 => (*(ptr as *const AtomicU16)).swap(new_bits as u16, Ordering::AcqRel) as u64, + 3 | 4 => { + (*(ptr as *const AtomicU32)).swap(new_bits as u32, Ordering::AcqRel) as u64 + } + _ => (*(ptr as *const AtomicU64)).swap(new_bits, Ordering::AcqRel), + } + } + } + + /// Reconstruct an `Option` from its `u64` bit representation. + /// + /// # Safety + /// `bits` must hold a valid `Option` bit-pattern in its low + /// `size_of::>()` bytes, as produced by a previous `encode`. + const unsafe fn decode(bits: u64) -> Option { + let mut result = MaybeUninit::>::uninit(); + ptr::copy_nonoverlapping( + ptr::from_ref(&bits).cast::(), + result.as_mut_ptr().cast::(), + size_of::>(), + ); + result.assume_init() + } + + /// Atomically replace the stored value with `None` and return what was there. + /// Returns `None` if the value was already taken. + pub fn take(&self) -> Option { + let old = self.atomic_swap(Self::encode(None)); + // SAFETY: `old` holds a valid `Option` bit-pattern. + unsafe { Self::decode(old) } + } + + /// Atomically store `val`, dropping any previous value. + pub fn set(&self, val: Option) -> Option { + let old = self.atomic_swap(Self::encode(val)); + unsafe { Self::decode(old) } + } + + /// Atomically store `Some(val)`, returning the previous value. + pub fn replace(&self, val: T) -> Option { + self.set(Some(val)) + } + + /// Borrow the current value without taking it. + /// + /// # Safety + /// Must not be called concurrently with [`take`], [`set`], or [`replace`]. + pub unsafe fn as_option(&self) -> &Option { + &*self.0.get() + } +} + +impl From> for AtomicOption { + fn from(val: Option) -> Self { + // we may raise this to 16 once AtomicU128 becomes stable + debug_assert!( + size_of::>() <= size_of::(), + "AtomicOption requires size_of::>() <= 8, got {}", + size_of::>() + ); + Self(UnsafeCell::new(val)) + } +} + +// `AtomicOption` is `Send`/`Sync` when `T: Send` — same contract as `Mutex>`. +unsafe impl Send for AtomicOption {} +unsafe impl Sync for AtomicOption {} diff --git a/datadog-ipc/src/lib.rs b/datadog-ipc/src/lib.rs index 144fcb5442..2419e16729 100644 --- a/datadog-ipc/src/lib.rs +++ b/datadog-ipc/src/lib.rs @@ -12,9 +12,12 @@ pub mod handles; pub mod platform; pub mod rate_limiter; +pub mod shm_stats; +mod atomic_option; pub mod client; pub mod codec; +pub use atomic_option::AtomicOption; pub use client::IpcClientConn; #[cfg(target_os = "linux")] diff --git a/datadog-ipc/src/platform/mem_handle.rs b/datadog-ipc/src/platform/mem_handle.rs index f20dc91672..2539d8dc4a 100644 --- a/datadog-ipc/src/platform/mem_handle.rs +++ b/datadog-ipc/src/platform/mem_handle.rs @@ -3,9 +3,12 @@ use crate::handles::{HandlesTransport, TransferHandles}; use crate::platform::{mmap_handle, munmap_handle, OwnedFileHandle, PlatformHandle}; +use crate::AtomicOption; #[cfg(feature = "tiny-bytes")] use libdd_tinybytes::UnderlyingBytes; use serde::{Deserialize, Serialize}; +#[cfg(target_os = "linux")] +use std::os::fd::AsRawFd; use std::{ffi::CString, io, ptr::NonNull}; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -37,15 +40,16 @@ pub(crate) struct ShmPath { pub struct NamedShmHandle { pub(crate) inner: ShmHandle, - pub(crate) path: Option, + pub(crate) path: AtomicOption>, } impl NamedShmHandle { - pub fn get_path(&self) -> &[u8] { - if let Some(ref shm_path) = &self.path { - shm_path.name.as_bytes() - } else { - b"" + /// # Safety + /// Must not be called concurrently with `unlink()`. + pub unsafe fn get_path(&self) -> &[u8] { + match self.path.as_option() { + Some(shm_path) => shm_path.name.to_bytes(), + None => b"", } } } @@ -87,10 +91,19 @@ where unsafe { self.set_mapping_size(size)?; } - nix::unistd::ftruncate( - self.get_shm().handle.as_owned_fd()?, - self.get_shm().size as libc::off_t, + let new_size = self.get_shm().size as libc::off_t; + let fd = self.get_shm().handle.as_owned_fd()?; + // Use fallocate on Linux to eagerly commit the new pages: ENOSPC at resize time is + // recoverable; a later SIGBUS mid-execution is not. + #[cfg(target_os = "linux")] + nix::fcntl::fallocate( + fd.as_raw_fd(), + nix::fcntl::FallocateFlags::empty(), + 0, + new_size, )?; + #[cfg(not(target_os = "linux"))] + nix::unistd::ftruncate(&fd, new_size)?; Ok(()) } /// # Safety @@ -131,6 +144,16 @@ impl FileBackedHandle for NamedShmHandle { } } +impl MappedMem { + /// Unlink the backing SHM file from the filesystem so new openers get `ENOENT`. + /// Existing mappings remain valid. On Windows the mapping is managed by the OS + /// via handle reference counts and there is no filesystem entry to remove. + #[cfg(unix)] + pub fn unlink(&self) { + self.mem.unlink(); + } +} + impl MappedMem { pub fn as_slice(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast(), self.mem.get_size()) } @@ -152,7 +175,9 @@ impl AsRef<[u8]> for MappedMem { } impl MappedMem { - pub fn get_path(&self) -> &[u8] { + /// # Safety + /// Must not be called concurrently with `unlink()`. + pub unsafe fn get_path(&self) -> &[u8] { self.mem.get_path() } } @@ -167,9 +192,10 @@ impl From> for ShmHandle { } impl From> for NamedShmHandle { - fn from(mut handle: MappedMem) -> NamedShmHandle { + fn from(handle: MappedMem) -> NamedShmHandle { + let path = handle.mem.path.take().into(); NamedShmHandle { - path: handle.mem.path.take(), + path, inner: handle.into(), } } diff --git a/datadog-ipc/src/platform/unix/mem_handle.rs b/datadog-ipc/src/platform/unix/mem_handle.rs index cb94fb058a..abea549d98 100644 --- a/datadog-ipc/src/platform/unix/mem_handle.rs +++ b/datadog-ipc/src/platform/unix/mem_handle.rs @@ -7,6 +7,8 @@ use crate::platform::{ use io_lifetimes::OwnedFd; use libc::{chmod, off_t}; use nix::errno::Errno; +#[cfg(target_os = "linux")] +use nix::fcntl::{fallocate, FallocateFlags}; use nix::fcntl::{open, OFlag}; use nix::sys::mman::{self, mmap, munmap, MapFlags, ProtFlags}; use nix::sys::stat::Mode; @@ -163,6 +165,11 @@ impl NamedShmHandle { pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result { let fd = shm_open(path.as_bytes(), OFlag::O_CREAT | OFlag::O_RDWR, mode)?; + // Use fallocate on Linux to eagerly commit pages: if /dev/shm is full we get ENOSPC + // here (recoverable) rather than SIGBUS mid-execution when a worker writes a slot. + #[cfg(target_os = "linux")] + fallocate(fd.as_raw_fd(), FallocateFlags::empty(), 0, size as off_t)?; + #[cfg(not(target_os = "linux"))] ftruncate(&fd, size as off_t)?; if let Some(uid) = shm_owner_uid() { let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None); @@ -176,13 +183,18 @@ impl NamedShmHandle { Self::new(file.into(), None, size) } + /// Unlink the SHM file from the filesystem without unmapping it. + pub fn unlink(&self) { + let _ = self.path.take(); // Drop of Box calls shm_unlink exactly once + } + fn new(fd: OwnedFd, path: Option, size: usize) -> io::Result { Ok(NamedShmHandle { inner: ShmHandle { handle: fd.into(), size, }, - path: path.map(|path| ShmPath { name: path }), + path: path.map(|path| Box::new(ShmPath { name: path })).into(), }) } } diff --git a/datadog-ipc/src/platform/unix/mem_handle_macos.rs b/datadog-ipc/src/platform/unix/mem_handle_macos.rs index ad1a9c1963..8da792e7dd 100644 --- a/datadog-ipc/src/platform/unix/mem_handle_macos.rs +++ b/datadog-ipc/src/platform/unix/mem_handle_macos.rs @@ -17,7 +17,7 @@ use std::os::fd::{AsFd, OwnedFd}; use std::os::unix::io::AsRawFd; use std::sync::atomic::{AtomicI32, AtomicU32, AtomicUsize, Ordering}; -const MAPPING_MAX_SIZE: usize = 1 << 17; // 128 MiB ought to be enough for everybody? +const MAPPING_MAX_SIZE: usize = 1 << 27; // 128 MiB ought to be enough for everybody? const NOT_COMMITTED: usize = 1 << (usize::BITS - 1); pub(crate) fn mmap_handle(mut handle: T) -> io::Result> { @@ -141,13 +141,18 @@ impl NamedShmHandle { Self::new(fd, None, 0) } + /// Unlink the SHM name from the filesystem without unmapping existing mappings. + pub fn unlink(&self) { + let _ = self.path.take(); // Drop of Box calls shm_unlink exactly once + } + fn new(fd: OwnedFd, path: Option, size: usize) -> io::Result { Ok(NamedShmHandle { inner: ShmHandle { handle: fd.into(), size: size | NOT_COMMITTED, }, - path: path.map(|path| ShmPath { name: path }), + path: path.map(|path| Box::new(ShmPath { name: path })).into(), }) } } @@ -198,6 +203,6 @@ impl>> MappedMem { impl Drop for ShmPath { fn drop(&mut self) { - _ = shm_unlink(path_slice(&self.name)); + _ = shm_unlink(path_slice(self.name.as_c_str())); } } diff --git a/datadog-ipc/src/platform/windows/mem_handle.rs b/datadog-ipc/src/platform/windows/mem_handle.rs index 9832a6c591..73ec538621 100644 --- a/datadog-ipc/src/platform/windows/mem_handle.rs +++ b/datadog-ipc/src/platform/windows/mem_handle.rs @@ -151,7 +151,7 @@ impl NamedShmHandle { handle: unsafe { PlatformHandle::from_raw_handle(handle) }, size, }, - path: Some(ShmPath { name }), + path: Some(ShmPath { name }).map(Box::new).into(), }) } } diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs new file mode 100644 index 0000000000..88d892b518 --- /dev/null +++ b/datadog-ipc/src/shm_stats.rs @@ -0,0 +1,983 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Lock-free shared-memory span stats concentrator. +//! +//! All PHP worker processes open the same SHM file and call +//! [`ShmSpanConcentrator::add_span`]. The sidecar is the creator +//! ([`ShmSpanConcentrator::create`]) and periodically calls +//! [`ShmSpanConcentrator::flush`] to drain the inactive bucket. +//! +//! ## SHM layout +//! ```text +//! [0 .. PAGE_SIZE) ShmHeader (one page) +//! [PAGE_SIZE .. PAGE_SIZE+R) bucket 0 region +//! [PAGE_SIZE+R .. PAGE_SIZE+2R) bucket 1 region +//! +//! Each bucket region (size R): +//! [0 .. HDR_SIZE) ShmBucketHeader +//! [HDR_SIZE .. HDR_SIZE+S*E) ShmEntry array (S = slot_count, E = entry_size) +//! [HDR_SIZE+S*E .. R) string pool (bump-allocated by writers) +//! ``` +//! +//! ## Slot lifecycle +//! ```text +//! key_hash == SLOT_EMPTY (0) → slot is free +//! key_hash == SLOT_INIT (u64::MAX) → slot is being claimed/written +//! key_hash == H (any other) → slot is ready, key hash is H +//! ``` +//! +//! A writer CAS(0→MAX) to claim, writes key + strings (no concurrent readers +//! yet), issues a **Release** fence, then stores `key_hash = H` (Release). +//! +//! ## Double-buffering +//! `ShmHeader::active_idx` tells which bucket PHP workers write to. +//! The sidecar swaps it, waits for `in_flight` to reach 0, then reads + clears. +//! +//! ## Table growth +//! When the active bucket is nearly full the sidecar: +//! 1. Creates a new SHM file at the *same path* (the old file is unlinked from the filesystem but +//! remains accessible to processes that already have it open). +//! 2. Sets `ShmHeader::ready = 0` on the **old** mapping so workers know to re-open the path on +//! their next `add_span` call. +//! 3. Holds onto the old concentrator for ≥ 1 s, flushing it periodically, to absorb any spans that +//! arrived before workers noticed the reload flag. +//! 4. Drops the old concentrator after that grace period. + +use std::cell::UnsafeCell; +use std::ffi::{CStr, CString}; +use std::hash::{Hash, Hasher}; +use std::hint; +use std::io; +use std::sync::atomic::{fence, AtomicI64, AtomicU32, AtomicU64, AtomicU8, Ordering::*}; +use std::sync::Arc; +use std::thread; +use zwohash::ZwoHasher; + +use libdd_ddsketch::DDSketch; +use libdd_trace_protobuf::pb; +use libdd_trace_stats::span_concentrator::{FixedAggregationKey, FlushableConcentrator}; + +use crate::platform::{FileBackedHandle, MappedMem, NamedShmHandle}; + +const SHM_VERSION: u32 = 1; + +/// Maximum peer-tag (key, value) pairs per aggregation slot. +pub const MAX_PEER_TAGS: usize = 16; + +/// Number of histogram bins (ok + error each) per aggregation group. +pub const N_BINS: usize = 256; + +/// Upper bound of the highest histogram bin (100 s in nanoseconds). +const MAX_DURATION_NS: u64 = 100_000_000_000; + +const SLOT_EMPTY: u64 = 0; +const SLOT_INIT: u64 = u64::MAX; + +/// Default aggregation slots per bucket. +pub const DEFAULT_SLOT_COUNT: usize = 256; +/// Default per-bucket string pool size. +pub const DEFAULT_STRING_POOL_BYTES: usize = 512 * 1024; + +/// The sidecar should recreate the SHM when slot utilisation exceeds this ratio. +pub const RELOAD_FILL_RATIO: f64 = 0.80; + +/// Max iterations when waiting for `in_flight` to reach zero (~100 µs). +const MAX_FLUSH_WAIT_ITERS: u32 = 100_000; +/// Spin iterations before yielding to the OS scheduler. +const YIELD_AFTER_SPINS: u32 = 8; + +fn bin_for_duration(nanos: i64) -> usize { + if nanos <= 0 { + return 0; + } + let d = nanos as u64; + if d >= MAX_DURATION_NS { + return N_BINS - 1; + } + let scale = (MAX_DURATION_NS as f64).ln() / (N_BINS as f64 - 2.0); + let b = 1.0 + (d as f64).ln() / scale; + (b as usize).clamp(1, N_BINS - 2) +} + +fn bin_representative(bin: usize) -> f64 { + if bin == 0 { + return 0.0; + } + let scale = (MAX_DURATION_NS as f64).ln() / (N_BINS as f64 - 2.0); + ((bin as f64 - 0.5) * scale).exp() +} + +/// Byte range inside a bump-allocated string pool (offset relative to pool start). +/// +/// Used by [`FixedAggregationKey`] when the key is stored in shared memory. +/// Both `offset == 0 && len == 0` and `offset != 0` are valid; a zero-length slice +/// represents an absent / empty string. +#[repr(C)] +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug, PartialOrd, Ord)] +pub struct StringRef { + pub offset: u32, + pub len: u32, +} + +/// Aggregation key – position-independent, no raw pointers. +/// +/// The fixed string fields and scalar fields are grouped in `FixedAggregationKey`. +/// Peer-tag arrays follow separately because they are variable-count and not part of the +/// generic key struct. +#[repr(C)] +struct ShmKeyHeader { + fixed: FixedAggregationKey, + peer_tag_keys: [StringRef; MAX_PEER_TAGS], + peer_tag_values: [StringRef; MAX_PEER_TAGS], + peer_tag_count: u8, +} + +/// Per-group stats. `AtomicU64` is `#[repr(transparent)]` over `u64`, so the +/// layout is identical to plain integers and zero-initialised mmap memory is +/// valid for `AtomicU64::new(0)`. +#[repr(C, align(8))] +struct ShmStats { + /// Total number of spans in this group. + hits: AtomicU64, + /// Number of error spans in this group. + errors: AtomicU64, + /// Sum of all span durations (nanoseconds). + duration_sum: AtomicU64, + /// Number of top-level spans (service-entry or measured). + top_level_hits: AtomicU64, + /// Histogram bins for non-error span durations. + ok_bins: [AtomicU64; N_BINS], + /// Histogram bins for error span durations. + error_bins: [AtomicU64; N_BINS], +} + +/// One slot in the hash table. +#[repr(C)] +struct ShmEntry { + key_hash: AtomicU64, + key: UnsafeCell, + stats: ShmStats, +} + +// SAFETY: ShmEntry lives entirely in shared-memory; all mutations go through +// the atomic protocol described in the module doc. +unsafe impl Sync for ShmEntry {} + +/// Per-bucket control header. +#[repr(C)] +struct ShmBucketHeader { + start_nanos: AtomicU64, + in_flight: AtomicI64, + string_cursor: AtomicU32, +} + +/// Global SHM header (first page of the mapping). +#[repr(C)] +struct ShmHeader { + /// Layout version; checked by [`ShmSpanConcentrator::open`]. Mismatch returns an error. + version: u32, + /// Set to 1 by the sidecar when workers should re-open the SHM at the + /// same path (a new, larger mapping has been created there). + ready: AtomicU8, + /// Index (0 or 1) of the bucket currently being written to by PHP workers. + active_idx: AtomicU8, + /// Width of each time bucket in nanoseconds (e.g. 10 s = 10_000_000_000). + bucket_size_nanos: u64, + /// Number of aggregation slots per bucket (hash-table capacity). + slot_count: u32, + /// Byte size of one full bucket region (header + slots + string pool), page-aligned. + bucket_region_size: u32, + /// Byte capacity of the per-bucket string pool. + string_pool_size: u32, + /// Monotonic counter incremented on every successful flush, used as the stats sequence number. + flush_seq: AtomicU64, +} + +fn bucket_hdr_size() -> usize { + // Align to 8 bytes (AtomicU64 alignment). + let s = size_of::(); + (s + 7) & !7 +} + +fn pool_start_within_bucket(slot_count: u32) -> usize { + bucket_hdr_size() + (slot_count as usize) * size_of::() +} + +fn aligned_bucket_region(slot_count: u32, string_pool_size: u32) -> usize { + let raw = pool_start_within_bucket(slot_count) + string_pool_size as usize; + let page = page_size::get(); + raw.div_ceil(page) * page +} + +fn total_shm_size(slot_count: u32, string_pool_size: u32) -> usize { + page_size::get() + 2 * aligned_bucket_region(slot_count, string_pool_size) +} + +fn bucket_start(bucket_idx: u8, bucket_region_size: u32) -> usize { + page_size::get() + bucket_idx as usize * bucket_region_size as usize +} + +unsafe fn shm_header(base: *const u8) -> &'static ShmHeader { + &*(base as *const ShmHeader) +} + +unsafe fn bucket_header(base: *const u8, bkt_start: usize) -> &'static ShmBucketHeader { + &*(base.add(bkt_start) as *const ShmBucketHeader) +} + +unsafe fn entry_ref(base: *const u8, bkt_start: usize, slot: usize) -> &'static ShmEntry { + let p = base.add(bkt_start + bucket_hdr_size() + slot * size_of::()); + &*(p as *const ShmEntry) +} + +unsafe fn pool_base(base: *const u8, bkt_start: usize, slot_count: u32) -> *const u8 { + base.add(bkt_start + pool_start_within_bucket(slot_count)) +} + +unsafe fn sref_str<'a>(pool: *const u8, sr: StringRef) -> &'a str { + if sr.len == 0 { + return ""; + } + std::str::from_utf8_unchecked(std::slice::from_raw_parts( + pool.add(sr.offset as usize), + sr.len as usize, + )) +} + +fn hash_key(input: &ShmSpanInput<'_>) -> u64 { + let mut h = ZwoHasher::default(); + input.fixed.hash(&mut h); + for &(k, v) in input.peer_tags { + k.hash(&mut h); + v.hash(&mut h); + } + match h.finish() { + SLOT_EMPTY => 1, + SLOT_INIT => SLOT_INIT - 1, + v => v, + } +} + +unsafe fn key_matches(entry: &ShmEntry, input: &ShmSpanInput<'_>, pool: *const u8) -> bool { + let k = &*entry.key.get(); + k.fixed.convert(|sr| unsafe { sref_str(pool, *sr) }) == input.fixed + && (k.peer_tag_count as usize) == input.peer_tags.len() + && input.peer_tags.iter().enumerate().all(|(i, &(ik, iv))| { + sref_str(pool, k.peer_tag_keys[i]) == ik && sref_str(pool, k.peer_tag_values[i]) == iv + }) +} + +unsafe fn alloc_str(pool: *mut u8, cursor: &AtomicU32, pool_size: u32, s: &str) -> StringRef { + let len = s.len() as u32; + if len == 0 { + return StringRef::default(); + } + let mut spins = 0u32; + loop { + let old = cursor.load(Relaxed); + let new = old.saturating_add(len); + if new > pool_size { + return StringRef::default(); + } + if cursor + .compare_exchange_weak(old, new, Relaxed, Relaxed) + .is_ok() + { + std::ptr::copy_nonoverlapping(s.as_ptr(), pool.add(old as usize), len as usize); + return StringRef { offset: old, len }; + } + spins += 1; + if spins % YIELD_AFTER_SPINS == 0 { + thread::yield_now(); + } else { + hint::spin_loop(); + } + } +} + +/// Pre-extracted span stats for one span, ready to be fed into [`ShmSpanConcentrator::add_span`]. +pub struct ShmSpanInput<'a> { + /// Aggregation key fields (everything except peer tags). + pub fixed: FixedAggregationKey<&'a str>, + /// (key, value) peer-tag pairs (capped at `MAX_PEER_TAGS` by the caller). + pub peer_tags: &'a [(&'a str, &'a str)], + // stats + pub duration_ns: i64, + pub is_error: bool, + pub is_top_level: bool, +} + +/// Owned (serializable) version of [`ShmSpanInput`]. +/// +/// Used as the IPC fallback payload when the PHP side cannot open the SHM concentrator yet +/// (e.g. on the very first request, before the sidecar has processed +/// `set_universal_service_tags` and created the SHM file). The sidecar handler receives +/// this struct, writes to the now-existing SHM concentrator, and the span is counted. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct OwnedShmSpanInput { + pub fixed: FixedAggregationKey, + pub peer_tags: Vec<(String, String)>, + pub duration_ns: i64, + pub is_error: bool, + pub is_top_level: bool, +} + +impl OwnedShmSpanInput { + /// Borrow as a [`ShmSpanInput`] for passing to [`ShmSpanConcentrator::add_span`]. + /// + /// `peer_tag_buf` is a caller-supplied scratch buffer; it must outlive the returned value. + pub fn as_shm_input<'a>( + &'a self, + peer_tag_buf: &'a mut Vec<(&'a str, &'a str)>, + ) -> ShmSpanInput<'a> { + peer_tag_buf.clear(); + for (k, v) in &self.peer_tags { + peer_tag_buf.push((k.as_str(), v.as_str())); + } + ShmSpanInput { + fixed: self.fixed.convert(|s: &str| s), + peer_tags: peer_tag_buf.as_slice(), + duration_ns: self.duration_ns, + is_error: self.is_error, + is_top_level: self.is_top_level, + } + } +} + +/// Shared-memory span stats concentrator. +/// +/// Created once by the sidecar; opened (read-write) by each PHP worker. +#[derive(Clone)] +pub struct ShmSpanConcentrator { + mem: Arc>, +} + +unsafe impl Send for ShmSpanConcentrator {} +unsafe impl Sync for ShmSpanConcentrator {} + +impl ShmSpanConcentrator { + /// Create a new SHM concentrator (sidecar side). + /// + /// Unlinks any pre-existing SHM file at `path` before creating the new one. + pub fn create( + path: CString, + bucket_size_nanos: u64, + slot_count: usize, + string_pool_bytes: usize, + ) -> io::Result { + let slot_count = slot_count.max(1) as u32; + let string_pool_size = string_pool_bytes as u32; + let total = total_shm_size(slot_count, string_pool_size); + + // Remove any stale mapping at this path (ignore errors). + #[cfg(unix)] + unsafe { + libc::shm_unlink(path.as_ptr()); + } + + let handle = NamedShmHandle::create(path, total)?; + let mut mem = handle.map()?; + + let base = mem.as_slice_mut().as_mut_ptr(); + unsafe { + // On Windows the named mapping may persist from a previous concentrator lifetime + // (workers still hold handles after the sidecar retired it). Hence explicitly reset it. + #[cfg(windows)] + std::ptr::write_bytes(base, 0, total); + + let hdr = &mut *(base as *mut ShmHeader); + hdr.version = SHM_VERSION; + hdr.bucket_size_nanos = bucket_size_nanos; + hdr.slot_count = slot_count; + hdr.bucket_region_size = aligned_bucket_region(slot_count, string_pool_size) as u32; + hdr.string_pool_size = string_pool_size; + // Signal readiness LAST — workers see ready=0 until this store and fall back + // to IPC, preventing writes to a partially-initialized concentrator. + hdr.ready.store(1, Release); + } + + Ok(ShmSpanConcentrator { mem: Arc::new(mem) }) + } + + /// Open an existing SHM concentrator (PHP worker side). + pub fn open(path: &CStr) -> io::Result { + let handle = NamedShmHandle::open(path)?; + let mem = handle.map()?; + + let base = mem.as_slice().as_ptr(); + unsafe { + let hdr = shm_header(base); + if hdr.ready.load(Relaxed) == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "SHM span concentrator: not yet ready", + )); + } + if hdr.version != SHM_VERSION { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "SHM span concentrator: incompatible version", + )); + } + } + + Ok(ShmSpanConcentrator { mem: Arc::new(mem) }) + } + + fn base_ptr(&self) -> *const u8 { + self.mem.as_slice().as_ptr() + } + + fn header(&self) -> &ShmHeader { + unsafe { shm_header(self.base_ptr()) } + } + + /// Returns `true` when the sidecar has signalled that workers should + /// re-open the SHM at the same path (a larger mapping has been created). + /// + /// Workers should call this before every `add_span`; when it returns `true` + /// they should drop this handle, call `open(path)`, and retry. + pub fn needs_reload(&self) -> bool { + self.header().ready.load(Acquire) == 0 + } + + /// Unlink the SHM file from the filesystem so that new PHP workers cannot open it. + /// Existing mappings (including this one and any already open in PHP workers) remain + /// valid. Call this *before* `signal_reload` when retiring a concentrator. + /// + /// Uses `Arc::get_mut` to take the path out (preventing a double-unlink on `Drop`). + /// If multiple `Arc` clones are alive the path cannot be taken; the unlink still + /// happens but `Drop` may attempt a harmless second unlink (which returns `ENOENT`). + pub fn unlink(&self) { + #[cfg(unix)] + self.mem.unlink(); + } + + /// Add a span to the currently-active bucket. Thread-safe. + pub fn add_span(&self, input: &ShmSpanInput<'_>) { + let hdr = self.header(); + let slot_count = hdr.slot_count; + let brs = hdr.bucket_region_size; + let pool_size = hdr.string_pool_size; + let base = self.base_ptr(); + + // Claim in-flight on the active bucket, with double-check against swap. + let active = hdr.active_idx.load(Acquire); + let bkt_start = bucket_start(active, brs); + let bh = unsafe { bucket_header(base, bkt_start) }; + bh.in_flight.fetch_add(1, Acquire); + + let (bkt_start, bh) = { + let active2 = hdr.active_idx.load(Acquire); + if active2 != active { + bh.in_flight.fetch_sub(1, Release); + let s2 = bucket_start(active2, brs); + let h2 = unsafe { bucket_header(base, s2) }; + h2.in_flight.fetch_add(1, Acquire); + (s2, h2) + } else { + (bkt_start, bh) + } + }; + + let hash = hash_key(input); + let pool = unsafe { pool_base(base, bkt_start, slot_count) as *mut u8 }; + + let mut slot = (hash as usize) % slot_count as usize; + let mut done = false; + for _ in 0..slot_count { + let entry = unsafe { entry_ref(base, bkt_start, slot) }; + + let mut spins = 0u32; + loop { + match entry.key_hash.load(Acquire) { + SLOT_EMPTY => { + if entry + .key_hash + .compare_exchange(SLOT_EMPTY, SLOT_INIT, Acquire, Relaxed) + .is_ok() + { + unsafe { + Self::write_key(entry, input, pool, &bh.string_cursor, pool_size); + } + // Release on the store synchronises the key write with any + // subsequent Acquire load of the hash — no separate fence needed. + entry.key_hash.store(hash, Release); + Self::update_stats(entry, input); + done = true; + break; + } + spins += 1; + if spins % YIELD_AFTER_SPINS == 0 { + thread::yield_now(); + } else { + hint::spin_loop(); + } + } + SLOT_INIT => { + spins += 1; + if spins % YIELD_AFTER_SPINS == 0 { + thread::yield_now(); + } else { + hint::spin_loop(); + } + } + h if h == hash => { + if unsafe { key_matches(entry, input, pool) } { + Self::update_stats(entry, input); + done = true; + } + break; + } + _ => break, // hash collision, probe next + } + } + + if done { + break; + } + slot = (slot + 1) % slot_count as usize; + } + + bh.in_flight.fetch_sub(1, Release); + } + + unsafe fn write_key( + entry: &ShmEntry, + input: &ShmSpanInput<'_>, + pool: *mut u8, + cursor: &AtomicU32, + pool_size: u32, + ) { + let k = &mut *entry.key.get(); + let fi = &input.fixed; + k.fixed = fi.convert(|s| unsafe { alloc_str(pool, cursor, pool_size, s) }); + let n = input.peer_tags.len().min(MAX_PEER_TAGS); + k.peer_tag_count = n as u8; + for (i, &(tk, tv)) in input.peer_tags[..n].iter().enumerate() { + k.peer_tag_keys[i] = alloc_str(pool, cursor, pool_size, tk); + k.peer_tag_values[i] = alloc_str(pool, cursor, pool_size, tv); + } + } + + fn update_stats(entry: &ShmEntry, input: &ShmSpanInput<'_>) { + let s = &entry.stats; + s.hits.fetch_add(1, Relaxed); + if input.is_error { + s.errors.fetch_add(1, Relaxed); + } + s.duration_sum.fetch_add(input.duration_ns as u64, Relaxed); + if input.is_top_level { + s.top_level_hits.fetch_add(1, Relaxed); + } + let bin = bin_for_duration(input.duration_ns); + if input.is_error { + s.error_bins[bin].fetch_add(1, Relaxed); + } else { + s.ok_bins[bin].fetch_add(1, Relaxed); + } + } + + /// Returns `(used_slots, total_slots)` for the currently-active bucket. + /// + /// The sidecar uses this to decide when to recreate with more slots. + pub fn slot_usage(&self) -> (usize, usize) { + let hdr = self.header(); + let active = hdr.active_idx.load(Acquire); + let bkt_start = bucket_start(active, hdr.bucket_region_size); + let base = self.base_ptr(); + let slot_count = hdr.slot_count as usize; + + let used = (0..slot_count) + .filter(|&s| { + let h = unsafe { entry_ref(base, bkt_start, s) } + .key_hash + .load(Relaxed); + h != SLOT_EMPTY && h != SLOT_INIT + }) + .count(); + + (used, slot_count) + } + + /// Signal workers to re-open the SHM (call before creating a new, larger one). + pub fn signal_reload(&self) { + self.header().ready.store(0, Release); + } + + /// Drain the inactive (or both, if `force`) bucket(s) and return raw stat buckets. + /// + /// This is the low-level building block used by both [`flush`] and the + /// [`FlushableConcentrator`] impl. + pub fn drain_buckets(&self, force: bool) -> Vec { + let hdr = self.header(); + let slot_count = hdr.slot_count; + let brs = hdr.bucket_region_size; + let pool_size = hdr.string_pool_size; + let bucket_nanos = hdr.bucket_size_nanos; + + let mut stat_buckets: Vec = Vec::new(); + + if force { + for idx in 0u8..2 { + if let Some(b) = self.drain_bucket(idx, slot_count, brs, pool_size, bucket_nanos) { + stat_buckets.push(b); + } + } + } else { + let old_active = hdr.active_idx.fetch_xor(1, AcqRel); + if let Some(b) = self.drain_bucket(old_active, slot_count, brs, pool_size, bucket_nanos) + { + stat_buckets.push(b); + } + } + + stat_buckets + } + + /// Flush and return a serialised `ClientStatsPayload`, or `None` if empty. + /// + /// * `force = false` – swap the active bucket, drain the previously-active one. + /// * `force = true` – drain both buckets without swapping (shutdown). + #[allow(clippy::too_many_arguments)] + pub fn flush( + &self, + force: bool, + hostname: String, + env: String, + version: String, + service: String, + runtime_id: String, + ) -> Option { + let stat_buckets = self.drain_buckets(force); + if stat_buckets.is_empty() { + return None; + } + + let seq = self.header().flush_seq.fetch_add(1, Relaxed); + Some(pb::ClientStatsPayload { + hostname, + env, + version, + stats: stat_buckets, + runtime_id, + service, + sequence: seq, + ..Default::default() + }) + } + + fn drain_bucket( + &self, + bucket_idx: u8, + slot_count: u32, + bucket_region_size: u32, + _pool_size: u32, + bucket_size_nanos: u64, + ) -> Option { + let base = self.base_ptr(); + let bkt_start = bucket_start(bucket_idx, bucket_region_size); + let bh = unsafe { bucket_header(base, bkt_start) }; + + // Wait for in-flight writers (bounded to tolerate dead workers). + // The intermediate loads only need Relaxed; a single fence(Acquire) after + // the loop synchronizes with the Release in each writer's in_flight.fetch_sub, + // and covers all subsequent SHM reads in this function and callees. + let mut spins = 0u32; + while bh.in_flight.load(Relaxed) != 0 && spins < MAX_FLUSH_WAIT_ITERS { + spins += 1; + if spins % YIELD_AFTER_SPINS == 0 { + thread::yield_now(); + } else { + hint::spin_loop(); + } + } + fence(Acquire); + + let bucket_start_ts = bh.start_nanos.load(Relaxed); + let pool = unsafe { pool_base(base, bkt_start, slot_count) }; + + let mut grouped: Vec = Vec::new(); + + for slot in 0..slot_count as usize { + let entry = unsafe { entry_ref(base, bkt_start, slot) }; + let h = entry.key_hash.load(Relaxed); + if h == SLOT_EMPTY || h == SLOT_INIT { + continue; + } + + let gs = unsafe { Self::read_entry(entry, pool) }; + if gs.hits > 0 { + grouped.push(gs); + } + + unsafe { + std::ptr::write_bytes(std::ptr::addr_of!(entry.stats) as *mut ShmStats, 0, 1); + } + entry.key_hash.store(SLOT_EMPTY, Release); + } + + bh.string_cursor.store(0, Release); + + let now_ns = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + bh.start_nanos + .store(now_ns - (now_ns % bucket_size_nanos), Release); + + if grouped.is_empty() { + return None; + } + + Some(pb::ClientStatsBucket { + start: bucket_start_ts, + duration: bucket_size_nanos, + stats: grouped, + agent_time_shift: 0, + }) + } + + unsafe fn read_entry(entry: &ShmEntry, pool: *const u8) -> pb::ClientGroupedStats { + let k = &*entry.key.get(); + let f = &k.fixed; + let s = &entry.stats; + + macro_rules! read_str { + ($sref:expr) => {{ + let r: StringRef = $sref; + if r.len == 0 { + String::new() + } else { + String::from_utf8_lossy(std::slice::from_raw_parts( + pool.add(r.offset as usize), + r.len as usize, + )) + .into_owned() + } + }}; + } + + let peer_tags: Vec = (0..k.peer_tag_count as usize) + .map(|i| { + format!( + "{}:{}", + read_str!(k.peer_tag_keys[i]), + read_str!(k.peer_tag_values[i]) + ) + }) + .collect(); + + // fence(Acquire) in drain_bucket's spin-wait loop already synchronises these reads. + let hits = s.hits.load(Relaxed); + let errors = s.errors.load(Relaxed); + let duration_sum = s.duration_sum.load(Relaxed); + let top_level_hits = s.top_level_hits.load(Relaxed); + + let mut ok_sketch = DDSketch::default(); + let mut err_sketch = DDSketch::default(); + for bin in 0..N_BINS { + let ok_count = s.ok_bins[bin].load(Relaxed); + let err_count = s.error_bins[bin].load(Relaxed); + let rep = bin_representative(bin); + if ok_count > 0 { + let _ = ok_sketch.add_with_count(rep, ok_count as f64); + } + if err_count > 0 { + let _ = err_sketch.add_with_count(rep, err_count as f64); + } + } + + pb::ClientGroupedStats { + service: read_str!(f.service_name), + name: read_str!(f.operation_name), + resource: read_str!(f.resource_name), + http_status_code: f.http_status_code, + r#type: read_str!(f.span_type), + db_type: String::new(), + hits, + errors, + duration: duration_sum, + ok_summary: ok_sketch.encode_to_vec(), + error_summary: err_sketch.encode_to_vec(), + synthetics: f.is_synthetics_request, + top_level_hits, + span_kind: read_str!(f.span_kind), + peer_tags, + is_trace_root: if f.is_trace_root { + pb::Trilean::True.into() + } else { + pb::Trilean::False.into() + }, + http_method: read_str!(f.http_method), + http_endpoint: read_str!(f.http_endpoint), + grpc_status_code: f + .grpc_status_code + .map(|c| c.to_string()) + .unwrap_or_default(), + service_source: read_str!(f.service_source), + span_derived_primary_tags: vec![], + } + } +} + +impl FlushableConcentrator for ShmSpanConcentrator { + fn flush_buckets(&mut self, force: bool) -> Vec { + self.drain_buckets(force) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::AtomicU32; + + fn test_path() -> CString { + static COUNTER: AtomicU32 = AtomicU32::new(0); + CString::new(format!( + "/ddtrace-shm-t-{}-{}", + unsafe { libc::getpid() }, + COUNTER.fetch_add(1, SeqCst) + )) + .unwrap() + } + + fn span<'a>(service: &'a str, resource: &'a str, dur: i64) -> ShmSpanInput<'a> { + ShmSpanInput { + fixed: FixedAggregationKey { + service_name: service, + resource_name: resource, + operation_name: "op", + span_type: "web", + span_kind: "server", + http_method: "GET", + http_endpoint: "/", + service_source: "", + http_status_code: 200, + is_synthetics_request: false, + is_trace_root: true, + grpc_status_code: None, + }, + peer_tags: &[], + duration_ns: dur, + is_error: false, + is_top_level: true, + } + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_add_and_flush() { + let c = ShmSpanConcentrator::create( + test_path(), + 10_000_000_000, + DEFAULT_SLOT_COUNT, + DEFAULT_STRING_POOL_BYTES, + ) + .unwrap(); + c.add_span(&span("svc", "res", 1_000_000)); + c.add_span(&span("svc", "res", 2_000_000)); + let bytes = c.flush( + true, + "h".into(), + "e".into(), + "v".into(), + "s".into(), + "r".into(), + ); + assert!(bytes.is_some()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_open_from_worker() { + let path = test_path(); + let creator = ShmSpanConcentrator::create( + path.clone(), + 10_000_000_000, + DEFAULT_SLOT_COUNT, + DEFAULT_STRING_POOL_BYTES, + ) + .unwrap(); + let worker = ShmSpanConcentrator::open(path.as_c_str()).unwrap(); + worker.add_span(&span("svc2", "res2", 5_000_000)); + let bytes = creator.flush( + true, + "h".into(), + "".into(), + "".into(), + "".into(), + "r".into(), + ); + assert!(bytes.is_some()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_needs_reload() { + let path = test_path(); + let creator = ShmSpanConcentrator::create( + path.clone(), + 10_000_000_000, + DEFAULT_SLOT_COUNT, + DEFAULT_STRING_POOL_BYTES, + ) + .unwrap(); + let worker = ShmSpanConcentrator::open(path.as_c_str()).unwrap(); + assert!(!worker.needs_reload()); + creator.signal_reload(); + assert!(worker.needs_reload()); + } + + #[test] + fn test_histogram_bins() { + assert_eq!(bin_for_duration(0), 0); + assert_eq!(bin_for_duration(-1), 0); + assert!(bin_for_duration(1) >= 1); + assert_eq!(bin_for_duration(MAX_DURATION_NS as i64), N_BINS - 1); + assert_eq!(bin_for_duration(i64::MAX), N_BINS - 1); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_flush_empty() { + let c = ShmSpanConcentrator::create( + test_path(), + 10_000_000_000, + DEFAULT_SLOT_COUNT, + DEFAULT_STRING_POOL_BYTES, + ) + .unwrap(); + assert!(c + .flush( + false, + "h".into(), + "e".into(), + "v".into(), + "s".into(), + "r".into() + ) + .is_none()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_slot_usage() { + let c = ShmSpanConcentrator::create( + test_path(), + 10_000_000_000, + DEFAULT_SLOT_COUNT, + DEFAULT_STRING_POOL_BYTES, + ) + .unwrap(); + let (used, total) = c.slot_usage(); + assert_eq!(used, 0); + assert_eq!(total, DEFAULT_SLOT_COUNT); + + c.add_span(&span("svc", "res1", 1_000)); + c.add_span(&span("svc", "res2", 2_000)); + let (used2, _) = c.slot_usage(); + assert_eq!(used2, 2); + } +} diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 4b32b6ae3d..eaa970c40a 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -626,6 +626,8 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config( remote_config_enabled: bool, is_fork: bool, process_tags: &libdd_common_ffi::Vec, + hostname: ffi::CharSlice, + root_service: ffi::CharSlice, ) -> MaybeError { let session_id_str: String = session_id.to_utf8_lossy().into(); let session_config = SessionConfig { @@ -663,6 +665,10 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config( .to_vec(), remote_config_enabled, process_tags: process_tags.to_vec(), + peer_tag_keys: vec![], + span_kinds_stats_computed: vec![], + hostname: hostname.to_utf8_lossy().into(), + root_service: root_service.to_utf8_lossy().into(), }; #[cfg(unix)] try_c!(blocking::set_session_config( diff --git a/datadog-sidecar-ffi/tests/sidecar.rs b/datadog-sidecar-ffi/tests/sidecar.rs index 1536995d2b..fffabf9be0 100644 --- a/datadog-sidecar-ffi/tests/sidecar.rs +++ b/datadog-sidecar-ffi/tests/sidecar.rs @@ -114,6 +114,8 @@ fn test_ddog_sidecar_register_app() { false, false, &process_tags, + "".into(), + "".into(), ) .unwrap_none(); @@ -165,6 +167,8 @@ fn test_ddog_sidecar_register_app() { false, false, &process_tags, + "".into(), + "".into(), ) .unwrap_none(); diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index e0837e8f37..59c749de59 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -23,6 +23,7 @@ datadog-sidecar-macros = { path = "../datadog-sidecar-macros" } libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] } libdd-data-pipeline = { path = "../libdd-data-pipeline" } libdd-trace-utils = { path = "../libdd-trace-utils" } +libdd-trace-stats = { path = "../libdd-trace-stats" } datadog-remote-config = { path = "../datadog-remote-config" , features = ["live-debugger"]} datadog-live-debugger = { path = "../datadog-live-debugger" } libdd-crashtracker = { path = "../libdd-crashtracker" } @@ -38,6 +39,8 @@ datadog-ipc = { path = "../datadog-ipc", features = ["tiny-bytes"] } datadog-ipc-macros = { path = "../datadog-ipc-macros" } rand = "0.8.3" +rmp-serde = "1.1.1" +libdd-trace-protobuf = { path = "../libdd-trace-protobuf" } serde = { version = "1.0", features = ["derive", "rc"] } serde_with = "3.6.0" bincode = { version = "1.3.3" } @@ -56,6 +59,7 @@ tokio = { version = "1.49.0", features = [ tokio-util = { version = "0.7", features = ["codec"] } prctl = "1.0.0" +sys-info = { version = "0.9.0" } tracing = { version = "0.1", default-features = false } tracing-log = { version = "0.2.0", optional = true } tracing-subscriber = { version = "0.3.22", default-features = false, features = [ @@ -88,6 +92,9 @@ version = "1.49.0" [target.'cfg(not(target_arch = "x86"))'.dependencies] simd-json = "=0.14" +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +libdd-capabilities-impl = { path = "../libdd-capabilities-impl" } + [target.'cfg(unix)'.dependencies] nix = { version = "0.29", features = ["socket", "mman"] } sendfd = { version = "0.4", features = ["tokio"] } diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index d9c0cb79e0..9a42ab5eec 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -419,6 +419,17 @@ pub fn set_test_session_token(transport: &mut SidecarTransport, token: String) - Ok(()) } +/// IPC fallback: send a span directly to the sidecar's SHM concentrator for (env, version). +pub fn add_span_to_concentrator( + transport: &mut SidecarTransport, + env: String, + version: String, + span: datadog_ipc::shm_stats::OwnedShmSpanInput, +) -> io::Result<()> { + lock_sender(transport)?.add_span_to_concentrator(env, version, span); + Ok(()) +} + /// Dumps the current state of the service. pub fn dump(transport: &mut SidecarTransport) -> io::Result { transport.with_retry(|s| s.dump().map_err(|e| io::Error::other(e.to_string()))) diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index e25e1f159f..f5c4271577 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -38,6 +38,7 @@ mod serialized_tracer_header_tags; mod session_info; pub mod sidecar_interface; pub(crate) mod sidecar_server; +pub mod stats_flusher; pub mod telemetry; pub(crate) mod tracing; @@ -65,6 +66,12 @@ pub struct SessionConfig { pub remote_config_capabilities: Vec, pub remote_config_enabled: bool, pub process_tags: Vec, + pub peer_tag_keys: Vec, + pub span_kinds_stats_computed: Vec, + /// Tracer-configured hostname (from `DD_HOSTNAME`). Empty means "not configured". + pub hostname: String, + /// Process-level service name (from `DD_SERVICE`), used as the stats concentrator key. + pub root_service: String, } #[derive(Debug, Deserialize, Serialize)] diff --git a/datadog-sidecar/src/service/sender.rs b/datadog-sidecar/src/service/sender.rs index c21d62edfa..8f60c7a546 100644 --- a/datadog-sidecar/src/service/sender.rs +++ b/datadog-sidecar/src/service/sender.rs @@ -435,6 +435,19 @@ impl SidecarSender { self.channel.try_send_set_test_session_token(token); } + pub fn add_span_to_concentrator( + &mut self, + env: String, + version: String, + span: datadog_ipc::shm_stats::OwnedShmSpanInput, + ) { + if !self.try_drain_outbox() { + return; + } + self.channel + .try_send_add_span_to_concentrator(env, version, span); + } + pub fn set_read_timeout(&mut self, d: Option) -> io::Result<()> { self.channel.0.set_read_timeout(d) } diff --git a/datadog-sidecar/src/service/session_info.rs b/datadog-sidecar/src/service/session_info.rs index c81b73ca45..efa0204715 100644 --- a/datadog-sidecar/src/service/session_info.rs +++ b/datadog-sidecar/src/service/session_info.rs @@ -24,7 +24,7 @@ use crate::service::{InstanceId, QueueId, RuntimeInfo}; /// /// It contains a list of runtimes, session configuration, tracer configuration, and log guards. /// It also has methods to manage the runtimes and configurations. -#[derive(Default)] +#[derive(Default, Clone)] pub(crate) struct SessionInfo { runtimes: Arc>>, pub(crate) session_config: Arc>>, @@ -45,30 +45,7 @@ pub(crate) struct SessionInfo { pub(crate) pid: Arc, pub(crate) remote_config_enabled: Arc>, pub(crate) process_tags: Arc>>, -} - -impl Clone for SessionInfo { - fn clone(&self) -> Self { - SessionInfo { - runtimes: self.runtimes.clone(), - session_config: self.session_config.clone(), - debugger_config: self.debugger_config.clone(), - tracer_config: self.tracer_config.clone(), - dogstatsd: self.dogstatsd.clone(), - remote_config_options: self.remote_config_options.clone(), - agent_infos: self.agent_infos.clone(), - remote_config_interval: self.remote_config_interval.clone(), - #[cfg(windows)] - remote_config_notify_function: self.remote_config_notify_function.clone(), - #[cfg(windows)] - process_handle: self.process_handle.clone(), - log_guard: self.log_guard.clone(), - session_id: self.session_id.clone(), - pid: self.pid.clone(), - remote_config_enabled: self.remote_config_enabled.clone(), - process_tags: self.process_tags.clone(), - } - } + pub(crate) stats_config: Arc>>, } impl SessionInfo { @@ -174,6 +151,15 @@ impl SessionInfo { } } + pub(crate) fn modify_stats_config(&self, f: F) + where + F: FnOnce(&mut crate::service::stats_flusher::StatsConfig), + { + if let Some(cfg) = &mut *self.stats_config.lock_or_panic() { + f(cfg) + } + } + pub(crate) fn get_trace_config(&self) -> MutexGuard<'_, tracer::Config> { self.tracer_config.lock_or_panic() } diff --git a/datadog-sidecar/src/service/sidecar_interface.rs b/datadog-sidecar/src/service/sidecar_interface.rs index 25424cc71a..eea877c7dd 100644 --- a/datadog-sidecar/src/service/sidecar_interface.rs +++ b/datadog-sidecar/src/service/sidecar_interface.rs @@ -213,6 +213,19 @@ pub trait SidecarInterface { /// * `token` - The session token. async fn set_test_session_token(token: String); + /// IPC fallback: add a span directly to the sidecar's SHM concentrator for (env, version). + /// + /// Used when the PHP side cannot open the SHM concentrator yet (startup race: SHM is + /// created by the sidecar after processing `set_universal_service_tags`, but span + /// serialization may run before that message is processed). Because the sidecar processes + /// IPC messages sequentially and `set_universal_service_tags` is sent first (via the + /// priority outbox), the concentrator is guaranteed to exist when this message is processed. + async fn add_span_to_concentrator( + env: String, + version: String, + span: datadog_ipc::shm_stats::OwnedShmSpanInput, + ); + /// Sends a ping to the service. #[blocking] async fn ping(); diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 33dda9a760..27957846f2 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -35,6 +35,10 @@ use crate::service::debugger_diagnostics_bookkeeper::{ }; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; +use crate::service::stats_flusher::{ + flush_all_stats_now, get_or_create_concentrator, stats_endpoint, ConcentratorKey, + SpanConcentratorState, StatsConfig, +}; use crate::service::tracing::trace_flusher::TraceFlusherStats; use crate::tokio_util::run_or_spawn_shared; use datadog_live_debugger::sender::{agent_info_supports_debugger_v2_endpoint, DebuggerType}; @@ -102,6 +106,8 @@ pub struct SidecarServer { remote_configs: RemoteConfigs, /// Diagnostics bookkeeper debugger_diagnostics_bookkeeper: Arc, + /// Per-env&version SHM span concentrators (global across all sessions). + pub(crate) span_concentrators: Arc>>>, } /// Per-connection handler wrapper that tracks sessions/instances for cleanup on disconnect. @@ -675,6 +681,25 @@ impl SidecarInterface for ConnectionSidecarHandler { }); *session.agent_infos.lock_or_panic() = Some(agent_info); } + *session.stats_config.lock_or_panic() = Some(StatsConfig { + endpoint: stats_endpoint(&config.endpoint).unwrap_or_else(|| config.endpoint.clone()), + flush_interval: config.flush_interval, + hostname: if config.hostname.is_empty() { + sys_info::hostname().unwrap_or_default() + } else { + config.hostname.clone() + }, + process_tags: config + .process_tags + .iter() + .map(|t| t.to_string()) + .collect::>() + .join(","), + root_service: config.root_service.clone(), + language: config.language.clone(), + tracer_version: config.tracer_version.clone(), + }); + session.set_remote_config_invariants(ConfigOptions { invariants: ConfigInvariants { language: config.language, @@ -939,11 +964,35 @@ impl SidecarInterface for ConnectionSidecarHandler { }); } + async fn add_span_to_concentrator( + &self, + _peer: PeerCredentials, + env: String, + version: String, + span: datadog_ipc::shm_stats::OwnedShmSpanInput, + ) { + let session_id = self.session_id.get().map(|s| s.as_str()).unwrap_or(""); + let session = self.server.get_session(session_id); + // Lazily create the concentrator on first IPC span for this (env, version, service). + if let Some(state) = get_or_create_concentrator( + &self.server.span_concentrators, + &env, + &version, + session_id, + &session, + ) { + let mut peer_tag_buf = Vec::new(); + let input = span.as_shm_input(&mut peer_tag_buf); + state.concentrator.add_span(&input); + } + } + async fn flush_traces(&self, _peer: PeerCredentials) { let flusher = self.server.trace_flusher.clone(); if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await { error!("Failed flushing traces: {e:?}"); } + flush_all_stats_now(&self.server.span_concentrators).await; } async fn set_test_session_token(&self, _peer: PeerCredentials, token: String) { @@ -965,6 +1014,10 @@ impl SidecarInterface for ConnectionSidecarHandler { session.modify_trace_config(|trace_cfg| { trace_cfg.set_endpoint_test_token(token.clone()); }); + // Update the stats config so newly created concentrators carry the test token. + session.modify_stats_config(|cfg| { + cfg.endpoint.test_token = token.clone(); + }); // TODO(APMSP-1377): the dogstatsd-client doesn't support test_session tokens yet // session.configure_dogstatsd(|cfg| { // update_cfg(cfg.endpoint.take(), |e| cfg.set_endpoint(e), &token); diff --git a/datadog-sidecar/src/service/stats_flusher.rs b/datadog-sidecar/src/service/stats_flusher.rs new file mode 100644 index 0000000000..92bc498f0c --- /dev/null +++ b/datadog-sidecar/src/service/stats_flusher.rs @@ -0,0 +1,291 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Periodic stats flusher for the SHM span concentrator. +//! +//! The sidecar maintains one `SpanConcentratorState` per (env, version, service) triple +//! (globally, across all sessions) in `SidecarServer::span_concentrators`. +//! Concentrators are created lazily on the first IPC span for a given key, and removed +//! automatically once idle: an empty drain sets the `please_reload` bit (telling PHP workers +//! to stop writing), and the subsequent flush performs a final drain before removal. + +use base64::prelude::BASE64_URL_SAFE_NO_PAD; +use base64::Engine; +use datadog_ipc::shm_stats::{ + ShmSpanConcentrator, DEFAULT_SLOT_COUNT, DEFAULT_STRING_POOL_BYTES, RELOAD_FILL_RATIO, +}; +use http::uri::PathAndQuery; +use libdd_capabilities_impl::{HttpClientTrait, NativeCapabilities}; +use libdd_common::{Endpoint, MutexExt}; +use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; +use std::collections::HashMap; +use std::ffi::CString; +use std::hash::{Hash, Hasher}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Duration; +use tracing::{error, info, warn}; +use zwohash::ZwoHasher; + +/// Build the stats endpoint by appending `/v0.6/stats` to the agent base URL. +/// Returns `None` for agentless mode (API key present) — stats are not supported agentless. +pub(crate) fn stats_endpoint(endpoint: &Endpoint) -> Option { + if endpoint.api_key.is_some() { + return None; + } + let mut parts = endpoint.url.clone().into_parts(); + parts.path_and_query = Some(PathAndQuery::from_static( + libdd_trace_stats::stats_exporter::STATS_ENDPOINT_PATH, + )); + Some(Endpoint { + url: http::Uri::from_parts(parts).ok()?, + ..endpoint.clone() + }) +} + +/// The subset of session configuration needed to create and flush a span stats concentrator. +#[derive(Clone)] +pub(crate) struct StatsConfig { + /// Stats endpoint with final path already baked in. + pub endpoint: Endpoint, + pub flush_interval: Duration, + /// Machine hostname, forwarded to the stats payload `hostname` field. + pub hostname: String, + /// Process-level tags serialised as `"key:value,..."`. + pub process_tags: String, + /// Process-level service name (from `DD_SERVICE`), used as the concentrator key dimension. + pub root_service: String, + /// Language identifier (e.g. "php"). + pub language: String, + /// Tracer library version. + pub tracer_version: String, +} + +/// Map key for the per-(env, version, root-service) concentrator map. +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub struct ConcentratorKey { + pub env: String, + pub version: String, + pub root_service: String, +} + +/// State held per-(env, version, root-service) for SHM span stats. +pub struct SpanConcentratorState { + pub concentrator: ShmSpanConcentrator, + /// The stats endpoint (with `/v0.6/stats` path baked in) used by the flush loop. + pub(crate) endpoint: Endpoint, + /// Metadata for StatsExporter payload annotation (hostname, env, version, service, …). + pub(crate) meta: StatsMetadata, +} + +// SAFETY: ShmSpanConcentrator is designed for cross-process sharing; all internal state +// uses atomic operations. +unsafe impl Send for SpanConcentratorState {} + +/// Compute the SHM path for an (env, version, root-service) triple's span concentrator. +pub fn env_stats_shm_path(env: &str, version: &str, service: &str) -> CString { + let mut hasher = ZwoHasher::default(); + env.hash(&mut hasher); + version.hash(&mut hasher); + service.hash(&mut hasher); + let hash = hasher.finish(); + + let mut path = format!( + "/ddspsc{}-{}", + crate::primary_sidecar_identifier(), + BASE64_URL_SAFE_NO_PAD.encode(hash.to_ne_bytes()), + ); + path.truncate(31); + #[allow(clippy::unwrap_used)] + CString::new(path).unwrap() +} + +fn make_exporter( + s: &SpanConcentratorState, + endpoint: Endpoint, + flush_interval: Duration, +) -> StatsExporter { + StatsExporter::new( + flush_interval, + Arc::new(Mutex::new(s.concentrator.clone())), + s.meta.clone(), + endpoint, + NativeCapabilities::new_client(), + ) +} + +/// Spawn-and-forget flush loop for a concentrator. +/// +/// **Idle removal**: when a flush produces no data (`send` returns `false`), the +/// `please_reload` bit is set on the SHM, signalling PHP workers to stop writing (they will +/// fall back to the IPC path). On the very next tick, a final force-flush drains any +/// remaining data and the concentrator is removed from the map. This two-phase removal +/// avoids a race between the reload signal and in-flight SHM writes. +pub async fn run_stats_flush_loop( + states: Weak>>>, + map_key: ConcentratorKey, + flush_interval: Duration, +) { + let Some(arc) = states.upgrade() else { + return; + }; + let state = { + let guard = arc.lock_or_panic(); + guard.get(&map_key).cloned() + }; + let Some(state) = state else { + return; + }; + let exporter = make_exporter(&state, state.endpoint.clone(), flush_interval); + + loop { + tokio::time::sleep(flush_interval).await; + let Some(arc) = states.upgrade() else { + break; // sidecar shutting down + }; + + // Fill-check (atomic SHM reads, no lock needed). + let (used, total) = state.concentrator.slot_usage(); + if total > 0 && (used as f64 / total as f64) > RELOAD_FILL_RATIO { + warn!( + "SHM span concentrator for env={} version={} service={} is {:.0}% full \ + ({used}/{total} slots); consider increasing slot count", + map_key.env, + map_key.version, + map_key.root_service, + (used as f64 / total as f64) * 100.0 + ); + } + + match exporter.send(false).await { + Err(e) => warn!( + "Failed to send stats for env={} version={}: {e}", + map_key.env, map_key.version + ), + Ok(true) => {} // data sent — continue + Ok(false) => { + // Empty drain: retire this concentrator. + info!( + "Removing idle SHM span concentrator for env={} version={} service={}", + map_key.env, map_key.version, map_key.root_service, + ); + state.concentrator.signal_reload(); + #[cfg(unix)] + state.concentrator.unlink(); + #[cfg(unix)] // on windows waiting is pointless, because we cannot unlink it + tokio::time::sleep(Duration::from_secs(1)).await; + { + let mut guard = arc.lock_or_panic(); + // Only remove our entry — a fresher one may have been inserted already. + if guard + .get(&map_key) + .map_or(false, |s| Arc::ptr_eq(s, &state)) + { + guard.remove(&map_key); + } + } + if let Err(e) = exporter.send(true).await { + warn!("Failed final stats flush: {e}"); + } + break; + } + } + } +} + +/// Look up or create the SHM span concentrator for `(env, version, service)`. +/// +/// Called lazily from `add_span_to_concentrator` when the PHP worker could not write to SHM +/// directly (SHM not yet available). Creating on first IPC span — rather than eagerly in +/// `set_universal_service_tags` — lets the concentrator key track the actual span env/version +/// rather than the root-span-only values reported at request start. +/// +/// Returns `None` when stats config is not available (agentless or not yet configured). +pub(crate) fn get_or_create_concentrator( + concentrators: &Arc>>>, + env: &str, + version: &str, + runtime_id: &str, + session: &crate::service::session_info::SessionInfo, +) -> Option> { + let config = session + .stats_config + .lock() + .unwrap_or_else(|e| e.into_inner()) + .clone()?; + + if config.endpoint.api_key.is_some() { + return None; // agentless — no stats + } + + let service_name = config.root_service.clone(); + + let map_key = ConcentratorKey { + env: env.to_owned(), + version: version.to_owned(), + root_service: service_name.clone(), + }; + let mut guard = concentrators.lock_or_panic(); + + if let Some(s) = guard.get(&map_key) { + if !s.concentrator.needs_reload() { + return Some(s.clone()); + } + // Entry is being retired (reload signalled) — fall through to create a fresh one. + } + + let path = env_stats_shm_path(env, version, &service_name); + + let meta = StatsMetadata { + hostname: config.hostname.clone(), + env: env.to_owned(), + app_version: version.to_owned(), + runtime_id: runtime_id.to_owned(), + language: config.language.clone(), + tracer_version: config.tracer_version.clone(), + process_tags: config.process_tags.clone(), + service: service_name.clone(), + ..Default::default() + }; + + match ShmSpanConcentrator::create( + path.clone(), + 10_000_000_000, + DEFAULT_SLOT_COUNT, + DEFAULT_STRING_POOL_BYTES, + ) { + Ok(concentrator) => { + let state = Arc::new(SpanConcentratorState { + concentrator, + endpoint: config.endpoint.clone(), + meta, + }); + guard.insert(map_key.clone(), state.clone()); + let weak = Arc::downgrade(concentrators); + let flush_interval = config.flush_interval; + tokio::spawn(async move { + run_stats_flush_loop(weak, map_key, flush_interval).await; + }); + Some(state) + } + Err(e) => { + error!("Failed to create SHM span stats concentrator for env={env} version={version} service={service_name}: {e}"); + None + } + } +} + +/// Immediately flush all active SHM span concentrators and send the results to the agent. +pub async fn flush_all_stats_now( + state: &Arc>>>, +) { + let states: Vec> = { + let guard = state.lock_or_panic(); + guard.values().cloned().collect() + }; + for s in states { + let exporter = make_exporter(&s, s.endpoint.clone(), Duration::from_secs(10)); + if let Err(e) = exporter.send(false).await { + warn!("flush_all_stats_now: failed to send stats: {e}"); + } + } +} diff --git a/datadog-sidecar/src/shm_remote_config.rs b/datadog-sidecar/src/shm_remote_config.rs index 9015f83ea9..98114fe1e2 100644 --- a/datadog-sidecar/src/shm_remote_config.rs +++ b/datadog-sidecar/src/shm_remote_config.rs @@ -268,7 +268,10 @@ impl MultiTargetHandlers for ConfigFileStora serialized.push(b'\n'); for file in files.iter() { #[allow(clippy::unwrap_used)] - serialized.extend_from_slice(file.handle.lock_or_panic().as_ref().unwrap().get_path()); + // SAFETY: no concurrent unlink() on this handle. + serialized.extend_from_slice(unsafe { + file.handle.lock_or_panic().as_ref().unwrap().get_path() + }); serialized.push(b':'); if let Some(ref limiter) = file.limiter { serialized.extend_from_slice(limiter.index().to_string().as_bytes()); diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index 8d5db11a82..44fa6ef67f 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -34,9 +34,8 @@ libdd-common = { version = "3.0.2", path = "../libdd-common", default-features = libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" } libdd-telemetry = { version = "4.0.0", path = "../libdd-telemetry", default-features = false, optional = true} libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" } -libdd-trace-stats = { version = "2.0.0", path = "../libdd-trace-stats" } +libdd-trace-stats = { version = "2.0.0", path = "../libdd-trace-stats", default-features = false } libdd-trace-utils = { version = "3.0.1", path = "../libdd-trace-utils", default-features = false } -libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } libdd-dogstatsd-client = { version = "2.0.0", path = "../libdd-dogstatsd-client", default-features = false } libdd-tinybytes = { version = "1.1.0", path = "../libdd-tinybytes", features = [ "bytes_string", @@ -78,6 +77,7 @@ telemetry = ["libdd-telemetry"] https = [ "libdd-common/https", "libdd-telemetry?/https", + "libdd-trace-stats/https", "libdd-trace-utils/https", "libdd-dogstatsd-client/https", ] diff --git a/libdd-data-pipeline/src/agent_info/schema.rs b/libdd-data-pipeline/src/agent_info/schema.rs index 6023bef01a..1a3a191f44 100644 --- a/libdd-data-pipeline/src/agent_info/schema.rs +++ b/libdd-data-pipeline/src/agent_info/schema.rs @@ -37,6 +37,21 @@ pub struct AgentInfoStruct { pub span_kinds_stats_computed: Option>, /// Container tags hash from HTTP response header pub container_tags_hash: Option, + /// Exact-match tag filters applied before stats computation (root span only). + pub filter_tags: Option, + /// Regex-match tag filters applied before stats computation (root span only). + pub filter_tags_regex: Option, + /// Regex patterns for root-span resource names; matching traces are excluded from stats. + pub ignore_resources: Option>, +} + +/// Require/reject lists for tag-based trace filters exposed by the agent /info endpoint. +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] +pub struct FilterTagsConfig { + /// All listed filters must match at least one root-span tag for the trace to be accepted. + pub require: Option>, + /// If any listed filter matches a root-span tag the trace is rejected. + pub reject: Option>, } #[allow(missing_docs)] diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 33d080c022..2af59a472b 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -13,8 +13,6 @@ pub mod agent_info; mod health_metrics; pub(crate) mod otlp; -#[allow(missing_docs)] -pub mod stats_exporter; #[cfg(feature = "telemetry")] pub(crate) mod telemetry; #[allow(missing_docs)] diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 7014b0e7a4..6a15438248 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -9,8 +9,6 @@ #[cfg(not(target_arch = "wasm32"))] use crate::agent_info::schema::AgentInfo; -#[cfg(not(target_arch = "wasm32"))] -use crate::stats_exporter; use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientTrait, MaybeSend}; #[cfg(not(target_arch = "wasm32"))] @@ -18,6 +16,8 @@ use libdd_common::Endpoint; use libdd_common::MutexExt; use libdd_shared_runtime::{SharedRuntime, WorkerHandle}; use libdd_trace_stats::span_concentrator::SpanConcentrator; +#[cfg(not(target_arch = "wasm32"))] +use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; use std::sync::{Arc, Mutex}; use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] @@ -25,6 +25,7 @@ use tracing::{debug, error}; #[cfg(not(target_arch = "wasm32"))] use super::add_path; +use super::TracerMetadata; #[cfg(not(target_arch = "wasm32"))] pub(crate) const DEFAULT_STATS_ELIGIBLE_SPAN_KINDS: [&str; 4] = @@ -35,7 +36,7 @@ pub(crate) const STATS_ENDPOINT: &str = "/v0.6/stats"; #[cfg(not(target_arch = "wasm32"))] /// Context struct that groups immutable parameters used by stats functions pub(crate) struct StatsContext<'a> { - pub metadata: &'a super::TracerMetadata, + pub metadata: &'a TracerMetadata, pub endpoint_url: &'a http::Uri, pub shared_runtime: &'a SharedRuntime, } @@ -104,10 +105,10 @@ fn create_and_start_stats_worker, client: H, ) -> anyhow::Result<()> { - let stats_exporter = stats_exporter::StatsExporter::::new( + let stats_exporter = StatsExporter::::new( bucket_size, stats_concentrator.clone(), - ctx.metadata.clone(), + StatsMetadata::from(ctx.metadata.clone()), Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)), client, ); @@ -255,3 +256,23 @@ pub(crate) fn is_stats_worker_active(client_side_stats: &ArcSwap for StatsMetadata { + fn from(m: TracerMetadata) -> StatsMetadata { + StatsMetadata { + hostname: m.hostname, + env: m.env, + app_version: m.app_version, + runtime_id: m.runtime_id, + language: m.language, + lang_version: m.language_version, + lang_interpreter: m.language_interpreter, + lang_vendor: m.language_interpreter_vendor, + tracer_version: m.tracer_version, + git_commit_sha: m.git_commit_sha, + process_tags: m.process_tags, + service: m.service, + } + } +} diff --git a/libdd-trace-stats/Cargo.toml b/libdd-trace-stats/Cargo.toml index 78946aa69f..1e66693b51 100644 --- a/libdd-trace-stats/Cargo.toml +++ b/libdd-trace-stats/Cargo.toml @@ -10,10 +10,24 @@ license.workspace = true autobenches = false [dependencies] +anyhow = "1.0" +libdd-capabilities = { path = "../libdd-capabilities", version = "0.1.0" } +libdd-common = { version = "3.0.2", path = "../libdd-common", default-features = false } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } +libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" } libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" } libdd-trace-utils = { version = "3.0.1", path = "../libdd-trace-utils", default-features = false } hashbrown = { version = "0.15" } +http = "1.1" +rmp-serde = "1.1.1" +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1.23", features = ["macros", "time"], default-features = false } +tokio-util = "0.7.11" +tracing = { version = "0.1", default-features = false } +async-trait = "0.1.85" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +libdd-capabilities-impl = { version = "0.1.0", path = "../libdd-capabilities-impl" } [lib] bench = false @@ -25,4 +39,11 @@ path = "benches/main.rs" [dev-dependencies] criterion = "0.5.1" +httpmock = "0.8.0-alpha.1" +libdd-trace-utils = { path = "../libdd-trace-utils", features = ["test-utils"] } rand = "0.8.5" +tokio = { version = "1.23", features = ["rt-multi-thread", "macros", "test-util", "time"], default-features = false } + +[features] +default = ["https"] +https = ["libdd-common/https"] diff --git a/libdd-trace-stats/README.md b/libdd-trace-stats/README.md index 1d9ef38a7f..789b35f0e0 100644 --- a/libdd-trace-stats/README.md +++ b/libdd-trace-stats/README.md @@ -14,6 +14,7 @@ Compute aggregated statistics from distributed tracing spans with time-bucketed - **Span Filtering**: Filter spans by top-level, measured, or span.kind - **Time Bucketing**: Configurable bucket sizes for aggregation - **Statistics Export**: Generate statistics payloads for Datadog backend +- **Stats Exporter** *(non-wasm)*: Periodic HTTP flusher backed by any `HttpClientTrait` implementation ## Span Concentrator @@ -41,6 +42,28 @@ Only certain spans are aggregated: When flushed, the concentrator keeps the most recent buckets and returns older buckets as statistics. +## Stats Exporter + +`StatsExporter` wraps a `FlushableConcentrator` and periodically drains it, encoding the resulting `ClientStatsPayload` as msgpack and POSTing it to the agent's `/v0.6/stats` endpoint. + +```rust +use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; +use libdd_capabilities_impl::NativeCapabilities; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +let exporter = StatsExporter::::new( + Duration::from_secs(10), + Arc::new(Mutex::new(concentrator)), + StatsMetadata { service: "my-service".into(), ..Default::default() }, + endpoint, + NativeCapabilities::new_client(), +); + +// Flush and send (force=true drains all buckets on shutdown) +exporter.send(false).await?; +``` + ## Example Usage ```rust diff --git a/libdd-trace-stats/src/lib.rs b/libdd-trace-stats/src/lib.rs index c086768772..ca62eeb1ab 100644 --- a/libdd-trace-stats/src/lib.rs +++ b/libdd-trace-stats/src/lib.rs @@ -9,3 +9,5 @@ //! This crate provides utilities to compute stats from traces. pub mod span_concentrator; +#[cfg(not(target_arch = "wasm32"))] +pub mod stats_exporter; diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index f60113a6ac..ac32c24620 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -8,6 +8,7 @@ use hashbrown::HashMap; use libdd_trace_protobuf::pb; use libdd_trace_utils::span::SpanText; +use std::borrow::Borrow; use crate::span_concentrator::StatSpan; @@ -23,62 +24,74 @@ const GRPC_STATUS_CODE_FIELD: &[&str] = &[ "grpc.status.code", ]; +/// Aggregation key fields shared across all concentrator implementations — everything +/// **except** peer tags. +/// +/// `T` is the string representation: +/// * `&'a str` — borrowed references used in [`BorrowedAggregationKey`] +/// * `String` — owned values used in `OwnedAggregationKey` +/// * `StringRef` — offset+len into a SHM string pool, used in `ShmKeyHeader` +#[derive( + Clone, Default, Hash, Eq, PartialEq, Debug, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub struct FixedAggregationKey { + pub resource_name: T, + pub service_name: T, + pub operation_name: T, + pub span_type: T, + pub span_kind: T, + pub http_method: T, + pub http_endpoint: T, + pub service_source: T, + pub http_status_code: u32, + pub grpc_status_code: Option, + pub is_synthetics_request: bool, + pub is_trace_root: bool, +} + +impl FixedAggregationKey { + /// Map all string fields through `f`, preserving scalar fields. + pub fn convert<'a, V: 'a, I: ?Sized + 'a, F: Fn(&'a I) -> V>( + &'a self, + f: F, + ) -> FixedAggregationKey + where + T: Borrow, + { + FixedAggregationKey { + resource_name: f(self.resource_name.borrow()), + service_name: f(self.service_name.borrow()), + operation_name: f(self.operation_name.borrow()), + span_type: f(self.span_type.borrow()), + span_kind: f(self.span_kind.borrow()), + http_method: f(self.http_method.borrow()), + http_endpoint: f(self.http_endpoint.borrow()), + service_source: f(self.service_source.borrow()), + http_status_code: self.http_status_code, + grpc_status_code: self.grpc_status_code, + is_synthetics_request: self.is_synthetics_request, + is_trace_root: self.is_trace_root, + } + } +} + #[derive(Clone, Hash, PartialEq, Eq)] /// Represent a stats aggregation key borrowed from span data pub(super) struct BorrowedAggregationKey<'a> { - resource_name: &'a str, - service_name: &'a str, - operation_name: &'a str, - span_type: &'a str, - span_kind: &'a str, - http_status_code: u32, - is_synthetics_request: bool, + fixed: FixedAggregationKey<&'a str>, peer_tags: Vec<(&'a str, &'a str)>, - is_trace_root: bool, - http_method: &'a str, - http_endpoint: &'a str, - grpc_status_code: Option, - service_source: &'a str, } impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { #[inline] - fn equivalent( - &self, - OwnedAggregationKey { - resource_name, - service_name, - operation_name, - span_type, - span_kind, - http_status_code, - is_synthetics_request, - peer_tags, - is_trace_root, - http_method, - http_endpoint, - grpc_status_code, - service_source, - }: &OwnedAggregationKey, - ) -> bool { - self.resource_name == resource_name - && self.service_name == service_name - && self.operation_name == operation_name - && self.span_type == span_type - && self.span_kind == span_kind - && self.http_status_code == *http_status_code - && self.is_synthetics_request == *is_synthetics_request - && self.peer_tags.len() == peer_tags.len() + fn equivalent(&self, other: &OwnedAggregationKey) -> bool { + self.fixed == other.fixed.convert(|s| s) + && self.peer_tags.len() == other.peer_tags.len() && self .peer_tags .iter() - .zip(peer_tags.iter()) + .zip(other.peer_tags.iter()) .all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2) - && self.is_trace_root == *is_trace_root - && self.http_method == http_method - && self.http_endpoint == http_endpoint - && self.grpc_status_code == *grpc_status_code - && self.service_source == service_source } } @@ -91,41 +104,19 @@ impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { /// borrowed key /// * Running the Equivalent trait on an owned key derived from a borrowed key should produce true pub(super) struct OwnedAggregationKey { - resource_name: String, - service_name: String, - operation_name: String, - span_type: String, - span_kind: String, - http_status_code: u32, - is_synthetics_request: bool, + fixed: FixedAggregationKey, peer_tags: Vec<(String, String)>, - is_trace_root: bool, - http_method: String, - http_endpoint: String, - grpc_status_code: Option, - service_source: String, } impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey { fn from(value: &BorrowedAggregationKey<'_>) -> Self { OwnedAggregationKey { - resource_name: value.resource_name.to_owned(), - service_name: value.service_name.to_owned(), - operation_name: value.operation_name.to_owned(), - span_type: value.span_type.to_owned(), - span_kind: value.span_kind.to_owned(), - http_status_code: value.http_status_code, - is_synthetics_request: value.is_synthetics_request, + fixed: value.fixed.convert(str::to_owned), peer_tags: value .peer_tags .iter() .map(|(k, v)| (k.to_string(), v.to_string())) .collect(), - is_trace_root: value.is_trace_root, - http_method: value.http_method.to_owned(), - http_endpoint: value.http_endpoint.to_owned(), - grpc_status_code: value.grpc_status_code, - service_source: value.service_source.to_owned(), } } } @@ -210,7 +201,7 @@ impl<'a> BorrowedAggregationKey<'a> { /// /// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the /// key. - pub(super) fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { + pub fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default(); let peer_tags = if should_track_peer_tags(span_kind) { // Parse the meta tags of the span and return a list of the peer tags based on the list @@ -246,21 +237,23 @@ impl<'a> BorrowedAggregationKey<'a> { let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default(); Self { - resource_name: span.resource(), - service_name: span.service(), - operation_name: span.name(), - span_type: span.r#type(), - span_kind, - http_status_code: status_code, - is_synthetics_request: span - .get_meta(TAG_ORIGIN) - .is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)), + fixed: FixedAggregationKey { + resource_name: span.resource(), + service_name: span.service(), + operation_name: span.name(), + span_type: span.r#type(), + span_kind, + http_method, + http_endpoint, + service_source, + http_status_code: status_code, + grpc_status_code, + is_synthetics_request: span + .get_meta(TAG_ORIGIN) + .is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)), + is_trace_root: span.is_trace_root(), + }, peer_tags, - is_trace_root: span.is_trace_root(), - http_method, - http_endpoint, - grpc_status_code, - service_source, } } } @@ -268,13 +261,20 @@ impl<'a> BorrowedAggregationKey<'a> { impl From for OwnedAggregationKey { fn from(value: pb::ClientGroupedStats) -> Self { Self { - resource_name: value.resource, - service_name: value.service, - operation_name: value.name, - span_type: value.r#type, - span_kind: value.span_kind, - http_status_code: value.http_status_code, - is_synthetics_request: value.synthetics, + fixed: FixedAggregationKey { + resource_name: value.resource, + service_name: value.service, + operation_name: value.name, + span_type: value.r#type, + span_kind: value.span_kind, + http_method: value.http_method, + http_endpoint: value.http_endpoint, + service_source: value.service_source, + http_status_code: value.http_status_code, + grpc_status_code: value.grpc_status_code.parse().ok(), + is_synthetics_request: value.synthetics, + is_trace_root: value.is_trace_root == 1, + }, peer_tags: value .peer_tags .into_iter() @@ -283,11 +283,6 @@ impl From for OwnedAggregationKey { Some((key.to_string(), value.to_string())) }) .collect(), - is_trace_root: value.is_trace_root == 1, - http_method: value.http_method, - http_endpoint: value.http_endpoint, - grpc_status_code: value.grpc_status_code.parse().ok(), - service_source: value.service_source, } } } @@ -383,12 +378,13 @@ impl StatsBucket { /// Create a ClientGroupedStats struct based on the given AggregationKey and GroupedStats fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::ClientGroupedStats { + let f = key.fixed; pb::ClientGroupedStats { - service: key.service_name, - name: key.operation_name, - resource: key.resource_name, - http_status_code: key.http_status_code, - r#type: key.span_type, + service: f.service_name, + name: f.operation_name, + resource: f.resource_name, + http_status_code: f.http_status_code, + r#type: f.span_type, db_type: String::new(), // db_type is not used yet (see proto definition) hits: group.hits, @@ -397,27 +393,27 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl ok_summary: group.ok_summary.encode_to_vec(), error_summary: group.error_summary.encode_to_vec(), - synthetics: key.is_synthetics_request, + synthetics: f.is_synthetics_request, top_level_hits: group.top_level_hits, - span_kind: key.span_kind, + span_kind: f.span_kind, peer_tags: key .peer_tags .into_iter() .map(|(k, v)| format!("{k}:{v}")) .collect(), - is_trace_root: if key.is_trace_root { + is_trace_root: if f.is_trace_root { pb::Trilean::True.into() } else { pb::Trilean::False.into() }, - http_method: key.http_method, - http_endpoint: key.http_endpoint, - grpc_status_code: key + http_method: f.http_method, + http_endpoint: f.http_endpoint, + grpc_status_code: f .grpc_status_code .map(|c| c.to_string()) .unwrap_or_default(), - service_source: key.service_source, + service_source: f.service_source, span_derived_primary_tags: vec![], // Todo } } @@ -436,6 +432,21 @@ mod tests { hasher.finish() } + impl FixedAggregationKey { + fn into_key(self) -> OwnedAggregationKey { + OwnedAggregationKey { + fixed: self, + peer_tags: vec![], + } + } + fn into_key_with_peers(self, peer_tags: Vec<(String, String)>) -> OwnedAggregationKey { + OwnedAggregationKey { + fixed: self, + peer_tags, + } + } + } + #[test] fn test_aggregation_key_from_span() { let test_cases: Vec<(SpanBytes, OwnedAggregationKey)> = vec![ @@ -449,13 +460,14 @@ mod tests { parent_id: 0, ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with span kind ( @@ -468,14 +480,15 @@ mod tests { meta: HashMap::from([("span.kind".into(), "client".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with peer tags but peertags aggregation disabled ( @@ -491,14 +504,15 @@ mod tests { ]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with multiple peer tags but peertags aggregation disabled ( @@ -516,14 +530,15 @@ mod tests { ]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), span_kind: "producer".into(), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with multiple peer tags but peertags aggregation disabled and span kind is // server @@ -542,14 +557,15 @@ mod tests { ]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), span_kind: "server".into(), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span from synthetics ( @@ -562,14 +578,15 @@ mod tests { meta: HashMap::from([("_dd.origin".into(), "synthetics-browser".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: true, is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with status code in meta ( @@ -582,7 +599,7 @@ mod tests { meta: HashMap::from([("http.status_code".into(), "418".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -590,7 +607,8 @@ mod tests { is_trace_root: true, http_status_code: 418, ..Default::default() - }, + } + .into_key(), ), // Span with invalid status code in meta ( @@ -603,14 +621,15 @@ mod tests { meta: HashMap::from([("http.status_code".into(), "x".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: false, is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with status code in metrics ( @@ -623,7 +642,7 @@ mod tests { metrics: HashMap::from([("http.status_code".into(), 418.0)]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), @@ -631,7 +650,8 @@ mod tests { is_trace_root: true, http_status_code: 418, ..Default::default() - }, + } + .into_key(), ), // Span with http.method and http.route ( @@ -647,7 +667,7 @@ mod tests { ]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "GET /api/v1/users".into(), @@ -656,7 +676,8 @@ mod tests { is_synthetics_request: false, is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with http.method and http.endpoint (http.endpoint takes precedence) ( @@ -673,7 +694,7 @@ mod tests { ]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "POST /users/create".into(), @@ -682,7 +703,8 @@ mod tests { is_synthetics_request: false, is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with grpc status from meta as named string ( @@ -690,11 +712,12 @@ mod tests { meta: HashMap::from([("rpc.grpc.status_code".into(), "OK".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { grpc_status_code: Some(0), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with grpc status from meta as numeric string ( @@ -702,11 +725,12 @@ mod tests { meta: HashMap::from([("rpc.grpc.status_code".into(), "14".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { grpc_status_code: Some(14), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with grpc status from meta with StatusCode. prefix ( @@ -714,11 +738,12 @@ mod tests { meta: HashMap::from([("grpc.code".into(), "StatusCode.UNAVAILABLE".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { grpc_status_code: Some(14), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with grpc status from metrics takes precedence over meta ( @@ -730,11 +755,12 @@ mod tests { metrics: HashMap::from([("rpc.grpc.status_code".into(), 2.0)]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { grpc_status_code: Some(7), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with grpc status from metrics via secondary key ( @@ -742,11 +768,12 @@ mod tests { metrics: HashMap::from([("grpc.code".into(), 3.0)]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { grpc_status_code: Some(3), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with invalid grpc status string ( @@ -754,10 +781,11 @@ mod tests { meta: HashMap::from([("rpc.grpc.status_code".into(), "NOPE".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), // Span with service source set by integration ( @@ -770,14 +798,15 @@ mod tests { meta: HashMap::from([("_dd.svc_src".into(), "redis".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), is_trace_root: true, service_source: "redis".into(), ..Default::default() - }, + } + .into_key(), ), // Span with service source set by configuration option ( @@ -790,14 +819,15 @@ mod tests { meta: HashMap::from([("_dd.svc_src".into(), "opt.split_by_tag".into())]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), is_trace_root: true, service_source: "opt.split_by_tag".into(), ..Default::default() - }, + } + .into_key(), ), // Span without service source (default service name) ( @@ -809,14 +839,15 @@ mod tests { parent_id: 0, ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), is_trace_root: true, service_source: "".into(), ..Default::default() - }, + } + .into_key(), ), ]; @@ -838,15 +869,15 @@ mod tests { meta: HashMap::from([("span.kind", "client"), ("aws.s3.bucket", "bucket-a")]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), is_trace_root: true, - peer_tags: vec![("aws.s3.bucket".into(), "bucket-a".into())], ..Default::default() - }, + } + .into_key_with_peers(vec![("aws.s3.bucket".into(), "bucket-a".into())]), ), // Span with multiple peer tags with peertags aggregation enabled ( @@ -864,19 +895,19 @@ mod tests { ]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), span_kind: "producer".into(), - peer_tags: vec![ - ("aws.s3.bucket".into(), "bucket-a".into()), - ("db.instance".into(), "dynamo.test.us1".into()), - ("db.system".into(), "dynamodb".into()), - ], is_trace_root: true, ..Default::default() - }, + } + .into_key_with_peers(vec![ + ("aws.s3.bucket".into(), "bucket-a".into()), + ("db.instance".into(), "dynamo.test.us1".into()), + ("db.system".into(), "dynamodb".into()), + ]), ), // Span with multiple peer tags with peertags aggregation enabled and span kind is // server @@ -895,14 +926,15 @@ mod tests { ]), ..Default::default() }, - OwnedAggregationKey { + FixedAggregationKey { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), span_kind: "server".into(), is_trace_root: true, ..Default::default() - }, + } + .into_key(), ), ]; diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 1ad0ba8b24..151d8fc2ef 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -6,12 +6,28 @@ use std::time::{self, Duration, SystemTime}; use libdd_trace_protobuf::pb; -use aggregation::{BorrowedAggregationKey, StatsBucket}; -use stat_span::StatSpan; +use aggregation::StatsBucket; mod aggregation; +use aggregation::BorrowedAggregationKey; +pub use aggregation::FixedAggregationKey; -mod stat_span; +pub mod stat_span; +pub use stat_span::StatSpan; + +/// Concentrators that can provide raw time buckets for export implement this trait. +/// +/// `StatsExporter` is generic over `C: FlushableConcentrator` so it can work with +/// both the in-process [`SpanConcentrator`] and the SHM-backed `ShmSpanConcentrator`. +pub trait FlushableConcentrator { + fn flush_buckets(&mut self, force: bool) -> Vec; +} + +impl FlushableConcentrator for SpanConcentrator { + fn flush_buckets(&mut self, force: bool) -> Vec { + self.flush(SystemTime::now(), force) + } +} /// Return a Duration between t and the unix epoch /// If t is before the unix epoch return 0 @@ -27,7 +43,7 @@ fn align_timestamp(t: u64, bucket_size: u64) -> u64 { } /// Return true if the span is eligible for stats computation -fn is_span_eligible<'a, T>(span: &'a T, span_kinds_stats_computed: &[String]) -> bool +pub fn is_span_eligible<'a, T>(span: &'a T, span_kinds_stats_computed: &[String]) -> bool where T: StatSpan<'a>, { @@ -93,11 +109,21 @@ impl SpanConcentrator { } } + /// Return the list of span kinds eligible for stats computation + pub fn span_kinds(&self) -> &[String] { + &self.span_kinds_stats_computed + } + /// Set the list of span kinds eligible for stats computation pub fn set_span_kinds(&mut self, span_kinds: Vec) { self.span_kinds_stats_computed = span_kinds; } + /// Return the list of keys considered as peer_tags for aggregation + pub fn peer_tag_keys(&self) -> &[String] { + &self.peer_tag_keys + } + /// Set the list of keys considered as peer_tags for aggregation pub fn set_peer_tags(&mut self, peer_tags: Vec) { self.peer_tag_keys = peer_tags; diff --git a/libdd-data-pipeline/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs similarity index 85% rename from libdd-data-pipeline/src/stats_exporter.rs rename to libdd-trace-stats/src/stats_exporter.rs index e23b0b5f4b..855a8b6d3f 100644 --- a/libdd-data-pipeline/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - borrow::Borrow, sync::{ atomic::{AtomicU64, Ordering}, Arc, Mutex, @@ -10,45 +9,77 @@ use std::{ time, }; -use crate::trace_exporter::TracerMetadata; +use crate::span_concentrator::{FlushableConcentrator, SpanConcentrator}; use async_trait::async_trait; use libdd_capabilities::{HttpClientTrait, MaybeSend}; use libdd_common::Endpoint; use libdd_shared_runtime::Worker; use libdd_trace_protobuf::pb; -use libdd_trace_stats::span_concentrator::SpanConcentrator; use libdd_trace_utils::send_with_retry::{send_with_retry, RetryStrategy}; +use std::fmt::Debug; use tracing::error; -const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; +pub const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; + +/// Metadata needed by the stats exporter to annotate payloads and HTTP requests. +#[derive(Clone, Default, Debug)] +pub struct StatsMetadata { + pub hostname: String, + pub env: String, + pub app_version: String, + pub runtime_id: String, + pub language: String, + pub lang_version: String, + pub lang_interpreter: String, + pub lang_vendor: String, + pub tracer_version: String, + pub git_commit_sha: String, + pub process_tags: String, + pub service: String, +} + +impl StatsMetadata { + /// Build the HTTP headers accepted by the agent's `/v0.6/stats` endpoint. + pub fn to_stats_headers(&self) -> http::HeaderMap { + let mut map = http::HeaderMap::new(); + if let Ok(v) = self.language.parse() { + map.insert("Datadog-Tracer-Language", v); + } + if let Ok(v) = self.tracer_version.parse() { + map.insert("Datadog-Tracer-Version", v); + } + map + } +} /// An exporter that concentrates and sends stats to the agent. /// /// `H` is the HTTP client implementation, see [`HttpClientTrait`]. Leaf crates /// pin it to a concrete type. #[derive(Debug)] -pub struct StatsExporter { +pub struct StatsExporter { flush_interval: time::Duration, - concentrator: Arc>, + concentrator: Arc>, endpoint: Endpoint, - meta: TracerMetadata, + meta: StatsMetadata, sequence_id: AtomicU64, client: H, } -impl StatsExporter { +impl StatsExporter { /// Return a new StatsExporter /// /// - `flush_interval` the interval on which the concentrator is flushed - /// - `concentrator` SpanConcentrator storing the stats to be sent to the agent + /// - `concentrator` an impl of `FlushableConcentrator` storing the stats to be sent to the + /// agent /// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent /// - `endpoint` the Endpoint used to send stats to the agent /// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the /// concentrator pub fn new( flush_interval: time::Duration, - concentrator: Arc>, - meta: TracerMetadata, + concentrator: Arc>, + meta: StatsMetadata, endpoint: Endpoint, client: H, ) -> Self { @@ -77,15 +108,15 @@ impl StatsExporter { /// # Panic /// Will panic if another thread panicked while holding the concentrator lock in which /// case stats cannot be flushed since the concentrator might be corrupted. - pub async fn send(&self, force_flush: bool) -> anyhow::Result<()> { + /// Returns `Ok(true)` if stats were sent, `Ok(false)` if the concentrator had nothing to send. + pub async fn send(&self, force_flush: bool) -> anyhow::Result { let payload = self.flush(force_flush); if payload.stats.is_empty() { - return Ok(()); + return Ok(false); } let body = rmp_serde::encode::to_vec_named(&payload)?; - let mut headers: http::HeaderMap = self.meta.borrow().into(); - + let mut headers = self.meta.to_stats_headers(); headers.insert( http::header::CONTENT_TYPE, libdd_common::header::APPLICATION_MSGPACK, @@ -101,7 +132,7 @@ impl StatsExporter { .await; match result { - Ok(_) => Ok(()), + Ok(_) => Ok(true), Err(err) => { error!(?err, "Error with the StateExporter when sending stats"); anyhow::bail!("Failed to send stats: {err}"); @@ -121,37 +152,37 @@ impl StatsExporter { fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload { let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed); encode_stats_payload( - self.meta.borrow(), + &self.meta, sequence, #[allow(clippy::unwrap_used)] - self.concentrator - .lock() - .unwrap() - .flush(time::SystemTime::now(), force_flush), + self.concentrator.lock().unwrap().flush_buckets(force_flush), ) } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl Worker for StatsExporter { +impl< + H: HttpClientTrait + MaybeSend + Sync + Debug + 'static, + C: FlushableConcentrator + Send + Debug, + > Worker for StatsExporter +{ async fn trigger(&mut self) { tokio::time::sleep(self.flush_interval).await; } /// Flush and send stats on every trigger. async fn run(&mut self) { - let _ = self.send(false).await; + let _ = self.send(false).await; // bool return ignored by Worker } async fn shutdown(&mut self) { - // Force flush all stats on shutdown let _ = self.send(true).await; } } fn encode_stats_payload( - meta: &TracerMetadata, + meta: &StatsMetadata, sequence: u64, buckets: Vec, ) -> pb::ClientStatsPayload { @@ -211,8 +242,8 @@ mod tests { let _ = is_sync::>; } - fn get_test_metadata() -> TracerMetadata { - TracerMetadata { + fn get_test_metadata() -> StatsMetadata { + StatsMetadata { hostname: "libdatadog-test".into(), env: "test".into(), app_version: "0.0.0".into(),