feat: support Spark Structured Streaming writes#564
Open
LuciferYang wants to merge 1 commit into
Open
Conversation
9c57066 to
29a0fcd
Compare
c5b059f to
54c47a2
Compare
LuciferYang
commented
Jun 2, 2026
5629631 to
61a2775
Compare
Closes lance-format#246. Adds a Spark Structured Streaming sink for Lance. Each non-empty micro-batch produces a single Lance transaction stamped with (streamingQueryId, epochId) in its transaction properties; replay dedupe scans recent transaction history via DatasetDelta.listTransactions for an existing pair and skips the commit if it finds one. Append, Complete, and Update output modes are routed through SparkWriteBuilder (also implementing SupportsStreamingUpdateAsAppend). Complete maps to a Lance Overwrite per epoch; Update is append-only per Spark's marker contract. The streaming overwrite signal is driven solely by Spark's output mode (truncate()), never by a write_mode option, so a stray write_mode=overwrite cannot turn an Append stream into a per-epoch truncate. CTAS / staged-commit flows are rejected with an actionable error since per-epoch commits are incompatible with the single-shot staged commit cadence. Empty epochs: an empty Append epoch issues no transaction; an empty Complete (overwrite) epoch still commits one empty Overwrite to truncate the table to the new (empty) result. User surface: - `streamingQueryId` (required) — globally unique idempotency key, trimmed and used verbatim as the stamped key. Two queries sharing the same id would dedupe each other's epochs. - `lance.streaming.dedupe.lookback.versions` (default 100, max 10000) — how many recent versions the dedupe scan reads on every commit. Raise on high-churn tables; lower to cut per-commit scan cost. Transaction-property keys `lance.streaming.queryId` and `lance.streaming.epochId` are stamped on every commit and are part of the stability contract — external tooling can read them straight from Lance transaction history. Robustness: - The streaming commit merges namespace-vended credentials (initialStorageOptions) into the CommitBuilder write params, matching the batch path, so writes to credential-vending namespace / object-store tables authenticate correctly. - The sink never pins the dataset version: toStreaming() strips any pinned `version`, so every epoch opens at the current latest and the dedupe scan window and the transaction's readVersion both reflect on-disk reality. - If a version inside the lookback window is removed by VACUUM/cleanup, the dedupe scan degrades to the documented bounded at-least-once fallback (logs a WARN and proceeds) rather than wedging the query. Compatibility / non-goals: - Supported on Spark 3.4+ (3.4 / 3.5 / 4.0 / 4.1). On 3.4, StreamingWrite.useCommitCoordinator is absent from the interface, so it is inert there; Lance commits each epoch atomically via CommitBuilder, making the coordinator immaterial on every version. - Uses the DatasetDelta JNI binding (lance-format/lance#6963); the pom is on lance.version 8.0.0-beta.9. - Streaming reads (MicroBatchStream) are not implemented yet — tracked separately. - Row-level UPDATE/DELETE via position-delta is not exposed on the streaming path. - Blob v2 tables can stream (STREAMING_WRITE capability), but the batch-only blob source-context copy-through optimization does not fire on the streaming path. - The target Lance table must exist before the query starts; the sink does not auto-create. Test coverage: - BaseStreamingWriteTest (runs on Spark 3.4/3.5/4.0/4.1): 10 cases — append happy path, missing/whitespace streamingQueryId rejection, empty-epoch no-op, real non-empty replay dedupe, lookback-window expiry, multi-epoch monotonic version, overwrite replace + empty truncate, and write_mode=overwrite-does-not-truncate-append. - LanceSparkWriteOptionsTest: lookback default / round-trip / bounds and non-numeric parse-error cases. - SparkWriteTest: toStreaming returns LanceStreamingWrite when streamingQueryId is provided, throws IAE without it, rejects staged commits. - integration-tests/test_lance_spark.py (docker, parameterized over local / MinIO / Azurite / Glue): writeStream append round-trip and dedupe-on-replay through the bundled jar against a real object store — the credential-bearing path the JUnit dir-catalog suite cannot reach. User-facing doc at docs/src/streaming.md (wired into the nav) covers semantics, output modes, the exactly-once contract, the bounded at-least-once fallback (lookback window + VACUUM), and OPTIMIZE cadence.
61a2775 to
cad1d35
Compare
Collaborator
Author
Collaborator
|
@LuciferYang I'm not familiar with Spark Structured Streaming. I'm very sorry that I cannot review this PR. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #246.
Summary
Adds a Structured Streaming sink for Lance. Each non-empty micro-batch issues one Lance transaction stamped with
lance.streaming.queryIdandlance.streaming.epochIdin its transaction properties; replay dedupe scans recent transaction history viaDatasetDelta.listTransactionsfor a matching(queryId, epochId)pair and skips the commit on a hit.SparkWriteBuilderimplementsSupportsStreamingUpdateAsAppendandLanceDatasetadvertisesTableCapability.STREAMING_WRITE(on both the plain and blob-v2 capability sets). Append / Complete / Update output modes flow through the existing V2 plumbing. CTAS and staged commits are rejected attoStreaming()with a clear error.streamingQueryIdis a required option — the idempotency key for the dedupe scan; it must be globally unique across concurrent queries on the same table.lance.streaming.dedupe.lookback.versions(default 100, max 10 000) caps the per-commit scan depth.Output modes & empty epochs
append(default)Appendper epochcompleteOverwriteper epoch (truncate + write the full result)updateSupportsStreamingUpdateAsAppend(delta rows appended, not merged)The streaming overwrite signal is driven solely by Spark's output mode (
truncate()), never by awrite_modeoption — so a straywrite_mode=overwritecannot turn an Append stream into a per-epoch truncate.Empty epochs: an empty append epoch issues no transaction; an empty complete epoch still commits one empty
Overwriteto truncate the table to the new (empty) result (skipping it would leave the prior epoch's rows on disk).Why one transaction per epoch
PR #399 used a two-transaction Append + UpdateConfig design, which doubled manifest growth and per-epoch latency at scale. Stamping the identity inside the Append moves the dedupe signal into transaction history and keeps writes to one manifest update per epoch.
Why the sink doesn't pin the dataset version
LanceBatchWritepinsdataset.version()in its constructor because it commits exactly once. Streaming reuses one sink across many epochs, so a pinned version would go stale immediately — the dedupe scan would point at the wrong range and the transaction'sreadVersionwould lag. Every commit opens at the current latest instead, andtoStreaming()strips any pinnedversionto enforce this.testMultipleEpochsOnSameSinkAdvanceVersionMonotonicallyregresses this path.Robustness
initialStorageOptions(fromdescribeTable) into theCommitBuilderwrite params, mirroringLanceBatchWrite, so writes to credential-vending namespace / object-store tables authenticate.lookbackversions; a duplicate can slip through oncelookback-or-more unrelated commits intervene before a replay. If a version inside the window is removed byVACUUM/cleanup (makinglistTransactionsthrow), the scan degrades to that bounded at-least-once fallback (logs a WARN and proceeds) rather than permanently wedging the query.streamingQueryIdis trimmed once and used verbatim as the stamped key (no whitespace asymmetry).Compatibility
Supported on Spark 3.4 / 3.5 / 4.0 / 4.1. On 3.4,
StreamingWrite.useCommitCoordinatoris absent from the interface so the override is inert there; Lance commits each epoch atomically viaCommitBuilder, making the commit coordinator immaterial on every version. A concreteTestStreamingWriteruns the shared suite on each version.Tests
BaseStreamingWriteTest(runs on 3.4/3.5/4.0/4.1) — 10 cases: append happy-path, missing & whitespacestreamingQueryIdrejection, empty-epoch no-op, real non-empty replay dedupe, lookback-window expiry, multi-epoch monotonic version, overwrite replace + empty truncate, andwrite_mode=overwrite-does-not-truncate-append.LanceSparkWriteOptionsTest— lookback default / round-trip / bounds and non-numeric parse-error cases.SparkWriteTest—toStreamingreturnsLanceStreamingWritewhen configured, throwsIllegalArgumentExceptionwithoutstreamingQueryId, rejects staged commits.Local verification:
make lintclean; full base + Spark 3.5 suite green (464 + 1064, 0 failures); streaming tests 10/10 on Spark 3.4 and 3.5.Out of scope
MicroBatchStream) — follow-up.DatasetDeltascan.Docs at
docs/src/streaming.md(wired into the site nav).