Skip to content

Fix stale DataWritten handling by separating commits into distinct RowDeltas#15710

Open
koodin9 wants to merge 12 commits intoapache:mainfrom
koodin9:fix/stale-datawritten-separate-rowdelta
Open

Fix stale DataWritten handling by separating commits into distinct RowDeltas#15710
koodin9 wants to merge 12 commits intoapache:mainfrom
koodin9:fix/stale-datawritten-separate-rowdelta

Conversation

@koodin9
Copy link

@koodin9 koodin9 commented Mar 21, 2026

Summary

After a partial commit (timeout), late-arriving DataWritten events from the previous
commit cycle are added to the next cycle's commitBuffer. The current implementation
groups all envelopes by table only (tableCommitMap()), so stale and current data files
end up in the same RowDelta. Since all files in a single RowDelta receive the same
sequence number, equality deletes fail to apply (data_seq < delete_seq is false),
resulting in duplicate rows.

This PR fixes the issue by separating envelopes by commitId and committing each group
in a distinct RowDelta, ensuring stale data files get a lower sequence number than
the current cycle's equality deletes.

Root Cause

  1. Coordinator times out waiting for all workers → partial commit with available files
  2. commitConsumerOffsets() advances control topic offsets
  3. Late DataWritten from the timed-out worker arrives in the next process() call
  4. tableCommitMap() merges stale + current envelopes into one RowDelta
  5. Same sequence number → equality deletes don't apply → duplicate rows

Changes

  • CommitState.java: Replace tableCommitMap() with tableCommitMaps() that returns
    List<Map<TableIdentifier, List<Envelope>>>, separated by commitId.
    Stale commitIds are ordered first (via LinkedHashMap insertion order),
    current commitId is always last.
  • Coordinator.java: doCommit() iterates over the list and commits each batch
    in a separate RowDelta. offsetsJson and vtts are only stored on the last batch
    to prevent lastCommittedOffsetsForTable() from filtering out subsequent batches.
    Null guards added to commitToTable().
  • TestCoordinatorPartialCommit.java: New regression test that simulates the
    partial commit scenario and verifies that the stale snapshot's sequence number is
    strictly less than the current snapshot's sequence number.

Why not clear the buffer? (re: #15651)

An alternative approach is to discard stale events in startNewCommit(). However, this
causes data loss in the partial commit scenario: after doCommit(true) succeeds,
consumer offsets are committed, so the timed-out worker will not re-send its
DataWritten. Clearing the buffer discards data that cannot be recovered.

Test Plan

  • New unit test TestCoordinatorPartialCommit — verifies separate RowDeltas
    produce strictly increasing sequence numbers
  • Existing Coordinator and integration tests pass
  • Production-like stress test: 4,398,000 rows with CDC (INSERT/UPDATE) under
    aggressive partial commit settings (commit.interval-ms=10000,
    commit.timeout-ms=1000). Verified zero data loss and no duplicate rows
    via Trino row count comparison against source MySQL.

This PR was ported from a fix originally developed on the https://github.com/databricks/iceberg-kafka-connect codebase. Testing (including the stress test with 4.4M
rows) was performed against the databricks/iceberg-kafka-connect-based deployment. We also plan to run the same tests against the Iceberg-based connector.

@t3hw
Copy link

t3hw commented Mar 22, 2026

Thanks for the PR, @koodin9 — and for the feedback on #15651 that identified the data loss scenario. That prompted a full redesign on my end. Your core approach here is exactly right, and I've built additional hardening on top of the same foundation. Would love to collaborate — here's what I've added:

  • Per-table group commits: tables in parallel, commitId groups sequential per table — stale failure only blocks its own table
  • Selective buffer draining: only successfully committed envelopes removed; failed groups retry next cycle
  • Error escalation: configurable blocking retries → failure policy (fail/non-blocking) → TTL eviction with orphaned file path logging
  • Per-group offsets: stale groups write their own envelope offsets (no null guards needed), preventing offset poisoning
  • Partial offset advancement: on partial success, consumer offsets advance to min uncommitted offset
  • JMX monitoring: CommitStateMXBean for stale group count, buffer size, eviction metrics
  • 8 new tests covering group ordering, selective removal, and failure scenarios

Happy to push commits on your branch or open a stacked PR — whatever works best.

@koodin9
Copy link
Author

koodin9 commented Mar 22, 2026

@t3hw
I think a stacked PR works best! Since I'm still figuring out the full implementation, could you build it on top of my current branch? I'd like to keep my initial commits in the history and learn from your additions.

koodin-9 and others added 3 commits March 22, 2026 19:38
prevent duplicate rows without data loss

The previous fix (edef7f9) cleared `commitBuffer` in `startNewCommit()`
to prevent stale `DataWritten` events from producing same-sequence
equality deletes. However, this discards late events from workers that
finished after a partial commit (timeout) — those workers already
committed source offsets via `sendOffsetsToTransaction()`, so the data
is unrecoverable.

Instead, group events by commitId and commit each group as a separate
RowDelta with its own Iceberg sequence number. Stale groups are
committed first (lower seq), so equality deletes from newer groups
can apply to older data (`data.seq < delete.seq`).

Error handling uses a three-stage escalation:
1. Blocking retries (configurable, default 3) preserve ordering
2. Failure policy (`fail` stops the connector, `non-blocking` proceeds
   with ordering inversion risk)
3. TTL eviction (default 1 hour) with ERROR log of orphaned file paths

Other changes:
- Per-group offset computation prevents stale snapshots from filtering
  out current envelopes (offset poisoning fix)
- Min-offset consumer advancement on partial success bounds
  re-consumption on restart
- JMX MBean (`CommitStateMXBean`) exposes buffer metrics
- Three new configs: `stale-ttl-ms`, `stale-max-blocking-retries`,
  `stale-failure-policy`
prevent duplicate rows without data loss

The previous fix (edef7f9) cleared `commitBuffer` in `startNewCommit()`
to prevent stale `DataWritten` events from producing same-sequence
equality deletes. However, this discards late events from workers that
finished after a partial commit (timeout) — those workers already
committed source offsets via `sendOffsetsToTransaction()`, so the data
is unrecoverable.

Instead, group events by commitId and commit each group as a separate
RowDelta with its own Iceberg sequence number. Stale groups are
committed first (lower seq), so equality deletes from newer groups
can apply to older data (`data.seq < delete.seq`).

Error handling uses a three-stage escalation:
1. Blocking retries (configurable, default 3) preserve ordering
2. Failure policy (`fail` stops the connector, `non-blocking` proceeds
   with ordering inversion risk)
3. TTL eviction (default 1 hour) with ERROR log of orphaned file paths

Other changes:
- Per-group offset computation prevents stale snapshots from filtering
  out current envelopes (offset poisoning fix)
- Min-offset consumer advancement on partial success bounds
  re-consumption on restart
- JMX MBean (`CommitStateMXBean`) exposes buffer metrics
- Three new configs: `stale-ttl-ms`, `stale-max-blocking-retries`,
  `stale-failure-policy`
@t3hw
Copy link

t3hw commented Mar 22, 2026

https://github.com/koodin9/iceberg/pull/2/changes
Feel free to review

Add production hardening for stale DataWritten recovery
@github-actions github-actions bot added the docs label Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants