Skip to content

Fixed buffer pool#1

Open
proteetpaul wants to merge 20 commits intomainfrom
fixed-buffer-pool
Open

Fixed buffer pool#1
proteetpaul wants to merge 20 commits intomainfrom
fixed-buffer-pool

Conversation

@proteetpaul
Copy link
Owner

No description provided.

@proteetpaul
Copy link
Owner Author

/gemini review

@gemini-code-assist
Copy link

Summary of Changes

Hello @proteetpaul, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a sophisticated fixed buffer pool for memory management, designed to optimize I/O operations, particularly when using io-uring. By pre-allocating and managing memory in a structured way, the system aims to reduce memory allocation overhead and improve data transfer efficiency. The changes integrate this new buffer pool into the io-uring backend, allowing read operations to utilize registered fixed buffers. Additionally, it provides a configurable option for the buffer pool's size, making the system more adaptable to different workloads and environments.

Highlights

  • Fixed Buffer Pool Implementation: Introduced a new memory management system (Arena, Segment, Page, TCache, FixedBufferPool) for efficient, pre-allocated memory buffers, especially beneficial for io-uring operations.
  • io-uring Integration: Enhanced the io-uring backend to leverage the new fixed buffer pool for read operations, allowing for IORING_OP_READ_FIXED to potentially improve I/O performance.
  • Configurable Buffer Pool Size: Added a command-line argument (--fixed-buffer-pool-size-mb) and API parameter to configure the size of the fixed buffer pool, enabling users to tune memory usage.
  • IoTask Trait Refactoring: Modified the IoTask trait to support multiple Submission Queue Entries (SQEs) and Completion Queue Entries (CQEs) per task, accommodating scenarios where a single logical I/O operation might require multiple io-uring submissions (e.g., when reading into multiple fixed buffers).

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a fixed buffer pool to be used with io_uring for potentially improved I/O performance. This is a substantial change, adding a sophisticated custom memory allocator with thread-local caches, using mmap and unsafe Rust. While the overall direction is sound, the implementation has several areas that need attention. My review highlights a critical compilation error, a potential memory leak in the arena allocator, a memory utilization issue with cross-thread frees, a performance concern in the thread-local cache, and an incomplete feature for fixed-buffer writes. Addressing these points will help ensure the stability and correctness of this new memory management layer.

pub mod pool;
mod segment;
mod arena;
mod global_pool;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This module declaration for global_pool will cause a compilation error because the corresponding file src/common/src/memory/global_pool.rs is not included in this pull request. If this module is intended for future work, it should be commented out to prevent breaking the build.

Suggested change
mod global_pool;
mod global_pool; // This module seems to be missing.

