diff --git a/datadog-ipc/src/platform/windows/sockets.rs b/datadog-ipc/src/platform/windows/sockets.rs index f0b6e007ee..5b1f9d2df0 100644 --- a/datadog-ipc/src/platform/windows/sockets.rs +++ b/datadog-ipc/src/platform/windows/sockets.rs @@ -24,6 +24,7 @@ use crate::platform::message::MAX_FDS; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use std::{ cell::RefCell, future::Future, @@ -53,8 +54,8 @@ use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, }; use windows_sys::Win32::System::Pipes::{ - ConnectNamedPipe, CreateNamedPipeA, PeekNamedPipe, SetNamedPipeHandleState, PIPE_NOWAIT, - PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, PIPE_WAIT, + ConnectNamedPipe, CreateNamedPipeA, PeekNamedPipe, SetNamedPipeHandleState, WaitNamedPipeA, + PIPE_NOWAIT, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, PIPE_WAIT, }; use windows_sys::Win32::System::Threading::{ CreateEventA, SetEvent, WaitForMultipleObjects, WaitForSingleObject, INFINITE, @@ -479,24 +480,42 @@ impl SeqpacketConn { use winapi::um::winnt::{GENERIC_READ, GENERIC_WRITE}; let name = path_to_null_terminated(path.as_ref()); - let h = unsafe { - CreateFileA( - name.as_ptr() as *const i8, - GENERIC_READ | GENERIC_WRITE, - 0, - null_mut(), - OPEN_EXISTING, - 0, // synchronous, non-overlapped - null_mut(), - ) - }; - if h == INVALID_HANDLE_VALUE { + let deadline = Instant::now() + Duration::from_secs(30); + let h = loop { + let h = unsafe { + CreateFileA( + name.as_ptr() as *const i8, + GENERIC_READ | GENERIC_WRITE, + 0, + null_mut(), + OPEN_EXISTING, + 0, // synchronous, non-overlapped + null_mut(), + ) + }; + if h != INVALID_HANDLE_VALUE { + break h; + } let err = io::Error::last_os_error(); - if err.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) { - return Err(io::ErrorKind::ConnectionRefused.into()); + if err.raw_os_error() != Some(ERROR_PIPE_BUSY as i32) { + return Err(err); } - return Err(err); - } + // All pipe instances are busy — block up to 5 s (capped at remaining budget) + // for one to become available, then retry CreateFileA. + let remaining_ms = deadline + .saturating_duration_since(Instant::now()) + .as_millis(); + if remaining_ms == 0 { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + "named pipe busy for 30 s: giving up", + )); + } + let wait_ms = remaining_ms.min(5000) as u32; + if unsafe { WaitNamedPipeA(name.as_ptr(), wait_ms) } == 0 { + return Err(io::Error::last_os_error()); + } + }; // Upgrade to message read-mode. let mode = PIPE_READMODE_MESSAGE; diff --git a/datadog-sidecar-ffi/tests/sidecar.rs b/datadog-sidecar-ffi/tests/sidecar.rs index 1536995d2b..e086b89694 100644 --- a/datadog-sidecar-ffi/tests/sidecar.rs +++ b/datadog-sidecar-ffi/tests/sidecar.rs @@ -68,10 +68,6 @@ fn test_ddog_sidecar_connection() { } #[test] -#[cfg_attr( - target_os = "windows", - ignore = "APMSP-2356 Investigate flakiness on Windows" -)] #[cfg_attr(miri, ignore)] fn test_ddog_sidecar_register_app() { set_sidecar_per_process(); diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index e0837e8f37..91bf7fb866 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -95,7 +95,15 @@ sendfd = { version = "0.4", features = ["tokio"] } [target.'cfg(windows)'.dependencies] libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false } libdd-crashtracker-ffi = { path = "../libdd-crashtracker-ffi", default-features = false, features = ["collector", "collector_windows"] } -winapi = { version = "0.3.9", features = ["securitybaseapi", "sddl", "winerror", "winbase"] } +winapi = { version = "0.3.9", features = [ + "securitybaseapi", + "sddl", + "winerror", + "winbase", + "synchapi", + "handleapi", + "processthreadsapi", +] } windows-sys = { version = "0.52.0", features = ["Win32_System_SystemInformation"] } [target.'cfg(windows_seh_wrapper)'.dependencies] diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index 4da1491bd7..69166c30ed 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -31,6 +31,10 @@ use crate::tracer::SHM_LIMITER; use crate::watchdog::Watchdog; use crate::{ddog_daemon_entry_point, setup_daemon_process}; +/// Callable returned by `setup_daemon_process` that blocks until the child sidecar +/// signals it is ready to accept connections. +pub type ReadinessWaiter = Box io::Result<()>>; + /// Configuration for main_loop behavior pub struct MainLoopConfig { pub enable_ctrl_c_handler: bool, @@ -256,7 +260,7 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> { } spawn_cfg.append_env("LSAN_OPTIONS", "detect_leaks=0"); - setup_daemon_process(listener, &mut spawn_cfg)?; + let wait_ready = setup_daemon_process(listener, &mut spawn_cfg)?; let mut lib_deps = cfg.library_dependencies; if let Some(appsec) = cfg.appsec_config.as_ref() { @@ -271,6 +275,8 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> { .map_err(io::Error::other) .context("Could not spawn the sidecar daemon")?; + wait_ready().map_err(|e| anyhow::anyhow!("Sidecar failed to signal readiness: {e}"))?; + Ok(()) } @@ -298,9 +304,8 @@ pub fn start_or_connect_to_sidecar(cfg: Config) -> anyhow::Result err.context("Error starting sidecar").err(), }; - Ok(SidecarTransport::from( - liaison - .connect_to_server() - .map_err(|e| err.unwrap_or(e.into()))?, - )) + liaison + .connect_to_server() + .map(SidecarTransport::from) + .map_err(|e| err.unwrap_or(e.into())) } diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index be42b52d49..31bf062f34 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -146,14 +146,17 @@ async fn accept_socket_loop( pub fn setup_daemon_process( listener: SeqpacketListener, spawn_cfg: &mut SpawnWorker, -) -> io::Result<()> { +) -> io::Result { spawn_cfg .daemonize(true) .process_name("datadog-ipc-helper") .pass_fd(unsafe { OwnedFd::from_raw_fd(listener.into_raw_fd()) }) .stdin(Stdio::Null); - Ok(()) + // Unix domain sockets have a kernel-level connection backlog: clients can connect + // successfully as soon as bind()+listen() have been called, even before the sidecar + // has called accept(). No readiness wait is needed on Unix. + Ok(Box::new(|| Ok(()))) } pub fn primary_sidecar_identifier() -> u32 { diff --git a/datadog-sidecar/src/windows.rs b/datadog-sidecar/src/windows.rs index 9bebe20d1f..c552f050ee 100644 --- a/datadog-sidecar/src/windows.rs +++ b/datadog-sidecar/src/windows.rs @@ -15,23 +15,37 @@ use std::ffi::CStr; use std::io::{self, Error}; use std::os::windows::io::{FromRawHandle, IntoRawHandle, OwnedHandle}; use std::ptr::null_mut; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::LazyLock; use std::sync::{Arc, Mutex}; use std::time::Instant; use tokio::select; use tracing::{error, info}; +use winapi::um::processthreadsapi::GetCurrentProcessId; +use winapi::um::synchapi::{CreateEventA, OpenEventA, SetEvent, WaitForSingleObject}; +use winapi::um::winbase::WAIT_OBJECT_0; use winapi::um::winnt::HANDLE; use winapi::{ - shared::{sddl::ConvertSidToStringSidA, winerror::ERROR_INSUFFICIENT_BUFFER}, + shared::{ + sddl::ConvertSidToStringSidA, + winerror::{ERROR_INSUFFICIENT_BUFFER, WAIT_TIMEOUT}, + }, um::{ handleapi::CloseHandle, processthreadsapi::{GetCurrentProcess, OpenProcessToken}, securitybaseapi::{GetSidSubAuthority, GetSidSubAuthorityCount, GetTokenInformation}, winbase::LocalFree, - winnt::{TokenIntegrityLevel, TokenUser, TOKEN_MANDATORY_LABEL, TOKEN_QUERY, TOKEN_USER}, + winnt::{ + TokenIntegrityLevel, TokenUser, EVENT_MODIFY_STATE, TOKEN_MANDATORY_LABEL, TOKEN_QUERY, + TOKEN_USER, + }, }, }; +/// Monotonic counter so concurrent `setup_daemon_process` calls (e.g. in tests) +/// always get a unique readiness-event name within the same process. +static SPAWN_SEQ: AtomicU64 = AtomicU64::new(0); + /// cbindgen:ignore #[no_mangle] pub extern "C" fn ddog_daemon_entry_point(_trampoline_data: &TrampolineData) { @@ -63,6 +77,9 @@ pub extern "C" fn ddog_daemon_entry_point(_trampoline_data: &TrampolineData) { } }; + // Notify the parent that we are ready to accept connections. + signal_sidecar_ready(); + Ok(( |handler| accept_socket_loop(listener, closed_future, handler), cancel, @@ -98,12 +115,73 @@ async fn accept_socket_loop( Ok(()) } +/// Signal the parent process that the sidecar is ready to accept connections. +/// +/// The parent's `wait_ready` closure (returned by `setup_daemon_process`) blocks on the +/// named event until this function fires. If the env var is absent (thread mode, tests) +/// this is a no-op. +fn signal_sidecar_ready() { + if let Ok(name) = std::env::var("DD_SIDECAR_READY_EVENT") { + if let Ok(name_c) = std::ffi::CString::new(name) { + unsafe { + let h = OpenEventA(EVENT_MODIFY_STATE, 0, name_c.as_ptr()); + if !h.is_null() { + SetEvent(h); + CloseHandle(h); + } else { + tracing::warn!( + "signal_sidecar_ready: OpenEventA failed: {:?}", + io::Error::last_os_error() + ); + } + } + } + } +} + +/// RAII wrapper that closes a Windows event handle on drop, ensuring no handle +/// leak even if the `ReadinessWaiter` closure is dropped without being called +/// (e.g. when `wait_spawn()` returns an error). +struct OwnedEvent(HANDLE); + +// SAFETY: A Windows HANDLE is an opaque process-global integer (pointer-sized +// for historical reasons). Any thread within the process can call CloseHandle / +// WaitForSingleObject on the same handle value, so transferring ownership across +// threads is safe. +unsafe impl Send for OwnedEvent {} + +impl Drop for OwnedEvent { + fn drop(&mut self) { + unsafe { CloseHandle(self.0) }; + } +} + pub fn setup_daemon_process( listener: SeqpacketListener, spawn_cfg: &mut SpawnWorker, -) -> io::Result<()> { - // Ensure unique process names - we spawn one sidecar per console session id (see - // setup/windows.rs for the reasoning) +) -> io::Result { + // Create a named manual-reset event that the child sidecar will signal once its + // Tokio runtime is running and it is about to enter the accept loop. + // + // The name includes the PID and a monotonic sequence number so that concurrent + // calls from the same process (e.g. in parallel tests) use distinct events. + let pid = unsafe { GetCurrentProcessId() }; + let seq = SPAWN_SEQ.fetch_add(1, Ordering::Relaxed); + let event_name = format!("Local\\dd-sidecar-ready-{}-{}", pid, seq); + let event_name_c = std::ffi::CString::new(event_name.as_str()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let event = unsafe { + CreateEventA( + null_mut(), // default security attributes + 1, // manual-reset + 0, // initially non-signaled + event_name_c.as_ptr(), // CString::as_ptr() → *const c_char, no extra cast needed + ) + }; + if event.is_null() { + return Err(io::Error::last_os_error()); + } + let raw = listener.into_raw_handle(); let owned = unsafe { OwnedHandle::from_raw_handle(raw) }; spawn_cfg @@ -113,8 +191,22 @@ pub fn setup_daemon_process( )) .pass_handle(owned) .stdin(Stdio::Null); - - Ok(()) + spawn_cfg.append_env("DD_SIDECAR_READY_EVENT", &event_name); + + // OwnedEvent ensures CloseHandle is called whether the closure is called or dropped. + let guard = OwnedEvent(event); + Ok(Box::new(move || { + let result = unsafe { WaitForSingleObject(guard.0, 30_000) }; + // `guard` is dropped here (or when the Box itself is dropped), closing the handle. + match result { + WAIT_OBJECT_0 => Ok(()), + WAIT_TIMEOUT => Err(io::Error::new( + io::ErrorKind::TimedOut, + "sidecar did not signal readiness within 30 s", + )), + _ => Err(io::Error::last_os_error()), // WAIT_FAILED + } + })) } pub fn ddog_setup_crashtracking(endpoint: Option<&Endpoint>, metadata: Metadata) -> bool { diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs index baa4ac6a89..399aa8cd88 100644 --- a/libdd-trace-utils/src/otlp_encoder/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -173,7 +173,7 @@ fn map_span(span: &Span, resource_service: &str) -> OtlpSpan { } fn map_span_link(link: &SpanLink) -> OtlpSpanLink { - let trace_id_128 = (link.trace_id_high as u128) << 64 | (link.trace_id as u128); + let trace_id_128 = ((link.trace_id_high as u128) << 64) | (link.trace_id as u128); let trace_id_hex = format!("{:032x}", trace_id_128); let span_id_hex = format!("{:016x}", link.span_id); let trace_state = if link.tracestate.borrow().is_empty() {