diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java index 150d690..83a793f 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java @@ -16,6 +16,7 @@ package org.apache.flink.streaming.connectors.kafka.table; import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; +import com.google.auto.service.AutoService; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.tuple.Tuple2; @@ -37,6 +38,7 @@ import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; @@ -82,6 +84,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateScanBoundedMode; /** Upsert-Kafka factory. */ +@AutoService(Factory.class) public class SafeUpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {