Skip to content
Draft
55 changes: 37 additions & 18 deletions datadog-ipc/src/platform/windows/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

use crate::platform::message::MAX_FDS;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{
cell::RefCell,
future::Future,
Expand Down Expand Up @@ -53,8 +54,8 @@ use windows_sys::Win32::Storage::FileSystem::{
ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
};
use windows_sys::Win32::System::Pipes::{
ConnectNamedPipe, CreateNamedPipeA, PeekNamedPipe, SetNamedPipeHandleState, PIPE_NOWAIT,
PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, PIPE_WAIT,
ConnectNamedPipe, CreateNamedPipeA, PeekNamedPipe, SetNamedPipeHandleState, WaitNamedPipeA,
PIPE_NOWAIT, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, PIPE_WAIT,
};
use windows_sys::Win32::System::Threading::{
CreateEventA, SetEvent, WaitForMultipleObjects, WaitForSingleObject, INFINITE,
Expand Down Expand Up @@ -479,24 +480,42 @@ impl SeqpacketConn {
use winapi::um::winnt::{GENERIC_READ, GENERIC_WRITE};

let name = path_to_null_terminated(path.as_ref());
let h = unsafe {
CreateFileA(
name.as_ptr() as *const i8,
GENERIC_READ | GENERIC_WRITE,
0,
null_mut(),
OPEN_EXISTING,
0, // synchronous, non-overlapped
null_mut(),
)
};
if h == INVALID_HANDLE_VALUE {
let deadline = Instant::now() + Duration::from_secs(30);
let h = loop {
let h = unsafe {
CreateFileA(
name.as_ptr() as *const i8,
GENERIC_READ | GENERIC_WRITE,
0,
null_mut(),
OPEN_EXISTING,
0, // synchronous, non-overlapped
null_mut(),
)
};
if h != INVALID_HANDLE_VALUE {
break h;
}
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) {
return Err(io::ErrorKind::ConnectionRefused.into());
if err.raw_os_error() != Some(ERROR_PIPE_BUSY as i32) {
return Err(err);
}
return Err(err);
}
// All pipe instances are busy — block up to 5 s (capped at remaining budget)
// for one to become available, then retry CreateFileA.
let remaining_ms = deadline
.saturating_duration_since(Instant::now())
.as_millis();
if remaining_ms == 0 {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"named pipe busy for 30 s: giving up",
));
}
let wait_ms = remaining_ms.min(5000) as u32;
if unsafe { WaitNamedPipeA(name.as_ptr(), wait_ms) } == 0 {
return Err(io::Error::last_os_error());
}
};

// Upgrade to message read-mode.
let mode = PIPE_READMODE_MESSAGE;
Expand Down
4 changes: 0 additions & 4 deletions datadog-sidecar-ffi/tests/sidecar.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/

Check failure on line 1 in datadog-sidecar-ffi/tests/sidecar.rs

View workflow job for this annotation

GitHub Actions / [windows-latest:] test report

datadog-sidecar-ffi/tests/sidecar.rs.test_ddog_sidecar_register_app

test_ddog_sidecar_register_app
// SPDX-License-Identifier: Apache-2.0
use datadog_sidecar_ffi::*;

Expand Down Expand Up @@ -68,10 +68,6 @@
}

#[test]
#[cfg_attr(
target_os = "windows",
ignore = "APMSP-2356 Investigate flakiness on Windows"
)]
#[cfg_attr(miri, ignore)]
fn test_ddog_sidecar_register_app() {
set_sidecar_per_process();
Expand Down
10 changes: 9 additions & 1 deletion datadog-sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,15 @@ sendfd = { version = "0.4", features = ["tokio"] }
[target.'cfg(windows)'.dependencies]
libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false }
libdd-crashtracker-ffi = { path = "../libdd-crashtracker-ffi", default-features = false, features = ["collector", "collector_windows"] }
winapi = { version = "0.3.9", features = ["securitybaseapi", "sddl", "winerror", "winbase"] }
winapi = { version = "0.3.9", features = [
"securitybaseapi",
"sddl",
"winerror",
"winbase",
"synchapi",
"handleapi",
"processthreadsapi",
] }
windows-sys = { version = "0.52.0", features = ["Win32_System_SystemInformation"] }

[target.'cfg(windows_seh_wrapper)'.dependencies]
Expand Down
17 changes: 11 additions & 6 deletions datadog-sidecar/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ use crate::tracer::SHM_LIMITER;
use crate::watchdog::Watchdog;
use crate::{ddog_daemon_entry_point, setup_daemon_process};

