Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
Expand All @@ -44,8 +43,12 @@
import org.apache.iceberg.util.StructProjection;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private static final Logger LOG = LoggerFactory.getLogger(BaseTaskWriter.class);

private final List<DataFile> completedDataFiles = Lists.newArrayList();
private final List<DeleteFile> completedDeleteFiles = Lists.newArrayList();
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
Expand Down Expand Up @@ -345,12 +348,17 @@ private void closeCurrent() throws IOException {
currentWriter.close();

if (currentRows == 0L) {
try {
io.deleteFile(currentFile.encryptingOutputFile());
} catch (UncheckedIOException e) {
// the file may not have been created, and it isn't worth failing the job to clean up,
// skip deleting
}
// the file may not have been created or cannot be deleted, and it isn't worth failing
// the job to clean up, skip deleting
Tasks.foreach(currentFile.encryptingOutputFile())
.suppressFailureWhenFinished()
.onFailure(
(file, exc) ->
LOG.warn(
"Failed to delete the uncommitted empty file during writer clean up: {}",
file,
exc))
.run(io::deleteFile);
} else {
complete(currentWriter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.connect.data.ErrorTolerance;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.tables.schema-force-optional";
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String ERROR_TOLERANCE = "errors.tolerance";
private static final String ERROR_LOG_INCLUDE_MESSAGES = "errors.log.include.messages";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PREFIX_PROP = "iceberg.control.group-id-prefix";
private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms";
Expand All @@ -94,6 +97,9 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String NAME_PROP = "name";
private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers";

private static final String DEFAULT_ERROR_TOLERANCE = ErrorTolerance.NONE.toString();
private static final String DEFAULT_ERROR_LOG_INCLUDE_MESSAGES = "false";

private static final String DEFAULT_CATALOG_NAME = "iceberg";
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";
Expand Down Expand Up @@ -177,6 +183,18 @@ private static ConfigDef newConfigDef() {
DEFAULT_CATALOG_NAME,
Importance.MEDIUM,
"Iceberg catalog name");
configDef.define(
ERROR_TOLERANCE,
ConfigDef.Type.STRING,
DEFAULT_ERROR_TOLERANCE,
Importance.MEDIUM,
"Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records.");
configDef.define(
ERROR_LOG_INCLUDE_MESSAGES,
ConfigDef.Type.BOOLEAN,
DEFAULT_ERROR_LOG_INCLUDE_MESSAGES,
Importance.MEDIUM,
"If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported.");
configDef.define(
CONTROL_TOPIC_PROP,
ConfigDef.Type.STRING,
Expand Down Expand Up @@ -434,6 +452,14 @@ public JsonConverter jsonConverter() {
return jsonConverter;
}

public String errorTolerance() {
return getString(ERROR_TOLERANCE);
}

public boolean errorLogIncludeMessages() {
return getBoolean(ERROR_LOG_INCLUDE_MESSAGES);
}

@VisibleForTesting
static boolean checkClassName(String className) {
return (className.matches(".*\\.ConnectDistributed.*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ private void startWorker() {
if (null == this.worker) {
LOG.info("Starting commit worker");
SinkWriter sinkWriter = new SinkWriter(catalog, config);
sinkWriter.setReporter(context.errantRecordReporter());
worker = new Worker(config, clientFactory, sinkWriter, context);
worker.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect.data;

import java.util.Locale;

public enum ErrorTolerance {

/** Tolerate no errors. */
NONE,

/** Tolerate all errors. */
ALL;

public String value() {
return name().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,34 @@ private void initNewWriter() {

@Override
public void write(SinkRecord record) {
Record row = null;
try {
// ignore tombstones...
if (record.value() != null) {
Record row = convertToRow(record);
writer.write(row);
row = convertToRow(record);
}
} catch (Exception e) {
String recordData =
this.config.errorLogIncludeMessages()
? String.format(", record: %s", record.value().toString())
: "";
throw new DataException(
String.format(
Locale.ROOT,
"An error occurred converting record, topic: %s, partition, %d, offset: %d",
"An error occurred converting record, topic: %s, partition, %d, offset: %d%s",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset()),
record.kafkaOffset(),
recordData),
e);
}
if (row != null) {
try {
writer.write(row);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

private Record convertToRow(SinkRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SinkWriter {

private static final Logger LOG = LoggerFactory.getLogger(SinkWriter.class);

private final IcebergSinkConfig config;
private final IcebergWriterFactory writerFactory;
private final Map<String, RecordWriter> writers;
private final Map<TopicPartition, Offset> sourceOffsets;
private ErrantRecordReporter reporter;

public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
this.config = config;
Expand All @@ -47,6 +55,10 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
this.sourceOffsets = Maps.newHashMap();
}

public void setReporter(ErrantRecordReporter reporter) {
this.reporter = reporter;
}

public void close() {
writers.values().forEach(RecordWriter::close);
}
Expand All @@ -65,7 +77,23 @@ public SinkWriterResult completeWrite() {
}

public void save(Collection<SinkRecord> sinkRecords) {
sinkRecords.forEach(this::save);
for (SinkRecord record : sinkRecords) {
try {
this.save(record);
} catch (DataException ex) {
if (this.reporter != null) {
this.reporter.report(record, ex);
}
if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) {
LOG.error(
"Data exception encountered while saving record but tolerated due to error tolerance settings. "
+ "To change this behavior, set 'errors.tolerance' to 'none':",
ex);
} else {
throw ex;
}
}
}
}

private void save(SinkRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
Expand All @@ -44,10 +45,13 @@
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class SinkWriterTest {

Expand Down Expand Up @@ -169,6 +173,11 @@ public void testDynamicNoRoute() {

private List<IcebergWriterResult> sinkWriterTest(
Map<String, Object> value, IcebergSinkConfig config) {
return sinkWriterTest(value, config, null);
}

private List<IcebergWriterResult> sinkWriterTest(
Map<String, Object> value, IcebergSinkConfig config, ErrantRecordReporter reporter) {
IcebergWriterResult writeResult =
new IcebergWriterResult(
TableIdentifier.parse(TABLE_NAME),
Expand All @@ -182,7 +191,7 @@ private List<IcebergWriterResult> sinkWriterTest(
when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer);

SinkWriter sinkWriter = new SinkWriter(catalog, config);

sinkWriter.setReporter(reporter);
// save a record
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
SinkRecord rec =
Expand All @@ -207,4 +216,72 @@ private List<IcebergWriterResult> sinkWriterTest(

return result.writerResults();
}

@Test
public void testErrorToleranceAll() {

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString());
when(config.errorLogIncludeMessages()).thenReturn(true);

Map<String, Object> value = ImmutableMap.of("id", 1);
List<IcebergWriterResult> writerResults = sinkWriterTest(value, config);
Map<String, Object> badValue = ImmutableMap.of("id", "abc");
List<IcebergWriterResult> writerResults1 = sinkWriterTest(badValue, config);
assertThat(writerResults1.size()).isEqualTo(1);
}

@Test
public void testErrorToleranceNone() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString());

Map<String, Object> badValue = ImmutableMap.of("id", "abc");
assertThatThrownBy(() -> sinkWriterTest(badValue, config))
.isInstanceOf(DataException.class)
.hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100");
}

@Test
public void testErrorToleranceNoneErrorLogIncludeMessages() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString());
when(config.errorLogIncludeMessages()).thenReturn(true);

Map<String, Object> badValue = ImmutableMap.of("id", "abc");
assertThatThrownBy(() -> sinkWriterTest(badValue, config))
.isInstanceOf(DataException.class)
.hasStackTraceContaining(
"Caused by: java.lang.NumberFormatException: For input string: \"abc\"\n")
.hasMessage(
"An error occurred converting record, topic: topic, partition, 1, offset: 100, record: {id=abc}");
}

@Test
public void testErrantRecordReporter() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString());

ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
when(reporter.report(any(), any()))
.then(
invocation -> {
return null;
});

Map<String, Object> badValue = ImmutableMap.of("id", "abc");
List<IcebergWriterResult> writerResults1 = sinkWriterTest(badValue, config, reporter);
assertThat(writerResults1.size()).isEqualTo(1);

// Verify report function was called once
Mockito.verify(reporter, Mockito.times(1)).report(any(), any());
}
}