Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/ailego/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ set(EXTRA_LIBS ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS})

if(UNIX AND NOT APPLE)
list(APPEND EXTRA_LIBS ${LIB_RT})
find_library(LIB_AIO NAMES aio)
if(LIB_AIO)
list(APPEND EXTRA_LIBS ${LIB_AIO})
endif()
endif()

if(NOT ANDROID AND AUTO_DETECT_ARCH)
Expand Down Expand Up @@ -123,3 +127,8 @@ cc_library(
LIBS ${EXTRA_LIBS}
VERSION "${GIT_SRCS_VER}"
)

if(LIB_AIO)
target_compile_definitions(zvec_ailego PUBLIC ZVEC_HAS_LIBAIO)
message(STATUS "Found libaio: ${LIB_AIO} (HNSW async prefetch enabled)")
endif()
35 changes: 34 additions & 1 deletion src/ailego/buffer/block_eviction_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,30 @@ bool BlockEvictionQueue::add_single_block(const BlockType &block,
return true;
}

MemoryLimitPool::~MemoryLimitPool() {
drain_free_list();
}

void MemoryLimitPool::drain_free_list() {
std::lock_guard<std::mutex> lock(free_list_mutex_);
size_t drained = 0;
while (free_list_head_) {
char *buf = free_list_head_;
free_list_head_ = *reinterpret_cast<char **>(buf);
ailego_free(buf);
++drained;
}
free_list_count_ = 0;
if (drained > 0) {
LOG_INFO("MemoryLimitPool: drained %zu cached buffers from free list",
drained);
}
}

int MemoryLimitPool::init(size_t pool_size) {
pool_size_ = 0;
BlockEvictionQueue::get_instance().recycle();
drain_free_list();
pool_size_ = pool_size;
LOG_INFO("MemoryLimitPool initialized with pool size: %lu", pool_size_);
return 0;
Expand All @@ -96,6 +117,15 @@ bool MemoryLimitPool::try_acquire_buffer(const size_t buffer_size,
}
desired = expected + buffer_size;
} while (!used_size_.compare_exchange_weak(expected, desired));
{
std::lock_guard<std::mutex> lock(free_list_mutex_);
if (free_list_head_) {
buffer = free_list_head_;
free_list_head_ = *reinterpret_cast<char **>(buffer);
--free_list_count_;
return true;
}
}
buffer = (char *)ailego_aligned_malloc(buffer_size, 4096);
if (!buffer) {
used_size_.fetch_sub(buffer_size);
Expand All @@ -119,7 +149,10 @@ void MemoryLimitPool::release_buffer(char *buffer, const size_t buffer_size) {
desired = expected - buffer_size;
assert(expected >= buffer_size);
} while (!used_size_.compare_exchange_weak(expected, desired));
ailego_free(buffer);
std::lock_guard<std::mutex> lock(free_list_mutex_);
*reinterpret_cast<char **>(buffer) = free_list_head_;
free_list_head_ = buffer;
++free_list_count_;
}

void MemoryLimitPool::release_external(const size_t buffer_size) {
Expand Down
Loading