Skip to content

Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode for in-batch deduplication#14797

Open
t3hw wants to merge 12 commits intoapache:mainfrom
nanit:cdc-support-with-DV
Open

Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode for in-batch deduplication#14797
t3hw wants to merge 12 commits intoapache:mainfrom
nanit:cdc-support-with-DV

Conversation

@t3hw
Copy link

@t3hw t3hw commented Dec 8, 2025

Introduce Delta Writer functionality for both unpartitioned and partitioned tables, enabling CDC and upsert modes. Enhance configuration options for CDC fields, upsert mode, and DV usage.

Inspired by #12070
Resolves #10842

@bryanck

edit:
"using DVs for CDC" - DVs only help for in-batch deduplication.
Out of batch deletes/updates fall back to equality deletes.

Partitioning the table is highly recommended, periodically compacting the table when using CDC mode is mandatory.

Copy link
Contributor

@bryanck bryanck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

As you may know, the original (non-Apache) sink had delta writer support that relied on equality deletes. When the sink was contributed to this project, the community decided it was best to remove that functionality, as using it can result in severely degraded performance. This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues.

We should revisit those discussions and resolve those concerns before we proceed with this.

@t3hw
Copy link
Author

t3hw commented Dec 8, 2025

Thanks for the PR.

As you may know, the original (non-Apache) sink had delta writer support that relied on equality deletes. When the sink was contributed to this project, the community decided it was best to remove that functionality, as using it can result in severely degraded performance. This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues.

We should revisit those discussions and resolve those concerns before we proceed with this.

Thanks for the reply!
I followed the Flink Sink implementation and attempted to port it into Kafka Connect.
Would it be viable to use the connector plugin for CDC with DV mode enabled?
also, should the iceberg table properties be set to read-on-merge for this use case?

@hpcnt-stewart
Copy link

hpcnt-stewart commented Dec 12, 2025

I believe this feature is absolutely essential.

@SeongJuMoon
Copy link

SeongJuMoon commented Dec 12, 2025

Thanks for contributing for iceberg ecosystem.
In my view, utilising Kafka sink connect could readily facilitate efforts to establish SCD Type 1. Whilst this functionality may prove difficult to implement due to various internal community considerations, such as potential degradation in write and read performance, I ultimately hope it will be integrated so that diverse users can further benefit from Iceberg's advantages.

@t3hw
Copy link
Author

t3hw commented Dec 12, 2025

For what it's worth, I tested it under a modest CDC load, and it seems to be working fine.
I can make another commit that locks the CDC mode to use DV only, but that would make the connector incompatible with v2 tables.
Regardless, the exception thrown when the table is a v2 table makes it clear that using DV mode is what causes the failure. At this point, users can decide for themselves how they want to proceed.

@t3hw t3hw requested a review from bryanck December 15, 2025 09:53
@rajansadasivan
Copy link

rajansadasivan commented Dec 17, 2025

We did a test with this PR branch with properties in kafka connect for sink connector on AWS
iceberg.tables.use-dv: "true"
iceberg.tables.write-props.format-version: "3"

We found the table was created in version 2 in metadata file. So this upsert & DV support is for version 2 tables only ?
The upsert mode for insert/update/delete worked as expected for all tables with PK=id.

@t3hw
Copy link
Author

t3hw commented Dec 17, 2025

We did a test with this PR branch with properties in kafka connect for sink connector on AWS
iceberg.tables.use-dv: "true"
iceberg.tables.write-props.format-version: "3"

We found the table was created in version 2 in metadata file. So this upsert & DV support is for version 2 tables only ?
The upsert mode for insert/update/delete worked as expected for all tables with PK=id.

Yeah, it's a limitation. I should have probably made a note about it above, but it is kind of out of scope for this PR.

What I ended up doing is writing some code that interfaces with the iceberg catalog outside of kafka connect and initilizes the table ahead of time with the correct format version

@nalawadechinmay
Copy link

nalawadechinmay commented Jan 6, 2026

@bryanck @t3hw Great to see the progress here! Our team is looking to use this connector for a project that requires iceberg sink connector upsert/CDC support. Is there any guidance on whether the implementation part of this PR will be accommodated in a later release? Any insight into the timeline or priority would be very helpful for our planning."