Comment on lines +96 to +99
pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) {
debug_assert!((self.slices[0].ptr as usize) <= segment as usize);
let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE;
self.used_bitmap[segment_idx] = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The retire_segment function incorrectly assumes that a segment is always composed of a single slice from the arena, as it only frees one entry in used_bitmap. However, allocate_segment is capable of allocating multiple contiguous slices. If a segment is ever allocated with more than one slice, this function will only free the first one, leading to a memory leak within the arena. To fix this, the Segment should store the number of slices it's composed of, and retire_segment should use this information to free all corresponding entries in used_bitmap.

Comment on lines +151 to +171
fn free(ptr: *mut u8) {
let segment_ptr = Segment::get_segment_from_ptr(ptr);
let page_ptr = unsafe { (*segment_ptr).get_page_from_ptr(ptr) };
unsafe {
(*page_ptr).free(ptr);
}
// If page is local and unused after free, return it to segment
let thread_id = unsafe { (*segment_ptr).thread_id };
let cur_cpu = unsafe { libc::sched_getcpu() as usize };
if cur_cpu == thread_id {
let should_free_page = unsafe { (*page_ptr).used.load(Ordering::Relaxed) == 0 };
if should_free_page {
let local_cache = Self::get_thread_local_cache();
let mut guard = local_cache.lock().unwrap();
guard.retire_page(page_ptr);
}
} else {
let pool = FIXED_BUFFER_POOL.get().unwrap();
pool.foreign_free.fetch_add(1, Ordering::Relaxed);
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation for handling "foreign frees" (memory freed by a thread different from the one that allocated it) can lead to inefficient memory utilization. When a foreign free occurs, the memory block is returned to its page, but the page itself is not retired to the segment's free list, even if it becomes empty. It remains tied to the original thread's TCache. This can cause memory to be held by a thread that no longer needs it, making it unavailable to other threads and potentially leading to allocation failures. Consider implementing a mechanism to return these pages to a global pool or to the owner thread's cache to improve memory utilization.

pub struct Page {
pub(crate) block_size: usize, // Size of objects that are being allocated to this page
// TODO(): Remove dependency on dynamically allocated memory
free_list: Mutex<VecDeque<Block>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a Mutex to protect the free_list VecDeque in each Page will likely become a performance bottleneck, as it introduces lock contention on every block allocation and deallocation. For a memory allocator, this can significantly impact performance, especially in highly concurrent scenarios. The TODO comment and the presence of commented-out fields like local_free_list suggest you might already be considering a more scalable approach. I'd recommend exploring lock-free data structures or other techniques to reduce contention here.

Comment on lines +110 to +119
fn remove_slice_from_span(self: &mut Self, slice: &mut Page) -> bool {
let span_idx = Self::get_span_idx_from_slice_count(slice.slice_count);
for i in 0..self.spans[span_idx].len() {
if self.spans[span_idx][i] == slice {
self.spans[span_idx].remove(i);
break;
}
}
return true;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The remove_slice_from_span function uses a linear scan to find and remove a slice from the spans vector, which has O(n) complexity. Similarly, remove_page_from_used_queue also performs a linear scan. For a memory allocator, which is often in the critical path, this can become a performance bottleneck. The TODO comment indicates you are aware of this. I recommend prioritizing the replacement of Vec with a more suitable data structure, like an intrusive linked list, to achieve O(1) removal and improve performance.

data: Bytes,
fd: RawFd,
error: Option<std::io::Error>,
use_fixed_buffers: bool,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The FileWriteTask struct has been updated to include a use_fixed_buffers field, but its prepare_sqe implementation doesn't seem to use it. It still uses opcode::Write, which does not leverage pre-registered buffers. To take advantage of the fixed buffer pool for writes, you would need to use opcode::WriteFixed. This appears to be an incomplete implementation of the fixed buffer feature for write operations.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a fixed buffer pool for io_uring to improve I/O performance. The implementation includes a custom memory allocator with an arena, segments, pages, and per-CPU thread caches. While this is a significant feature, the review has identified several critical issues, including a portability problem with sched_getcpu that will cause compilation failures on non-Linux systems, and a compilation error in a test. There are also high-severity concerns regarding memory management that could lead to fragmentation and memory hoarding in long-running services. Additionally, there are medium-severity issues related to performance hotspots in the allocation path and the robustness of internal APIs. These issues should be addressed to ensure the stability, portability, and performance of the new memory allocator.

Comment on lines +87 to +90
fn get_thread_local_cache() -> &'static Mutex<TCache> {
let cpu = unsafe { libc::sched_getcpu() };
&FIXED_BUFFER_POOL.get().unwrap().local_caches[cpu as usize]
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

libc::sched_getcpu() is a GNU extension and is only available on Linux. Using it without platform-specific conditional compilation (#[cfg(target_os = "linux")]) will cause compilation to fail on other platforms like macOS. You should guard its usage. For non-Linux platforms, you could fall back to a different mechanism or disable per-CPU caching. A simple fallback could be to use CPU 0 for all threads, but this would serialize all threads on one cache, negating the benefits of per-CPU caches.

}
// If page is local and unused after free, return it to segment
let thread_id = unsafe { (*segment_ptr).thread_id };
let cur_cpu = unsafe { libc::sched_getcpu() as usize };

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The use of libc::sched_getcpu() here is not portable and will fail to compile on non-Linux systems. This should be guarded with #[cfg(target_os = "linux")] and a fallback should be provided for other operating systems.

assert_eq!(total, LEN);

let mut random_bytes = [0u8; LEN];
let mut rng = rand::rng();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The code rand::rng() is not a valid function call and will cause a compilation error. To get a thread-local random number generator, you should use rand::thread_rng().

Suggested change
let mut rng = rand::rng();
let mut rng = rand::thread_rng();

Comment on lines +157 to +171
// If page is local and unused after free, return it to segment
let thread_id = unsafe { (*segment_ptr).thread_id };
let cur_cpu = unsafe { libc::sched_getcpu() as usize };
if cur_cpu == thread_id {
let should_free_page = unsafe { (*page_ptr).used.load(Ordering::Relaxed) == 0 };
if should_free_page {
let local_cache = Self::get_thread_local_cache();
let mut guard = local_cache.lock().unwrap();
guard.retire_page(page_ptr);
}
} else {
let pool = FIXED_BUFFER_POOL.get().unwrap();
pool.foreign_free.fetch_add(1, Ordering::Relaxed);
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When a buffer is freed from a different thread than it was allocated on (cur_cpu != thread_id), the page is not retired even if it becomes empty. The memory is returned to the page's free list, but the page itself is not returned to the TCache's free spans. This means the page cannot be coalesced or reused for allocations of a different size class, leading to memory fragmentation and hoarding. The foreign_free counter is incremented, but no action is taken to reclaim the page. You should implement a mechanism to handle foreign frees, for example by using a concurrent queue to send the page back to its owner TCache for retirement.

Comment on lines +201 to +212
fn cleanup_pages(self: &mut Self) {
for i in 0..self.used_pages.len() {
for page_idx in 0..self.used_pages[i].len() {
let page = self.used_pages[i][page_idx];
unsafe {
if (*page).used.load(Ordering::Relaxed) == 0 {
self.retire_page(page);
self.used_pages[i].remove(page_idx);
}
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function cleanup_pages is defined to iterate over used_pages and retire pages that have become empty. This seems intended to address the memory hoarding issue with foreign frees. However, this function is never called anywhere in the codebase. Without calling it, pages freed by other threads will never be fully reclaimed by their owner TCache, leading to memory fragmentation. You should consider calling this function periodically, for example during allocation when no free pages are available.

rlim_max: 0,
};
unsafe { libc::getrlimit(libc::RLIMIT_MEMLOCK, &mut rlimit); }
assert!(64 * 1024 <= rlimit.rlim_max, "rlimit.MEMLOCK should be at least 64 MB to test the fixed-buffer pool. Current rlimit is: {} KB", rlimit.rlim_max);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion assert!(64 * 1024 <= rlimit.rlim_max, ...) is incorrect. The message says "at least 64 MB", but 64 * 1024 is 64 KB. The check should be for 64 MB, which is 64 * 1024 * 1024. Also, it's better to check against rlim_cur (the soft limit) as that's what's enforced, not rlim_max.

Suggested change
assert!(64 * 1024 <= rlimit.rlim_max, "rlimit.MEMLOCK should be at least 64 MB to test the fixed-buffer pool. Current rlimit is: {} KB", rlimit.rlim_max);
assert!((64 * 1024 * 1024) as u64 <= rlimit.rlim_cur, "rlimit.MEMLOCK should be at least 64 MB to test the fixed-buffer pool. Current rlimit is: {} KB", rlimit.rlim_cur / 1024);

Comment on lines +96 to +100
pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) {
debug_assert!((self.slices[0].ptr as usize) <= segment as usize);
let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE;
self.used_bitmap[segment_idx] = 0;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The retire_segment function will panic on self.slices[0] if self.slices is empty. This can happen if the arena is created with a capacity smaller than SEGMENT_SIZE. While current call paths might avoid this, it's safer to make this internal function more robust. Adding a check for self.slices.is_empty() at the beginning of the function would prevent this potential panic, especially in debug builds.

    pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) {
        if self.slices.is_empty() {
            return;
        }
        debug_assert!((self.slices[0].ptr as usize) <= segment as usize);
        let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE;
        self.used_bitmap[segment_idx] = 0;
    }

Comment on lines +110 to +119
fn remove_slice_from_span(self: &mut Self, slice: &mut Page) -> bool {
let span_idx = Self::get_span_idx_from_slice_count(slice.slice_count);
for i in 0..self.spans[span_idx].len() {
if self.spans[span_idx][i] == slice {
self.spans[span_idx].remove(i);
break;
}
}
return true;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function iterates through self.spans[span_idx] to find and remove a slice, which is an O(N) operation. For a memory allocator, this can become a performance bottleneck. The TODO comment on line 109 already suggests this. Consider using a data structure that allows for O(1) removal, like an intrusive linked list.

Comment on lines +135 to +146
fn remove_page_from_used_queue(self: &mut Self, page_ptr: *mut Page) {
let size_class = Self::get_size_class(unsafe { (*page_ptr).block_size });
if size_class >= NUM_SIZE_CLASSES {
return
}
for i in 0..self.used_pages[size_class].len() {
if self.used_pages[size_class][i] == page_ptr {
self.used_pages[size_class].remove(i);
return
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function iterates through self.used_pages[size_class] to find and remove a page, which is an O(N) operation. This can be inefficient and impact allocation performance. Consider using a data structure that supports faster removal, such as a hash map or a linked list.

Comment on lines 334 to 340
pub(crate) fn build(data: Bytes, fd: RawFd, use_fixed_buffers: bool) -> FileWriteTask {
FileWriteTask {
data,
fd,
error: None,
use_fixed_buffers,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The FileWriteTask::build function accepts a use_fixed_buffers boolean flag, but it's not used anywhere in FileWriteTask. The prepare_sqe implementation always uses opcode::Write. If the intention is to support fixed buffer writes, it should use opcode::WriteFixed when this flag is true and the data buffer is from the fixed pool. Currently, the flag has no effect.

@proteetpaul
Copy link
Owner Author

/gemini I am observing a data corruption bug when testing the allocator with multiple threads. I don't observe these bugs when testing with a single partition. What could be the possible issue?

@gemini-code-assist
Copy link

Hello @proteetpaul, thanks for reaching out! The data corruption you're observing when testing the allocator with multiple threads, but not with a single partition, strongly suggests a concurrency issue within the fixed buffer pool's memory management. This is a common challenge in high-performance allocators that try to optimize for thread locality.

Based on the code, the most likely source of the data corruption lies in how FixedBufferPool::free handles memory deallocated by a "foreign" thread (a thread different from the one that originally allocated the memory or owns the segment).

Here's a breakdown of the potential issues:

  1. Inconsistent Segment State for Foreign Frees: In FixedBufferPool::free (lines 151-171 in src/common/src/memory/pool.rs):

    • When cur_cpu != thread_id (meaning a foreign thread is freeing memory), the else branch is taken. In this branch, pool.foreign_free is incremented, and (*page_ptr).free(ptr) is called. The Page::free method correctly uses a Mutex for its free_list and an AtomicUsize for used, so the individual page's block list is updated safely.
    • However, the critical problem is that the retire_page logic is skipped for foreign frees. This means that the Segment's allocated counter (in src/common/src/memory/segment.rs) is not decremented, and the Page is not returned to the spans list of the owning TCache (in src/common/src/memory/tcache.rs).
  2. Consequences of Inconsistent State:

    • Memory Leaks/Fragmentation: The TCache that owns the Segment will continue to believe that the page is allocated, even though its blocks have been returned to its free_list. This can lead to the owning TCache unnecessarily requesting new Segments from the Arena because it incorrectly perceives its existing Segments as full. This might not directly cause corruption but can lead to memory exhaustion and poor performance.
    • Use-After-Free / Double-Free (Data Corruption): More critically, if the owning TCache later attempts to allocate from a page it believes is still in use (because its Segment's allocated count is high), but that page has been partially or fully freed by foreign threads, it could hand out a pointer to memory that has already been re-allocated by another thread. This is a classic use-after-free scenario, leading to data corruption. Conversely, if the owning TCache eventually tries to retire_page a page that was already effectively freed by a foreign thread, it could lead to double-free issues or inconsistencies during coalescing if the Segment's internal state (e.g., slice_count, slice_offset of adjacent pages) is not what it expects.
    • Race Conditions on Segment Metadata: While TCache operations are protected by a Mutex, the Segment itself is modified directly via raw pointers in retire_page and find_page_from_spans. If the Segment's allocated count or the pages array metadata (like slice_count or slice_offset) becomes inconsistent due to foreign frees not triggering the full retire_page logic, subsequent operations by the owning TCache could encounter corrupted metadata, leading to incorrect memory management decisions and data corruption.

Recommendation for Investigation and Fix:

The core problem is that the Segment's thread_id implies ownership, but FixedBufferPool::free doesn't fully respect this ownership for foreign frees. To resolve this, you need a mechanism to ensure that pages freed by foreign threads are properly processed by the TCache that owns their respective Segments.

Consider these approaches:

  1. Message Passing to Owning TCache: When a foreign thread frees a block, instead of just incrementing foreign_free, it should enqueue the page_ptr (or the block itself) into a thread-safe queue associated with the owning TCache (identified by segment.thread_id). The owning TCache would then periodically process this queue and perform the retire_page logic for these

@proteetpaul
Copy link
Owner Author

  • Use-After-Free / Double-Free (Data Corruption): More critically, if the owning TCache later attempts to allocate from a page it believes is still in use (because its Segment's allocated count is high), but that page has been partially or fully freed by foreign threads, it could hand out a pointer to memory that has already been re-allocated by another thread. This is a classic use-after-free scenario, leading to data corruption. Conversely, if the owning TCache eventually tries to retire_page a page that was already effectively freed by a foreign thread, it could lead to double-free issues or inconsistencies during coalescing if the Segment's internal state (e.g., slice_count, slice_offset of adjacent pages) is not what it expects.

/gemini A foreign thread can only add pages to the free list of the owning page cache. How can it lead to a use-after-free error? Also, frees from another thread cannot retire pages. I don't understand how this could lead to double -free issues

@gemini-code-assist
Copy link

Hello @proteetpaul, thanks for the follow-up questions. Let me clarify how the observed behavior can lead to use-after-free and double-free issues, even with foreign frees adding blocks to the owning page's free list.

Your understanding that a foreign thread adds blocks to the free_list of the owning Page is correct. This is handled by (*page_ptr).free(ptr) on line 155 of src/common/src/memory/pool.rs. This operation correctly decrements page.used and makes the individual block available for re-allocation by that specific Page.

However, the core of the problem lies in the discrepancy between the Page's internal state and the TCache's higher-level tracking of page and segment availability when a foreign thread performs the free operation.

How can it lead to a use-after-free error?

As noted in my previous comment, the if cur_cpu == thread_id block (lines 160-166 in src/common/src/memory/pool.rs) is skipped for foreign frees. This means two critical updates are missed:

  1. segment.allocated is not decremented: The Segment (which owns the Page) has an allocated counter (line 12 in src/common/src/memory/segment.rs) that tracks how many pages within it are currently considered

proteet and others added 6 commits January 3, 2026 00:47
This commit fixes the following bugs:
- Sets file offset correctly in read task for fixed buffers
- Sets correct block size in tcache
- Clears free list of page before setting block size
This commit implements a lockfree queue to hold frees by different
threads. Freed blocks are cleaned up later, either through periodic
cleanup operations or when under memory pressure.
@proteetpaul
Copy link
Owner Author

/gemini the buffer pool implementation that is part of this PR is very inefficient in terms of memory usage. Please understand the code and find out possible design flaws or bugs which could cause hoarding of memory in thread-local caches

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature: a fixed buffer pool for memory management, integrated with io_uring for efficient I/O. The implementation includes a custom memory allocator with thread-local caches, which is a complex piece of engineering. While the overall direction is promising for performance, the introduction of substantial unsafe code and complex memory management logic requires careful scrutiny. I've identified several critical and high-severity issues, including a memory leak, blocking I/O calls in async contexts, and potential panics due to unhandled errors and incorrect assumptions. These should be addressed to ensure the stability and correctness of the new memory pool.

Comment on lines +336 to +345
if direct_io {
padding = (4096 - (data.len() & 4095)) & 4095;
let layout = Layout::from_size_align(data.len() + padding, 4096).expect("Failed to create layout");
assert!((data.len() + padding) % 4096 == 0);
unsafe {
let new_ptr = alloc(layout);
std::ptr::copy_nonoverlapping(ptr, new_ptr, data.len());
ptr = new_ptr;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

When direct_io is true, a new buffer is allocated using std::alloc::alloc to ensure alignment. However, this allocated memory is never deallocated, leading to a memory leak every time a direct I/O write is performed. You should implement the Drop trait for FileWriteTask to deallocate this buffer if it was allocated. You'll also need to store the Layout and a flag indicating if the buffer was allocated by this task.

}

pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) {
debug_assert!((self.slices[0].ptr as usize) <= segment as usize);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

self.slices can be empty if the arena is created with a capacity smaller than SEGMENT_SIZE. Accessing self.slices[0] on this line would cause a panic in that case. It's better to handle this case gracefully, for example by adding a check for self.slices.is_empty() before this line.

assert_eq!(total, LEN);

let mut random_bytes = [0u8; LEN];
let mut rng = rand::rng();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

rand::rng() is not a valid function in the rand crate. You probably meant to use rand::thread_rng() to get a thread-local random number generator.

Suggested change
let mut rng = rand::rng();
let mut rng = rand::thread_rng();

Comment on lines +423 to +427
let file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_DIRECT)
.open(path)
.expect("failed to create file");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The file is opened here using a blocking OpenOptions::open() call within an async function. This will block the worker thread of the async runtime, which can lead to performance degradation and even deadlocks if all worker threads are blocked. You should wrap this blocking operation in tokio::task::spawn_blocking. The same issue exists in the write function below.

}
let path = self.io_context.liquid_path(entry_id);
let bytes = self.io_context.read(path, None).await.ok()?;
let bytes = self.io_context.read(path, None).await.unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using .unwrap() here will cause a panic if there's an I/O error when reading from the cache file. The previous implementation with .ok()? would gracefully return None. Panicking on I/O errors can make the system less robust. It would be better to handle the error, for example by logging it and returning None.

Suggested change
let bytes = self.io_context.read(path, None).await.unwrap();
let bytes = self.io_context.read(path, None).await.ok()?;

None => {
let path = self.io_context.liquid_path(entry_id);
let bytes = self.io_context.read(path, None).await.ok()?;
let bytes = self.io_context.read(path, None).await.unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using .unwrap() here will cause a panic if there's an I/O error when reading from the cache file. The previous implementation with .ok()? would gracefully return None. Panicking on I/O errors can make the system less robust. It would be better to handle the error, for example by logging it and returning None.

Suggested change
let bytes = self.io_context.read(path, None).await.unwrap();
let bytes = self.io_context.read(path, None).await.ok()?;

CachedData::DiskArrow(_) => {
let path = self.io_context.arrow_path(entry_id);
let bytes = self.io_context.read(path, None).await.ok()?;
let bytes = self.io_context.read(path, None).await.unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using .unwrap() here will cause a panic if there's an I/O error when reading from the cache file. The previous implementation with .ok()? would gracefully return None. Panicking on I/O errors can make the system less robust. It would be better to handle the error, for example by logging it and returning None.

Suggested change
let bytes = self.io_context.read(path, None).await.unwrap();
let bytes = self.io_context.read(path, None).await.ok()?;

}

fn get_thread_local_cache() -> &'static Mutex<TCache> {
let cpu = unsafe { libc::sched_getcpu() };

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

libc::sched_getcpu() is a Linux-specific function. Using it here will cause compilation to fail on other platforms like macOS or Windows. Since this code is in the common crate, it should ideally be portable. Consider using a crate like core_affinity which provides a cross-platform way to get CPU IDs, or guard this code with #[cfg(target_os = "linux")] and provide a fallback for other platforms (e.g., using a single cache or a thread ID-based approach).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant