Skip to content

feat: distributed vector index creation#605

Open
summaryzb wants to merge 8 commits into
lance-format:mainfrom
summaryzb:distribute_vec_create
Open

feat: distributed vector index creation#605
summaryzb wants to merge 8 commits into
lance-format:mainfrom
summaryzb:distribute_vec_create

Conversation

@summaryzb

@summaryzb summaryzb commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Summary

Implements distributed vector index creation for the five IVF families exposed by the SQL grammar (IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_PQ, IVF_HNSW_SQ). The pipeline trains IVF centroids and the PQ codebook once on the driver, broadcasts them to executors, then builds per-fragment segments in parallel and commits them atomically through Dataset.commitExistingIndexSegments. Adds a pure parameter resolver/validator with smart defaults plus end-to-end and unit test suites.

Problem

Fix: #165
A separate vector pipeline is therefore required: driver-side training, broadcast, executor-parallel quantization, atomic commit — plus a parameter contract for the WITH(...) clause that validates against the column dimension and row count before any work is dispatched.

Approach

Builds on #479 (@jiaoew1991)
Three-phase pipeline (VectorIndexJob).

  • Phase 1 — Driver training and broadcast.**
  • Phase 2 — Executor-parallel segment build (VectorIndexTask).**
  • Phase 3 — Driver atomic commit.** `

Parameter resolution (VectorIndexParamsResolver). Split into two layers:

  • parseAndValidate(indexType, args, dim, numRows) is pure — no Spark, no Dataset — and is unit-tested directly.
  • resolve(...) is the driver-only wrapper that opens the dataset, reads the column's Arrow FixedSizeList<Float|Double> dimension via VectorUtils.getVectorArrowDimension

Trade-offs.

  • IndexType.IVF_HNSW_FLAT is intentionally not included — lance-core 7.0.0's VectorIndexParams.Builder.build() rejects HNSW without a PQ/SQ quantizer, so the dispatcher excludes it explicitly.
  • Driver-side IVF/PQ training is intentional: The training step is the only non-distributed phase, which is why broadcasts are destroyed eagerly. This will be followup when lance KMeans support partial aggregate abstraction and expose it to compute engine layer

Changes

Core indexing pipeline (Scala, lance-spark-base_2.12/src/main/scala/.../v2/)

  • AddIndexExec.scala: dispatch the five IVF index types through the logical-segment commit path; extract extractNamespaceInfo for reuse; add IndexUtils.useLogicalSegmentCommit and IndexUtils.batchFragments (shared by Zonemap and IVF); validate num_segments only for logical-segment types; reuse commitIndexSegments for both Zonemap and IVF segments.
  • VectorIndexJob.scala (new): three-phase distributed builder — driver trainAndBroadcast, executor VectorIndexTask.execute, broadcast cleanup in finally; logs phase transitions for operability.
  • VectorIndexParamsResolver.scala (new): pure parseAndValidate plus driver-side resolve that reads dim/rowCount; per-type whitelists, smart defaults, distance alias, validation with type-aware error messages.
  • VectorIndexPlan.scala (new): closure-clean VectorIndexPlan, IvfPlan, PqPlan, SqPlan, HnswPlan — primitives, Option, IndexType enum only.

Docs

  • README.md: add a feature line documenting the new "Distributed vector index build" capability and explicitly listing the five supported families, the training/build split, and the atomic commit behavior.

Test Coverage

Pure unit tests — VectorIndexParamsResolverTest (no Spark, no Dataset)
Pure unit tests — IndexUtilsTest
End-to-end Spark integration — BaseAddVectorIndexTest (run by both 3.4 and 3.5 modules)

  • Fixture sanity: 4 INSERTs of 80 rows each → ≥4 fragments, 320 rows total.
  • Create each of IVF_FLAT, IVF_PQ (explicit num_sub_vectors), IVF_SQ, IVF_HNSW_PQ, IVF_HNSW_SQ; verify segments cover every fragment exactly once.
  • Default inference end-to-end: num_sub_vectors from dim=32 and dim=24; failure when dim=23.
  • num_partitions inferred from row count when omitted.
  • euclidean alias maps to L2 in a real dataset.
  • Negative paths surface the resolver errors via the SQL planner: num_sub_vectors for IVF_FLAT, HNSW params for non-HNSW, unknown distance_type='manhattan', multiple columns, non-vector column, num_partitions=10000 exceeding row count.
  • num_segments=2 produces exactly 2 segments; num_segments=100000 clamps down to the fragment count; default produces min(fragmentCount, defaultParallelism) segments.
  • Re-running CREATE INDEX with the same name produces a fresh, disjoint set of segment UUIDs (proves replace semantics).
  • SHOW INDEXES includes the vector index; DROP INDEX removes it and re-create after drop succeeds.
  • Creating an index on an empty table returns fragments_indexed=0 and commits no segments.
  • VECTOR_SEARCH against an IVF_PQ-indexed table returns exactly k rows (round-trip: index is actually used).

Change-Id: I3a9c29532bf75d352121054cb592f8c0d8114b66
@github-actions github-actions Bot added the enhancement New feature or request label Jun 10, 2026
Change-Id: Ibe1a1d9dcd898e17ab4e4d00075303af15772bb0
Change-Id: Id6c9b3841b0fc13370b6d3785fde99c24f663378
@summaryzb

Copy link
Copy Markdown
Contributor Author

First off, huge thanks for @sezruby , we're genuinely happy to have both PRs reviewed in parallel — whether that means landing #601 first and rebasing the HNSW + resolver-split additions on top, landing ours and cherry-picking the recall test and use_residual error from #601, or any combination the
maintainers prefer. Either way, thanks again for the work — it made it much easier to sanity-check our own design.
The differences, as honestly as I can lay them out:

Dimension PR #601 (sezruby) distribute_vec_create (ours)
Index types covered IVF_FLAT, IVF_PQ, IVF_SQ (HNSW deferred) Same three + IVF_HNSW_PQ, IVF_HNSW_SQ with full HnswBuildParams (m, ef_construction, max_level, prefetch_distance)
WITH-args parsing VectorIndexSpec.fromArgs — single entry point Split: pure VectorIndexParamsResolver.parseAndValidate(indexType, args, dim, numRows) (no Spark, no Dataset) + thin resolve(...) wrapper that reads dim/rowCount from Dataset
Validation tests Covered in AddIndexTest end-to-end 323-line VectorIndexParamsResolverTest unit-tests the validation matrix without a SparkSession
PQ num_sub_vectors defaults (per PR text) Dim-aware: dim/16 if divisible, else dim/8, else error asking user to specify; also enforces dim % nsv == 0
Fragment batching Vector-only path Extracted IndexUtils.batchFragments(fragmentIds, numSegments, defaultParallelism), reused by both ZonemapIndexJob and VectorIndexJob so num_segments clamping behaves identically
replace=true on per-task createIndex Avoided by not setting withIndexName per task We set withIndexName per task and pass replace(true) to bypass the "Index name X already exists" manifest check before commitExistingIndexSegments runs
Broadcast cleanup (per PR text) Centroid + codebook broadcasts destroyed in finally; extra guard destroys centroidsBC if codebook training/broadcast throws, to prevent orphan broadcasts on failure
use_residual rejection Explicit parse-time error with rationale ("would degrade PQ recall") Implicit via per-index-type key whitelist; generic error message — #601's approach is friendlier
Recall regression test Real top-K recall ≥ 0.5 guard with clustered 8-dim embeddings, IVF_PQ + cosine — explicitly catches the lance-6 silent-L2 bug Not yet; BaseAddVectorIndexTest (669 lines) covers creation, recreate-replaces (UUID disjointness)

@sezruby

sezruby commented Jun 10, 2026

Copy link
Copy Markdown

Thanks for the gracious + accurate comparison @summaryzb

To clarify intent on my side: #601 was deliberately scoped small (FLAT/PQ/SQ only, no HNSW, no resolver split) so it could land fast as an incremental step. I'd been planning HNSW + the dim-aware param resolver as follow-ups on top of it. Your PR has those already — that's a real superset.

Happy with whichever order maintainers prefer:

No strong preference between those, just want to land something for #165 so OSS users stop hitting Unsupported index method: ivf_pq. Will defer to maintainer call.

@LuciferYang

Copy link
Copy Markdown
Collaborator

Can you resolve the conflicts first? @summaryzb

try {
// ---- Phase 2: split fragments, parallelize tasks ----
val batches =
IndexUtils.batchFragments(fragmentIds, numSegments, sc.defaultParallelism)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runSegments repeats ZonemapIndexJob.run's core and VectorIndexTask.execute repeats ZonemapIndexTask.execute — the 7-line openDatasetBuilder().initialStorageOptions().runtimeNamespace() envelope now exists in 4 verbatim copies plus a near-verbatim 5th, and the two tasks carry divergent replace(true)/withIndexName vs replace(false) contracts on the same commit seam (explained only in a comment).

Suggestion: Hoist a shared IndexUtils.runSegmentTasks plus a shared open/create/encode/close helper parameterized by an IndexOptions builder fn.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I partially addressed this by hoisting the common segment execution path into IndexUtils.runSegmentTasks
The remaining duplication is mostly the method-specific IndexOptions builder setup, and keeping it explicit makes the different commit semantics easier to audit。
Given that, I’d prefer not to add another abstraction in this PR unless you feel strongly.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — no need for another abstraction here. Hoisting the shared segment-execution path into IndexUtils.runSegmentTasks / batchFragments is the right amount. The remaining duplication is the per-method IndexOptions builder setup, and keeping that explicit is reasonable since the commit semantics genuinely differ (e.g. the IVF path sets replace(true) + withIndexName, while the zonemap segment path uses replace(false) and no name). I don't feel strongly — fine to leave as is.

Comment thread docs/src/operations/ddl/create-index.md Outdated
Change-Id: Ia8faaa46b2a00ac92a29e903d1700d40bad8b219
Change-Id: I8b42fcf94708dcbc1eb901c73522be5d2c1b347e
Change-Id: I424e585630613b49e01df2f9301a1a4c164ed173
@LuciferYang

Copy link
Copy Markdown
Collaborator

Re-reviewed after the latest push (0b63292a). Most of the earlier threads look addressed — noting them below — with 3 items remaining (1 carryover + 2 new).

Looks addressed (OK to resolve): Float32-only vector validation with an accurate message; findField wrapped in try/catch → "Column not found. Available: …"; Locale.ROOT on distance_type and the %f test formatter; broadcast cleanup handling InterruptedException/NonFatal (no silent catch _: Throwable); empty-table path running buildIndexType + parseAndValidate before the fragmentIds.isEmpty return; version pinning via AddIndexTableSnapshot.withVersion(ds.version()); the IndexUtils.batchFragments/runSegmentTasks/openDataset/createIndexSegment dedup; and the VECTOR umbrella-type test assertion.


1. SQ is trained per-segment with no shared artifact — VectorIndexJob.scala:227 (also :164–169)

High — possible correctness issue (carryover)

IVF centroids and the PQ codebook are trained once on the driver and broadcast, but for IVF_SQ / IVF_HNSW_SQ there's no equivalent shared artifact — each executor task builds its own SQ quantizer inside createIndex from only that segment's rows. If the per-dimension quantization ranges differ across segments, committing them as one logical index could produce inconsistent distances at query time. Does Lance reconcile SQ bounds across segments at commit, or should SQ be driver-trained and broadcast like the PQ codebook? If per-segment SQ is actually safe, a short comment explaining why would help.

2. Dead code: VectorIndexParamsResolver.resolve(...) + two VectorIndexPlan helpers — VectorIndexParamsResolver.scala:318; VectorIndexPlan.scala:38,41

Medium — cleanup

resolve has no call sites — AddIndexExec's snapshot block inlines the same findField / validateVectorFieldForIndex / parseAndValidate steps, and unlike resolve it pins the dataset version. Since the version-pinned snapshot path is the one in use, resolve (which opens its own unpinned dataset) is now redundant and should be removed to avoid two divergent driver-side entry points. The object-level scaladoc (VectorIndexParamsResolver.scala:32–34) also still describes resolve as "the dataset-touching wrapper" entry point — stale, remove it with the method. While here, VectorIndexPlan.requiresPq and requiresHnsw (:38,41) are also unreferenced (zero call sites in main or test) — same cleanup pass.

3. train is rejected for IVF, so deferred training is broken for vector indexes (contradicts the docs) — VectorIndexParamsResolver.scala:39,109; AddIndexExec.scala:155,368,1181; create-index.md:45,323

Medium — bug / doc mismatch

Deferred training (train=false) is blocked for IVF at two layers, so the documented "train=false supported for IVF_*" behavior does not work.

  • Layer 1 (parse-time): for every IVF create, the snapshot block calls parseAndValidate(indexType, args, …) with the raw args (AddIndexExec.scala:155) before the train=false branch runs. Its whitelist CommonKeys (VectorIndexParamsResolver.scala:39) does not include train, and it rejects on key presence, not value, so any explicit traintrain=false or train=true — throws "train is not a recognized parameter for IVF_PQ. Allowed: …" at VectorIndexParamsResolver.scala:109. The eager build works only because the default omits the train key entirely.
  • Layer 2 (commit-time): even if train were whitelisted, the if (!train) branch routes every type — including IVF — to commitEmptyIndex, which builds only ScalarIndexParams via buildScalarIndexParamType(method) (AddIndexExec.scala:368); that throws "Unsupported index method: ivf_pq" for IVF (:1069). There is no setVectorIndexParams empty-index path. So whitelisting train alone merely moves the failure from parse-time to commit-time.

This contradicts the docs: train is a documented Common Option in a section that "applies to all index methods" (create-index.md:45), and create-index.md:323 states "train = false is supported for all index methods … and IVF_* vector methods"; the class doc says "deferred training … Supported for all index types" (AddIndexExec.scala:67).

Complete fix — pick one: (1) truly support it — whitelist the SparkOnlyOptions (AddIndexExec.scala:1181) on the IVF parse path and give commitEmptyIndex a vector-aware path (build VectorIndexParams, set train(false) on the IndexOptions); or (2) correct the docs to state deferred training is unsupported for IVF_* and reject it up front with a clear message. Either way, add IVF tests for train=false and an explicit train=true.

Side note: the Layer-1 whitelist also rejects build_mode / rows_per_range for IVF, but those are documented as BTree-only options (create-index.md:60), so that rejection is a latent option-handling inconsistency (num_segments is whitelisted in CommonKeys, the rest of SparkOnlyOptions are not), not a doc contradiction. (Minor: buildScalarIndexParamType uses bare method.toLowerCase at :1065 without Locale.ROOT, unlike buildIndexType at :1051.)

Change-Id: I003a524dd4b4ab5e17002fbbe1d6d9d0d6f1c94e
@summaryzb

Copy link
Copy Markdown
Contributor Author

Lance's distributed index commit path silently produces incorrect IVF_SQ / IVF_HNSW_SQ, limit the segment num of SQ training to one, this will be a followup

@LuciferYang LuciferYang left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

cc @sezruby @hamersaw @fangbo for further review

@fangbo

fangbo commented Jun 26, 2026

Copy link
Copy Markdown
Collaborator

+1 , Create job !

By the way, would it be possible to split create-index.md into create-scalar-index.md and create-vector-index.md to ensure that each document have a more focused scope ?

Change-Id: I0ebba5d7a9585fc97692a82aa000fd911d51f50c
@summaryzb

Copy link
Copy Markdown
Contributor Author

+1 , Create job !

By the way, would it be possible to split create-index.md into create-scalar-index.md and create-vector-index.md to ensure that each document have a more focused scope ?

Thanks for reviewing, create-index.md is splited

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support build IVF index distributively in Spark

4 participants