Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode for in-batch deduplication#14797
Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode for in-batch deduplication#14797t3hw wants to merge 12 commits intoapache:mainfrom
Conversation
bryanck
left a comment
There was a problem hiding this comment.
Thanks for the PR.
As you may know, the original (non-Apache) sink had delta writer support that relied on equality deletes. When the sink was contributed to this project, the community decided it was best to remove that functionality, as using it can result in severely degraded performance. This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues.
We should revisit those discussions and resolve those concerns before we proceed with this.
Thanks for the reply! |
|
I believe this feature is absolutely essential. |
|
Thanks for contributing for iceberg ecosystem. |
|
For what it's worth, I tested it under a modest CDC load, and it seems to be working fine. |
|
We did a test with this PR branch with properties in kafka connect for sink connector on AWS We found the table was created in version 2 in metadata file. So this upsert & DV support is for version 2 tables only ? |
Yeah, it's a limitation. I should have probably made a note about it above, but it is kind of out of scope for this PR. What I ended up doing is writing some code that interfaces with the iceberg catalog outside of kafka connect and initilizes the table ahead of time with the correct format version |
|
@bryanck @t3hw Great to see the progress here! Our team is looking to use this connector for a project that requires iceberg sink connector upsert/CDC support. Is there any guidance on whether the implementation part of this PR will be accommodated in a later release? Any insight into the timeline or priority would be very helpful for our planning." |
|
@bryanck @t3hw Great progress here, We are looking to use Apache Iceberg sink connector, and we need sink connector upsert functionality. Could you please clarify whether there are plans to support or merge this functionality in the future? If this is on the roadmap, we can proceed with this approach. Otherwise, we may need to consider alternative solutions, such as using Flink. It would be helpful to understand the expected direction or future of this PR. Thanks for the guidance. |
|
@bryanck +1 in strong support of this PR. The addition of Delta Writer and upsert/CDC support in the Kafka Connect sink unlocks important production use cases. It significantly simplifies incremental and CDC-based data pipelines. Many users including our team are looking to adopt Iceberg for CDC/upsert workflows, and having this merged into main would meaningfully improve adoption while reducing long-term maintenance and workarounds. We’ve also tested this PR under a sustained load of ~100 TPS for about 30 minutes and observed a stable lag in the range of 7–10 minutes. Notably, the lag did not scale proportionally with load, which is a promising signal for production readiness. Thanks for the thoughtful work on this, could the community consider moving this change forward in the near future? We’d be excited to see it included in an upcoming release. |
|
We need the ability to have the updates merged and this PR solves that. I understand the review and merge can take time so not asking to rush it but we would really benefit by getting a feedback from the community on:
We are blocked on our implementation journey pending this PR merge hence asking if this is confirmed to be on the roadmap or not. Appreciate your urgent attention and everything you and other contributors do. Thank you! |
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaWriter.java
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaWriter.java
Outdated
Show resolved
Hide resolved
|
@bryanck could you review this and, if you’re comfortable, approve? We’re seeing growing interest from teams that manage their own Kafka infrastructure to use this sink. While our Data Platform teams can achieve similar outcomes with Spark Streaming or the older Iceberg sink (https://github.com/databricks/iceberg-kafka-connect), I believe the current Iceberg sink simplifies day-to-day work for a lot of acquisition teams |
|
@hladush This is something the Iceberg community has decided, including PMC members, so we'd need to get the community on board in order to proceed. You can raise this as a topic on the dev list (again) or in a community sync if you want. |
…the CdcConstants from the SMT classes.
|
@bryanck thanks a lot for your response, could you please share with me any doc how to do that ? |
|
added a small clarification to the PR description:
|
|
Pasting my comment from another discussion about this PR: Original commenter said: And the response: If this gets accepted the documentation should be updated to let users know that compaction is highly recommended. Given that equality deletes + compaction is how Flink handles CDC (and is the industry standard), would this approach be acceptable if we:
|
6f410bd to
6e5d469
Compare
it should |
Heya, I merged this into 1.9.2 (and 1.10.1) manually and its works great for me. We really need this change and I will probably maintain a patch file until its merged in properly. You can checkout 1.9.2 and git apply the following: |
|
Will this PR be included in the next release? |
I have started a thread on the Kafka Connect Iceberg slack channel about a month ago but it didn't gain much traction. You are more than welcome to assist in the request to merge this feature. Currently the only option is to compile the branch yourself and drop it into the Kafka Connect plugin folder. |
@bryanck We have been testing a manually patched 1.10.1 version of the iceberg connector for a few weeks now, and it performs perfectly for +100 millions rows of data. No performance issues (using aws s3tables). Can you please bring this issue up again in the next community meeting? Also, it seems strange to use that the flink sink was merged in with upsert support with almost identical code (see org.apache.iceberg.flink.sink.BaseDeltaTaskWriter). |
Hmm, we have not seen any duplicates yet in our tests but we only tested it in this setup:
... Also we make sure all identifier fields are strictly non-null in the resulting iceberg table schema. |
|
I could not reproduce this duplication, but I have an assumption Again, this is just an assumption. There are many moving parts. I couldn't reproduce this issue in my integration tests.. |
I can't provide the aws s3tables ones since they are for a customer, but here are the ones we use for demos. I hit these tables quite hard with random inserts and updates in postgres and they generated no duplicates in iceberg. |
|
Thanks for continuing this work — the implementation and discussion here are very helpful. Given the mixed reports (works for some setups, occasional duplicates for others), could we tighten the merge criteria around a clear correctness contract before merge? Specifically, it would help to have:
That would make it much easier for users to adopt safely and for maintainers to evaluate long-term support risk. |
91275aa to
6a47746
Compare
|
@jerryzhujing can you share the logs of your kafka connect service? edit: PR #15651 might fix it. I have created this branch that combines both the CDC change and the fix, ill run some tests, but the issue is not easily reproducible so it may take some time to verify that the fix is working as intended. |
cross-batch duplicates In upsert mode, INSERT did not call `deleteKey()` before `write()`, unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE for the same key arrived in the same batch, INSERT's `write()` populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the position-delete path instead of emitting the equality delete needed to remove the prior batch's row. The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for INSERT when upsert is enabled - matching Flink's behavior. This produces an equality delete for every INSERT in upsert mode, even for genuinely new keys (no-op at read time, resolved by compaction). This is the same trade-off Flink makes. Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`) that commit to real Iceberg tables and read back row data, following the Flink `TestDeltaTaskWriter` pattern. These tests verify that equality deletes actually mask prior-batch rows - something the existing mock-based unit tests could not validate. Also updates documentation with a delete behavior matrix, exactly-once semantics explanation, and operational guidance for CDC mode.
e087308 to
08deade
Compare
|
Thanks for the follow-up and for adding the docs in I went through the latest diff and had two things I wanted to flag before we call this done:
Also, given the duplicate/recovery discussion (and If you agree, I can help with a small follow-up commit for the validation + docs wording cleanup. |
I haven't tested it with COW, but the nature of large-scale streaming and microbatches would probably work much better with MOR + compaction cycles.
this could be an issue, but i was basing it of the previous implementation (this code is missing from #12070, but was previously defined in the tabular implementation) Other than that, iv'e made a small change to the docs |
|
Thanks everyone for taking on this work, it's been a long time coming.
In regards to this behavior, this will occur when debezium does an initial or incremental / ad-hoc snapshot. The operation will appear as an "r". The desired behavior after a snapshot may be an upsert instead of an insert. Probably depends on the usage. |
|
Thanks, this context is super helpful. I’m still getting familiar with this area, so take this as a suggestion rather than a strong stance. Given @kiikoh’s note about Debezium snapshots using
My thinking is this keeps expected CDC behavior while avoiding silent inserts for truly unknown values. If that sounds reasonable, I’m happy to help add a small test and doc note. |
|
Regarding the duplications, @koodin9 wrote a wonderful comment on my other PR (#15651) explaining the issue, and why this fix can lead to a loss off data. More work must be done there before it is safe for use in production. |
@jerryzhujing I have pushed the cdc-and-recovery-fix branch with a newer version of the duplicates fix, ill be running some more tests testing it later on week, but its available if you'd like to test it as well |
The cdc-and-recovery-fix branch has been updated to include @koodin9 's fix for de-duplication, see PRs #15710/#15651 |
|
Hi all, I have been trying to follow the message thread and looked at some of the other PR's mentioned. They seem like valid concerns in their own right but relate at best tangentially to the original purpose of the PR. The original PR introduces at its core:
Unless I missed something it seems to me that none of the concerns (in the Coordinator/Commit buffer) are related to any of this code; and should be addressed on their own, since they probably cause similar issues with the current append writer. We have been testing both versions of the delta writer, partitioned and partitioned with both v2 and v3 (e.g. with DV enabled) formats for weeks now with no issues. I am just worried that we are conflating issues a bit here. Let's find any remaining problems with the code in the PR and then merge. |
Introduce Delta Writer functionality for both unpartitioned and partitioned tables, enabling CDC and upsert modes. Enhance configuration options for CDC fields, upsert mode, and DV usage.
Inspired by #12070
Resolves #10842
@bryanck
edit:
"using DVs for CDC" - DVs only help for in-batch deduplication.
Out of batch deletes/updates fall back to equality deletes.
Partitioning the table is highly recommended, periodically compacting the table when using CDC mode is mandatory.