Conversation
|
/gemini review |
Summary of ChangesHello @proteetpaul, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a sophisticated fixed buffer pool for memory management, designed to optimize I/O operations, particularly when using io-uring. By pre-allocating and managing memory in a structured way, the system aims to reduce memory allocation overhead and improve data transfer efficiency. The changes integrate this new buffer pool into the io-uring backend, allowing read operations to utilize registered fixed buffers. Additionally, it provides a configurable option for the buffer pool's size, making the system more adaptable to different workloads and environments. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a fixed buffer pool to be used with io_uring for potentially improved I/O performance. This is a substantial change, adding a sophisticated custom memory allocator with thread-local caches, using mmap and unsafe Rust. While the overall direction is sound, the implementation has several areas that need attention. My review highlights a critical compilation error, a potential memory leak in the arena allocator, a memory utilization issue with cross-thread frees, a performance concern in the thread-local cache, and an incomplete feature for fixed-buffer writes. Addressing these points will help ensure the stability and correctness of this new memory management layer.
| pub mod pool; | ||
| mod segment; | ||
| mod arena; | ||
| mod global_pool; |
There was a problem hiding this comment.
This module declaration for global_pool will cause a compilation error because the corresponding file src/common/src/memory/global_pool.rs is not included in this pull request. If this module is intended for future work, it should be commented out to prevent breaking the build.
| mod global_pool; | |
| mod global_pool; // This module seems to be missing. |
| pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) { | ||
| debug_assert!((self.slices[0].ptr as usize) <= segment as usize); | ||
| let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE; | ||
| self.used_bitmap[segment_idx] = 0; |
There was a problem hiding this comment.
The retire_segment function incorrectly assumes that a segment is always composed of a single slice from the arena, as it only frees one entry in used_bitmap. However, allocate_segment is capable of allocating multiple contiguous slices. If a segment is ever allocated with more than one slice, this function will only free the first one, leading to a memory leak within the arena. To fix this, the Segment should store the number of slices it's composed of, and retire_segment should use this information to free all corresponding entries in used_bitmap.
| fn free(ptr: *mut u8) { | ||
| let segment_ptr = Segment::get_segment_from_ptr(ptr); | ||
| let page_ptr = unsafe { (*segment_ptr).get_page_from_ptr(ptr) }; | ||
| unsafe { | ||
| (*page_ptr).free(ptr); | ||
| } | ||
| // If page is local and unused after free, return it to segment | ||
| let thread_id = unsafe { (*segment_ptr).thread_id }; | ||
| let cur_cpu = unsafe { libc::sched_getcpu() as usize }; | ||
| if cur_cpu == thread_id { | ||
| let should_free_page = unsafe { (*page_ptr).used.load(Ordering::Relaxed) == 0 }; | ||
| if should_free_page { | ||
| let local_cache = Self::get_thread_local_cache(); | ||
| let mut guard = local_cache.lock().unwrap(); | ||
| guard.retire_page(page_ptr); | ||
| } | ||
| } else { | ||
| let pool = FIXED_BUFFER_POOL.get().unwrap(); | ||
| pool.foreign_free.fetch_add(1, Ordering::Relaxed); | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation for handling "foreign frees" (memory freed by a thread different from the one that allocated it) can lead to inefficient memory utilization. When a foreign free occurs, the memory block is returned to its page, but the page itself is not retired to the segment's free list, even if it becomes empty. It remains tied to the original thread's TCache. This can cause memory to be held by a thread that no longer needs it, making it unavailable to other threads and potentially leading to allocation failures. Consider implementing a mechanism to return these pages to a global pool or to the owner thread's cache to improve memory utilization.
src/common/src/memory/page.rs
Outdated
| pub struct Page { | ||
| pub(crate) block_size: usize, // Size of objects that are being allocated to this page | ||
| // TODO(): Remove dependency on dynamically allocated memory | ||
| free_list: Mutex<VecDeque<Block>>, |
There was a problem hiding this comment.
Using a Mutex to protect the free_list VecDeque in each Page will likely become a performance bottleneck, as it introduces lock contention on every block allocation and deallocation. For a memory allocator, this can significantly impact performance, especially in highly concurrent scenarios. The TODO comment and the presence of commented-out fields like local_free_list suggest you might already be considering a more scalable approach. I'd recommend exploring lock-free data structures or other techniques to reduce contention here.
src/common/src/memory/tcache.rs
Outdated
| fn remove_slice_from_span(self: &mut Self, slice: &mut Page) -> bool { | ||
| let span_idx = Self::get_span_idx_from_slice_count(slice.slice_count); | ||
| for i in 0..self.spans[span_idx].len() { | ||
| if self.spans[span_idx][i] == slice { | ||
| self.spans[span_idx].remove(i); | ||
| break; | ||
| } | ||
| } | ||
| return true; | ||
| } |
There was a problem hiding this comment.
The remove_slice_from_span function uses a linear scan to find and remove a slice from the spans vector, which has O(n) complexity. Similarly, remove_page_from_used_queue also performs a linear scan. For a memory allocator, which is often in the critical path, this can become a performance bottleneck. The TODO comment indicates you are aware of this. I recommend prioritizing the replacement of Vec with a more suitable data structure, like an intrusive linked list, to achieve O(1) removal and improve performance.
src/parquet/src/io/io_uring/tasks.rs
Outdated
| data: Bytes, | ||
| fd: RawFd, | ||
| error: Option<std::io::Error>, | ||
| use_fixed_buffers: bool, |
There was a problem hiding this comment.
The FileWriteTask struct has been updated to include a use_fixed_buffers field, but its prepare_sqe implementation doesn't seem to use it. It still uses opcode::Write, which does not leverage pre-registered buffers. To take advantage of the fixed buffer pool for writes, you would need to use opcode::WriteFixed. This appears to be an incomplete implementation of the fixed buffer feature for write operations.
There was a problem hiding this comment.
Code Review
This pull request introduces a fixed buffer pool for io_uring to improve I/O performance. The implementation includes a custom memory allocator with an arena, segments, pages, and per-CPU thread caches. While this is a significant feature, the review has identified several critical issues, including a portability problem with sched_getcpu that will cause compilation failures on non-Linux systems, and a compilation error in a test. There are also high-severity concerns regarding memory management that could lead to fragmentation and memory hoarding in long-running services. Additionally, there are medium-severity issues related to performance hotspots in the allocation path and the robustness of internal APIs. These issues should be addressed to ensure the stability, portability, and performance of the new memory allocator.
| fn get_thread_local_cache() -> &'static Mutex<TCache> { | ||
| let cpu = unsafe { libc::sched_getcpu() }; | ||
| &FIXED_BUFFER_POOL.get().unwrap().local_caches[cpu as usize] | ||
| } |
There was a problem hiding this comment.
libc::sched_getcpu() is a GNU extension and is only available on Linux. Using it without platform-specific conditional compilation (#[cfg(target_os = "linux")]) will cause compilation to fail on other platforms like macOS. You should guard its usage. For non-Linux platforms, you could fall back to a different mechanism or disable per-CPU caching. A simple fallback could be to use CPU 0 for all threads, but this would serialize all threads on one cache, negating the benefits of per-CPU caches.
| } | ||
| // If page is local and unused after free, return it to segment | ||
| let thread_id = unsafe { (*segment_ptr).thread_id }; | ||
| let cur_cpu = unsafe { libc::sched_getcpu() as usize }; |
| assert_eq!(total, LEN); | ||
|
|
||
| let mut random_bytes = [0u8; LEN]; | ||
| let mut rng = rand::rng(); |
src/common/src/memory/pool.rs
Outdated
| // If page is local and unused after free, return it to segment | ||
| let thread_id = unsafe { (*segment_ptr).thread_id }; | ||
| let cur_cpu = unsafe { libc::sched_getcpu() as usize }; | ||
| if cur_cpu == thread_id { | ||
| let should_free_page = unsafe { (*page_ptr).used.load(Ordering::Relaxed) == 0 }; | ||
| if should_free_page { | ||
| let local_cache = Self::get_thread_local_cache(); | ||
| let mut guard = local_cache.lock().unwrap(); | ||
| guard.retire_page(page_ptr); | ||
| } | ||
| } else { | ||
| let pool = FIXED_BUFFER_POOL.get().unwrap(); | ||
| pool.foreign_free.fetch_add(1, Ordering::Relaxed); | ||
| } | ||
| } |
There was a problem hiding this comment.
When a buffer is freed from a different thread than it was allocated on (cur_cpu != thread_id), the page is not retired even if it becomes empty. The memory is returned to the page's free list, but the page itself is not returned to the TCache's free spans. This means the page cannot be coalesced or reused for allocations of a different size class, leading to memory fragmentation and hoarding. The foreign_free counter is incremented, but no action is taken to reclaim the page. You should implement a mechanism to handle foreign frees, for example by using a concurrent queue to send the page back to its owner TCache for retirement.
| fn cleanup_pages(self: &mut Self) { | ||
| for i in 0..self.used_pages.len() { | ||
| for page_idx in 0..self.used_pages[i].len() { | ||
| let page = self.used_pages[i][page_idx]; | ||
| unsafe { | ||
| if (*page).used.load(Ordering::Relaxed) == 0 { | ||
| self.retire_page(page); | ||
| self.used_pages[i].remove(page_idx); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The function cleanup_pages is defined to iterate over used_pages and retire pages that have become empty. This seems intended to address the memory hoarding issue with foreign frees. However, this function is never called anywhere in the codebase. Without calling it, pages freed by other threads will never be fully reclaimed by their owner TCache, leading to memory fragmentation. You should consider calling this function periodically, for example during allocation when no free pages are available.
| rlim_max: 0, | ||
| }; | ||
| unsafe { libc::getrlimit(libc::RLIMIT_MEMLOCK, &mut rlimit); } | ||
| assert!(64 * 1024 <= rlimit.rlim_max, "rlimit.MEMLOCK should be at least 64 MB to test the fixed-buffer pool. Current rlimit is: {} KB", rlimit.rlim_max); |
There was a problem hiding this comment.
The assertion assert!(64 * 1024 <= rlimit.rlim_max, ...) is incorrect. The message says "at least 64 MB", but 64 * 1024 is 64 KB. The check should be for 64 MB, which is 64 * 1024 * 1024. Also, it's better to check against rlim_cur (the soft limit) as that's what's enforced, not rlim_max.
| assert!(64 * 1024 <= rlimit.rlim_max, "rlimit.MEMLOCK should be at least 64 MB to test the fixed-buffer pool. Current rlimit is: {} KB", rlimit.rlim_max); | |
| assert!((64 * 1024 * 1024) as u64 <= rlimit.rlim_cur, "rlimit.MEMLOCK should be at least 64 MB to test the fixed-buffer pool. Current rlimit is: {} KB", rlimit.rlim_cur / 1024); |
| pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) { | ||
| debug_assert!((self.slices[0].ptr as usize) <= segment as usize); | ||
| let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE; | ||
| self.used_bitmap[segment_idx] = 0; | ||
| } |
There was a problem hiding this comment.
The retire_segment function will panic on self.slices[0] if self.slices is empty. This can happen if the arena is created with a capacity smaller than SEGMENT_SIZE. While current call paths might avoid this, it's safer to make this internal function more robust. Adding a check for self.slices.is_empty() at the beginning of the function would prevent this potential panic, especially in debug builds.
pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) {
if self.slices.is_empty() {
return;
}
debug_assert!((self.slices[0].ptr as usize) <= segment as usize);
let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE;
self.used_bitmap[segment_idx] = 0;
}
src/common/src/memory/tcache.rs
Outdated
| fn remove_slice_from_span(self: &mut Self, slice: &mut Page) -> bool { | ||
| let span_idx = Self::get_span_idx_from_slice_count(slice.slice_count); | ||
| for i in 0..self.spans[span_idx].len() { | ||
| if self.spans[span_idx][i] == slice { | ||
| self.spans[span_idx].remove(i); | ||
| break; | ||
| } | ||
| } | ||
| return true; | ||
| } |
There was a problem hiding this comment.
This function iterates through self.spans[span_idx] to find and remove a slice, which is an O(N) operation. For a memory allocator, this can become a performance bottleneck. The TODO comment on line 109 already suggests this. Consider using a data structure that allows for O(1) removal, like an intrusive linked list.
| fn remove_page_from_used_queue(self: &mut Self, page_ptr: *mut Page) { | ||
| let size_class = Self::get_size_class(unsafe { (*page_ptr).block_size }); | ||
| if size_class >= NUM_SIZE_CLASSES { | ||
| return | ||
| } | ||
| for i in 0..self.used_pages[size_class].len() { | ||
| if self.used_pages[size_class][i] == page_ptr { | ||
| self.used_pages[size_class].remove(i); | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
src/parquet/src/io/io_uring/tasks.rs
Outdated
| pub(crate) fn build(data: Bytes, fd: RawFd, use_fixed_buffers: bool) -> FileWriteTask { | ||
| FileWriteTask { | ||
| data, | ||
| fd, | ||
| error: None, | ||
| use_fixed_buffers, | ||
| } |
There was a problem hiding this comment.
The FileWriteTask::build function accepts a use_fixed_buffers boolean flag, but it's not used anywhere in FileWriteTask. The prepare_sqe implementation always uses opcode::Write. If the intention is to support fixed buffer writes, it should use opcode::WriteFixed when this flag is true and the data buffer is from the fixed pool. Currently, the flag has no effect.
|
/gemini I am observing a data corruption bug when testing the allocator with multiple threads. I don't observe these bugs when testing with a single partition. What could be the possible issue? |
|
Hello @proteetpaul, thanks for reaching out! The data corruption you're observing when testing the allocator with multiple threads, but not with a single partition, strongly suggests a concurrency issue within the fixed buffer pool's memory management. This is a common challenge in high-performance allocators that try to optimize for thread locality. Based on the code, the most likely source of the data corruption lies in how Here's a breakdown of the potential issues:
Recommendation for Investigation and Fix: The core problem is that the Consider these approaches:
|
/gemini A foreign thread can only add pages to the free list of the owning page cache. How can it lead to a use-after-free error? Also, frees from another thread cannot retire pages. I don't understand how this could lead to double -free issues |
|
Hello @proteetpaul, thanks for the follow-up questions. Let me clarify how the observed behavior can lead to use-after-free and double-free issues, even with foreign frees adding blocks to the owning page's free list. Your understanding that a foreign thread adds blocks to the However, the core of the problem lies in the discrepancy between the How can it lead to a use-after-free error?As noted in my previous comment, the
|
This commit implements a lockfree queue to hold frees by different threads. Freed blocks are cleaned up later, either through periodic cleanup operations or when under memory pressure.
6542115 to
561c91a
Compare
4d50e93 to
ee14daa
Compare
|
/gemini the buffer pool implementation that is part of this PR is very inefficient in terms of memory usage. Please understand the code and find out possible design flaws or bugs which could cause hoarding of memory in thread-local caches |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant new feature: a fixed buffer pool for memory management, integrated with io_uring for efficient I/O. The implementation includes a custom memory allocator with thread-local caches, which is a complex piece of engineering. While the overall direction is promising for performance, the introduction of substantial unsafe code and complex memory management logic requires careful scrutiny. I've identified several critical and high-severity issues, including a memory leak, blocking I/O calls in async contexts, and potential panics due to unhandled errors and incorrect assumptions. These should be addressed to ensure the stability and correctness of the new memory pool.
| if direct_io { | ||
| padding = (4096 - (data.len() & 4095)) & 4095; | ||
| let layout = Layout::from_size_align(data.len() + padding, 4096).expect("Failed to create layout"); | ||
| assert!((data.len() + padding) % 4096 == 0); | ||
| unsafe { | ||
| let new_ptr = alloc(layout); | ||
| std::ptr::copy_nonoverlapping(ptr, new_ptr, data.len()); | ||
| ptr = new_ptr; | ||
| } | ||
| } |
There was a problem hiding this comment.
When direct_io is true, a new buffer is allocated using std::alloc::alloc to ensure alignment. However, this allocated memory is never deallocated, leading to a memory leak every time a direct I/O write is performed. You should implement the Drop trait for FileWriteTask to deallocate this buffer if it was allocated. You'll also need to store the Layout and a flag indicating if the buffer was allocated by this task.
| } | ||
|
|
||
| pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) { | ||
| debug_assert!((self.slices[0].ptr as usize) <= segment as usize); |
There was a problem hiding this comment.
| assert_eq!(total, LEN); | ||
|
|
||
| let mut random_bytes = [0u8; LEN]; | ||
| let mut rng = rand::rng(); |
| let file = OpenOptions::new() | ||
| .read(true) | ||
| .custom_flags(libc::O_DIRECT) | ||
| .open(path) | ||
| .expect("failed to create file"); |
There was a problem hiding this comment.
The file is opened here using a blocking OpenOptions::open() call within an async function. This will block the worker thread of the async runtime, which can lead to performance degradation and even deadlocks if all worker threads are blocked. You should wrap this blocking operation in tokio::task::spawn_blocking. The same issue exists in the write function below.
| } | ||
| let path = self.io_context.liquid_path(entry_id); | ||
| let bytes = self.io_context.read(path, None).await.ok()?; | ||
| let bytes = self.io_context.read(path, None).await.unwrap(); |
There was a problem hiding this comment.
Using .unwrap() here will cause a panic if there's an I/O error when reading from the cache file. The previous implementation with .ok()? would gracefully return None. Panicking on I/O errors can make the system less robust. It would be better to handle the error, for example by logging it and returning None.
| let bytes = self.io_context.read(path, None).await.unwrap(); | |
| let bytes = self.io_context.read(path, None).await.ok()?; |
| None => { | ||
| let path = self.io_context.liquid_path(entry_id); | ||
| let bytes = self.io_context.read(path, None).await.ok()?; | ||
| let bytes = self.io_context.read(path, None).await.unwrap(); |
There was a problem hiding this comment.
Using .unwrap() here will cause a panic if there's an I/O error when reading from the cache file. The previous implementation with .ok()? would gracefully return None. Panicking on I/O errors can make the system less robust. It would be better to handle the error, for example by logging it and returning None.
| let bytes = self.io_context.read(path, None).await.unwrap(); | |
| let bytes = self.io_context.read(path, None).await.ok()?; |
| CachedData::DiskArrow(_) => { | ||
| let path = self.io_context.arrow_path(entry_id); | ||
| let bytes = self.io_context.read(path, None).await.ok()?; | ||
| let bytes = self.io_context.read(path, None).await.unwrap(); |
There was a problem hiding this comment.
Using .unwrap() here will cause a panic if there's an I/O error when reading from the cache file. The previous implementation with .ok()? would gracefully return None. Panicking on I/O errors can make the system less robust. It would be better to handle the error, for example by logging it and returning None.
| let bytes = self.io_context.read(path, None).await.unwrap(); | |
| let bytes = self.io_context.read(path, None).await.ok()?; |
| } | ||
|
|
||
| fn get_thread_local_cache() -> &'static Mutex<TCache> { | ||
| let cpu = unsafe { libc::sched_getcpu() }; |
There was a problem hiding this comment.
libc::sched_getcpu() is a Linux-specific function. Using it here will cause compilation to fail on other platforms like macOS or Windows. Since this code is in the common crate, it should ideally be portable. Consider using a crate like core_affinity which provides a cross-platform way to get CPU IDs, or guard this code with #[cfg(target_os = "linux")] and provide a fallback for other platforms (e.g., using a single cache or a thread ID-based approach).
No description provided.