feat: distributed vector index creation#605
Conversation
Change-Id: I3a9c29532bf75d352121054cb592f8c0d8114b66
Change-Id: Ibe1a1d9dcd898e17ab4e4d00075303af15772bb0
Change-Id: Id6c9b3841b0fc13370b6d3785fde99c24f663378
|
First off, huge thanks for @sezruby , we're genuinely happy to have both PRs reviewed in parallel — whether that means landing #601 first and rebasing the HNSW + resolver-split additions on top, landing ours and cherry-picking the recall test and use_residual error from #601, or any combination the
|
|
Thanks for the gracious + accurate comparison @summaryzb To clarify intent on my side: #601 was deliberately scoped small (FLAT/PQ/SQ only, no HNSW, no resolver split) so it could land fast as an incremental step. I'd been planning HNSW + the dim-aware param resolver as follow-ups on top of it. Your PR has those already — that's a real superset. Happy with whichever order maintainers prefer:
No strong preference between those, just want to land something for #165 so OSS users stop hitting |
|
Can you resolve the conflicts first? @summaryzb |
| try { | ||
| // ---- Phase 2: split fragments, parallelize tasks ---- | ||
| val batches = | ||
| IndexUtils.batchFragments(fragmentIds, numSegments, sc.defaultParallelism) |
There was a problem hiding this comment.
runSegments repeats ZonemapIndexJob.run's core and VectorIndexTask.execute repeats ZonemapIndexTask.execute — the 7-line openDatasetBuilder().initialStorageOptions().runtimeNamespace() envelope now exists in 4 verbatim copies plus a near-verbatim 5th, and the two tasks carry divergent replace(true)/withIndexName vs replace(false) contracts on the same commit seam (explained only in a comment).
Suggestion: Hoist a shared IndexUtils.runSegmentTasks plus a shared open/create/encode/close helper parameterized by an IndexOptions builder fn.
There was a problem hiding this comment.
I partially addressed this by hoisting the common segment execution path into IndexUtils.runSegmentTasks。
The remaining duplication is mostly the method-specific IndexOptions builder setup, and keeping it explicit makes the different commit semantics easier to audit。
Given that, I’d prefer not to add another abstraction in this PR unless you feel strongly.
There was a problem hiding this comment.
Agreed — no need for another abstraction here. Hoisting the shared segment-execution path into IndexUtils.runSegmentTasks / batchFragments is the right amount. The remaining duplication is the per-method IndexOptions builder setup, and keeping that explicit is reasonable since the commit semantics genuinely differ (e.g. the IVF path sets replace(true) + withIndexName, while the zonemap segment path uses replace(false) and no name). I don't feel strongly — fine to leave as is.
Change-Id: Ia8faaa46b2a00ac92a29e903d1700d40bad8b219
Change-Id: I424e585630613b49e01df2f9301a1a4c164ed173
|
Re-reviewed after the latest push ( Looks addressed (OK to resolve): Float32-only vector validation with an accurate message; 1. SQ is trained per-segment with no shared artifact —
|
Change-Id: I003a524dd4b4ab5e17002fbbe1d6d9d0d6f1c94e
|
Lance's distributed index commit path silently produces incorrect |
|
+1 , Create job ! By the way, would it be possible to split |
Change-Id: I0ebba5d7a9585fc97692a82aa000fd911d51f50c
Thanks for reviewing, |
Summary
Implements distributed vector index creation for the five IVF families exposed by the SQL grammar (
IVF_FLAT,IVF_PQ,IVF_SQ,IVF_HNSW_PQ,IVF_HNSW_SQ). The pipeline trains IVF centroids and the PQ codebook once on the driver, broadcasts them to executors, then builds per-fragment segments in parallel and commits them atomically throughDataset.commitExistingIndexSegments. Adds a pure parameter resolver/validator with smart defaults plus end-to-end and unit test suites.Problem
Fix: #165
A separate vector pipeline is therefore required: driver-side training, broadcast, executor-parallel quantization, atomic commit — plus a parameter contract for the WITH(...) clause that validates against the column dimension and row count before any work is dispatched.
Approach
Builds on #479 (@jiaoew1991)
Three-phase pipeline (
VectorIndexJob).VectorIndexTask).**Parameter resolution (
VectorIndexParamsResolver). Split into two layers:parseAndValidate(indexType, args, dim, numRows)is pure — no Spark, no Dataset — and is unit-tested directly.resolve(...)is the driver-only wrapper that opens the dataset, reads the column's ArrowFixedSizeList<Float|Double>dimension viaVectorUtils.getVectorArrowDimensionTrade-offs.
IndexType.IVF_HNSW_FLATis intentionally not included — lance-core 7.0.0'sVectorIndexParams.Builder.build()rejects HNSW without a PQ/SQ quantizer, so the dispatcher excludes it explicitly.Driver-side IVF/PQ training is intentional: The training step is the only non-distributed phase, which is why broadcasts are destroyed eagerly. This will be followup when lanceKMeanssupportpartial aggregateabstraction and expose it to compute engine layerChanges
Core indexing pipeline (Scala,
lance-spark-base_2.12/src/main/scala/.../v2/)AddIndexExec.scala: dispatch the five IVF index types through the logical-segment commit path; extractextractNamespaceInfofor reuse; addIndexUtils.useLogicalSegmentCommitandIndexUtils.batchFragments(shared by Zonemap and IVF); validatenum_segmentsonly for logical-segment types; reusecommitIndexSegmentsfor both Zonemap and IVF segments.VectorIndexJob.scala(new): three-phase distributed builder — drivertrainAndBroadcast, executorVectorIndexTask.execute, broadcast cleanup infinally; logs phase transitions for operability.VectorIndexParamsResolver.scala(new): pureparseAndValidateplus driver-sideresolvethat reads dim/rowCount; per-type whitelists, smart defaults, distance alias, validation with type-aware error messages.VectorIndexPlan.scala(new): closure-cleanVectorIndexPlan,IvfPlan,PqPlan,SqPlan,HnswPlan— primitives,Option,IndexTypeenum only.Docs
README.md: add a feature line documenting the new "Distributed vector index build" capability and explicitly listing the five supported families, the training/build split, and the atomic commit behavior.Test Coverage
Pure unit tests —
VectorIndexParamsResolverTest(no Spark, no Dataset)Pure unit tests —
IndexUtilsTestEnd-to-end Spark integration —
BaseAddVectorIndexTest(run by both 3.4 and 3.5 modules)≥4fragments, 320 rows total.IVF_FLAT,IVF_PQ(explicitnum_sub_vectors),IVF_SQ,IVF_HNSW_PQ,IVF_HNSW_SQ; verify segments cover every fragment exactly once.num_sub_vectorsfromdim=32anddim=24; failure whendim=23.num_partitionsinferred from row count when omitted.euclideanalias maps to L2 in a real dataset.num_sub_vectorsfor IVF_FLAT, HNSW params for non-HNSW, unknowndistance_type='manhattan', multiple columns, non-vector column,num_partitions=10000exceeding row count.num_segments=2produces exactly 2 segments;num_segments=100000clamps down to the fragment count; default producesmin(fragmentCount, defaultParallelism)segments.CREATE INDEXwith the same name produces a fresh, disjoint set of segment UUIDs (proves replace semantics).SHOW INDEXESincludes the vector index;DROP INDEXremoves it and re-create after drop succeeds.fragments_indexed=0and commits no segments.VECTOR_SEARCHagainst anIVF_PQ-indexed table returns exactlykrows (round-trip: index is actually used).