diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 0834c7156a9c..0edf3662a18a 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.Map; import org.apache.iceberg.DataFile; @@ -44,8 +43,12 @@ import org.apache.iceberg.util.StructProjection; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BaseTaskWriter implements TaskWriter { + private static final Logger LOG = LoggerFactory.getLogger(BaseTaskWriter.class); + private final List completedDataFiles = Lists.newArrayList(); private final List completedDeleteFiles = Lists.newArrayList(); private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); @@ -345,12 +348,17 @@ private void closeCurrent() throws IOException { currentWriter.close(); if (currentRows == 0L) { - try { - io.deleteFile(currentFile.encryptingOutputFile()); - } catch (UncheckedIOException e) { - // the file may not have been created, and it isn't worth failing the job to clean up, - // skip deleting - } + // the file may not have been created or cannot be deleted, and it isn't worth failing + // the job to clean up, skip deleting + Tasks.foreach(currentFile.encryptingOutputFile()) + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> + LOG.warn( + "Failed to delete the uncommitted empty file during writer clean up: {}", + file, + exc)) + .run(io::deleteFile); } else { complete(currentWriter); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index a4e15932f1a3..b98628beeb5c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.connect.data.ErrorTolerance; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -79,6 +80,8 @@ public class IcebergSinkConfig extends AbstractConfig { "iceberg.tables.schema-force-optional"; private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = "iceberg.tables.schema-case-insensitive"; + private static final String ERROR_TOLERANCE = "errors.tolerance"; + private static final String ERROR_LOG_INCLUDE_MESSAGES = "errors.log.include.messages"; private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; private static final String CONTROL_GROUP_ID_PREFIX_PROP = "iceberg.control.group-id-prefix"; private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; @@ -94,6 +97,9 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String NAME_PROP = "name"; private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; + private static final String DEFAULT_ERROR_TOLERANCE = ErrorTolerance.NONE.toString(); + private static final String DEFAULT_ERROR_LOG_INCLUDE_MESSAGES = "false"; + private static final String DEFAULT_CATALOG_NAME = "iceberg"; private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-"; @@ -177,6 +183,18 @@ private static ConfigDef newConfigDef() { DEFAULT_CATALOG_NAME, Importance.MEDIUM, "Iceberg catalog name"); + configDef.define( + ERROR_TOLERANCE, + ConfigDef.Type.STRING, + DEFAULT_ERROR_TOLERANCE, + Importance.MEDIUM, + "Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records."); + configDef.define( + ERROR_LOG_INCLUDE_MESSAGES, + ConfigDef.Type.BOOLEAN, + DEFAULT_ERROR_LOG_INCLUDE_MESSAGES, + Importance.MEDIUM, + "If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported."); configDef.define( CONTROL_TOPIC_PROP, ConfigDef.Type.STRING, @@ -434,6 +452,14 @@ public JsonConverter jsonConverter() { return jsonConverter; } + public String errorTolerance() { + return getString(ERROR_TOLERANCE); + } + + public boolean errorLogIncludeMessages() { + return getBoolean(ERROR_LOG_INCLUDE_MESSAGES); + } + @VisibleForTesting static boolean checkClassName(String className) { return (className.matches(".*\\.ConnectDistributed.*") diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 853a049f6bec..cc44d8170251 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -166,6 +166,7 @@ private void startWorker() { if (null == this.worker) { LOG.info("Starting commit worker"); SinkWriter sinkWriter = new SinkWriter(catalog, config); + sinkWriter.setReporter(context.errantRecordReporter()); worker = new Worker(config, clientFactory, sinkWriter, context); worker.start(); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java new file mode 100644 index 000000000000..cc4cc2599479 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Locale; + +public enum ErrorTolerance { + + /** Tolerate no errors. */ + NONE, + + /** Tolerate all errors. */ + ALL; + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index b5be5b3a0047..a0a7d60b366b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -57,22 +57,34 @@ private void initNewWriter() { @Override public void write(SinkRecord record) { + Record row = null; try { // ignore tombstones... if (record.value() != null) { - Record row = convertToRow(record); - writer.write(row); + row = convertToRow(record); } } catch (Exception e) { + String recordData = + this.config.errorLogIncludeMessages() + ? String.format(", record: %s", record.value().toString()) + : ""; throw new DataException( String.format( Locale.ROOT, - "An error occurred converting record, topic: %s, partition, %d, offset: %d", + "An error occurred converting record, topic: %s, partition, %d, offset: %d%s", record.topic(), record.kafkaPartition(), - record.kafkaOffset()), + record.kafkaOffset(), + recordData), e); } + if (row != null) { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } private Record convertToRow(SinkRecord record) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index f81155e13777..6fe0fcfd6558 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -32,13 +32,21 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(SinkWriter.class); + private final IcebergSinkConfig config; private final IcebergWriterFactory writerFactory; private final Map writers; private final Map sourceOffsets; + private ErrantRecordReporter reporter; public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.config = config; @@ -47,6 +55,10 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.sourceOffsets = Maps.newHashMap(); } + public void setReporter(ErrantRecordReporter reporter) { + this.reporter = reporter; + } + public void close() { writers.values().forEach(RecordWriter::close); } @@ -65,7 +77,23 @@ public SinkWriterResult completeWrite() { } public void save(Collection sinkRecords) { - sinkRecords.forEach(this::save); + for (SinkRecord record : sinkRecords) { + try { + this.save(record); + } catch (DataException ex) { + if (this.reporter != null) { + this.reporter.report(record, ex); + } + if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { + LOG.error( + "Data exception encountered while saving record but tolerated due to error tolerance settings. " + + "To change this behavior, set 'errors.tolerance' to 'none':", + ex); + } else { + throw ex; + } + } + } } private void save(SinkRecord record) { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java index 4a17b926fc56..f7a3a955a1c4 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -44,10 +45,13 @@ import org.apache.iceberg.types.Types; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class SinkWriterTest { @@ -169,6 +173,11 @@ public void testDynamicNoRoute() { private List sinkWriterTest( Map value, IcebergSinkConfig config) { + return sinkWriterTest(value, config, null); + } + + private List sinkWriterTest( + Map value, IcebergSinkConfig config, ErrantRecordReporter reporter) { IcebergWriterResult writeResult = new IcebergWriterResult( TableIdentifier.parse(TABLE_NAME), @@ -182,7 +191,7 @@ private List sinkWriterTest( when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); SinkWriter sinkWriter = new SinkWriter(catalog, config); - + sinkWriter.setReporter(reporter); // save a record Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); SinkRecord rec = @@ -207,4 +216,72 @@ private List sinkWriterTest( return result.writerResults(); } + + @Test + public void testErrorToleranceAll() { + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + Map value = ImmutableMap.of("id", 1); + List writerResults = sinkWriterTest(value, config); + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config); + assertThat(writerResults1.size()).isEqualTo(1); + } + + @Test + public void testErrorToleranceNone() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100"); + } + + @Test + public void testErrorToleranceNoneErrorLogIncludeMessages() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasStackTraceContaining( + "Caused by: java.lang.NumberFormatException: For input string: \"abc\"\n") + .hasMessage( + "An error occurred converting record, topic: topic, partition, 1, offset: 100, record: {id=abc}"); + } + + @Test + public void testErrantRecordReporter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + + ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); + when(reporter.report(any(), any())) + .then( + invocation -> { + return null; + }); + + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config, reporter); + assertThat(writerResults1.size()).isEqualTo(1); + + // Verify report function was called once + Mockito.verify(reporter, Mockito.times(1)).report(any(), any()); + } }