@karankk007
Copy link

karankk007 commented Jan 6, 2026

@bryanck @t3hw Great progress here, We are looking to use Apache Iceberg sink connector, and we need sink connector upsert functionality.

Could you please clarify whether there are plans to support or merge this functionality in the future? If this is on the roadmap, we can proceed with this approach. Otherwise, we may need to consider alternative solutions, such as using Flink.

It would be helpful to understand the expected direction or future of this PR. Thanks for the guidance.

@ashokcoupa
Copy link

@bryanck +1 in strong support of this PR.

The addition of Delta Writer and upsert/CDC support in the Kafka Connect sink unlocks important production use cases. It significantly simplifies incremental and CDC-based data pipelines.

Many users including our team are looking to adopt Iceberg for CDC/upsert workflows, and having this merged into main would meaningfully improve adoption while reducing long-term maintenance and workarounds.

We’ve also tested this PR under a sustained load of ~100 TPS for about 30 minutes and observed a stable lag in the range of 7–10 minutes. Notably, the lag did not scale proportionally with load, which is a promising signal for production readiness.

Thanks for the thoughtful work on this, could the community consider moving this change forward in the near future? We’d be excited to see it included in an upcoming release.

@supalp
Copy link

supalp commented Jan 6, 2026

@bryanck @t3hw and community,

We need the ability to have the updates merged and this PR solves that. I understand the review and merge can take time so not asking to rush it but we would really benefit by getting a feedback from the community on:

  1. Does this capability aligns with the roadmap for the Kafka connect?
  2. If not, what are the major road blocks?

We are blocked on our implementation journey pending this PR merge hence asking if this is confirmed to be on the roadmap or not. Appreciate your urgent attention and everything you and other contributors do.

Thank you!

@hladush
Copy link

hladush commented Jan 9, 2026

@bryanck could you review this and, if you’re comfortable, approve?

We’re seeing growing interest from teams that manage their own Kafka infrastructure to use this sink. While our Data Platform teams can achieve similar outcomes with Spark Streaming or the older Iceberg sink (https://github.com/databricks/iceberg-kafka-connect), I believe the current Iceberg sink simplifies day-to-day work for a lot of acquisition teams
You noted: This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues. It's a valid point and totally makes sense. In my view, the main gap is documentation - something we can fix. Strong docs and examples should reduce support load significantly. The alternatives either require additional infrastructure (e.g., Flink) or are less straightforward to operate for teams (Spark).
If you’re open to it, I’ll add a docs package (setup, config, examples, and a troubleshooting guide) as part of this change. Happy to discuss.

@bryanck
Copy link
Contributor

bryanck commented Jan 9, 2026

@hladush This is something the Iceberg community has decided, including PMC members, so we'd need to get the community on board in order to proceed. You can raise this as a topic on the dev list (again) or in a community sync if you want.

@hladush
Copy link

hladush commented Jan 13, 2026

@bryanck thanks a lot for your response, could you please share with me any doc how to do that ?

@supalp
Copy link

supalp commented Jan 13, 2026

@hladush - thanks for the voice of support for this change. We would love to partner and add our perspective on why this is an important change for consideration by PMV and getting this added in dev list as suggested vt @bryanck

@t3hw t3hw changed the title Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode for in-batch deduplication Jan 13, 2026
@t3hw
Copy link
Author

t3hw commented Jan 13, 2026

added a small clarification to the PR description:

"using DVs for CDC" - DVs only help for in-batch deduplication.
Out of batch deletes/updates fall back to equality deletes.
Partitioning the table is highly recommended, periodically compacting the table when using CDC mode is mandatory.

@t3hw
Copy link
Author

t3hw commented Jan 13, 2026

Pasting my comment from another discussion about this PR:

Original commenter said:
The primary issue with using DVs for this is identifying the position of a deleted row in the dataset. If receive a delete record for an existing row, you need to scan the table to locate the file+position in order to create the DV. That results is lots of scan behavior and data materialization on the worker, which really isn't practical/scalable.
That issue is the main issue that equality deletes address, but equality deletes also introduce significant maintenance burden that isn't available in KC natively. This means in order to use this approach with KC, you also need Spark or equivalent to aggressively maintain the deletes.
Based on the PR, I'm not clear how these issues are addressed

And the response:
My implementation of BaseDeltaWriter inherits the insertedRowMap from the BaseEqualityDeltaWriter class. It deletes using the DV Writer by default, while falling back to Equality Deletes if the key was not found. The map is cleared when the batch is closed and the file gets written.
I believe RisingWave is using a similar approach.

If this gets accepted the documentation should be updated to let users know that compaction is highly recommended.
The implementation includes PartitionedDeltaWriter which routes records to partition-specific writers. This means equality deletes are scoped to their partition - query engines only need to scan files within that partition, not the entire table. For well-partitioned tables (e.g., by date), this significantly reduces the read-time cost of equality deletes.
Another approach would be to keep a separate index of the id-field->parquet file, but running compactions is required regardless, so this is kind of a wasted effort as it would need to be replicated, persisted, or managed outside of kafka connect, and it would lose its state and have to be re-built once a compaction is triggered.
I agree the PR description was misleading about "using DVs for CDC" - DVs only help for in-batch deduplication. I'll update the description to be clearer.

Given that equality deletes + compaction is how Flink handles CDC (and is the industry standard), would this approach be acceptable if we:

  1. Update the PR description to accurately describe the behavior
  2. Document that periodic compaction is required for production use
  3. Recommend proper table partitioning to minimize equality delete scan overhead
  4. Provide example compaction commands in the docs

@t3hw t3hw force-pushed the cdc-support-with-DV branch from 6f410bd to 6e5d469 Compare January 20, 2026 14:02
@t3hw
Copy link
Author

t3hw commented Jan 28, 2026

Whether it will merge the multiple records with one id in the one commit interval? In my testing when the job running for some hours there are some duplicate records when there multiple records with one id in the one commit interval by using databricks kafka iceberg connector.

it should

@rainerschamm
Copy link

rainerschamm commented Feb 13, 2026

@t3hw is it possible to put your code into a new repo ? I want to compile it and to test the cdc upsert

Heya, I merged this into 1.9.2 (and 1.10.1) manually and its works great for me. We really need this change and I will probably maintain a patch file until its merged in properly.

You can checkout 1.9.2 and git apply the following:

cdc-upsert-1.9.2.patch

@LiorV10
Copy link

LiorV10 commented Feb 25, 2026

Will this PR be included in the next release?

@t3hw
Copy link
Author

t3hw commented Feb 26, 2026

Will this PR be included in the next release?

I have started a thread on the Kafka Connect Iceberg slack channel about a month ago but it didn't gain much traction. You are more than welcome to assist in the request to merge this feature.

Currently the only option is to compile the branch yourself and drop it into the Kafka Connect plugin folder.

@rainerschamm
Copy link

Will this PR be included in the next release?

I have started a thread on the Kafka Connect Iceberg slack channel about a month ago but it didn't gain much traction. You are more than welcome to assist in the request to merge this feature.

Currently the only option is to compile the branch yourself and drop it into the Kafka Connect plugin folder.

@bryanck We have been testing a manually patched 1.10.1 version of the iceberg connector for a few weeks now, and it performs perfectly for +100 millions rows of data. No performance issues (using aws s3tables). Can you please bring this issue up again in the next community meeting? Also, it seems strange to use that the flink sink was merged in with upsert support with almost identical code (see org.apache.iceberg.flink.sink.BaseDeltaTaskWriter).

@rainerschamm
Copy link

@t3hw @rainerschamm In my testing there are still duplicated records if oner records are updated frequently. My commit time is 3 minutes. Below are two records updated within one minute and are only two duplicated records in the table. There are 434192 records in the table with 434191 distinct id records.

updated_at

2026-03-05 03:37:58.685000 2026-03-05 03:37:59.076000

Hmm, we have not seen any duplicates yet in our tests but we only tested it in this setup:

  • no partitioning
  • merge-on-read
  • 5 minute commit

...
iceberg.tables.auto-create-props.write.delete.mode: merge-on-read
iceberg.tables.auto-create-props.write.merge.mode: merge-on-read
iceberg.tables.auto-create-props.write.update.mode: merge-on-read
...

Also we make sure all identifier fields are strictly non-null in the resulting iceberg table schema.

@t3hw
Copy link
Author

t3hw commented Mar 5, 2026

I could not reproduce this duplication, but I have an assumption
Updates are broken down into 2 separate Delete>Insert operations.
The iceberg format does not enforce unique primary keys... Perhaps there could be an edge case where a record is inserted and in the next commit interval it is updated immediately after the previous snapshot is written, somehow leading the engine to miss the delete operation, leading to 2 consecutive inserts.

Again, this is just an assumption. There are many moving parts. I couldn't reproduce this issue in my integration tests..

@rainerschamm
Copy link

@t3hw @rainerschamm In my testing there are still duplicated records if oner records are updated frequently. My commit time is 3 minutes. Below are two records updated within one minute and are only two duplicated records in the table. There are 434192 records in the table with 434191 distinct id records.

updated_at

2026-03-05 03:37:58.685000 2026-03-05 03:37:59.076000

Hmm, we have not seen any duplicates yet in our tests but we only tested it in this setup:

  • no partitioning
  • merge-on-read
  • 5 minute commit

... iceberg.tables.auto-create-props.write.delete.mode: merge-on-read iceberg.tables.auto-create-props.write.merge.mode: merge-on-read iceberg.tables.auto-create-props.write.update.mode: merge-on-read ...
Also we make sure all identifier fields are strictly non-null in the resulting iceberg table schema.

@rainerschamm Do you mind share the complete sink properties? Let me check if anything wrong with my config. It is not always have duplicates and only like add a couple of duplicate records everyday. Have no idea how to troubleshoot it.

I can't provide the aws s3tables ones since they are for a customer, but here are the ones we use for demos.

I hit these tables quite hard with random inserts and updates in postgres and they generated no duplicates in iceberg.

debezium-source-connector.yaml
iceberg-sink-connector.yaml

@adp2201
Copy link

adp2201 commented Mar 12, 2026

Thanks for continuing this work — the implementation and discussion here are very helpful.

Given the mixed reports (works for some setups, occasional duplicates for others), could we tighten the merge criteria around a clear correctness contract before merge?

Specifically, it would help to have:

  1. A documented behavior matrix for CDC/upsert mode (DV path vs equality-delete fallback, MOR/COW expectations, partitioned vs unpartitioned),
  2. A deterministic integration test (or test matrix) that reproduces rapid consecutive updates to the same key across commit boundaries and validates no duplicate live rows,
  3. Explicit operational requirements in docs (required table props, compaction cadence, non-null identifier constraints, and known limitations).

That would make it much easier for users to adopt safely and for maintainers to evaluate long-term support risk.

@github-actions github-actions bot added the build label Mar 15, 2026
@t3hw t3hw force-pushed the cdc-support-with-DV branch 2 times, most recently from 91275aa to 6a47746 Compare March 16, 2026 00:52
@t3hw
Copy link
Author

t3hw commented Mar 16, 2026

@jerryzhujing can you share the logs of your kafka connect service?
do you see warning messages in the logs, such as these?

Received commit response when no commit in progress, this can happen during recovery.
Received commit ready when no commit in progress, this can happen during recovery.

edit: PR #15651 might fix it. I have created this branch that combines both the CDC change and the fix, ill run some tests, but the issue is not easily reproducible so it may take some time to verify that the fix is working as intended.

cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
@t3hw t3hw force-pushed the cdc-support-with-DV branch from e087308 to 08deade Compare March 16, 2026 11:56
@t3hw
Copy link
Author

t3hw commented Mar 16, 2026

@adp2201 I have added a couple of new sections to the docs in the previous commit, you can see the diff here

hopefully these address some of the concerns you have raised

@adp2201
Copy link

adp2201 commented Mar 17, 2026

Thanks for the follow-up and for adding the docs in 08deadee81d0.

I went through the latest diff and had two things I wanted to flag before we call this done:

  1. In Operation.fromString, unknown CDC op values currently fall back to INSERT. That feels risky for data correctness (silent bad writes) vs failing fast with a clear error.
  2. The docs around format version read a bit inconsistent (> 2 in one place, but V2 behavior is described/tested elsewhere). Could we tighten that wording so users know exactly what is supported?

Also, given the duplicate/recovery discussion (and #15651), do we want this PR rebased on that fix, or should we add a deterministic regression test here to prove recovery behavior is safe?

If you agree, I can help with a small follow-up commit for the validation + docs wording cleanup.

@t3hw
Copy link
Author

t3hw commented Mar 17, 2026

@jerryzhujing

@t3hw It supports both COW and MOR?

I haven't tested it with COW, but the nature of large-scale streaming and microbatches would probably work much better with MOR + compaction cycles.

@adp2201

In Operation.fromString, unknown CDC op values currently fall back to INSERT. That feels risky for data correctness (silent bad writes) vs failing fast with a clear error.

this could be an issue, but i was basing it of the previous implementation (this code is missing from #12070, but was previously defined in the tabular implementation)

Other than that, iv'e made a small change to the docs

@kiikoh
Copy link

kiikoh commented Mar 17, 2026

Thanks everyone for taking on this work, it's been a long time coming.

  1. In Operation.fromString, unknown CDC op values currently fall back to INSERT. That feels risky for data correctness (silent bad writes) vs failing fast with a clear error.

In regards to this behavior, this will occur when debezium does an initial or incremental / ad-hoc snapshot. The operation will appear as an "r".

The desired behavior after a snapshot may be an upsert instead of an insert. Probably depends on the usage.

@adp2201
Copy link

adp2201 commented Mar 17, 2026

Thanks, this context is super helpful. I’m still getting familiar with this area, so take this as a suggestion rather than a strong stance.

Given @kiikoh’s note about Debezium snapshots using r, would it make sense to make op parsing a bit more explicit, something like:

  • I/C/R -> INSERT
  • U -> UPDATE
  • D -> DELETE
  • anything else -> clear error

My thinking is this keeps expected CDC behavior while avoiding silent inserts for truly unknown values. If that sounds reasonable, I’m happy to help add a small test and doc note.

@t3hw
Copy link
Author

t3hw commented Mar 20, 2026

Regarding the duplications, @koodin9 wrote a wonderful comment on my other PR (#15651) explaining the issue, and why this fix can lead to a loss off data. More work must be done there before it is safe for use in production.
In the mean time, it is possible to use the debezium ts_ms field to deduplicate the rows in the query level.

@t3hw
Copy link
Author

t3hw commented Mar 20, 2026

Regarding the duplications, @koodin9 wrote a wonderful comment on my other PR (#15651) explaining the issue, and why this fix can lead to a loss off data. More work must be done there before it is safe for use in production. In the mean time, it is possible to use the debezium ts_ms field to deduplicate the rows in the query level.

@t3hw In my testing, it is actually missing some data

@jerryzhujing I have pushed the cdc-and-recovery-fix branch with a newer version of the duplicates fix, ill be running some more tests testing it later on week, but its available if you'd like to test it as well

@t3hw
Copy link
Author

t3hw commented Mar 25, 2026

@jerryzhujing can you share the logs of your kafka connect service? do you see warning messages in the logs, such as these?

Received commit response when no commit in progress, this can happen during recovery.
Received commit ready when no commit in progress, this can happen during recovery.

edit: PR #15651 might fix it. I have created this branch that combines both the CDC change and the fix, ill run some tests, but the issue is not easily reproducible so it may take some time to verify that the fix is working as intended.

The cdc-and-recovery-fix branch has been updated to include @koodin9 's fix for de-duplication, see PRs #15710/#15651

@rainerschamm
Copy link

Hi all, I have been trying to follow the message thread and looked at some of the other PR's mentioned. They seem like valid concerns in their own right but relate at best tangentially to the original purpose of the PR.

The original PR introduces at its core:

The BaseDeltaWriter (in very similar way to Flinks BaseDeltaTaskWriter)
And its two derived classes PartitionedDeltaWriter and UnpartitionedDeltaWriter
Some config changes and some basic utility changes

Unless I missed something it seems to me that none of the concerns (in the Coordinator/Commit buffer) are related to any of this code; and should be addressed on their own, since they probably cause similar issues with the current append writer.

We have been testing both versions of the delta writer, partitioned and partitioned with both v2 and v3 (e.g. with DV enabled) formats for weeks now with no issues.

I am just worried that we are conflating issues a bit here. Let's find any remaining problems with the code in the PR and then merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka Connect: Add delta writer support