From 15ce74ea8f933fe82f166d3a3c2c9860695739f2 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Thu, 8 May 2025 13:32:47 +0530 Subject: [PATCH 1/5] Implement error handling mechanism for data exception --- .../iceberg/connect/IcebergSinkConfig.java | 26 ++++++++++++++ .../iceberg/connect/data/ErrorTolerance.java | 34 +++++++++++++++++++ .../iceberg/connect/data/IcebergWriter.java | 34 ++++++++++++++----- 3 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..678bf3b0f2c0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.connect.data.ErrorTolerance; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -79,6 +80,8 @@ public class IcebergSinkConfig extends AbstractConfig { "iceberg.tables.schema-force-optional"; private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = "iceberg.tables.schema-case-insensitive"; + private static final String ERROR_TOLERANCE = "errors.tolerance"; + private static final String ERROR_LOG_INCLUDE_MESSAGES = "errors.log.include.messages"; private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; private static final String CONTROL_GROUP_ID_PREFIX_PROP = "iceberg.control.group-id-prefix"; private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; @@ -95,6 +98,9 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TASK_ID = "task.id"; private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; + private static final String DEFAULT_ERROR_TOLERANCE = ErrorTolerance.NONE.toString(); + private static final String DEFAULT_ERROR_LOG_INCLUDE_MESSAGES = "false"; + private static final String DEFAULT_CATALOG_NAME = "iceberg"; private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-"; @@ -181,6 +187,18 @@ private static ConfigDef newConfigDef() { DEFAULT_CATALOG_NAME, Importance.MEDIUM, "Iceberg catalog name"); + configDef.define( + ERROR_TOLERANCE, + ConfigDef.Type.STRING, + DEFAULT_ERROR_TOLERANCE, + Importance.MEDIUM, + "Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records."); + configDef.define( + ERROR_LOG_INCLUDE_MESSAGES, + ConfigDef.Type.BOOLEAN, + DEFAULT_ERROR_LOG_INCLUDE_MESSAGES, + Importance.MEDIUM, + "If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported."); configDef.define( CONTROL_TOPIC_PROP, ConfigDef.Type.STRING, @@ -452,6 +470,14 @@ public JsonConverter jsonConverter() { return jsonConverter; } + public String errorTolerance() { + return getString(ERROR_TOLERANCE); + } + + public boolean errorLogIncludeMessages() { + return getBoolean(ERROR_LOG_INCLUDE_MESSAGES); + } + @VisibleForTesting static boolean checkClassName(String className) { return (className.matches(".*\\.ConnectDistributed.*") diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java new file mode 100644 index 000000000000..cc4cc2599479 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Locale; + +public enum ErrorTolerance { + + /** Tolerate no errors. */ + NONE, + + /** Tolerate all errors. */ + ALL; + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index b5be5b3a0047..32ff698e7b76 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -32,8 +32,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class IcebergWriter implements RecordWriter { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); + private final Table table; private final String tableName; private final IcebergSinkConfig config; @@ -56,7 +61,7 @@ private void initNewWriter() { } @Override - public void write(SinkRecord record) { + public void write(SinkRecord record) throws DataException { try { // ignore tombstones... if (record.value() != null) { @@ -64,14 +69,25 @@ public void write(SinkRecord record) { writer.write(row); } } catch (Exception e) { - throw new DataException( - String.format( - Locale.ROOT, - "An error occurred converting record, topic: %s, partition, %d, offset: %d", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset()), - e); + String recordData = ""; + if (this.config.errorLogIncludeMessages()) { + recordData = String.format(", record: %s", record.value().toString()); + } + DataException ex = + new DataException( + String.format( + Locale.ROOT, + "topic: %s, partition, %d, offset: %d %s", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + recordData), + e); + if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { + LOG.error("An error occurred converting record...", ex); + } else { + throw ex; + } } } From 9a5a98bced6688cace2ea7e8b061a8993f6c3274 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 12:47:22 +0530 Subject: [PATCH 2/5] Update DLQ implementation to use errant reporter --- .../connect/channel/CommitterImpl.java | 1 + .../iceberg/connect/data/IcebergWriter.java | 48 +++++++++---------- .../iceberg/connect/data/SinkWriter.java | 30 +++++++++++- 3 files changed, 52 insertions(+), 27 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 1314988404e3..1f68b112905a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -178,6 +178,7 @@ private void startWorker() { if (null == this.worker) { LOG.info("Starting commit worker"); SinkWriter sinkWriter = new SinkWriter(catalog, config); + sinkWriter.setReporter(context.errantRecordReporter()); worker = new Worker(config, clientFactory, sinkWriter, context); worker.start(); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index 32ff698e7b76..a0a7d60b366b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -32,13 +32,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class IcebergWriter implements RecordWriter { - - private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); - private final Table table; private final String tableName; private final IcebergSinkConfig config; @@ -61,32 +56,33 @@ private void initNewWriter() { } @Override - public void write(SinkRecord record) throws DataException { + public void write(SinkRecord record) { + Record row = null; try { // ignore tombstones... if (record.value() != null) { - Record row = convertToRow(record); - writer.write(row); + row = convertToRow(record); } } catch (Exception e) { - String recordData = ""; - if (this.config.errorLogIncludeMessages()) { - recordData = String.format(", record: %s", record.value().toString()); - } - DataException ex = - new DataException( - String.format( - Locale.ROOT, - "topic: %s, partition, %d, offset: %d %s", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset(), - recordData), - e); - if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { - LOG.error("An error occurred converting record...", ex); - } else { - throw ex; + String recordData = + this.config.errorLogIncludeMessages() + ? String.format(", record: %s", record.value().toString()) + : ""; + throw new DataException( + String.format( + Locale.ROOT, + "An error occurred converting record, topic: %s, partition, %d, offset: %d%s", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + recordData), + e); + } + if (row != null) { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); } } } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index f81155e13777..6fe0fcfd6558 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -32,13 +32,21 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(SinkWriter.class); + private final IcebergSinkConfig config; private final IcebergWriterFactory writerFactory; private final Map writers; private final Map sourceOffsets; + private ErrantRecordReporter reporter; public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.config = config; @@ -47,6 +55,10 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.sourceOffsets = Maps.newHashMap(); } + public void setReporter(ErrantRecordReporter reporter) { + this.reporter = reporter; + } + public void close() { writers.values().forEach(RecordWriter::close); } @@ -65,7 +77,23 @@ public SinkWriterResult completeWrite() { } public void save(Collection sinkRecords) { - sinkRecords.forEach(this::save); + for (SinkRecord record : sinkRecords) { + try { + this.save(record); + } catch (DataException ex) { + if (this.reporter != null) { + this.reporter.report(record, ex); + } + if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { + LOG.error( + "Data exception encountered while saving record but tolerated due to error tolerance settings. " + + "To change this behavior, set 'errors.tolerance' to 'none':", + ex); + } else { + throw ex; + } + } + } } private void save(SinkRecord record) { From ecb2ae0af1534d664f36c9235e8cb5c942001ef7 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 13:45:00 +0530 Subject: [PATCH 3/5] Add tests --- .../iceberg/connect/data/TestSinkWriter.java | 82 ++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index a14ebcab7336..b98c0b59e737 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -44,10 +45,13 @@ import org.apache.iceberg.types.Types; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class TestSinkWriter { @@ -168,7 +172,12 @@ public void testDynamicNoRoute() { } private List sinkWriterTest( - Map value, IcebergSinkConfig config) { + Map value, IcebergSinkConfig config) { + return sinkWriterTest(value, config, null); + } + + private List sinkWriterTest( + Map value, IcebergSinkConfig config, ErrantRecordReporter reporter) { IcebergWriterResult writeResult = new IcebergWriterResult( TableIdentifier.parse(TABLE_NAME), @@ -182,7 +191,7 @@ private List sinkWriterTest( when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); SinkWriter sinkWriter = new SinkWriter(catalog, config); - + sinkWriter.setReporter(reporter); // save a record Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); SinkRecord rec = @@ -207,4 +216,73 @@ private List sinkWriterTest( return result.writerResults(); } + + @Test + public void testErrorToleranceAll() { + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + + Map value = ImmutableMap.of("id", 1); + List writerResults = sinkWriterTest(value, config); + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config); + assertThat(writerResults1.size()).isEqualTo(1); + } + + @Test + public void testErrorToleranceNone() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100"); + } + + @Test + public void testErrorToleranceNoneErrorLogIncludeMessages() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasStackTraceContaining( + "Caused by: java.lang.NumberFormatException: For input string: \"abc\"\n") + .hasMessage( + "An error occurred converting record, topic: topic, partition, 1, offset: 100, record: {id=abc}"); + } + + @Test + public void testErrantRecordReporter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + + ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); + when(reporter.report(any(), any())) + .then( + invocation -> { + return null; + }); + + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config, reporter); + assertThat(writerResults1.size()).isEqualTo(1); + + // Verify report function was called once + Mockito.verify(reporter, Mockito.times(1)).report(any(), any()); + } } From f7fd17bc90d4a17cdafccb0a74be2ff4fefddd87 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 13:52:52 +0530 Subject: [PATCH 4/5] Add tests --- .../java/org/apache/iceberg/connect/data/TestSinkWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index b98c0b59e737..f709a31aa911 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -172,7 +172,7 @@ public void testDynamicNoRoute() { } private List sinkWriterTest( - Map value, IcebergSinkConfig config) { + Map value, IcebergSinkConfig config) { return sinkWriterTest(value, config, null); } @@ -226,7 +226,6 @@ public void testErrorToleranceAll() { when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); when(config.errorLogIncludeMessages()).thenReturn(true); - Map value = ImmutableMap.of("id", 1); List writerResults = sinkWriterTest(value, config); Map badValue = ImmutableMap.of("id", "abc"); From ff02a8efc9899b9b4e75f24199af0b9383ad827c Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 14:31:02 +0530 Subject: [PATCH 5/5] Add tests --- .../java/org/apache/iceberg/connect/data/TestSinkWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index f709a31aa911..8df2551fbce7 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -20,7 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock;