Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ee60d29
feat(cli): Add standalone --server and --client modes with full repor…
sberg-rh Apr 7, 2026
fca0f61
feat: Support send_delay and include_first_message in standalone client
sberg-rh Apr 7, 2026
3f5726a
perf: Eliminate per-message heap allocation in standalone client
sberg-rh Apr 7, 2026
33d85d0
feat: Add concurrency support for standalone client/server
sberg-rh Apr 7, 2026
d4ed3a4
test: Add coverage for concurrency, from_stream, and aggregation
sberg-rh Apr 8, 2026
1542f4b
fix: Add async multi-accept server and remove unused parameter
sberg-rh Apr 8, 2026
47848ac
fix: Use OS-assigned ports in tests and document grace period
sberg-rh Apr 8, 2026
02cf1c4
fix: Set streams to blocking mode after tokio into_std conversion
sberg-rh Apr 8, 2026
8a09124
test: Strengthen test assertions for real-world correctness
sberg-rh Apr 8, 2026
7610244
fix: Address round 2 review feedback
sberg-rh Apr 9, 2026
9d336b4
test: Add real-world scenario tests for coverage and correctness
sberg-rh Apr 9, 2026
e71b30e
fix: Set accepted streams to blocking mode in multi-accept servers
sberg-rh Apr 9, 2026
fa5b31f
fix: Address review feedback - grace period, quiet flag, mutex safety
sberg-rh Apr 14, 2026
b427894
fix: Address review feedback - multiple correctness and quality fixes
sberg-rh Apr 14, 2026
551d9f5
feat: Add receive_blocking_timed for accurate one-way latency
sberg-rh Apr 14, 2026
3e17727
fix: Update MessageLatencyRecord calls for PR #105 compatibility
sberg-rh Apr 14, 2026
f9b31ab
refactor: Move standalone logic from main.rs to library crate
sberg-rh Apr 15, 2026
dc2a1fe
fix: Improve server resilience and error visibility
sberg-rh Apr 15, 2026
90d00fc
test: Comprehensive coverage improvements for standalone modules
sberg-rh Apr 15, 2026
15f9c92
perf: Reduce multi-accept server polling interval from 50ms to 5ms
sberg-rh Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/blocking_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ fn main() -> Result<()> {
buffer_size: None,
pmq_priority: 0,

// Standalone mode flags
server: false,
client: false,

// Internal flag (not for external use)
internal_run_as_server: false,

Expand Down
2 changes: 2 additions & 0 deletions examples/blocking_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ fn create_args(
client_affinity: None,
buffer_size: None,
pmq_priority: 0,
server: false,
client: false,
internal_run_as_server: false,
verbose: 0,
quiet: false,
Expand Down
2 changes: 2 additions & 0 deletions src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ impl BenchmarkConfig {
/// # pmq_priority: 0,
/// # include_first_message: false,
/// # blocking: false,
/// # server: false,
/// # client: false,
/// # internal_run_as_server: false,
/// # socket_path: None,
/// # shared_memory_name: None,
Expand Down
141 changes: 128 additions & 13 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const TIMING: &str = "Timing";
const CONCURRENCY: &str = "Concurrency";
const OUTPUT_AND_LOGGING: &str = "Output and Logging";
const ADVANCED: &str = "Advanced";
const STANDALONE: &str = "Standalone Mode";

/// Returns the default IPC mechanism based on the target OS.
///
Expand Down Expand Up @@ -366,27 +367,87 @@ pub struct Args {
#[arg(long, default_value_t = false, help_heading = ADVANCED)]
pub shm_direct: bool,

/// (Internal) Run the process in server-only mode.
/// Run in standalone server mode.
///
/// This is a hidden flag used by the benchmark runner to spawn a child
/// process that acts only as the server. It is not intended for direct
/// use by users.
#[arg(long, hide = true)]
pub internal_run_as_server: bool,
/// Starts the process as a server that listens for incoming client
/// connections. The server waits indefinitely for a client to connect,
/// receives messages, and responds to round-trip requests.
///
/// For TCP and UDS, the server accepts multiple concurrent connections
/// (one handler thread per client). After the first client connects,
/// the server waits 2 seconds before checking if all clients have
/// disconnected. Additional clients should connect within this window.
///
/// This enables running the server and client as independent processes,
/// potentially on different hosts or in different environments (e.g.,
/// host and container).
///
/// Mutually exclusive with --client.
///
/// # Examples
///
/// ```bash
/// # Terminal 1: start server
/// ipc-benchmark --server -m uds --blocking
///
/// # Terminal 2: start client
/// ipc-benchmark --client -m uds --blocking -i 10000
/// ```
#[arg(long, conflicts_with = "client", help_heading = STANDALONE)]
pub server: bool,

// --- Transport-specific arguments for internal use ---
/// (Internal) Specifies the exact socket path for UDS.
#[arg(long, hide = true)]
/// Run in standalone client mode.
///
/// Connects to an already-running server and executes the benchmark
/// workload. If the server is not yet available, the client retries
/// the connection with backoff for up to 30 seconds before failing.
///
/// Mutually exclusive with --server.
///
/// # Examples
///
/// ```bash
/// # Terminal 1: start server
/// ipc-benchmark --server -m tcp --blocking
///
/// # Terminal 2: start client (retries until server is ready)
/// ipc-benchmark --client -m tcp --blocking -i 5000
/// ```
#[arg(long, conflicts_with = "server", help_heading = STANDALONE)]
pub client: bool,

/// Socket path for Unix Domain Sockets in standalone mode.
///
/// Specifies the filesystem path for the UDS socket. Both server
/// and client must use the same path. If not specified, a default
/// path in the system temp directory is used.
#[arg(long, help_heading = STANDALONE)]
pub socket_path: Option<String>,

/// (Internal) Specifies the exact name for Shared Memory.
#[arg(long, hide = true)]
/// Shared memory segment name for standalone mode.
///
/// Specifies the name of the POSIX shared memory segment. Both
/// server and client must use the same name. If not specified,
/// defaults to "ipc_benchmark_shm".
#[arg(long, help_heading = STANDALONE)]
pub shared_memory_name: Option<String>,

/// (Internal) Specifies the exact name for the POSIX Message Queue.
#[arg(long, hide = true)]
/// POSIX Message Queue name for standalone mode.
///
/// Specifies the name of the message queue. Both server and client
/// must use the same name. If not specified, defaults to
/// "ipc_benchmark_pmq".
#[arg(long, help_heading = STANDALONE)]
pub message_queue_name: Option<String>,

/// (Internal) Run the process in server-only mode.
///
/// This is a hidden flag used by the benchmark runner to spawn a child
/// process that acts only as the server. It is not intended for direct
/// use by users.
#[arg(long, hide = true)]
pub internal_run_as_server: bool,

/// (Internal) File path for server to write measured latencies.
///
/// Used internally when spawning the server process to communicate
Expand Down Expand Up @@ -993,4 +1054,58 @@ mod tests {
assert_eq!(args.message_size, 1024);
assert_eq!(args.msg_count, 1000);
}

#[test]
fn test_server_flag() {
let args = Args::parse_from(["ipc-benchmark", "--server", "-m", "tcp"]);
assert!(args.server);
assert!(!args.client);
}

#[test]
fn test_client_flag() {
let args = Args::parse_from(["ipc-benchmark", "--client", "-m", "tcp"]);
assert!(args.client);
assert!(!args.server);
}

#[test]
fn test_server_client_mutually_exclusive() {
let result = Args::try_parse_from(["ipc-benchmark", "--server", "--client", "-m", "tcp"]);
assert!(result.is_err(), "--server and --client should conflict");
}

#[test]
fn test_standalone_defaults() {
let args = Args::parse_from(["ipc-benchmark", "-m", "tcp"]);
assert!(!args.server);
assert!(!args.client);
}

#[test]
fn test_standalone_endpoint_flags() {
let args = Args::parse_from([
"ipc-benchmark",
"--server",
"-m",
"tcp",
"--socket-path",
"/tmp/test.sock",
"--shared-memory-name",
"my_shm",
"--message-queue-name",
"my_pmq",
]);
assert!(args.server);
assert_eq!(args.socket_path, Some("/tmp/test.sock".to_string()));
assert_eq!(args.shared_memory_name, Some("my_shm".to_string()));
assert_eq!(args.message_queue_name, Some("my_pmq".to_string()));
}

#[test]
fn test_server_with_blocking() {
let args = Args::parse_from(["ipc-benchmark", "--server", "-m", "tcp", "--blocking"]);
assert!(args.server);
assert!(args.blocking);
}
}
15 changes: 15 additions & 0 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,21 @@ pub trait BlockingTransport: Send {
/// timeout mechanisms if needed (SO_RCVTIMEO on sockets, etc).
fn receive_blocking(&mut self) -> Result<Message>;

/// Receive a message and capture a monotonic timestamp immediately
/// after the raw bytes are read but before deserialization.
///
/// This provides more accurate one-way latency measurement by
/// excluding deserialization overhead from the receive timestamp.
///
/// The default implementation captures the timestamp after
/// `receive_blocking()` returns (including deserialization).
/// Transport implementations should override this to place the
/// timestamp between raw I/O and deserialization.
fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
let msg = self.receive_blocking()?;
Ok((msg, get_monotonic_time_ns()))
}

/// Close the transport and release resources.
///
/// This method cleanly shuts down the transport, closing connections
Expand Down
36 changes: 36 additions & 0 deletions src/ipc/shared_memory_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,42 @@ impl BlockingTransport for BlockingSharedMemory {
Ok(message)
}

fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
self.ensure_peer_ready()?;

let ring_buffer = self.ring_buffer.ok_or_else(|| {
anyhow!(
"Cannot receive: shared memory not initialized. \
Call start_server_blocking() or start_client_blocking() first."
)
})?;

#[cfg(unix)]
let data = unsafe { (*ring_buffer).read_data_blocking()? };

#[cfg(not(unix))]
let data = loop {
match unsafe { (*ring_buffer).read_data() } {
Ok(d) => break d,
Err(_) => {
if unsafe { (*ring_buffer).shutdown.load(Ordering::Acquire) } {
return Err(anyhow!("Connection closed"));
}
thread::yield_now();
thread::sleep(Duration::from_micros(100));
}
}
};

// Capture timestamp after raw read, before deserialization
let receive_time_ns = crate::ipc::get_monotonic_time_ns();

let message: Message =
bincode::deserialize(&data).context("Failed to deserialize message")?;

Ok((message, receive_time_ns))
}

fn close_blocking(&mut self) -> Result<()> {
debug!("Closing blocking shared memory transport");

Expand Down
9 changes: 9 additions & 0 deletions src/ipc/shared_memory_direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,15 @@ impl BlockingTransport for BlockingSharedMemoryDirect {
Ok(message)
}

fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
// SHM-direct has no deserialization (direct memcpy), so the
// timestamp is captured immediately after the data read and
// before mutex unlock/signal. This uses the default implementation
// since there's no meaningful deserialization to exclude.
let msg = self.receive_blocking()?;
Ok((msg, crate::ipc::get_monotonic_time_ns()))
}

fn close_blocking(&mut self) -> Result<()> {
debug!("Closing direct memory SHM transport");

Expand Down
47 changes: 47 additions & 0 deletions src/ipc/tcp_socket_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ impl BlockingTcpSocket {
}
}

/// Create a transport from a pre-accepted TCP stream.
///
/// Used by the standalone server to wrap each accepted connection
/// in its own transport instance for per-client handler threads.
pub fn from_stream(stream: TcpStream) -> Self {
Self {
listener: None,
stream: Some(stream),
}
}

/// Accept a connection if we haven't already.
/// This is called automatically on first send/receive in server mode.
fn ensure_connection(&mut self) -> Result<()> {
Expand Down Expand Up @@ -293,6 +304,42 @@ impl BlockingTransport for BlockingTcpSocket {
Ok(message)
}

fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
self.ensure_connection()?;

let stream = self.stream.as_mut().context(
"Cannot receive: socket not connected. \
Call start_server_blocking() or start_client_blocking() first.",
)?;

let mut len_bytes = [0u8; 4];
stream.read_exact(&mut len_bytes).context(
"Failed to read message length. \
Connection may be closed or peer disconnected.",
)?;
let len = u32::from_le_bytes(len_bytes) as usize;
if len == 0 || len > Self::MAX_MESSAGE_SIZE {
return Err(anyhow!(
"Invalid message length: {} bytes (allowed: 1..={})",
len,
Self::MAX_MESSAGE_SIZE
));
}

let mut buffer = vec![0u8; len];
stream
.read_exact(&mut buffer)
.context("Failed to read message data")?;

// Capture timestamp after raw read, before deserialization
let receive_time_ns = crate::ipc::get_monotonic_time_ns();

let message: Message =
bincode::deserialize(&buffer).context("Failed to deserialize message")?;

Ok((message, receive_time_ns))
}

fn close_blocking(&mut self) -> Result<()> {
debug!("Closing blocking TCP transport");

Expand Down
47 changes: 47 additions & 0 deletions src/ipc/unix_domain_socket_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ impl BlockingUnixDomainSocket {
}
}

/// Create a transport from a pre-accepted Unix stream.
///
/// Used by the standalone server to wrap each accepted connection
/// in its own transport instance for per-client handler threads.
pub fn from_stream(stream: UnixStream) -> Self {
Self {
listener: None,
stream: Some(stream),
}
}

/// Accept a connection if we haven't already.
/// This is called automatically on first send/receive in server mode.
fn ensure_connection(&mut self) -> Result<()> {
Expand Down Expand Up @@ -334,6 +345,42 @@ impl BlockingTransport for BlockingUnixDomainSocket {
Ok(message)
}

fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
self.ensure_connection()?;

let stream = self.stream.as_mut().context(
"Cannot receive: socket not connected. \
Call start_server_blocking() or start_client_blocking() first.",
)?;

let mut len_bytes = [0u8; 4];
stream.read_exact(&mut len_bytes).context(
"Failed to read message length. \
Connection may be closed or peer disconnected.",
)?;
let len = u32::from_le_bytes(len_bytes) as usize;
if len == 0 || len > Self::MAX_MESSAGE_SIZE {
return Err(anyhow!(
"Invalid message length: {} bytes (allowed: 1..={})",
len,
Self::MAX_MESSAGE_SIZE
));
}

let mut buffer = vec![0u8; len];
stream
.read_exact(&mut buffer)
.context("Failed to read message data")?;

// Capture timestamp after raw read, before deserialization
let receive_time_ns = crate::ipc::get_monotonic_time_ns();

let message: Message =
bincode::deserialize(&buffer).context("Failed to deserialize message")?;

Ok((message, receive_time_ns))
}

fn close_blocking(&mut self) -> Result<()> {
debug!("Closing blocking UDS transport");

Expand Down
Loading
Loading