diff --git a/src/ailego/CMakeLists.txt b/src/ailego/CMakeLists.txt index 29cf22cd1..1e524a1b2 100644 --- a/src/ailego/CMakeLists.txt +++ b/src/ailego/CMakeLists.txt @@ -16,6 +16,10 @@ set(EXTRA_LIBS ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS}) if(UNIX AND NOT APPLE) list(APPEND EXTRA_LIBS ${LIB_RT}) + find_library(LIB_AIO NAMES aio) + if(LIB_AIO) + list(APPEND EXTRA_LIBS ${LIB_AIO}) + endif() endif() if(NOT ANDROID AND AUTO_DETECT_ARCH) @@ -123,3 +127,8 @@ cc_library( LIBS ${EXTRA_LIBS} VERSION "${GIT_SRCS_VER}" ) + +if(LIB_AIO) + target_compile_definitions(zvec_ailego PUBLIC ZVEC_HAS_LIBAIO) + message(STATUS "Found libaio: ${LIB_AIO} (HNSW async prefetch enabled)") +endif() diff --git a/src/ailego/buffer/block_eviction_queue.cc b/src/ailego/buffer/block_eviction_queue.cc index eff930127..cad117b92 100644 --- a/src/ailego/buffer/block_eviction_queue.cc +++ b/src/ailego/buffer/block_eviction_queue.cc @@ -78,9 +78,30 @@ bool BlockEvictionQueue::add_single_block(const BlockType &block, return true; } +MemoryLimitPool::~MemoryLimitPool() { + drain_free_list(); +} + +void MemoryLimitPool::drain_free_list() { + std::lock_guard lock(free_list_mutex_); + size_t drained = 0; + while (free_list_head_) { + char *buf = free_list_head_; + free_list_head_ = *reinterpret_cast(buf); + ailego_free(buf); + ++drained; + } + free_list_count_ = 0; + if (drained > 0) { + LOG_INFO("MemoryLimitPool: drained %zu cached buffers from free list", + drained); + } +} + int MemoryLimitPool::init(size_t pool_size) { pool_size_ = 0; BlockEvictionQueue::get_instance().recycle(); + drain_free_list(); pool_size_ = pool_size; LOG_INFO("MemoryLimitPool initialized with pool size: %lu", pool_size_); return 0; @@ -96,6 +117,15 @@ bool MemoryLimitPool::try_acquire_buffer(const size_t buffer_size, } desired = expected + buffer_size; } while (!used_size_.compare_exchange_weak(expected, desired)); + { + std::lock_guard lock(free_list_mutex_); + if (free_list_head_) { + buffer = free_list_head_; + free_list_head_ = *reinterpret_cast(buffer); + --free_list_count_; + return true; + } + } buffer = (char *)ailego_aligned_malloc(buffer_size, 4096); if (!buffer) { used_size_.fetch_sub(buffer_size); @@ -119,7 +149,10 @@ void MemoryLimitPool::release_buffer(char *buffer, const size_t buffer_size) { desired = expected - buffer_size; assert(expected >= buffer_size); } while (!used_size_.compare_exchange_weak(expected, desired)); - ailego_free(buffer); + std::lock_guard lock(free_list_mutex_); + *reinterpret_cast(buffer) = free_list_head_; + free_list_head_ = buffer; + ++free_list_count_; } void MemoryLimitPool::release_external(const size_t buffer_size) { diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 3318db1ca..316a0297c 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -13,11 +13,13 @@ // limitations under the License. #include +#include #include #include #include #include #include +#include #include #if defined(_MSC_VER) @@ -143,17 +145,19 @@ bool VectorPageTable::extend(size_t new_entry_num) { char *VectorPageTable::acquire_block(block_id_t block_id) { assert(block_id < entry_num_.load(std::memory_order_relaxed)); Entry &e = entry_at(block_id); - while (true) { - int current_count = e.ref_count.load(std::memory_order_acquire); - if (current_count < 0) { - return nullptr; - } - if (e.ref_count.compare_exchange_weak(current_count, current_count + 1, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - return e.buffer; + int old = e.ref_count.fetch_add(1, std::memory_order_acq_rel); + if (ailego_likely(old >= 0)) { + return e.buffer; + } + int cur = old + 1; + while (cur < 0 && cur != std::numeric_limits::min()) { + if (e.ref_count.compare_exchange_weak(cur, cur - 1, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + break; } } + return nullptr; } void VectorPageTable::release_block(block_id_t block_id) { @@ -170,7 +174,8 @@ void VectorPageTable::release_block(block_id_t block_id) { block.owner = this; block.owner_key = block_id; block.version = 0; - BlockEvictionQueue::get_instance().add_single_block(block, 0); + BlockEvictionQueue::get_instance().add_single_block( + block, static_cast(e.evict_priority)); } } } @@ -179,12 +184,7 @@ void VectorPageTable::evict_block(block_id_t block_id) { assert(block_id < entry_num_.load(std::memory_order_relaxed)); Entry &e = entry_at(block_id); int expected = 0; - // Two-phase eviction to prevent data race on e.buffer with - // set_block_acquired. We first CAS to kEvicting (-1), which causes - // set_block_acquired to spin-wait; then do the actual work (flush, free, - // null buffer); finally store INT_MIN ("evicted") which unblocks - // set_block_acquired. - static constexpr int kEvicting = -1; + static constexpr int kEvicting = std::numeric_limits::min() / 2; if (e.ref_count.compare_exchange_strong(expected, kEvicting)) { char *buffer = e.buffer; if (buffer && e.is_dirty.load(std::memory_order_relaxed) && @@ -196,8 +196,6 @@ void VectorPageTable::evict_block(block_id_t block_id) { e.buffer = nullptr; MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); } - // Transition to fully-evicted state. Use release so that the - // set_block_acquired acquire-load sees e.buffer == nullptr. e.ref_count.store(std::numeric_limits::min(), std::memory_order_release); } @@ -208,11 +206,6 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, size_t file_offset) { assert(block_id < entry_num_.load(std::memory_order_acquire)); Entry &e = entry_at(block_id); - // Diagnostics for the kEvicting wait. The wait itself never gives up: - // the only thread that can transition kEvicting -> INT_MIN is the - // evict_block() owner, so abandoning the spin here would orphan the - // entry in kEvicting forever. Instead, we use bounded backoff and emit - // tiered logs so a stuck eviction is observable. using clock = std::chrono::steady_clock; const auto wait_start = clock::now(); auto last_log = wait_start; @@ -228,19 +221,16 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, return e.buffer; } } else if (current_count == std::numeric_limits::min()) { - // Fully evicted — safe to claim this entry for our new buffer. e.buffer = buffer; e.file_offset = file_offset; e.in_evict_queue.store(false, std::memory_order_relaxed); e.is_dirty.store(false, std::memory_order_relaxed); + e.ever_loaded = true; e.ref_count.store(1, std::memory_order_release); return e.buffer; } else { - // kEvicting (-1): eviction is in progress on this entry. - // Tiered backoff: hot spin first, then short sleep, then longer sleep. ++spin_count; if (spin_count < 64) { - // Pure busy wait for the common ~μs case. } else if (spin_count < 1024) { std::this_thread::yield(); } else if (spin_count < 8192) { @@ -248,7 +238,6 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - // Tiered diagnostics: warn once after 100ms, error every 1s after 1s. const auto now = clock::now(); const auto elapsed = now - wait_start; if (!warned && elapsed >= std::chrono::milliseconds(100)) { @@ -272,31 +261,76 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, } } -VecBufferPool::VecBufferPool(const std::string &filename, bool writable) { +VecBufferPool::VecBufferPool(const std::string &filename, bool writable, + bool enable_direct_io) { file_name_ = filename; writable_ = writable; #if defined(_MSC_VER) int flags = writable_ ? (O_RDWR | _O_BINARY) : (O_RDONLY | _O_BINARY); fd_ = _open(filename.c_str(), flags, 0644); + meta_fd_ = _open(filename.c_str(), flags, 0644); + (void)enable_direct_io; // O_DIRECT not supported on this path +#else + int base_flags = writable_ ? O_RDWR : O_RDONLY; + // Metadata channel: always buffered IO. Serves the unaligned + // header/footer/segment_meta reads & writes and benefits from page cache. + meta_fd_ = ::open(filename.c_str(), base_flags, 0644); + // Page-data channel: optionally O_DIRECT; fall back to buffered open when + // the filesystem (tmpfs/overlayfs/...) rejects O_DIRECT. + int data_flags = base_flags; +#ifdef O_DIRECT + if (enable_direct_io) { + data_flags |= O_DIRECT; + } +#endif + fd_ = ::open(filename.c_str(), data_flags, 0644); +#ifdef O_DIRECT + if (fd_ < 0 && (data_flags & O_DIRECT)) { + LOG_WARN( + "VecBufferPool: open with O_DIRECT failed for file[%s] (errno=%d), " + "falling back to buffered IO", + filename.c_str(), errno); + fd_ = ::open(filename.c_str(), base_flags, 0644); + direct_io_enabled_ = false; + } else { + direct_io_enabled_ = (data_flags & O_DIRECT) != 0; + } +#else + (void)enable_direct_io; +#endif +#endif + if (fd_ < 0 || meta_fd_ < 0) { + if (fd_ >= 0) { +#if defined(_MSC_VER) + _close(fd_); +#else + ::close(fd_); +#endif + } + if (meta_fd_ >= 0) { +#if defined(_MSC_VER) + _close(meta_fd_); #else - int flags = writable_ ? O_RDWR : O_RDONLY; - fd_ = ::open(filename.c_str(), flags, 0644); + ::close(meta_fd_); #endif - if (fd_ < 0) { + } throw std::runtime_error("Failed to open file: " + filename); } #if defined(_MSC_VER) struct _stat64 st; if (_fstat64(fd_, &st) < 0) { _close(fd_); + _close(meta_fd_); #else struct stat st; if (fstat(fd_, &st) < 0) { ::close(fd_); + ::close(meta_fd_); #endif throw std::runtime_error("Failed to stat file: " + filename); } file_size_ = st.st_size; + initial_file_size_ = file_size_; // snapshot for skip-pread optimisation } int VecBufferPool::init() { @@ -375,24 +409,41 @@ char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { } size_t page_offset = page_id * kVectorPageSize; - size_t expected_bytes = std::min(kVectorPageSize, file_size_ - page_offset); - if (expected_bytes < kVectorPageSize) { - std::memset(buffer + expected_bytes, 0, kVectorPageSize - expected_bytes); - } - ssize_t read_bytes = zvec_pread(fd_, buffer, expected_bytes, page_offset); - if (read_bytes != static_cast(expected_bytes)) { - LOG_ERROR( - "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " - "offset[%zu], expected[%zu], got[%zd]", - file_name_.c_str(), page_id, page_offset, expected_bytes, read_bytes); - MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); - return nullptr; + // Skip pread for pages created by extend_file (beyond the original file + // size at open time) that have never been loaded before. Their on-disk + // content is guaranteed to be zeros (ftruncate). After eviction the + // ever_loaded flag stays true so reloads correctly pread the flushed data. + if (writable_ && page_offset >= initial_file_size_ && + !page_table_.is_ever_loaded(page_id)) { + std::memset(buffer, 0, kVectorPageSize); + } else { + // O_DIRECT requires the IO length to be a multiple of the device block + // size. The backing file size is always page-aligned (IndexMapping + + // append_segment guarantee this), so reading a full page never reads past + // EOF; the tail padding is the file's own zero region. In direct mode we + // MUST read the whole page; the buffered path keeps the legacy short-read + // + zero-pad behaviour. + size_t read_len = direct_io_enabled_ + ? kVectorPageSize + : std::min(kVectorPageSize, file_size_ - page_offset); + if (read_len < kVectorPageSize) { + std::memset(buffer + read_len, 0, kVectorPageSize - read_len); + } + ssize_t read_bytes = zvec_pread(fd_, buffer, read_len, page_offset); + if (read_bytes != static_cast(read_len)) { + LOG_ERROR( + "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " + "offset[%zu], expected[%zu], got[%zd]", + file_name_.c_str(), page_id, page_offset, read_len, read_bytes); + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); + return nullptr; + } } return page_table_.set_block_acquired(page_id, buffer, page_offset); } int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { - ssize_t read_bytes = zvec_pread(fd_, buffer, length, offset); + ssize_t read_bytes = zvec_pread(meta_fd_, buffer, length, offset); if (read_bytes != static_cast(length)) { LOG_ERROR( "Buffer pool failed to read file at offset: file[%s], offset[%zu], " @@ -446,7 +497,7 @@ int VecBufferPool::write_meta(size_t offset, size_t length, file_name_.c_str()); return -1; } - ssize_t w = zvec_pwrite(fd_, buffer, length, offset); + ssize_t w = zvec_pwrite(meta_fd_, buffer, length, offset); if (w != static_cast(length)) { LOG_ERROR( "Buffer pool failed to write meta: file[%s], offset[%zu], " @@ -461,23 +512,72 @@ int VecBufferPool::flush_all() { if (!writable_) { return 0; } + const size_t total = page_table_.entry_num(); + if (total == 0) { + return 0; + } + + static constexpr size_t kBatchPages = 256; + const size_t kBatchSize = kBatchPages * kVectorPageSize; + char *batch_buf = + static_cast(ailego_aligned_malloc(kBatchSize, 4096)); + int rc = 0; size_t total_dirty = 0; size_t fail_count = 0; - for (size_t i = 0; i < page_table_.entry_num(); ++i) { - if (page_table_.is_block_dirty(i)) { - ++total_dirty; - int r = page_table_.flush_block(i); - if (r != 0) { - rc = r; - ++fail_count; + size_t i = 0; + + while (i < total) { + if (!page_table_.is_block_dirty(i)) { + ++i; + continue; + } + + const size_t run_start = i; + size_t run_count = 0; + const size_t limit = batch_buf ? kBatchPages : 1; + while (i < total && run_count < limit && page_table_.is_block_dirty(i)) { + char *buf = page_table_.get_block_buffer(i); + if (!buf) break; + if (batch_buf) { + std::memcpy(batch_buf + run_count * kVectorPageSize, buf, + kVectorPageSize); } + ++run_count; + ++i; } + if (run_count == 0) { + ++i; + continue; + } + total_dirty += run_count; + + bool ok = false; + if (batch_buf && run_count > 0) { + const size_t write_size = run_count * kVectorPageSize; + ssize_t w = zvec_pwrite(fd_, batch_buf, write_size, + run_start * kVectorPageSize); + ok = (w == static_cast(write_size)); + } + if (ok) { + for (size_t j = run_start; j < run_start + run_count; ++j) { + page_table_.clear_dirty(j); + } + } else { + for (size_t j = run_start; j < run_start + run_count; ++j) { + int r = page_table_.flush_block(j); + if (r != 0) { + rc = r; + ++fail_count; + } + } + } + } + + if (batch_buf) { + ailego_free(batch_buf); } if (fail_count != 0) { - // Aggregated diagnostic so that callers (notably ~VecBufferPool, which - // discards the return value) cannot silently lose dirty pages: any - // unflushed page at this point means the on-disk image is now stale. LOG_ERROR( "VecBufferPool::flush_all: %zu/%zu dirty page(s) failed to flush, " "file[%s] last_rc=%d -- on-disk data may be stale.", @@ -495,6 +595,10 @@ bool VecBufferPool::extend_file(size_t new_size) { if (new_size <= file_size_) { return true; } + // The backing file must stay page-aligned so that O_DIRECT full-page reads + // never read past EOF. All current callers pass page-aligned targets. + assert(new_size % kVectorPageSize == 0 && + "extend_file target must be page-aligned for O_DIRECT correctness"); // Pre-validate against the page table's static capacity BEFORE mutating // any on-disk state. Otherwise a successful ftruncate followed by a // failed page_table_.extend() would leave the file size and the page @@ -617,5 +721,53 @@ void VecBufferPoolHandle::acquire_one(block_id_t block_id) { pool_.page_table_.acquire_block(block_id); } +void VecBufferPool::warmup() { + const size_t total_pages = page_table_.entry_num(); + // Read in large sequential chunks to minimize syscall overhead. + // Each chunk = 1024 pages = 4MB (maximize sequential I/O throughput). + static constexpr size_t kChunkPages = 1024; + const size_t kChunkSize = kChunkPages * kVectorPageSize; + + // Aligned buffer for bulk read (O_DIRECT requires alignment). + char *chunk_buf = static_cast(aligned_alloc(4096, kChunkSize)); + if (!chunk_buf) return; + + size_t loaded = 0; + bool pool_full = false; + for (size_t base = 0; base < total_pages && !pool_full; base += kChunkPages) { + const size_t pages_in_chunk = std::min(kChunkPages, total_pages - base); + const size_t read_bytes = pages_in_chunk * kVectorPageSize; + const size_t file_offset = base * kVectorPageSize; + + // One large sequential pread instead of N individual ones. + ssize_t got = zvec_pread(fd_, chunk_buf, read_bytes, file_offset); + if (got != static_cast(read_bytes)) break; + + // Distribute chunk data into individual page buffers. + for (size_t j = 0; j < pages_in_chunk; ++j) { + auto page_id = static_cast(base + j); + // Skip if already loaded. + char *existing = page_table_.acquire_block(page_id); + if (existing) { + page_table_.release_block(page_id); + ++loaded; + continue; + } + // Allocate page buffer from pool (no retry - stop if full). + char *buf = nullptr; + bool found = MemoryLimitPool::get_instance().try_acquire_buffer( + kVectorPageSize, buf); + if (!found) { pool_full = true; break; } + std::memcpy(buf, chunk_buf + j * kVectorPageSize, kVectorPageSize); + page_table_.set_block_acquired(page_id, buf, file_offset + j * kVectorPageSize); + page_table_.release_block(page_id); + ++loaded; + } + } + free(chunk_buf); + LOG_DEBUG("VecBufferPool::warmup: preloaded %zu/%zu pages for file[%s]", + loaded, total_pages, file_name_.c_str()); +} + } // namespace ailego } // namespace zvec diff --git a/src/core/algorithm/hnsw/hnsw_algorithm.cc b/src/core/algorithm/hnsw/hnsw_algorithm.cc index 8c6fcfe17..251cdfa46 100644 --- a/src/core/algorithm/hnsw/hnsw_algorithm.cc +++ b/src/core/algorithm/hnsw/hnsw_algorithm.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. #include "hnsw_algorithm.h" +#include #include namespace zvec { @@ -81,8 +82,12 @@ int HnswAlgorithm::search(HnswContext *ctx) const { } dist_t dist = ctx->dist_calculator().dist(entry_point); + const auto &upper_entity = + static_cast(ctx->get_entity()); + upper_entity.reset_io_budget(INT32_MAX); for (level_t cur_level = maxLevel; cur_level >= 1; --cur_level) { select_entry_point(cur_level, &entry_point, &dist, ctx); + upper_entity.release_vectors(); } auto &topk_heap = ctx->topk_heap(); @@ -103,6 +108,11 @@ void HnswAlgorithm::select_entry_point(level_t level, HnswContext *ctx) const { const auto &entity = static_cast(ctx->get_entity()); HnswDistCalculator &dc = ctx->dist_calculator(); + uint32_t buf_cap = entity.max_degree(level); + std::vector neighbor_ids(buf_cap); + std::vector neighbor_vecs(buf_cap); + std::vector dists(buf_cap); + while (true) { const auto neighbors = entity.get_neighbors_typed(level, *entry_point); if (ailego_unlikely(ctx->debugging())) { @@ -113,31 +123,35 @@ void HnswAlgorithm::select_entry_point(level_t level, break; } - std::vector neighbor_vec_blocks; - int ret = entity.get_vector_typed(&neighbors[0], size, neighbor_vec_blocks); - if (ailego_unlikely(ctx->debugging())) { - (*ctx->mutable_stats_get_vector())++; + if (size > buf_cap) { + buf_cap = size; + neighbor_ids.resize(buf_cap); + neighbor_vecs.resize(buf_cap); + dists.resize(buf_cap); } - if (ailego_unlikely(ret != 0)) { - break; + for (uint32_t i = 0; i < size; ++i) { + neighbor_ids[i] = neighbors[i]; } - bool find_closer = false; - - std::vector dists(size); - std::vector neighbor_vecs(size); - for (uint32_t i = 0; i < size; ++i) { - neighbor_vecs[i] = neighbor_vec_blocks[i].data(); + if (ailego_unlikely(entity.resolve_vectors(neighbor_ids.data(), size, + neighbor_vecs.data()) != 0)) { + break; + } + if (ailego_unlikely(ctx->debugging())) { + (*ctx->mutable_stats_get_vector())++; } dc.batch_dist(neighbor_vecs.data(), size, dists.data()); - for (uint32_t i = 0; i < size; ++i) { - dist_t cur_dist = dists[i]; + // Release per-hop pages to prevent PinnedPageSet overflow. + // Upper-level pages have high eviction priority, so re-acquire is cheap. + entity.release_vectors(); - if (cur_dist < *dist) { - *entry_point = neighbors[i]; - *dist = cur_dist; + bool find_closer = false; + for (uint32_t i = 0; i < size; ++i) { + if (neighbor_vecs[i] && dists[i] < *dist) { + *entry_point = neighbor_ids[i]; + *dist = dists[i]; find_closer = true; } } @@ -176,7 +190,10 @@ void HnswAlgorithm::add_neighbors(node_id_t id, level_t level, // // Two specialized inner loops, dispatched from search_neighbors(): // -// fast_search_neighbors: mmap/contiguous with direct vector pointers. +// fast_search_neighbors: level-0 unfiltered search for all storage +// modes (mmap/BufferPool/contiguous). Vector +// resolution delegated to entity via +// resolve_vectors()/release_vectors(). // Uses BlockHeap (AVX2) or LinearPool (scalar) // for visited tracking and top-k maintenance. // dual_heap_search_neighbors: CandidateHeap + TopkHeap + VisitFilter. @@ -184,20 +201,19 @@ void HnswAlgorithm::add_neighbors(node_id_t id, level_t level, // search, upper levels, and BufferPool fallback. // ============================================================================ -// mmap/contiguous variant: resolve vectors via get_vector_ptr and use -// LinearPool or BlockHeap for visited tracking + top-k maintenance. -// HeapType must expose reset/set_visited/check_visited/push_block/has_next/pop. template void fast_search_neighbors(const EntityType &entity, HeapType &pool, VisitFilter &visit, HnswDistCalculator &dc, uint32_t topk, uint32_t ef, node_id_t entry_point, dist_t entry_dist, uint32_t prefetch_lines, uint32_t prefetch_offset) { - const uint32_t max_deg = entity.max_degree(0); // level 0 only + const uint32_t max_deg = entity.max_degree(0); const uint32_t cap = std::max(topk, ef); pool.reset(static_cast(cap), static_cast(max_deg)); visit.clear(); + entity.reset_io_budget(static_cast(ef / 2)); + visit.set_visited(entry_point); pool.push_block(&entry_dist, &entry_point, 1); @@ -219,41 +235,53 @@ void fast_search_neighbors(const EntityType &entity, HeapType &pool, neighbor_vecs.resize(buf_capacity); } - const uint32_t po = - std::min(static_cast(neighbors.size()), prefetch_offset); uint32_t unvisited_count = 0; - uint32_t i = 0; - - // Phase 1: scan first `po` neighbors with prefetch. - for (; i < po; ++i) { + for (uint32_t i = 0; i < neighbors.size(); ++i) { node_id_t node = neighbors[i]; if (visit.visited(node)) continue; visit.set_visited(node); - const void *vec_ptr = entity.get_vector_ptr(node); - const char *p = reinterpret_cast(vec_ptr); - for (uint32_t cl = 0; cl < prefetch_lines; ++cl) { - ailego_prefetch(p + cl * 64); - } - neighbor_ids[unvisited_count] = node; - neighbor_vecs[unvisited_count] = vec_ptr; - unvisited_count++; + neighbor_ids[unvisited_count++] = node; } - // Phase 2: scan remaining neighbors. - for (; i < neighbors.size(); ++i) { - node_id_t node = neighbors[i]; - if (visit.visited(node)) continue; - visit.set_visited(node); - neighbor_ids[unvisited_count] = node; - neighbor_vecs[unvisited_count] = entity.get_vector_ptr(node); - unvisited_count++; + if (unvisited_count == 0) continue; + + if (ailego_unlikely(entity.resolve_vectors(neighbor_ids.data(), + unvisited_count, + neighbor_vecs.data()) != 0)) + break; + + // Partition: move resolved vectors (non-null, cache hit) to front. + // Unresolved ones (cache miss) go to the back with FLT_MAX distance. + uint32_t resolved = 0; + for (uint32_t i = 0; i < unvisited_count; ++i) { + if (neighbor_vecs[i]) { + if (i != resolved) { + std::swap(neighbor_vecs[i], neighbor_vecs[resolved]); + std::swap(neighbor_ids[i], neighbor_ids[resolved]); + } + ++resolved; + } } - if (unvisited_count == 0) continue; - dc.batch_dist(neighbor_vecs.data(), unvisited_count, dists.data()); + if (resolved > 0) { + const uint32_t po = std::min(prefetch_offset, resolved); + for (uint32_t i = 0; i < po; ++i) { + const char *p = static_cast(neighbor_vecs[i]); + for (uint32_t cl = 0; cl < prefetch_lines; ++cl) { + ailego_prefetch(p + cl * 64); + } + } + dc.batch_dist(neighbor_vecs.data(), resolved, dists.data()); + } + // Unresolved vectors get FLT_MAX - they won't enter the candidate pool. + for (uint32_t i = resolved; i < unvisited_count; ++i) { + dists[i] = FLT_MAX; + } pool.push_block(dists.data(), neighbor_ids.data(), static_cast(unvisited_count)); + + entity.release_vectors(); } } @@ -379,9 +407,7 @@ void dual_heap_search_neighbors(const EntityType &entity, level_t level, // search_neighbors: Dispatch to fast or dual-heap path. // // - add_node / filtered / upper levels → dual_heap_search_neighbors -// - level-0 unfiltered search: -// MmapMemoryBlock → fast_search_neighbors (BlockHeap/LinearPool) -// BufferPool → dual_heap_search_neighbors (fallback) +// - level-0 unfiltered search → fast_search_neighbors // ============================================================================ template void HnswAlgorithm::search_neighbors(level_t level, @@ -393,7 +419,6 @@ void HnswAlgorithm::search_neighbors(level_t level, HnswDistCalculator &dc = ctx->dist_calculator(); if (!use_pool || ctx->filter().is_valid() || level != 0) { - // Dual-heap path: add_node, filtered search, or upper-level scan. auto run_with_filter = [&](auto &&filter) { dual_heap_search_neighbors( entity, level, entry_point, dist, topk, ctx, dc, @@ -410,36 +435,24 @@ void HnswAlgorithm::search_neighbors(level_t level, run_with_filter(filter); } } else { - // Pool-based path for level-0 unfiltered search. - if constexpr (std::is_same_v) { - const uint32_t prefetch_lines = - ctx->pl() > 0 ? ctx->pl() : (entity.vector_size() + 63) / 64; - - // Fast path: direct pointer access via get_vector_ptr. - // BlockHeap (AVX2) or LinearPool (scalar) for top-k tracking. - const uint32_t topk_v = static_cast(ctx->topk()); - const uint32_t ef_v = ctx->ef(); - const bool avx2_ok = - zvec::ailego::internal::CpuFeatures::static_flags_.AVX2; - - auto &visit = ctx->visit_filter(); - - if (avx2_ok) { - auto &bpool = ctx->block_pool(); - fast_search_neighbors(entity, bpool, visit, dc, topk_v, ef_v, - *entry_point, *dist, prefetch_lines, ctx->po()); - copy_pool_to_topk(bpool, topk); - } else { - auto &lpool = ctx->pool(); - fast_search_neighbors(entity, lpool, visit, dc, topk_v, ef_v, - *entry_point, *dist, prefetch_lines, ctx->po()); - copy_pool_to_topk(lpool, topk); - } + const uint32_t prefetch_lines = + ctx->pl() > 0 ? ctx->pl() : (entity.vector_size() + 63) / 64; + const uint32_t topk_v = static_cast(ctx->topk()); + const uint32_t ef_v = ctx->ef(); + const bool avx2_ok = + zvec::ailego::internal::CpuFeatures::static_flags_.AVX2; + auto &visit = ctx->visit_filter(); + + if (avx2_ok) { + auto &bpool = ctx->block_pool(); + fast_search_neighbors(entity, bpool, visit, dc, topk_v, ef_v, + *entry_point, *dist, prefetch_lines, ctx->po()); + copy_pool_to_topk(bpool, topk); } else { - // BufferPool entities: fallback to dual-heap path. - auto filter = [](node_id_t) { return false; }; - dual_heap_search_neighbors( - entity, level, entry_point, dist, topk, ctx, dc, filter); + auto &lpool = ctx->pool(); + fast_search_neighbors(entity, lpool, visit, dc, topk_v, ef_v, + *entry_point, *dist, prefetch_lines, ctx->po()); + copy_pool_to_topk(lpool, topk); } } } diff --git a/src/core/algorithm/hnsw/hnsw_streamer.cc b/src/core/algorithm/hnsw/hnsw_streamer.cc index 64ff5a747..73ef6d396 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer.cc +++ b/src/core/algorithm/hnsw/hnsw_streamer.cc @@ -269,6 +269,11 @@ int HnswStreamer::open(IndexStorage::Pointer stg) { if (ret != 0) { return ret; } + + if (entity_->storage_mode() == HnswStorageMode::kBufferPool) { + static_cast(entity_.get()) + ->mark_upper_level_pages(); + } IndexMeta index_meta; ret = entity_->get_index_meta(&index_meta); if (ret == IndexError_NoExist) { diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc index 50f15c3ff..2e8099b37 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc @@ -811,6 +811,40 @@ const HnswEntity::Pointer HnswMmapStreamerEntity::clone() const { return HnswEntity::Pointer(entity); } +const HnswEntity::Pointer HnswBufferPoolStreamerEntity::clone() const { + std::vector node_chunks; + node_chunks.reserve(node_chunks_.size()); + for (size_t i = 0UL; i < node_chunks_.size(); ++i) { + node_chunks.emplace_back(node_chunks_[i]->clone()); + if (ailego_unlikely(!node_chunks[i])) { + LOG_ERROR("HnswBufferPoolStreamerEntity get chunk failed in clone"); + return HnswEntity::Pointer(); + } + } + + std::vector upper_neighbor_chunks; + upper_neighbor_chunks.reserve(upper_neighbor_chunks_.size()); + for (size_t i = 0UL; i < upper_neighbor_chunks_.size(); ++i) { + upper_neighbor_chunks.emplace_back(upper_neighbor_chunks_[i]->clone()); + if (ailego_unlikely(!upper_neighbor_chunks[i])) { + LOG_ERROR("HnswBufferPoolStreamerEntity get chunk failed in clone"); + return HnswEntity::Pointer(); + } + } + + auto *entity = new (std::nothrow) HnswBufferPoolStreamerEntity( + stats_, header(), chunk_size_, node_index_mask_bits_, + upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_, + upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_, + keys_map_, use_key_info_map_, std::move(node_chunks), + std::move(upper_neighbor_chunks), broker_, node_chunk_bases_, + upper_neighbor_chunk_bases_); + if (ailego_unlikely(!entity)) { + LOG_ERROR("HnswBufferPoolStreamerEntity new failed"); + } + return HnswEntity::Pointer(entity); +} + const HnswEntity::Pointer HnswContiguousStreamerEntity::clone() const { std::vector node_chunks; node_chunks.reserve(node_chunks_.size()); diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index 19f8ba161..bc5ebf8d5 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -14,7 +14,11 @@ #pragma once +#include +#include #include +#include +#include #include #include #include @@ -859,14 +863,22 @@ class HnswMmapStreamerEntity : public HnswStreamerEntity { return *reinterpret_cast(base + offset); } - //! Direct vector pointer access (no MemoryBlock wrapper). - //! For use in the merged search loop to avoid intermediate allocations. ailego_force_inline const void *get_vector_ptr(node_id_t id) const { uint32_t chunk_idx = id >> node_index_mask_bits_; uint32_t offset = (id & node_index_mask_) * node_size(); return get_node_chunk_base(chunk_idx) + offset; } + ailego_force_inline int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + for (uint32_t i = 0; i < count; ++i) + out[i] = get_vector_ptr(ids[i]); + return 0; + } + + ailego_force_inline void release_vectors() const {} + void reset_io_budget(int32_t) const {} // no-op for mmap mode + protected: //! Get cached base address for a node chunk, syncing if needed ailego_force_inline const char *get_node_chunk_base( @@ -912,7 +924,6 @@ class HnswMmapStreamerEntity : public HnswStreamerEntity { mutable std::vector upper_neighbor_chunk_bases_{}; }; -//! Typed entity subclass for buffer pool mode. class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { public: using MemoryBlock = BufferPoolMemoryBlock; @@ -924,6 +935,8 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { return HnswStorageMode::kBufferPool; } + const HnswEntity::Pointer clone() const override; + inline TypedNeighbors get_neighbors_typed(level_t level, node_id_t id) const { return HnswStreamerEntity::get_neighbors_typed(level, id); @@ -939,6 +952,226 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { inline key_t get_key_typed(node_id_t id) const { return HnswStreamerEntity::get_key_typed(id); } + + int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + ensure_pinned_pages(); + if (ailego_unlikely(!pinned_pages_.bound())) return -1; + const size_t vec_sz = vector_size(); + const size_t pg_sz = ailego::kVectorPageSize; + cross_page_used_ = 0; + if (cross_page_arena_.size() < count * vec_sz) + cross_page_arena_.resize(count * vec_sz); + for (uint32_t i = 0; i < count; ++i) { + const size_t abs_off = get_vector_abs_offset(ids[i]); + const auto page_id = static_cast(abs_off / pg_sz); + const size_t intra = abs_off % pg_sz; + if (ailego_likely(intra + vec_sz <= pg_sz)) { + char *page = pinned_pages_.try_get_page(page_id); + if (!page) { + if (io_budget_ > 0) { + page = pinned_pages_.get_page(page_id); + if (ailego_unlikely(!page)) return -1; + --io_budget_; + } else { + out[i] = nullptr; continue; + } + } + out[i] = page + intra; + } else { + const size_t part1 = pg_sz - intra; + char *p1 = pinned_pages_.try_get_page(page_id); + char *p2 = pinned_pages_.try_get_page(page_id + 1); + if (!p1 || !p2) { + if (io_budget_ > 0) { + if (!p1) p1 = pinned_pages_.get_page(page_id); + if (!p2) p2 = pinned_pages_.get_page(page_id + 1); + if (ailego_unlikely(!p1 || !p2)) return -1; + --io_budget_; + } else { + out[i] = nullptr; continue; + } + } + char *scratch = cross_page_arena_.data() + cross_page_used_ * vec_sz; + ++cross_page_used_; + std::memcpy(scratch, p1 + intra, part1); + std::memcpy(scratch + part1, p2, vec_sz - part1); + out[i] = scratch; + } + } + return 0; + } + + void release_vectors() const { + pinned_pages_.release_all(); + } + + //! Reset I/O budget for a new search. budget = max pread calls allowed. + void reset_io_budget(int32_t budget) const { io_budget_ = budget; } + + void mark_upper_level_pages() { + auto *pool = vec_buffer_pool(); + if (!pool) return; + auto ep = entry_point(); + auto max_lvl = cur_max_level(); + if (ep == kInvalidNodeId || max_lvl == 0) return; + + const uint32_t n = doc_cnt(); + std::vector visited(n, false); + std::vector upper_nodes; + upper_nodes.reserve(n / scaling_factor() + 64); + upper_nodes.push_back(ep); + visited[ep] = true; + + for (level_t lvl = max_lvl; lvl >= 1; --lvl) { + for (size_t idx = 0; idx < upper_nodes.size(); ++idx) { + auto id = upper_nodes[idx]; + auto it = upper_neighbor_index_->find(id); + if (it == upper_neighbor_index_->end()) continue; + auto meta = + reinterpret_cast(&it->second); + if (lvl > static_cast(meta->bits.level)) continue; + auto neighbors = get_neighbors_typed(lvl, id); + for (uint32_t i = 0; i < neighbors.size(); ++i) { + auto nid = neighbors[i]; + if (nid < n && !visited[nid]) { + visited[nid] = true; + upper_nodes.push_back(nid); + } + } + } + } + + const size_t pg_sz = ailego::kVectorPageSize; + const size_t vec_sz = vector_size(); + std::vector page_ids; + page_ids.reserve(upper_nodes.size()); + for (auto id : upper_nodes) { + const size_t abs_off = get_vector_abs_offset(id); + page_ids.push_back( + static_cast(abs_off / pg_sz)); + const size_t intra = abs_off % pg_sz; + if (intra + vec_sz > pg_sz) { + page_ids.push_back( + static_cast(abs_off / pg_sz) + 1); + } + } + std::sort(page_ids.begin(), page_ids.end()); + page_ids.erase(std::unique(page_ids.begin(), page_ids.end()), + page_ids.end()); + + size_t marked = 0; + for (auto pid : page_ids) { + pool->page_table_.set_evict_priority(pid, 2); + char *buf = pool->acquire_buffer(pid, 50); + if (buf) { + pool->page_table_.release_block(pid); + ++marked; + } + } + LOG_DEBUG( + "mark_upper_level_pages: marked %zu/%zu pages for %zu upper-level " + "nodes (maxLevel=%d, priority=2)", + marked, page_ids.size(), upper_nodes.size(), (int)max_lvl); + } + + private: + struct PinnedPageSet { + static constexpr size_t kCapacity = 128; + static constexpr size_t kMask = kCapacity - 1; + static constexpr ailego::block_id_t kEmpty = + std::numeric_limits::max(); + + PinnedPageSet() { reset_table(); } + ~PinnedPageSet() { release_all(); } + PinnedPageSet(const PinnedPageSet &) = delete; + PinnedPageSet &operator=(const PinnedPageSet &) = delete; + + void bind(ailego::VecBufferPool *pool) { pool_ = pool; } + bool bound() const { return pool_ != nullptr; } + + char *get_page(ailego::block_id_t page_id) { + size_t slot = static_cast(page_id) & kMask; + for (;;) { + if (ids_[slot] == page_id) return bufs_[slot]; + if (ids_[slot] == kEmpty) { + char *buf = pool_->acquire_buffer(page_id, 50); + if (ailego_unlikely(!buf)) return nullptr; + ids_[slot] = page_id; + bufs_[slot] = buf; + ++count_; + return buf; + } + slot = (slot + 1) & kMask; + } + } + + //! Try to get a page WITHOUT triggering disk I/O. + //! Returns buffer if page is in PinnedPageSet or already in pool memory. + //! Returns nullptr if page would need a pread (cache miss). + char *try_get_page(ailego::block_id_t page_id) { + size_t slot = static_cast(page_id) & kMask; + for (;;) { + if (ids_[slot] == page_id) return bufs_[slot]; + if (ids_[slot] == kEmpty) { + char *buf = pool_->try_acquire_buffer(page_id); + if (!buf) return nullptr; // page not in memory, skip + ids_[slot] = page_id; + bufs_[slot] = buf; + ++count_; + return buf; + } + slot = (slot + 1) & kMask; + } + } + + void release_all() { + if (!pool_) return; + for (size_t i = 0; i < kCapacity; ++i) { + if (ids_[i] != kEmpty) { + pool_->page_table_.release_block(ids_[i]); + ids_[i] = kEmpty; + bufs_[i] = nullptr; + } + } + count_ = 0; + } + + private: + void reset_table() { + std::fill_n(ids_, kCapacity, kEmpty); + std::fill_n(bufs_, kCapacity, nullptr); + count_ = 0; + } + ailego::VecBufferPool *pool_{nullptr}; + ailego::block_id_t ids_[kCapacity]; + char *bufs_[kCapacity]; + size_t count_{0}; + }; + + ailego::VecBufferPool *vec_buffer_pool() const { + if (broker_ && broker_->storage()) { + return broker_->storage()->vec_buffer_pool(); + } + return nullptr; + } + + size_t get_vector_abs_offset(node_id_t id) const { + auto loc = get_vector_chunk_loc(id); + return node_chunks_[loc.first]->abs_data_offset() + loc.second; + } + + void ensure_pinned_pages() const { + if (!pinned_pages_.bound()) { + auto *pool = vec_buffer_pool(); + if (pool) pinned_pages_.bind(pool); + } + } + + mutable PinnedPageSet pinned_pages_; + mutable std::vector cross_page_arena_; + mutable uint32_t cross_page_used_{0}; + mutable int32_t io_budget_{INT32_MAX}; }; //! Typed entity subclass for contiguous memory mode. @@ -1048,18 +1281,25 @@ class HnswContiguousStreamerEntity : public HnswMmapStreamerEntity { return HnswMmapStreamerEntity::get_key_typed(id); } - //! Direct vector pointer from flat vector array (stride = vector_size). - //! For use in the merged search loop to avoid intermediate allocations. ailego_force_inline const void *get_vector_ptr(node_id_t id) const { if (ailego_likely(vector_base_ != nullptr)) { return vector_base_ + static_cast(id) * vector_size(); } - // Fallback to mmap chunk-based access uint32_t chunk_idx = id >> node_index_mask_bits_; uint32_t offset = (id & node_index_mask_) * node_size(); return get_node_chunk_base(chunk_idx) + offset; } + ailego_force_inline int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + for (uint32_t i = 0; i < count; ++i) + out[i] = get_vector_ptr(ids[i]); + return 0; + } + + ailego_force_inline void release_vectors() const {} + void reset_io_budget(int32_t) const {} // no-op for contiguous mode + protected: //! Custom deleter for contiguous memory (munmap / _aligned_free / free) //! Used by shared_ptr to properly release mmap'd memory. diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 3db6c58d9..425da4553 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -449,6 +449,12 @@ class BufferStorage : public IndexStorage { return shared_from_this(); } + size_t abs_data_offset(void) const override { + return segment_info_->segment_header_start_offset + + segment_info_->segment_header->content_offset + + segment_info_->segment.meta()->data_index; + } + protected: friend BufferStorage; // Pointer into BufferStorage::segments_ (unordered_map mapped value). @@ -481,6 +487,9 @@ class BufferStorage : public IndexStorage { if (val != 0) { segment_meta_capacity_ = val; } + // O_DIRECT is always on for memory controllability. + params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_); + enable_direct_io_ = true; return 0; } @@ -506,10 +515,11 @@ class BufferStorage : public IndexStorage { } } - // Open in writable mode when the caller expects to modify the index - // (create_if_missing=true implies write intent, same as MMapFileStorage). + // O_DIRECT is used for both build and search to keep memory + // controllable (no uncontrolled page-cache growth). buffer_pool_ = std::make_shared( - path, /*writable=*/create_if_missing); + path, /*writable=*/create_if_missing, + /*enable_direct_io=*/enable_direct_io_); buffer_pool_handle_ = std::make_shared( buffer_pool_->get_handle()); int ret = ParseToMapping(); @@ -522,6 +532,7 @@ class BufferStorage : public IndexStorage { this->close_index(); return ret; } + buffer_pool_->warmup(); LOG_INFO( "BufferStorage opened: file=%s, writable=%d, max_segment_size=%" PRIu64 ", segment_count=%zu", @@ -804,6 +815,10 @@ class BufferStorage : public IndexStorage { return chain_headers_.front()->magic; } + ailego::VecBufferPool *vec_buffer_pool(void) const override { + return buffer_pool_.get(); + } + protected: //! Initialize index version segment (writes content into an IndexMapping). //! Only intended to be called from init_index() while `mapping` is still @@ -1530,6 +1545,10 @@ class BufferStorage : public IndexStorage { // init_index(). uint32_t segment_meta_capacity_{4096u}; + // When true, the page-data fd is opened with O_DIRECT (metadata fd stays + // buffered). Defaults to false: identical behaviour to the legacy path. + bool enable_direct_io_{false}; + // Per-header-chain file offsets used by flush_index() and append_segment(). struct MetaChain { uint64_t header_start_offset; diff --git a/src/core/utility/utility_params.h b/src/core/utility/utility_params.h index c57e6e980..1b8ba2cef 100644 --- a/src/core/utility/utility_params.h +++ b/src/core/utility/utility_params.h @@ -72,6 +72,10 @@ static const std::string MMAPFILE_STORAGE_FORCE_FLUSH = static const std::string MMAPFILE_STORAGE_SEGMENT_META_CAPACITY = "proxima.mmap_file.storage.segment_meta_capacity"; +//! BufferStorage +static const std::string BUFFER_STORAGE_ENABLE_DIRECT_IO = + "proxima.buffer.storage.enable_direct_io"; + //! MipsConverter static const std::string MIPS_CONVERTER_M_VALUE = "proxima.mips.converter.m_value"; diff --git a/src/include/zvec/ailego/buffer/block_eviction_queue.h b/src/include/zvec/ailego/buffer/block_eviction_queue.h index fa5aff2a5..64a20fec7 100644 --- a/src/include/zvec/ailego/buffer/block_eviction_queue.h +++ b/src/include/zvec/ailego/buffer/block_eviction_queue.h @@ -81,13 +81,6 @@ class BlockEvictionQueue { bool add_single_block(const BlockType &block, int queue_index); - // void clear_dead_node(); - - bool is_valid(EvictableBlockOwner *owner) { - std::shared_lock lock(valid_owners_mutex_); - return valid_owners_.find(owner) != valid_owners_.end(); - } - void set_valid(EvictableBlockOwner *owner) { std::unique_lock lock(valid_owners_mutex_); valid_owners_.insert(owner); @@ -98,9 +91,6 @@ class BlockEvictionQueue { valid_owners_.erase(owner); } - // Atomically checks under the shared lock that the owner is still valid AND - // the block version has not been superseded, preventing TOCTOU races when an - // owner is concurrently destroyed. bool is_valid_and_alive(const BlockType &item); void recycle(); @@ -143,10 +133,17 @@ class MemoryLimitPool { private: MemoryLimitPool() = default; + ~MemoryLimitPool(); + + void drain_free_list(); private: size_t pool_size_{0}; std::atomic used_size_{0}; + + std::mutex free_list_mutex_; + char *free_list_head_{nullptr}; + size_t free_list_count_{0}; }; } // namespace ailego diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 02d19bbc7..9584f60b5 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -50,6 +50,8 @@ class VectorPageTable : public EvictableBlockOwner { std::atomic ref_count; std::atomic in_evict_queue; std::atomic is_dirty; + uint8_t evict_priority{0}; + bool ever_loaded{false}; // true once the page has been loaded at least once char *buffer; size_t file_offset; }; @@ -96,6 +98,13 @@ class VectorPageTable : public EvictableBlockOwner { void evict_block(block_id_t block_id) override; + void set_evict_priority(block_id_t block_id, uint8_t priority) { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + Entry &e = entry_at(block_id); + e.evict_priority = priority; + e.in_evict_queue.store(false, std::memory_order_relaxed); + } + char *set_block_acquired(block_id_t block_id, char *buffer, size_t file_offset); @@ -114,6 +123,19 @@ class VectorPageTable : public EvictableBlockOwner { return entry_at(block_id).is_dirty.load(std::memory_order_relaxed); } + //! Get the raw buffer pointer for a loaded page (nullptr if not loaded). + //! Used by batched flush to memcpy page contents into a coalescing buffer. + char *get_block_buffer(block_id_t block_id) const { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + return entry_at(block_id).buffer; + } + + //! Clear the dirty flag after a successful batched flush. + void clear_dirty(block_id_t block_id) { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + entry_at(block_id).is_dirty.store(false, std::memory_order_relaxed); + } + //! Flush a single dirty block without evicting it. Caller guarantees the //! block is currently loaded (buffer != nullptr). int flush_block(block_id_t block_id) { @@ -151,6 +173,21 @@ class VectorPageTable : public EvictableBlockOwner { return !e.in_evict_queue.load(std::memory_order_relaxed); } + //! Check if a page is loaded (has a non-null buffer). + //! Used by try_acquire_buffer to avoid ref_count leaks on unloaded pages. + bool is_loaded(block_id_t block_id) const { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + return entry_at(block_id).buffer != nullptr; + } + + //! Check if a page has ever been loaded (so pread is needed on reload + //! after eviction). A page that was never loaded can be zero-filled + //! if it lies beyond the initial file size (created by extend_file). + bool is_ever_loaded(block_id_t block_id) const { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + return entry_at(block_id).ever_loaded; + } + private: // Segmented page table: entries are split across fixed-size segments so // that extend() can grow the table without moving existing entries. @@ -202,7 +239,8 @@ class VecBufferPool { static constexpr size_t kMutexBucketCount = 64UL * 1024UL; - VecBufferPool(const std::string &filename, bool writable = false); + VecBufferPool(const std::string &filename, bool writable = false, + bool enable_direct_io = false); ~VecBufferPool() { // Flush any remaining dirty blocks before tearing down memory/fd so that // writes are not silently lost. Safe to call even in read-only mode. @@ -213,8 +251,10 @@ class VecBufferPool { } #if defined(_MSC_VER) _close(fd_); + _close(meta_fd_); #else close(fd_); + close(meta_fd_); #endif } @@ -253,11 +293,32 @@ class VecBufferPool { return file_size_; } + //! Sequentially preload pages into the pool until pool is full. + void warmup(); + + //! Try to acquire a page buffer WITHOUT triggering disk I/O. + //! Returns the buffer pointer if the page is already in memory, + //! or nullptr if the page would need a pread (cache miss). + //! Avoids touching ref_count for unloaded pages to prevent leaks. + char *try_acquire_buffer(block_id_t page_id) { + assert(page_id < page_table_.entry_num()); + // Quick check: if buffer is null, page not loaded - skip without + // incrementing ref_count (acquire_block would leak ref_count on + // pages with ref_count>=0 but buffer==nullptr). + if (!page_table_.is_loaded(page_id)) return nullptr; + return page_table_.acquire_block(page_id); + } + private: - int fd_; + int fd_; // page-data channel: may carry O_DIRECT + int meta_fd_; // metadata channel: always buffered IO size_t file_size_; + size_t initial_file_size_; // file size at open time; pages beyond this + // are created by extend_file and can skip + // pread on first load (content is zeros). std::string file_name_; bool writable_{false}; + bool direct_io_enabled_{false}; // whether O_DIRECT actually took effect public: VectorPageTable page_table_; diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index 049c79917..ca4e24dbb 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -332,6 +332,10 @@ class IndexStorage : public IndexModule { virtual const uint8_t *base_data(void) const { return nullptr; } + + virtual size_t abs_data_offset(void) const { + return 0; + } }; //! Destructor @@ -399,6 +403,10 @@ class IndexStorage : public IndexModule { virtual std::string file_path(void) const { return ""; } + + virtual ailego::VecBufferPool *vec_buffer_pool(void) const { + return nullptr; + } }; } // namespace core