Skip to content

Io uring runtime#2

Open
proteetpaul wants to merge 19 commits intomainfrom
io-uring-runtime
Open

Io uring runtime#2
proteetpaul wants to merge 19 commits intomainfrom
io-uring-runtime

Conversation

@proteetpaul
Copy link
Owner

This commit introduces a multi-threaded runtime that integrates io_uring. Each worker thread has the following components:

  • A local async executor
  • An io_uring instance
    Tasks are submitted to the worker thread using a crossbeam channel, and results are propagated back to the caller using a oneshot channel. When a task requires IO, it will yield to the runtime. The runtime is responsible for submitting IO's to the ring and polling IO completions. Upon completion, the runtime will unpark the corresponding task.

@gemini-code-assist
Copy link

Summary of Changes

Hello @proteetpaul, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the project's I/O capabilities by integrating a high-performance io_uring based asynchronous runtime, specifically targeting Linux environments. Concurrently, it introduces a comprehensive overhaul of string comparison and filtering logic within LiquidByteViewArray, enhancing efficiency through new operator abstractions and string fingerprinting. These core improvements are complemented by expanded performance metrics and new analytical studies to validate and guide future optimizations.

Highlights

  • New io_uring Non-Blocking Runtime: Introduced a multi-threaded, non-blocking I/O runtime leveraging Linux's io_uring interface. Each worker thread manages a local async executor and an io_uring instance, handling task submission and completion via crossbeam and oneshot channels.
  • Enhanced String Comparison Logic: Refactored LiquidByteViewArray comparison mechanisms with a new ByteViewOperator enum, enabling more expressive and optimized filtering. This includes specialized handling for equality, ordering, and substring searches, utilizing string fingerprinting for performance gains.
  • Advanced Cache Statistics & Transcoding: Expanded cache statistics to track detailed memory usage per entry type (Arrow, Liquid, Squeezed-Liquid) and added a counter for failed predicate evaluations. Transcoding now supports hints for building metadata like string fingerprints, optimizing the conversion of Arrow arrays to Liquid arrays.
  • New Performance Studies: Added new studies to analyze filter selectivity, prefix differentiability, and string fingerprint performance, providing deeper insights into the efficiency of string-based data structures and filtering strategies.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Ignored Files
  • Ignored by pattern: .github/workflows/** (2)
    • .github/workflows/ci.yml
    • .github/workflows/fuzz.yml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@proteetpaul
Copy link
Owner Author

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new io_uring based runtime, which is a significant and valuable addition for high-performance I/O on Linux. The implementation follows a solid multi-threaded executor pattern with per-thread io_uring instances. The refactoring to decouple storage operations from DataFusion's Operator by introducing a local ByteViewOperator is also a great improvement for modularity.

However, I've found a few critical issues in the new runtime implementation that need to be addressed:

  • A memory safety issue related to raw pointers that could lead to use-after-free.
  • A potential for panicking the worker thread on file I/O errors.
  • A busy-looping condition in the worker thread that can lead to high CPU usage when idle.

I've left detailed comments on these points. Once these are addressed, this will be a fantastic enhancement to the project.

Comment on lines +168 to +173
struct AsyncTask {
// Note: Should change this to Arc in case of a work-stealing scheduler
pub inner: Rc<RefCell<dyn IoTask>>,
pub waker: Waker,
pub completed: *mut AtomicBool,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's a potential memory safety issue here. The completed: *mut AtomicBool is a raw pointer to a field within UringFuture. If the UringFuture is dropped before the I/O operation completes (for example, if it's part of a tokio::select! that resolves another branch, or a timeout), this pointer will become dangling. When the I/O operation eventually completes, the worker thread will attempt to write to this dangling pointer, leading to undefined behavior.

To fix this, you should use a shared ownership mechanism like Arc<AtomicBool> to ensure the AtomicBool lives as long as either the future or the async task needs it.

This would involve changing:

  1. UringFuture to hold an Arc<AtomicBool>.
  2. AsyncTask to also hold an Arc<AtomicBool>.
  3. The poll method of UringFuture to clone() the Arc for the AsyncTask.
  4. The unsafe block in poll_completions can then be removed.

Comment on lines +146 to +166
fn worker_main_loop(receiver: crossbeam_channel::Receiver<ExecutorTask>) {
let executor = LocalExecutor::new();
loop {
while !receiver.is_empty() {
let task = receiver.recv()
.expect("Failed to receive task");
executor.spawn(task).detach();
}
let task_found = executor.try_tick();
LOCAL_WORKER.with(|worker| {
let mut worker = worker.borrow_mut();
if worker.need_submit {
worker.ring.submit().expect("Failed to submit");
worker.need_submit = false;
} else if !task_found && worker.tokens.len() < MAX_CONCURRENT_TASKS as usize {
worker.ring.submit_and_wait(1).expect("Failed to submit");
}
worker.poll_completions();
});
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The worker_main_loop can enter a busy-loop, consuming 100% CPU when there are no tasks to process and no in-flight I/O operations. This happens because when receiver.is_empty() is true and there are no in-flight I/O operations, the loop continues without blocking.

To fix this, when there are no in-flight I/O operations and no pending tasks in the local executor, the worker should block on receiving a new task from the channel (e.g., using receiver.recv()) instead of busy-looping.

Comment on lines +268 to +273
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path)
.expect("failed to create file");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using .expect() here will cause the worker thread to panic if the file cannot be created or opened. This could bring down the entire process. It's better to propagate the error by returning a Result.

Suggested change
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path)
.expect("failed to create file");
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path)?;

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new io_uring based asynchronous runtime. The implementation is a great step forward, but there are several critical issues that need to be addressed before it can be merged. These include a potential busy-loop in the worker thread, memory safety issues due to raw pointer usage, fragile thread safety due to unsafe impl Send, and blocking I/O calls within the async runtime. I've provided detailed comments and suggestions for each of these points. Additionally, there's an opportunity to refactor some duplicated code to improve maintainability.

Comment on lines +148 to +166
loop {
while !receiver.is_empty() {
let task = receiver.recv()
.expect("Failed to receive task");
executor.spawn(task).detach();
}
let task_found = executor.try_tick();
LOCAL_WORKER.with(|worker| {
let mut worker = worker.borrow_mut();
if worker.need_submit {
worker.ring.submit().expect("Failed to submit");
worker.need_submit = false;
} else if !task_found && worker.tokens.len() < MAX_CONCURRENT_TASKS as usize {
worker.ring.submit_and_wait(1).expect("Failed to submit");
}
worker.poll_completions();
});
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The worker_main_loop can enter a busy loop, consuming 100% CPU when the worker is idle. This happens when there are no new tasks in the receiver, no tasks making progress in the LocalExecutor (task_found is false), and no in-flight I/O operations. In this state, the loop doesn't block and spins continuously.

To fix this, the worker should block waiting for new tasks when idle. A potential approach:

fn worker_main_loop(receiver: crossbeam_channel::Receiver<ExecutorTask>) {
    let executor = LocalExecutor::new();
    loop {
        // Try to tick the executor and poll for IO completions
        let task_found = executor.try_tick();
        let has_inflight_io = LOCAL_WORKER.with(|worker| {
            let mut worker = worker.borrow_mut();
            if worker.need_submit {
                worker.ring.submit().expect("Failed to submit");
                worker.need_submit = false;
            }
            worker.poll_completions();
            worker.tokens.len() < MAX_CONCURRENT_TASKS as usize
        });

        if !task_found && !has_inflight_io {
            // No tasks to run and no inflight IO. Block for new tasks.
            match receiver.recv() {
                Ok(task) => executor.spawn(task).detach(),
                Err(_) => break, // Channel closed, exit loop.
            }
        } else if !task_found && has_inflight_io {
            // No tasks to run, but there is inflight IO. Wait for IO.
            LOCAL_WORKER.with(|worker| {
                let mut worker = worker.borrow_mut();
                worker.ring.submit_and_wait(1).expect("Failed to submit and wait");
                worker.poll_completions();
            });
        }

        // Drain any other pending tasks without blocking
        while let Ok(task) = receiver.try_recv() {
            executor.spawn(task).detach();
        }
    }
}

// Note: Should change this to Arc in case of a work-stealing scheduler
pub inner: Rc<RefCell<dyn IoTask>>,
pub waker: Waker,
pub completed: *mut AtomicBool,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The use of a raw pointer *mut AtomicBool in AsyncTask to point to UringFuture::completed is not memory safe. If the Future that contains UringFuture is dropped before the I/O operation completes, the UringFuture will be dropped and its memory freed. However, the AsyncTask might still be in the inflight_tasks queue. When the I/O operation eventually completes, poll_completions will attempt to dereference this dangling pointer, leading to a use-after-free vulnerability.

To fix this, use an Arc<AtomicBool> to safely share the completion state between the UringFuture and the AsyncTask. This ensures the AtomicBool remains live as long as either the future or the in-flight task holds a reference to it.

This would involve changing:

  1. UringFuture::completed to Arc<AtomicBool>.
  2. AsyncTask::completed to Arc<AtomicBool>.
  3. In UringFuture::new, initialize with Arc::new(AtomicBool::new(false)).
  4. In UringFuture::poll, clone the Arc for AsyncTask.
  5. In RuntimeWorker::poll_completions, remove the unsafe block and directly store to the Arc.
Suggested change
pub completed: *mut AtomicBool,
pub completed: std::sync::Arc<std::sync::atomic::AtomicBool>,

Comment on lines +268 to +273
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path)
.expect("failed to create file");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The write function performs a blocking file open operation (OpenOptions::...::open). Since this function is executed on a worker thread of the async runtime, this blocking call will stall the entire worker thread. This prevents it from processing other async tasks or I/O completion events, leading to poor performance and potential deadlocks if other tasks depend on this worker.

To fix this, the file opening should be performed asynchronously using an io_uring operation, similar to how it's done in the read function. You may need to create a new IoTask for opening files in write mode or generalize the existing FileOpenTask.

Comment on lines +191 to +192
unsafe impl<T> Send for UringFuture<T>
where T: IoTask + 'static, {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

UringFuture contains Rc<RefCell<T>>, which is not Send. The unsafe impl<T> Send for UringFuture<T> is used to bypass the compiler's safety checks. While this might appear to work with the current non-work-stealing executor, it's a fragile design that can easily lead to data races and undefined behavior if the executor logic changes. It makes the code harder to reason about and maintain.

To ensure thread safety, replace Rc<RefCell<T>> with Arc<Mutex<T>>. This will make UringFuture Send without needing an unsafe implementation, making the code robust and safe by design. This would require changing Rc to Arc and RefCell to a Mutex in both UringFuture and AsyncTask, and updating the call sites to use .lock().unwrap().

Comment on lines +191 to +207
/// Return a bytes object holding the result of the read operation.
#[inline]
pub(crate) fn get_result(self: &mut Self) -> Result<Bytes, std::io::Error> {
if let Some(err) = self.error.take() {
return Err(err);
}

let (start_padding, _) = self.padding();
let range_len = (self.range.end - self.range.start) as usize;
let data_start = self.aligned_offset + start_padding;
let data_end = data_start + range_len;

let buffer = mem::take(&mut self.buffer);
let bytes = Bytes::from(buffer);

Ok(bytes.slice(data_start..data_end))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic in this get_result function is nearly identical to the existing into_result function. This code duplication can make maintenance harder, as changes might need to be applied in two places.

Consider refactoring the common logic into a private helper function that can be called by both get_result and into_result to avoid duplication.

For example, for FileReadTask:

impl FileReadTask {
    // ...
    fn prepare_result(&mut self) -> Result<Bytes, std::io::Error> {
        if let Some(err) = self.error.take() {
            return Err(err);
        }

        let (start_padding, _) = self.padding();
        let range_len = (self.range.end - self.range.start) as usize;
        let data_start = self.aligned_offset + start_padding;
        let data_end = data_start + range_len;

        let buffer = std::mem::take(&mut self.buffer);
        let bytes = Bytes::from(buffer);

        Ok(bytes.slice(data_start..data_end))
    }

    pub(crate) fn into_result(mut self: Box<Self>) -> Result<Bytes, std::io::Error> {
        self.prepare_result()
    }

    pub(crate) fn get_result(&mut self) -> Result<Bytes, std::io::Error> {
        self.prepare_result()
    }
}

proteetpaul and others added 17 commits February 24, 2026 02:31
This commit implements a buffer pool for fixed buffers in order to remove memory allocation
and pinning overheads during io submission. The buffer pool is integrated into the uring
threadpool mechanism. Also implements minor optimizations in the uring threadpool, such as syscall batching.
- Syscall batching
- Handle multiple sqes per IO task
- Spawn a batch of tasks on the runtime
Partition batches more evenly, add support for other io modes, minor code cleanup
- Add error handling in pool.rs
- Run cargo fmt
- Minor fixes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant