Skip to content

Commit ecf3fe7

Browse files
alonre24rfsaliev
andauthored
[8.2] [MOD-12668] Implement asynchronous SVS GC execution using SVSMultiThreadJob (#847)
resolve conflicts - use lock guard instead of unique lock Co-authored-by: Rafik Saliev <rafik.f.saliev@intel.com>
1 parent 8f4d7c9 commit ecf3fe7

File tree

5 files changed

+132
-11
lines changed

5 files changed

+132
-11
lines changed

src/VecSim/algorithms/svs/svs_tiered.h

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
213213
// Used to prevent scheduling multiple index update jobs at the same time.
214214
// As far as the update job does a batch update, job queue should have just 1 job at the moment.
215215
std::atomic_flag indexUpdateScheduled = ATOMIC_FLAG_INIT;
216+
// Used to prevent scheduling multiple index GC jobs at the same time.
217+
std::atomic_flag indexGCScheduled = ATOMIC_FLAG_INIT;
216218
// Used to prevent running multiple index update jobs in parallel.
217219
// Even if update jobs scheduled sequentially, they can be started in parallel.
218220
mutable std::mutex updateJobMutex;
@@ -510,6 +512,39 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
510512
index->updateSVSIndex(availableThreads);
511513
}
512514

515+
/**
516+
* @brief Run SVS index GC in a thread-safe manner.
517+
*
518+
* This static wrapper function performs the following actions:
519+
* - Acquires a lock on the index's mainIndexGuard to ensure thread safety during the GC
520+
* - Configures the number of threads for the underlying SVS index update operation.
521+
* - Calls the SVSIndex::runGC() method to perform the actual index update.
522+
* - Clears the indexGCScheduled flag to allow future scheduling.
523+
*
524+
* @param idx Pointer to the VecSimIndex to be updated.
525+
* @param availableThreads The number of threads available for the update operation. Current
526+
* thread us used as well, so the minimal value is 1.
527+
* @note no need to implement extra non-static method, as GC logic is simple enough to be done
528+
* here.
529+
*/
530+
static void SVSIndexGCWrapper(VecSimIndex *idx, size_t availableThreads) {
531+
assert(availableThreads > 0);
532+
auto index = static_cast<TieredSVSIndex<DataType> *>(idx);
533+
assert(index);
534+
535+
std::lock_guard lock{index->mainIndexGuard};
536+
// Release the scheduled flag to allow scheduling again
537+
index->indexGCScheduled.clear();
538+
539+
// Do SVS index GC
540+
index->backendIndex->log(VecSimCommonStrings::LOG_VERBOSE_STRING,
541+
"running asynchronous GC for tiered SVS index");
542+
auto svs_index = index->GetSVSIndex();
543+
svs_index->setNumThreads(std::min(availableThreads, index->backendIndex->indexSize()));
544+
// VecSimIndexAbstract::runGC() is protected
545+
static_cast<VecSimIndexInterface *>(index->backendIndex)->runGC();
546+
}
547+
513548
#ifdef BUILD_TESTS
514549
public:
515550
#endif
@@ -526,6 +561,19 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
526561
this->submitJobs(jobs);
527562
}
528563

564+
void scheduleSVSIndexGC() {
565+
// do not schedule if scheduled already
566+
if (indexGCScheduled.test_and_set()) {
567+
return;
568+
}
569+
570+
auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity();
571+
auto jobs = SVSMultiThreadJob::createJobs(
572+
this->allocator, SVS_GC_JOB, SVSIndexGCWrapper, this, total_threads,
573+
std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs);
574+
this->submitJobs(jobs);
575+
}
576+
529577
private:
530578
void updateSVSIndex(size_t availableThreads) {
531579
std::set<labelType> to_delete;
@@ -856,10 +904,8 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
856904

857905
void runGC() override {
858906
TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING,
859-
"running asynchronous GC for tiered SVS index");
860-
std::unique_lock<std::shared_mutex> backend_lock{this->mainIndexGuard};
861-
// VecSimIndexAbstract::runGC() is protected
862-
static_cast<VecSimIndexInterface *>(this->backendIndex)->runGC();
907+
"scheduling asynchronous GC for tiered SVS index");
908+
scheduleSVSIndexGC();
863909
}
864910

865911
void acquireSharedLocks() override {

src/VecSim/vec_sim_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ typedef enum {
243243
HNSW_SEARCH_JOB,
244244
HNSW_SWAP_JOB,
245245
SVS_BATCH_UPDATE_JOB,
246+
SVS_GC_JOB,
246247
INVALID_JOB // to indicate that finding a JobType >= INVALID_JOB is an error
247248
} JobType;
248249

tests/benchmark/bm_initialization/bm_basics_svs_initialize_fp32.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@ BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimSVS, BM_FUNC_NAME(BM_RunGC), DATA_TYPE_INDE
1919
BENCHMARK_REGISTER_F(BM_VecSimSVS, BM_FUNC_NAME(BM_RunGC))
2020
->Unit(benchmark::kMillisecond)
2121
->Iterations(1)
22-
->Arg(50)
23-
->Arg(100)
24-
->Arg(250)
25-
->Arg(500)
26-
->ArgName("num_deletions");
22+
->ArgsProduct({{50, 100, 500}, {1, 4}})
23+
->ArgNames({"num_deletions", "thread_count"});
2724

2825
// AddLabel one by one
2926
BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimSVS, BM_FUNC_NAME(BM_AddLabelOneByOne), DATA_TYPE_INDEX_T)

tests/benchmark/bm_vecsim_svs.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,13 @@ template <typename index_type_t>
407407
void BM_VecSimSVS<index_type_t>::RunGC(benchmark::State &st) {
408408

409409
size_t num_deletions = st.range(0);
410-
auto mock_thread_pool = tieredIndexMock(1);
411-
ASSERT_EQ(mock_thread_pool.thread_pool_size, 1);
410+
auto mock_thread_pool = tieredIndexMock(st.range(1));
411+
ASSERT_EQ(mock_thread_pool.thread_pool_size, st.range(1));
412412
auto *tiered_index = CreateTieredSVSIndexFromFile(mock_thread_pool, 1);
413413

414+
// start threadpool
415+
mock_thread_pool.init_threads();
416+
414417
// Delete vectors, not yet triggering consolidation.
415418
for (size_t i = 0; i < num_deletions; ++i) {
416419
int ret = VecSimIndex_DeleteVector(tiered_index, i);
@@ -421,6 +424,7 @@ void BM_VecSimSVS<index_type_t>::RunGC(benchmark::State &st) {
421424
ASSERT_EQ(info.svsInfo.numberOfMarkedDeletedNodes, num_deletions);
422425
for (auto _ : st) {
423426
VecSimTieredIndex_GC(tiered_index);
427+
mock_thread_pool.thread_pool_wait();
424428
};
425429
// num deleted should be 0
426430
info = VecSimIndex_DebugInfo(tiered_index);

tests/unit/test_svs_tiered.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2763,8 +2763,19 @@ TYPED_TEST(SVSTieredIndexTestBasic, runGCAPI) {
27632763
ASSERT_EQ(tiered_index->GetSVSIndex()->indexStorageSize(), n);
27642764
auto size_before_gc = tiered_index->getAllocationSize();
27652765

2766+
auto jobs_before_gc = mock_thread_pool.jobQ.size();
27662767
// Run the GC API call, expect that we will clean up the SVS index.
27672768
VecSimTieredIndex_GC(tiered_index);
2769+
// Expected that GC jobs were added to the queue.
2770+
ASSERT_EQ(mock_thread_pool.jobQ.size(), jobs_before_gc + mock_thread_pool.thread_pool_size);
2771+
// Run GC twice.
2772+
VecSimTieredIndex_GC(tiered_index);
2773+
// Expected that no new GC jobs were added to the queue.
2774+
ASSERT_EQ(mock_thread_pool.jobQ.size(), jobs_before_gc + mock_thread_pool.thread_pool_size);
2775+
// Wait for any pending jobs to complete. As far as SVS GC is done via a job.
2776+
mock_thread_pool.init_threads();
2777+
mock_thread_pool.thread_pool_join();
2778+
// Validate sizes after GC.
27682779
ASSERT_EQ(tiered_index->indexSize(), n - threshold);
27692780
ASSERT_EQ(tiered_index->GetBackendIndex()->indexSize(), n - threshold);
27702781
ASSERT_EQ(tiered_index->GetSVSIndex()->indexStorageSize(), n - threshold);
@@ -2775,6 +2786,68 @@ TYPED_TEST(SVSTieredIndexTestBasic, runGCAPI) {
27752786
EXPECT_EQ(tiered_index->statisticInfo().numberOfMarkedDeleted, 0);
27762787
}
27772788

2789+
TYPED_TEST(SVSTieredIndexTestBasic, runGCParallel) {
2790+
// Create TieredSVS index instance with a mock queue.
2791+
size_t dim = 4;
2792+
size_t threshold = 1024;
2793+
const size_t n = threshold * 4;
2794+
SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2};
2795+
VecSimParams svs_params = CreateParams(params);
2796+
auto mock_thread_pool = tieredIndexMock();
2797+
2798+
// Force trigger the first update job for 64 first vectors.
2799+
auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, 64);
2800+
ASSERT_INDEX(tiered_index);
2801+
auto allocator = tiered_index->getAllocator();
2802+
2803+
// Insert n vectors directly to SVS.
2804+
std::srand(10); // create pseudo random generator with any arbitrary seed.
2805+
for (size_t i = 0; i < n; i++) {
2806+
TEST_DATA_T vector[dim];
2807+
for (size_t j = 0; j < dim; j++) {
2808+
vector[j] = std::rand() / (TEST_DATA_T)RAND_MAX;
2809+
}
2810+
VecSimIndex_AddVector(tiered_index->GetBackendIndex(), vector, i);
2811+
}
2812+
2813+
ASSERT_EQ(tiered_index->indexSize(), n);
2814+
ASSERT_EQ(tiered_index->GetBackendIndex()->indexSize(), n);
2815+
2816+
// Initialize the thread pool to start processing jobs.
2817+
mock_thread_pool.init_threads();
2818+
2819+
// Run the mess of add, delete, GC
2820+
for (size_t i = 0; i < threshold; i++) {
2821+
// Run GC for every 64 iterations.
2822+
if (i % 64 == 0) {
2823+
VecSimTieredIndex_GC(tiered_index);
2824+
}
2825+
// Add a new vector
2826+
TEST_DATA_T vector[dim];
2827+
for (size_t j = 0; j < dim; j++) {
2828+
vector[j] = std::rand() / (TEST_DATA_T)RAND_MAX;
2829+
}
2830+
VecSimIndex_AddVector(tiered_index, vector, n + i);
2831+
// Delete an existing vector
2832+
tiered_index->deleteVector(i + threshold);
2833+
}
2834+
// Final GC after all operations.
2835+
VecSimTieredIndex_GC(tiered_index);
2836+
// Wait for any pending jobs to complete. As far as SVS GC is done via a job.
2837+
mock_thread_pool.thread_pool_join();
2838+
2839+
// Validate sizes after GC.
2840+
auto tiered_size = tiered_index->indexSize();
2841+
auto flat_size = tiered_index->GetFlatIndex()->indexSize();
2842+
auto backend_size = tiered_index->GetBackendIndex()->indexSize();
2843+
ASSERT_EQ(tiered_size, n);
2844+
ASSERT_EQ(tiered_size, backend_size + flat_size);
2845+
// Expect that GC cleaned all the deleted vectors.
2846+
ASSERT_EQ(tiered_index->GetSVSIndex()->indexStorageSize(), backend_size);
2847+
ASSERT_EQ(tiered_index->GetSVSIndex()->getNumMarkedDeleted(), 0);
2848+
EXPECT_EQ(tiered_index->statisticInfo().numberOfMarkedDeleted, 0);
2849+
}
2850+
27782851
TYPED_TEST(SVSTieredIndexTestBasic, switchDeleteModes) {
27792852
// Create TieredSVS index instance with a mock queue.
27802853
size_t dim = 16;

0 commit comments

Comments
 (0)