Skip to content

feat: support Spark Structured Streaming writes#564

Open
LuciferYang wants to merge 1 commit into
lance-format:mainfrom
LuciferYang:feat/structured-streaming-write
Open

feat: support Spark Structured Streaming writes#564
LuciferYang wants to merge 1 commit into
lance-format:mainfrom
LuciferYang:feat/structured-streaming-write

Conversation

@LuciferYang

@LuciferYang LuciferYang commented May 27, 2026

Copy link
Copy Markdown
Collaborator

Closes #246.

Summary

Adds a Structured Streaming sink for Lance. Each non-empty micro-batch issues one Lance transaction stamped with lance.streaming.queryId and lance.streaming.epochId in its transaction properties; replay dedupe scans recent transaction history via DatasetDelta.listTransactions for a matching (queryId, epochId) pair and skips the commit on a hit.

SparkWriteBuilder implements SupportsStreamingUpdateAsAppend and LanceDataset advertises TableCapability.STREAMING_WRITE (on both the plain and blob-v2 capability sets). Append / Complete / Update output modes flow through the existing V2 plumbing. CTAS and staged commits are rejected at toStreaming() with a clear error.

streamingQueryId is a required option — the idempotency key for the dedupe scan; it must be globally unique across concurrent queries on the same table. lance.streaming.dedupe.lookback.versions (default 100, max 10 000) caps the per-commit scan depth.

Output modes & empty epochs

Spark mode Lance behavior
append (default) one Append per epoch
complete one Overwrite per epoch (truncate + write the full result)
update append-only via SupportsStreamingUpdateAsAppend (delta rows appended, not merged)

The streaming overwrite signal is driven solely by Spark's output mode (truncate()), never by a write_mode option — so a stray write_mode=overwrite cannot turn an Append stream into a per-epoch truncate.

