Conversation
Summary of ChangesHello @proteetpaul, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly upgrades the project's I/O capabilities by integrating a high-performance Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
a78a155 to
533869e
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new io_uring based runtime, which is a significant and valuable addition for high-performance I/O on Linux. The implementation follows a solid multi-threaded executor pattern with per-thread io_uring instances. The refactoring to decouple storage operations from DataFusion's Operator by introducing a local ByteViewOperator is also a great improvement for modularity.
However, I've found a few critical issues in the new runtime implementation that need to be addressed:
- A memory safety issue related to raw pointers that could lead to use-after-free.
- A potential for panicking the worker thread on file I/O errors.
- A busy-looping condition in the worker thread that can lead to high CPU usage when idle.
I've left detailed comments on these points. Once these are addressed, this will be a fantastic enhancement to the project.
| struct AsyncTask { | ||
| // Note: Should change this to Arc in case of a work-stealing scheduler | ||
| pub inner: Rc<RefCell<dyn IoTask>>, | ||
| pub waker: Waker, | ||
| pub completed: *mut AtomicBool, | ||
| } |
There was a problem hiding this comment.
There's a potential memory safety issue here. The completed: *mut AtomicBool is a raw pointer to a field within UringFuture. If the UringFuture is dropped before the I/O operation completes (for example, if it's part of a tokio::select! that resolves another branch, or a timeout), this pointer will become dangling. When the I/O operation eventually completes, the worker thread will attempt to write to this dangling pointer, leading to undefined behavior.
To fix this, you should use a shared ownership mechanism like Arc<AtomicBool> to ensure the AtomicBool lives as long as either the future or the async task needs it.
This would involve changing:
UringFutureto hold anArc<AtomicBool>.AsyncTaskto also hold anArc<AtomicBool>.- The
pollmethod ofUringFuturetoclone()theArcfor theAsyncTask. - The
unsafeblock inpoll_completionscan then be removed.
| fn worker_main_loop(receiver: crossbeam_channel::Receiver<ExecutorTask>) { | ||
| let executor = LocalExecutor::new(); | ||
| loop { | ||
| while !receiver.is_empty() { | ||
| let task = receiver.recv() | ||
| .expect("Failed to receive task"); | ||
| executor.spawn(task).detach(); | ||
| } | ||
| let task_found = executor.try_tick(); | ||
| LOCAL_WORKER.with(|worker| { | ||
| let mut worker = worker.borrow_mut(); | ||
| if worker.need_submit { | ||
| worker.ring.submit().expect("Failed to submit"); | ||
| worker.need_submit = false; | ||
| } else if !task_found && worker.tokens.len() < MAX_CONCURRENT_TASKS as usize { | ||
| worker.ring.submit_and_wait(1).expect("Failed to submit"); | ||
| } | ||
| worker.poll_completions(); | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
The worker_main_loop can enter a busy-loop, consuming 100% CPU when there are no tasks to process and no in-flight I/O operations. This happens because when receiver.is_empty() is true and there are no in-flight I/O operations, the loop continues without blocking.
To fix this, when there are no in-flight I/O operations and no pending tasks in the local executor, the worker should block on receiving a new task from the channel (e.g., using receiver.recv()) instead of busy-looping.
| let file = OpenOptions::new() | ||
| .create(true) | ||
| .truncate(true) | ||
| .write(true) | ||
| .open(path) | ||
| .expect("failed to create file"); |
There was a problem hiding this comment.
Using .expect() here will cause the worker thread to panic if the file cannot be created or opened. This could bring down the entire process. It's better to propagate the error by returning a Result.
| let file = OpenOptions::new() | |
| .create(true) | |
| .truncate(true) | |
| .write(true) | |
| .open(path) | |
| .expect("failed to create file"); | |
| let file = OpenOptions::new() | |
| .create(true) | |
| .truncate(true) | |
| .write(true) | |
| .open(path)?; |
There was a problem hiding this comment.
Code Review
This pull request introduces a new io_uring based asynchronous runtime. The implementation is a great step forward, but there are several critical issues that need to be addressed before it can be merged. These include a potential busy-loop in the worker thread, memory safety issues due to raw pointer usage, fragile thread safety due to unsafe impl Send, and blocking I/O calls within the async runtime. I've provided detailed comments and suggestions for each of these points. Additionally, there's an opportunity to refactor some duplicated code to improve maintainability.
| loop { | ||
| while !receiver.is_empty() { | ||
| let task = receiver.recv() | ||
| .expect("Failed to receive task"); | ||
| executor.spawn(task).detach(); | ||
| } | ||
| let task_found = executor.try_tick(); | ||
| LOCAL_WORKER.with(|worker| { | ||
| let mut worker = worker.borrow_mut(); | ||
| if worker.need_submit { | ||
| worker.ring.submit().expect("Failed to submit"); | ||
| worker.need_submit = false; | ||
| } else if !task_found && worker.tokens.len() < MAX_CONCURRENT_TASKS as usize { | ||
| worker.ring.submit_and_wait(1).expect("Failed to submit"); | ||
| } | ||
| worker.poll_completions(); | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
The worker_main_loop can enter a busy loop, consuming 100% CPU when the worker is idle. This happens when there are no new tasks in the receiver, no tasks making progress in the LocalExecutor (task_found is false), and no in-flight I/O operations. In this state, the loop doesn't block and spins continuously.
To fix this, the worker should block waiting for new tasks when idle. A potential approach:
fn worker_main_loop(receiver: crossbeam_channel::Receiver<ExecutorTask>) {
let executor = LocalExecutor::new();
loop {
// Try to tick the executor and poll for IO completions
let task_found = executor.try_tick();
let has_inflight_io = LOCAL_WORKER.with(|worker| {
let mut worker = worker.borrow_mut();
if worker.need_submit {
worker.ring.submit().expect("Failed to submit");
worker.need_submit = false;
}
worker.poll_completions();
worker.tokens.len() < MAX_CONCURRENT_TASKS as usize
});
if !task_found && !has_inflight_io {
// No tasks to run and no inflight IO. Block for new tasks.
match receiver.recv() {
Ok(task) => executor.spawn(task).detach(),
Err(_) => break, // Channel closed, exit loop.
}
} else if !task_found && has_inflight_io {
// No tasks to run, but there is inflight IO. Wait for IO.
LOCAL_WORKER.with(|worker| {
let mut worker = worker.borrow_mut();
worker.ring.submit_and_wait(1).expect("Failed to submit and wait");
worker.poll_completions();
});
}
// Drain any other pending tasks without blocking
while let Ok(task) = receiver.try_recv() {
executor.spawn(task).detach();
}
}
}| // Note: Should change this to Arc in case of a work-stealing scheduler | ||
| pub inner: Rc<RefCell<dyn IoTask>>, | ||
| pub waker: Waker, | ||
| pub completed: *mut AtomicBool, |
There was a problem hiding this comment.
The use of a raw pointer *mut AtomicBool in AsyncTask to point to UringFuture::completed is not memory safe. If the Future that contains UringFuture is dropped before the I/O operation completes, the UringFuture will be dropped and its memory freed. However, the AsyncTask might still be in the inflight_tasks queue. When the I/O operation eventually completes, poll_completions will attempt to dereference this dangling pointer, leading to a use-after-free vulnerability.
To fix this, use an Arc<AtomicBool> to safely share the completion state between the UringFuture and the AsyncTask. This ensures the AtomicBool remains live as long as either the future or the in-flight task holds a reference to it.
This would involve changing:
UringFuture::completedtoArc<AtomicBool>.AsyncTask::completedtoArc<AtomicBool>.- In
UringFuture::new, initialize withArc::new(AtomicBool::new(false)). - In
UringFuture::poll, clone theArcforAsyncTask. - In
RuntimeWorker::poll_completions, remove theunsafeblock and directly store to theArc.
| pub completed: *mut AtomicBool, | |
| pub completed: std::sync::Arc<std::sync::atomic::AtomicBool>, |
| let file = OpenOptions::new() | ||
| .create(true) | ||
| .truncate(true) | ||
| .write(true) | ||
| .open(path) | ||
| .expect("failed to create file"); |
There was a problem hiding this comment.
The write function performs a blocking file open operation (OpenOptions::...::open). Since this function is executed on a worker thread of the async runtime, this blocking call will stall the entire worker thread. This prevents it from processing other async tasks or I/O completion events, leading to poor performance and potential deadlocks if other tasks depend on this worker.
To fix this, the file opening should be performed asynchronously using an io_uring operation, similar to how it's done in the read function. You may need to create a new IoTask for opening files in write mode or generalize the existing FileOpenTask.
| unsafe impl<T> Send for UringFuture<T> | ||
| where T: IoTask + 'static, {} |
There was a problem hiding this comment.
UringFuture contains Rc<RefCell<T>>, which is not Send. The unsafe impl<T> Send for UringFuture<T> is used to bypass the compiler's safety checks. While this might appear to work with the current non-work-stealing executor, it's a fragile design that can easily lead to data races and undefined behavior if the executor logic changes. It makes the code harder to reason about and maintain.
To ensure thread safety, replace Rc<RefCell<T>> with Arc<Mutex<T>>. This will make UringFuture Send without needing an unsafe implementation, making the code robust and safe by design. This would require changing Rc to Arc and RefCell to a Mutex in both UringFuture and AsyncTask, and updating the call sites to use .lock().unwrap().
| /// Return a bytes object holding the result of the read operation. | ||
| #[inline] | ||
| pub(crate) fn get_result(self: &mut Self) -> Result<Bytes, std::io::Error> { | ||
| if let Some(err) = self.error.take() { | ||
| return Err(err); | ||
| } | ||
|
|
||
| let (start_padding, _) = self.padding(); | ||
| let range_len = (self.range.end - self.range.start) as usize; | ||
| let data_start = self.aligned_offset + start_padding; | ||
| let data_end = data_start + range_len; | ||
|
|
||
| let buffer = mem::take(&mut self.buffer); | ||
| let bytes = Bytes::from(buffer); | ||
|
|
||
| Ok(bytes.slice(data_start..data_end)) | ||
| } |
There was a problem hiding this comment.
The logic in this get_result function is nearly identical to the existing into_result function. This code duplication can make maintenance harder, as changes might need to be applied in two places.
Consider refactoring the common logic into a private helper function that can be called by both get_result and into_result to avoid duplication.
For example, for FileReadTask:
impl FileReadTask {
// ...
fn prepare_result(&mut self) -> Result<Bytes, std::io::Error> {
if let Some(err) = self.error.take() {
return Err(err);
}
let (start_padding, _) = self.padding();
let range_len = (self.range.end - self.range.start) as usize;
let data_start = self.aligned_offset + start_padding;
let data_end = data_start + range_len;
let buffer = std::mem::take(&mut self.buffer);
let bytes = Bytes::from(buffer);
Ok(bytes.slice(data_start..data_end))
}
pub(crate) fn into_result(mut self: Box<Self>) -> Result<Bytes, std::io::Error> {
self.prepare_result()
}
pub(crate) fn get_result(&mut self) -> Result<Bytes, std::io::Error> {
self.prepare_result()
}
}This commit implements a buffer pool for fixed buffers in order to remove memory allocation and pinning overheads during io submission. The buffer pool is integrated into the uring threadpool mechanism. Also implements minor optimizations in the uring threadpool, such as syscall batching.
- Syscall batching - Handle multiple sqes per IO task - Spawn a batch of tasks on the runtime
Partition batches more evenly, add support for other io modes, minor code cleanup
This commit introduces a multi-threaded runtime that integrates io_uring. Each worker thread has the following components:
Tasks are submitted to the worker thread using a crossbeam channel, and results are propagated back to the caller using a oneshot channel. When a task requires IO, it will yield to the runtime. The runtime is responsible for submitting IO's to the ring and polling IO completions. Upon completion, the runtime will unpark the corresponding task.