Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bfacac4
Add async runtime for io uring
proteetpaul Jan 13, 2026
533869e
Fix
proteetpaul Jan 13, 2026
1456d6f
Add a fixed buffer allocator for io_uring
proteetpaul Feb 22, 2026
dcb0fa5
Integrate uring runtime with fixed buffers
proteetpaul Mar 2, 2026
31df6c0
Add benchmark for io_uring runtime
proteetpaul Mar 2, 2026
3399f40
Add a SimpleIoContext to support different IO modes
proteetpaul Mar 2, 2026
89eed17
Remove datafusion dependency in benchmark
proteetpaul Mar 2, 2026
eb3f31b
Fix bug related to io_uring_enter
Mar 2, 2026
44c6e0f
Setup logger
proteetpaul Mar 2, 2026
9180bf8
Bug fix: Process task completion
proteetpaul Mar 2, 2026
cddf627
Bug fix: Register buffers with runtime worker rings
proteetpaul Mar 2, 2026
363f25b
Fix bug in fixed buffer registration in arena
proteetpaul Mar 2, 2026
1ef1abb
Fix bug in runtime worker IO task completion
proteetpaul Mar 2, 2026
0ae5af9
Record flamegraphs in benchmark
proteetpaul Mar 2, 2026
5e739d4
Add support for projection columns
proteetpaul Mar 9, 2026
6eec220
Add some tracepoints and disk usage tracking
proteetpaul Mar 9, 2026
f6e889c
Avoid redundant writes in storage_runner
proteetpaul Mar 9, 2026
a069119
Fix partitioning logic in storage_runner
proteetpaul Mar 9, 2026
6f498d8
Changes
proteetpaul Mar 9, 2026
dc19baa
Formatting changes
proteetpaul Mar 16, 2026
2c62fbd
First attempt at a work-stealing runtime
proteetpaul Apr 1, 2026
93e5dab
Bug fix
proteetpaul Apr 1, 2026
07923de
Fixes
proteetpaul Apr 2, 2026
709be25
Measure cpu time in benchmark
proteetpaul Apr 2, 2026
4fab99c
Changes to storage_runner
proteetpaul Apr 13, 2026
89a8d3a
Fix perf slowdown when running bench in std-blocking mode
proteetpaul Apr 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ liquid-cache-storage = { workspace = true }
liquid-cache-common = { workspace = true }
liquid-cache-local = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }
arrow-flight = { workspace = true }
Expand Down Expand Up @@ -67,3 +68,7 @@ path = "bench_server.rs"
[[bin]]
name = "in_process"
path = "in_process.rs"

[[bin]]
name = "storage_runner"
path = "src/storage_runner.rs"
4 changes: 4 additions & 0 deletions benchmark/bench_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ struct CliArgs {
/// IO mode, available options: uring, uring-direct, std-blocking, tokio, std-spawn-blocking
#[arg(long = "io-mode", default_value = "uring-multi-async")]
io_mode: IoMode,

#[arg(long = "fixed-buffer-pool-size-mb", default_value = "0")]
fixed_buffer_pool_size_mb: usize,
}

#[tokio::main]
Expand Down Expand Up @@ -81,6 +84,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
squeeze_policy,
Box::new(NoHydration::new()),
Some(args.io_mode),
args.fixed_buffer_pool_size_mb,
)?;

let liquid_cache_server = Arc::new(liquid_cache_server);
Expand Down
9 changes: 7 additions & 2 deletions benchmark/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use fastrace::prelude::*;
use liquid_cache_benchmarks::{
BenchmarkManifest, InProcessBenchmarkMode, InProcessBenchmarkRunner, setup_observability,
};
use liquid_cache_common::IoMode;
use liquid_cache_common::{IoMode, memory::pool::FixedBufferPool};
use mimalloc::MiMalloc;
use serde::Serialize;
use std::path::PathBuf;
Expand Down Expand Up @@ -70,6 +70,9 @@ struct InProcessBenchmark {
/// IO mode, available options: uring, uring-direct, std-blocking, tokio, std-spawn-blocking
#[arg(long = "io-mode", default_value = "uring-multi-async")]
io_mode: IoMode,

#[arg(long = "fixed-buffer-pool-size-mb", default_value = "0")]
fixed_buffer_pool_size_mb: usize,
}

impl InProcessBenchmark {
Expand All @@ -88,7 +91,8 @@ impl InProcessBenchmark {
.with_cache_dir(self.cache_dir.clone())
.with_query_filter(self.query_index)
.with_io_mode(self.io_mode)
.with_output_dir(self.output_dir.clone());
.with_output_dir(self.output_dir.clone())
.with_fixed_buffer_pool_size_mb(self.fixed_buffer_pool_size_mb);
runner.run(manifest, self, output).await?;
Ok(())
}
Expand All @@ -102,6 +106,7 @@ async fn main() -> Result<()> {
let _guard = root.set_local_parent();

benchmark.run().await?;
FixedBufferPool::print_stats();
fastrace::flush();
Ok(())
}
9 changes: 9 additions & 0 deletions benchmark/src/inprocess_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub struct InProcessBenchmarkRunner {
pub io_mode: IoMode,
pub output_dir: Option<PathBuf>,
pub collect_perf_events: bool,
pub fixed_buffer_pool_size_mb: usize,
}

impl Default for InProcessBenchmarkRunner {
Expand All @@ -224,6 +225,7 @@ impl InProcessBenchmarkRunner {
io_mode: IoMode::default(),
output_dir: None,
collect_perf_events: false,
fixed_buffer_pool_size_mb: 0,
}
}

Expand Down Expand Up @@ -282,6 +284,11 @@ impl InProcessBenchmarkRunner {
self
}

pub fn with_fixed_buffer_pool_size_mb(mut self, fixed_buffer_pool_size_mb: usize) -> Self {
self.fixed_buffer_pool_size_mb = fixed_buffer_pool_size_mb;
self
}

#[fastrace::trace]
async fn setup_context(
&self,
Expand Down Expand Up @@ -348,6 +355,7 @@ impl InProcessBenchmarkRunner {
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.with_io_mode(self.io_mode)
.with_eager_shredding(true)
.with_fixed_buffer_pool_size_mb(self.fixed_buffer_pool_size_mb)
.build(session_config)?;
(v.0, Some(v.1))
}
Expand All @@ -359,6 +367,7 @@ impl InProcessBenchmarkRunner {
.with_hydration_policy(Box::new(NoHydration::new()))
.with_squeeze_policy(Box::new(TranscodeEvict))
.with_io_mode(self.io_mode)
.with_fixed_buffer_pool_size_mb(self.fixed_buffer_pool_size_mb)
.build(session_config)?;
(v.0, Some(v.1))
}
Expand Down
77 changes: 76 additions & 1 deletion benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub mod client_runner;
pub mod inprocess_runner;
mod manifest;
mod observability;
mod tracepoints;
pub mod tracepoints;
pub mod utils;

pub use client_runner::*;
Expand Down Expand Up @@ -487,6 +487,81 @@ impl Display for IterationResult {
}
}

/// Table layout matching [`IterationResult`]'s [`Display`] (borders, row style, disk formatting).
/// When `uring_runnable` is `Some`, includes work-stealing executor `Runnable::run` timing (see storage runner).
pub fn format_storage_iteration_metrics(
iteration: usize,
iteration_wall: Duration,
disk_read: u64,
disk_written: u64,
uring_runnable: Option<(f64, f64)>,
) -> String {
struct StorageIterationTable {
iteration: usize,
iteration_wall_ms: u64,
uring_runnable: Option<(f64, f64)>,
disk_read: u64,
disk_written: u64,
}

impl std::fmt::Display for StorageIterationTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const INNER: usize = 50;
write_border_top(f, INNER)?;
write_kv_row(
f,
INNER,
"Iteration:",
&format!("{}", self.iteration),
)?;
write_kv_row(
f,
INNER,
"Iteration wall:",
&format!("{} ms", format_number(self.iteration_wall_ms)),
)?;
if let Some((runnable_wall_ms, wall_minus_runnable_ms)) = self.uring_runnable {
write_kv_row(
f,
INNER,
"Runnable wall (sum):",
&format!("{:.3} ms", runnable_wall_ms),
)?;
write_kv_row(
f,
INNER,
"Wall minus runnable:",
&format!("{:.3} ms", wall_minus_runnable_ms),
)?;
}
write_border_sep(f, INNER)?;
write_kv_row(
f,
INNER,
"Disk (Read/Write):",
&format!(
"{} / {}",
format_bytes(self.disk_read),
format_bytes(self.disk_written)
),
)?;
write_border_bottom(f, INNER)
}
}

let iteration_wall_ms = (iteration_wall.as_secs_f64() * 1000.0).round() as u64;
format!(
"{}",
StorageIterationTable {
iteration,
iteration_wall_ms,
uring_runnable,
disk_read,
disk_written,
}
)
}

fn format_number(n: u64) -> String {
let s = n.to_string();
let chars: Vec<char> = s.chars().collect();
Expand Down
Loading
Loading