/// Callable returned by `setup_daemon_process` that blocks until the child sidecar
/// signals it is ready to accept connections.
pub type ReadinessWaiter = Box<dyn FnOnce() -> io::Result<()>>;

/// Configuration for main_loop behavior
pub struct MainLoopConfig {
pub enable_ctrl_c_handler: bool,
Expand Down Expand Up @@ -256,7 +260,7 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> {
}
spawn_cfg.append_env("LSAN_OPTIONS", "detect_leaks=0");

setup_daemon_process(listener, &mut spawn_cfg)?;
let wait_ready = setup_daemon_process(listener, &mut spawn_cfg)?;

let mut lib_deps = cfg.library_dependencies;
if let Some(appsec) = cfg.appsec_config.as_ref() {
Expand All @@ -271,6 +275,8 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> {
.map_err(io::Error::other)
.context("Could not spawn the sidecar daemon")?;

wait_ready().map_err(|e| anyhow::anyhow!("Sidecar failed to signal readiness: {e}"))?;

Ok(())
}

Expand Down Expand Up @@ -298,9 +304,8 @@ pub fn start_or_connect_to_sidecar(cfg: Config) -> anyhow::Result<SidecarTranspo
err => err.context("Error starting sidecar").err(),
};

Ok(SidecarTransport::from(
liaison
.connect_to_server()
.map_err(|e| err.unwrap_or(e.into()))?,
))
liaison
.connect_to_server()
.map(SidecarTransport::from)
.map_err(|e| err.unwrap_or(e.into()))
}
7 changes: 5 additions & 2 deletions datadog-sidecar/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,17 @@ async fn accept_socket_loop(
pub fn setup_daemon_process(
listener: SeqpacketListener,
spawn_cfg: &mut SpawnWorker,
) -> io::Result<()> {
) -> io::Result<crate::ReadinessWaiter> {
spawn_cfg
.daemonize(true)
.process_name("datadog-ipc-helper")
.pass_fd(unsafe { OwnedFd::from_raw_fd(listener.into_raw_fd()) })
.stdin(Stdio::Null);

Ok(())
// Unix domain sockets have a kernel-level connection backlog: clients can connect
// successfully as soon as bind()+listen() have been called, even before the sidecar
// has called accept(). No readiness wait is needed on Unix.
Ok(Box::new(|| Ok(())))
}

pub fn primary_sidecar_identifier() -> u32 {
Expand Down
106 changes: 99 additions & 7 deletions datadog-sidecar/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,37 @@ use std::ffi::CStr;
use std::io::{self, Error};
use std::os::windows::io::{FromRawHandle, IntoRawHandle, OwnedHandle};
use std::ptr::null_mut;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::LazyLock;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::select;
use tracing::{error, info};
use winapi::um::processthreadsapi::GetCurrentProcessId;
use winapi::um::synchapi::{CreateEventA, OpenEventA, SetEvent, WaitForSingleObject};
use winapi::um::winbase::WAIT_OBJECT_0;
use winapi::um::winnt::HANDLE;
use winapi::{
shared::{sddl::ConvertSidToStringSidA, winerror::ERROR_INSUFFICIENT_BUFFER},
shared::{
sddl::ConvertSidToStringSidA,
winerror::{ERROR_INSUFFICIENT_BUFFER, WAIT_TIMEOUT},
},
um::{
handleapi::CloseHandle,
processthreadsapi::{GetCurrentProcess, OpenProcessToken},
securitybaseapi::{GetSidSubAuthority, GetSidSubAuthorityCount, GetTokenInformation},
winbase::LocalFree,
winnt::{TokenIntegrityLevel, TokenUser, TOKEN_MANDATORY_LABEL, TOKEN_QUERY, TOKEN_USER},
winnt::{
TokenIntegrityLevel, TokenUser, EVENT_MODIFY_STATE, TOKEN_MANDATORY_LABEL, TOKEN_QUERY,
TOKEN_USER,
},
},
};

/// Monotonic counter so concurrent `setup_daemon_process` calls (e.g. in tests)
/// always get a unique readiness-event name within the same process.
static SPAWN_SEQ: AtomicU64 = AtomicU64::new(0);

/// cbindgen:ignore
#[no_mangle]
pub extern "C" fn ddog_daemon_entry_point(_trampoline_data: &TrampolineData) {
Expand Down Expand Up @@ -63,6 +77,9 @@ pub extern "C" fn ddog_daemon_entry_point(_trampoline_data: &TrampolineData) {
}
};

// Notify the parent that we are ready to accept connections.
signal_sidecar_ready();

Ok((
|handler| accept_socket_loop(listener, closed_future, handler),
cancel,
Expand Down Expand Up @@ -98,12 +115,73 @@ async fn accept_socket_loop(
Ok(())
}

/// Signal the parent process that the sidecar is ready to accept connections.
///
/// The parent's `wait_ready` closure (returned by `setup_daemon_process`) blocks on the
/// named event until this function fires. If the env var is absent (thread mode, tests)
/// this is a no-op.
fn signal_sidecar_ready() {
if let Ok(name) = std::env::var("DD_SIDECAR_READY_EVENT") {
if let Ok(name_c) = std::ffi::CString::new(name) {
unsafe {
let h = OpenEventA(EVENT_MODIFY_STATE, 0, name_c.as_ptr());
if !h.is_null() {
SetEvent(h);
CloseHandle(h);
} else {
tracing::warn!(
"signal_sidecar_ready: OpenEventA failed: {:?}",
io::Error::last_os_error()
);
}
}
}
}
}

/// RAII wrapper that closes a Windows event handle on drop, ensuring no handle
/// leak even if the `ReadinessWaiter` closure is dropped without being called
/// (e.g. when `wait_spawn()` returns an error).
struct OwnedEvent(HANDLE);

// SAFETY: A Windows HANDLE is an opaque process-global integer (pointer-sized
// for historical reasons). Any thread within the process can call CloseHandle /
// WaitForSingleObject on the same handle value, so transferring ownership across
// threads is safe.
unsafe impl Send for OwnedEvent {}

impl Drop for OwnedEvent {
fn drop(&mut self) {
unsafe { CloseHandle(self.0) };
}
}

pub fn setup_daemon_process(
listener: SeqpacketListener,
spawn_cfg: &mut SpawnWorker,
) -> io::Result<()> {
// Ensure unique process names - we spawn one sidecar per console session id (see
// setup/windows.rs for the reasoning)
) -> io::Result<crate::ReadinessWaiter> {
// Create a named manual-reset event that the child sidecar will signal once its
// Tokio runtime is running and it is about to enter the accept loop.
//
// The name includes the PID and a monotonic sequence number so that concurrent
// calls from the same process (e.g. in parallel tests) use distinct events.
let pid = unsafe { GetCurrentProcessId() };
let seq = SPAWN_SEQ.fetch_add(1, Ordering::Relaxed);
let event_name = format!("Local\\dd-sidecar-ready-{}-{}", pid, seq);
let event_name_c = std::ffi::CString::new(event_name.as_str())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let event = unsafe {
CreateEventA(
null_mut(), // default security attributes
1, // manual-reset
0, // initially non-signaled
event_name_c.as_ptr(), // CString::as_ptr() → *const c_char, no extra cast needed
)
};
if event.is_null() {
return Err(io::Error::last_os_error());
}

let raw = listener.into_raw_handle();
let owned = unsafe { OwnedHandle::from_raw_handle(raw) };
spawn_cfg
Expand All @@ -113,8 +191,22 @@ pub fn setup_daemon_process(
))
.pass_handle(owned)
.stdin(Stdio::Null);

Ok(())
spawn_cfg.append_env("DD_SIDECAR_READY_EVENT", &event_name);

// OwnedEvent ensures CloseHandle is called whether the closure is called or dropped.
let guard = OwnedEvent(event);
Ok(Box::new(move || {
let result = unsafe { WaitForSingleObject(guard.0, 30_000) };
// `guard` is dropped here (or when the Box itself is dropped), closing the handle.
match result {
WAIT_OBJECT_0 => Ok(()),
WAIT_TIMEOUT => Err(io::Error::new(
io::ErrorKind::TimedOut,
"sidecar did not signal readiness within 30 s",
)),
_ => Err(io::Error::last_os_error()), // WAIT_FAILED
}
}))
}

pub fn ddog_setup_crashtracking(endpoint: Option<&Endpoint>, metadata: Metadata) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-utils/src/otlp_encoder/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ fn map_span<T: TraceData>(span: &Span<T>, resource_service: &str) -> OtlpSpan {
}

fn map_span_link<T: TraceData>(link: &SpanLink<T>) -> OtlpSpanLink {
let trace_id_128 = (link.trace_id_high as u128) << 64 | (link.trace_id as u128);
let trace_id_128 = ((link.trace_id_high as u128) << 64) | (link.trace_id as u128);
let trace_id_hex = format!("{:032x}", trace_id_128);
let span_id_hex = format!("{:016x}", link.span_id);
let trace_state = if link.tracestate.borrow().is_empty() {
Expand Down
Loading