Empty epochs: an empty append epoch issues no transaction; an empty complete epoch still commits one empty Overwrite to truncate the table to the new (empty) result (skipping it would leave the prior epoch's rows on disk).

Why one transaction per epoch

PR #399 used a two-transaction Append + UpdateConfig design, which doubled manifest growth and per-epoch latency at scale. Stamping the identity inside the Append moves the dedupe signal into transaction history and keeps writes to one manifest update per epoch.

Why the sink doesn't pin the dataset version

LanceBatchWrite pins dataset.version() in its constructor because it commits exactly once. Streaming reuses one sink across many epochs, so a pinned version would go stale immediately — the dedupe scan would point at the wrong range and the transaction's readVersion would lag. Every commit opens at the current latest instead, and toStreaming() strips any pinned version to enforce this. testMultipleEpochsOnSameSinkAdvanceVersionMonotonically regresses this path.

Robustness

  • Vended credentials. The streaming commit merges namespace-vended initialStorageOptions (from describeTable) into the CommitBuilder write params, mirroring LanceBatchWrite, so writes to credential-vending namespace / object-store tables authenticate.
  • Bounded at-least-once. The dedupe scan covers the last lookback versions; a duplicate can slip through once lookback-or-more unrelated commits intervene before a replay. If a version inside the window is removed by VACUUM/cleanup (making listTransactions throw), the scan degrades to that bounded at-least-once fallback (logs a WARN and proceeds) rather than permanently wedging the query.
  • streamingQueryId is trimmed once and used verbatim as the stamped key (no whitespace asymmetry).

Compatibility

Supported on Spark 3.4 / 3.5 / 4.0 / 4.1. On 3.4, StreamingWrite.useCommitCoordinator is absent from the interface so the override is inert there; Lance commits each epoch atomically via CommitBuilder, making the commit coordinator immaterial on every version. A concrete TestStreamingWrite runs the shared suite on each version.

Tests

  • BaseStreamingWriteTest (runs on 3.4/3.5/4.0/4.1) — 10 cases: append happy-path, missing & whitespace streamingQueryId rejection, empty-epoch no-op, real non-empty replay dedupe, lookback-window expiry, multi-epoch monotonic version, overwrite replace + empty truncate, and write_mode=overwrite-does-not-truncate-append.
  • LanceSparkWriteOptionsTest — lookback default / round-trip / bounds and non-numeric parse-error cases.
  • SparkWriteTesttoStreaming returns LanceStreamingWrite when configured, throws IllegalArgumentException without streamingQueryId, rejects staged commits.

Local verification: make lint clean; full base + Spark 3.5 suite green (464 + 1064, 0 failures); streaming tests 10/10 on Spark 3.4 and 3.5.

Out of scope

  • Streaming reads (MicroBatchStream) — follow-up.
  • Row-level UPDATE / DELETE on the streaming path.
  • Blob v2 source-context copy-through (a batch-only optimization) does not fire on the streaming path; direct blob-data streaming works.
  • In-memory dedupe cache to skip the per-commit DatasetDelta scan.

Docs at docs/src/streaming.md (wired into the site nav).

@github-actions github-actions Bot added the enhancement New feature or request label May 27, 2026
@LuciferYang LuciferYang force-pushed the feat/structured-streaming-write branch from 9c57066 to 29a0fcd Compare June 2, 2026 06:09
@LuciferYang LuciferYang marked this pull request as ready for review June 2, 2026 06:10
@LuciferYang LuciferYang force-pushed the feat/structured-streaming-write branch 2 times, most recently from c5b059f to 54c47a2 Compare June 2, 2026 06:27
Comment thread pom.xml Outdated
@LuciferYang LuciferYang marked this pull request as draft June 2, 2026 08:53
@LuciferYang LuciferYang force-pushed the feat/structured-streaming-write branch 3 times, most recently from 5629631 to 61a2775 Compare June 25, 2026 15:32
@LuciferYang LuciferYang marked this pull request as ready for review June 25, 2026 15:55
Closes lance-format#246.

Adds a Spark Structured Streaming sink for Lance. Each non-empty
micro-batch produces a single Lance transaction stamped with
(streamingQueryId, epochId) in its transaction properties; replay
dedupe scans recent transaction history via
DatasetDelta.listTransactions for an existing pair and skips the
commit if it finds one.

Append, Complete, and Update output modes are routed through
SparkWriteBuilder (also implementing SupportsStreamingUpdateAsAppend).
Complete maps to a Lance Overwrite per epoch; Update is append-only
per Spark's marker contract. The streaming overwrite signal is driven
solely by Spark's output mode (truncate()), never by a write_mode
option, so a stray write_mode=overwrite cannot turn an Append stream
into a per-epoch truncate. CTAS / staged-commit flows are rejected
with an actionable error since per-epoch commits are incompatible with
the single-shot staged commit cadence.

Empty epochs: an empty Append epoch issues no transaction; an empty
Complete (overwrite) epoch still commits one empty Overwrite to
truncate the table to the new (empty) result.

User surface:

- `streamingQueryId` (required) — globally unique idempotency key,
  trimmed and used verbatim as the stamped key. Two queries sharing
  the same id would dedupe each other's epochs.
- `lance.streaming.dedupe.lookback.versions` (default 100, max 10000)
  — how many recent versions the dedupe scan reads on every commit.
  Raise on high-churn tables; lower to cut per-commit scan cost.

Transaction-property keys `lance.streaming.queryId` and
`lance.streaming.epochId` are stamped on every commit and are part of
the stability contract — external tooling can read them straight from
Lance transaction history.

Robustness:

- The streaming commit merges namespace-vended credentials
  (initialStorageOptions) into the CommitBuilder write params, matching
  the batch path, so writes to credential-vending namespace /
  object-store tables authenticate correctly.
- The sink never pins the dataset version: toStreaming() strips any
  pinned `version`, so every epoch opens at the current latest and the
  dedupe scan window and the transaction's readVersion both reflect
  on-disk reality.
- If a version inside the lookback window is removed by VACUUM/cleanup,
  the dedupe scan degrades to the documented bounded at-least-once
  fallback (logs a WARN and proceeds) rather than wedging the query.

Compatibility / non-goals:

- Supported on Spark 3.4+ (3.4 / 3.5 / 4.0 / 4.1). On 3.4,
  StreamingWrite.useCommitCoordinator is absent from the interface, so
  it is inert there; Lance commits each epoch atomically via
  CommitBuilder, making the coordinator immaterial on every version.
- Uses the DatasetDelta JNI binding (lance-format/lance#6963); the pom
  is on lance.version 8.0.0-beta.9.
- Streaming reads (MicroBatchStream) are not implemented yet — tracked
  separately.
- Row-level UPDATE/DELETE via position-delta is not exposed on the
  streaming path.
- Blob v2 tables can stream (STREAMING_WRITE capability), but the
  batch-only blob source-context copy-through optimization does not
  fire on the streaming path.
- The target Lance table must exist before the query starts; the sink
  does not auto-create.

Test coverage:

- BaseStreamingWriteTest (runs on Spark 3.4/3.5/4.0/4.1): 10 cases —
  append happy path, missing/whitespace streamingQueryId rejection,
  empty-epoch no-op, real non-empty replay dedupe, lookback-window
  expiry, multi-epoch monotonic version, overwrite replace + empty
  truncate, and write_mode=overwrite-does-not-truncate-append.
- LanceSparkWriteOptionsTest: lookback default / round-trip / bounds
  and non-numeric parse-error cases.
- SparkWriteTest: toStreaming returns LanceStreamingWrite when
  streamingQueryId is provided, throws IAE without it, rejects staged
  commits.
- integration-tests/test_lance_spark.py (docker, parameterized over
  local / MinIO / Azurite / Glue): writeStream append round-trip and
  dedupe-on-replay through the bundled jar against a real object store —
  the credential-bearing path the JUnit dir-catalog suite cannot reach.

User-facing doc at docs/src/streaming.md (wired into the nav) covers
semantics, output modes, the exactly-once contract, the bounded
at-least-once fallback (lookback window + VACUUM), and OPTIMIZE cadence.
@LuciferYang LuciferYang force-pushed the feat/structured-streaming-write branch from 61a2775 to cad1d35 Compare June 26, 2026 02:56
@LuciferYang

Copy link
Copy Markdown
Collaborator Author

cc @hamersaw @jackye1995 @fangbo

@fangbo

fangbo commented Jun 26, 2026

Copy link
Copy Markdown
Collaborator

@LuciferYang I'm not familiar with Spark Structured Streaming. I'm very sorry that I cannot review this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Spark structured streaming

2 participants