From f97932de4bc23913acc3b55b4a2bfd7258248a6f Mon Sep 17 00:00:00 2001 From: Deep Patel Date: Fri, 8 May 2026 16:31:34 -0700 Subject: [PATCH] feat: add Apache Pinot OFFLINE segment sink connector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a Flink 2.x DynamicTableSink that writes RowData records into Apache Pinot OFFLINE segments via FlinkSegmentWriter + SegmentUploaderDefault, without requiring Pinot 1.6.0+ SNAPSHOT artifacts. - PinotDynamicTableFactory: registers 'pinot' connector with controller.url, table.name, and segment.flush.rows options - PinotRowDataSink: at-least-once Sink with async segment upload; flushes on checkpoint (flush()) and when segmentFlushRows threshold is reached - PinotRowDataConverter: maps Flink RowData fields to Pinot GenericRow, converting TIMESTAMP_LTZ → epoch millis and DECIMAL → double - Integration test (PinotSinkFlinkTest): Flink minicluster + 4 Testcontainers (ZK + Controller + Broker + Server); verifies end-to-end segment upload Dependency notes in pom.xml: - Excludes org.apache.calcite:* from pinot-flink-connector (avoids SqlTypeName.VARIANT conflict with Flink 2.2's newer Calcite) - Pins jackson-core 2.18.2 (helix-core pulls in 2.12.6 which lacks StreamReadConstraints needed by jackson-databind 2.18.2) - Adds --add-opens for Chronicle Core's JDK-internal reflection on JDK 17+ Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Deep Patel --- connectors/pinot-connector/pom.xml | 151 +++++++++ .../pinot/PinotDynamicTableFactory.java | 78 +++++ .../pinot/PinotDynamicTableSink.java | 114 +++++++ .../connector/pinot/PinotOptions.java | 43 +++ .../pinot/PinotRowDataConverter.java | 104 ++++++ .../connector/pinot/PinotRowDataSink.java | 220 +++++++++++++ .../pinot/PinotRowDataConverterTest.java | 118 +++++++ .../connector/pinot/PinotSinkFlinkTest.java | 297 ++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 8 + connectors/pom.xml | 1 + 10 files changed, 1134 insertions(+) create mode 100644 connectors/pinot-connector/pom.xml create mode 100644 connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableFactory.java create mode 100644 connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableSink.java create mode 100644 connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotOptions.java create mode 100644 connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverter.java create mode 100644 connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataSink.java create mode 100644 connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverterTest.java create mode 100644 connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotSinkFlinkTest.java create mode 100644 connectors/pinot-connector/src/test/resources/log4j2-test.properties diff --git a/connectors/pinot-connector/pom.xml b/connectors/pinot-connector/pom.xml new file mode 100644 index 0000000..a4d4486 --- /dev/null +++ b/connectors/pinot-connector/pom.xml @@ -0,0 +1,151 @@ + + + + 4.0.0 + + + com.datasqrl.flinkrunner + connectors + 1.0-SNAPSHOT + + + pinot-connector + Apache Pinot Connector + + + 1.3.0 + + + + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + + org.apache.pinot + pinot-flink-connector + ${pinot.version} + + + + org.apache.flink + * + + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + + org.apache.calcite + * + + + + + + + com.google.auto.service + auto-service + + + + + com.fasterxml.jackson.core + jackson-core + 2.18.2 + + + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + test + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + org.testcontainers + testcontainers-junit-jupiter + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.projectlombok + lombok + ${lombok.version} + + + com.google.auto.service + auto-service + ${auto.service.version} + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.lang.reflect=ALL-UNNAMED + --add-opens java.base/java.io=ALL-UNNAMED + --add-opens java.base/java.nio=ALL-UNNAMED + --add-opens java.base/sun.nio.ch=ALL-UNNAMED + + + + + diff --git a/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableFactory.java b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableFactory.java new file mode 100644 index 0000000..c150caa --- /dev/null +++ b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableFactory.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.pinot; + +import com.google.auto.service.AutoService; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; + +/** + * Flink Table factory for the {@code pinot} connector identifier. + * + *

Example SQRL / FlinkSQL usage: + * + *

{@code
+ * CREATE TABLE PinotSink (
+ *   user_id   BIGINT,
+ *   event_ts  TIMESTAMP_LTZ(3),
+ *   action    STRING
+ * ) WITH (
+ *   'connector'           = 'pinot',
+ *   'controller.url'      = 'http://pinot-controller:9000',
+ *   'table.name'          = 'user_events_OFFLINE',
+ *   'segment.flush.rows'  = '200000'
+ * );
+ *
+ * EXPORT user_events -> PinotSink;
+ * }
+ */ +@AutoService(Factory.class) +public class PinotDynamicTableFactory implements DynamicTableSinkFactory { + + public static final String IDENTIFIER = "pinot"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Set.of(PinotOptions.CONTROLLER_URL, PinotOptions.TABLE_NAME); + } + + @Override + public Set> optionalOptions() { + return Set.of(PinotOptions.SEGMENT_FLUSH_ROWS); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + var helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + var options = helper.getOptions(); + + return new PinotDynamicTableSink( + options.get(PinotOptions.CONTROLLER_URL), + options.get(PinotOptions.TABLE_NAME), + options.get(PinotOptions.SEGMENT_FLUSH_ROWS), + context.getPhysicalRowDataType()); + } +} diff --git a/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableSink.java b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableSink.java new file mode 100644 index 0000000..a3c089c --- /dev/null +++ b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotDynamicTableSink.java @@ -0,0 +1,114 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.pinot; + +import java.util.Collections; +import java.util.HashMap; +import lombok.RequiredArgsConstructor; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; + +/** Planner-facing description of the Pinot OFFLINE sink. */ +@RequiredArgsConstructor +public class PinotDynamicTableSink implements DynamicTableSink { + + private static final String DEFAULT_OUTPUT_DIR = "/tmp/pinotoutput"; + + private final String controllerUrl; + private final String tableName; + private final long segmentFlushRows; + private final DataType physicalRowDataType; + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.insertOnly(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + var fieldNames = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + var fieldDataTypes = DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]); + + var converter = new PinotRowDataConverter(fieldNames, fieldDataTypes); + var schema = buildPinotSchema(fieldNames, fieldDataTypes); + var tableConfig = buildTableConfig(); + + return SinkV2Provider.of( + new PinotRowDataSink(tableConfig, schema, segmentFlushRows, converter)); + } + + @Override + public DynamicTableSink copy() { + return new PinotDynamicTableSink( + controllerUrl, tableName, segmentFlushRows, physicalRowDataType); + } + + @Override + public String asSummaryString() { + return "Pinot[" + tableName + "]"; + } + + private Schema buildPinotSchema(String[] fieldNames, DataType[] fieldDataTypes) { + var builder = new Schema.SchemaBuilder().setSchemaName(tableName); + for (int i = 0; i < fieldNames.length; i++) { + builder.addSingleValueDimension( + fieldNames[i], toPinotDataType(fieldDataTypes[i].getLogicalType())); + } + return builder.build(); + } + + private TableConfig buildTableConfig() { + var batchConfigMap = new HashMap(); + batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, DEFAULT_OUTPUT_DIR); + batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, controllerUrl); + + var ingestionConfig = new IngestionConfig(); + ingestionConfig.setBatchIngestionConfig( + new BatchIngestionConfig(Collections.singletonList(batchConfigMap), "APPEND", "HOURLY")); + + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(tableName) + .setIngestionConfig(ingestionConfig) + .build(); + } + + private FieldSpec.DataType toPinotDataType(LogicalType type) { + return switch (type.getTypeRoot()) { + case BOOLEAN -> FieldSpec.DataType.BOOLEAN; + case TINYINT, SMALLINT, INTEGER, DATE, TIME_WITHOUT_TIME_ZONE -> FieldSpec.DataType.INT; + case BIGINT, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE -> + FieldSpec.DataType.LONG; + case FLOAT -> FieldSpec.DataType.FLOAT; + case DOUBLE, DECIMAL -> FieldSpec.DataType.DOUBLE; + case CHAR, VARCHAR -> FieldSpec.DataType.STRING; + case BINARY, VARBINARY -> FieldSpec.DataType.BYTES; + default -> + throw new UnsupportedOperationException( + "No Pinot type mapping for Flink type: " + type.getTypeRoot()); + }; + } +} diff --git a/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotOptions.java b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotOptions.java new file mode 100644 index 0000000..d35cf4f --- /dev/null +++ b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotOptions.java @@ -0,0 +1,43 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.pinot; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PinotOptions { + + public static final ConfigOption CONTROLLER_URL = + ConfigOptions.key("controller.url") + .stringType() + .noDefaultValue() + .withDescription("Pinot controller base URL, e.g. http://localhost:9000"); + + public static final ConfigOption TABLE_NAME = + ConfigOptions.key("table.name") + .stringType() + .noDefaultValue() + .withDescription("Target Pinot OFFLINE table name"); + + public static final ConfigOption SEGMENT_FLUSH_ROWS = + ConfigOptions.key("segment.flush.rows") + .longType() + .defaultValue(500_000L) + .withDescription("Max rows buffered per segment before it is flushed and uploaded"); +} diff --git a/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverter.java b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverter.java new file mode 100644 index 0000000..e77fdfc --- /dev/null +++ b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverter.java @@ -0,0 +1,104 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.pinot; + +import java.io.Serializable; +import lombok.RequiredArgsConstructor; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.pinot.spi.data.readers.GenericRow; + +/** + * Converts a Flink {@link RowData} (internal binary format) into a Pinot {@link GenericRow}. + * + *

Field getters are transient because {@link RowData.FieldGetter} is not serializable. They are + * rebuilt lazily on first use after deserialization. + * + *

Type mapping (Flink internal → Pinot / Java): + * + *

+ *  BOOLEAN                        → Boolean
+ *  TINYINT / SMALLINT / INTEGER   → Integer
+ *  BIGINT                         → Long
+ *  FLOAT                          → Float
+ *  DOUBLE                         → Double
+ *  DECIMAL                        → Double  (precision may be lost for very large decimals)
+ *  CHAR / VARCHAR                 → String
+ *  DATE                           → Integer (days since epoch)
+ *  TIME_WITHOUT_TIME_ZONE         → Integer (millis since midnight)
+ *  TIMESTAMP / TIMESTAMP_LTZ      → Long    (epoch millis)
+ *  BINARY / VARBINARY             → byte[]
+ * 
+ */ +@RequiredArgsConstructor +public class PinotRowDataConverter implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String[] fieldNames; + private final DataType[] fieldDataTypes; + + private transient RowData.FieldGetter[] fieldGetters; + private transient LogicalType[] logicalTypes; + + private void ensureInitialized() { + if (fieldGetters != null) { + return; + } + logicalTypes = new LogicalType[fieldDataTypes.length]; + fieldGetters = new RowData.FieldGetter[fieldDataTypes.length]; + for (int i = 0; i < fieldDataTypes.length; i++) { + logicalTypes[i] = fieldDataTypes[i].getLogicalType(); + fieldGetters[i] = RowData.createFieldGetter(logicalTypes[i], i); + } + } + + public GenericRow convert(RowData element) { + ensureInitialized(); + var row = new GenericRow(); + for (int i = 0; i < fieldNames.length; i++) { + row.putValue( + fieldNames[i], toExternal(fieldGetters[i].getFieldOrNull(element), logicalTypes[i])); + } + return row; + } + + private Object toExternal(Object internal, LogicalType type) { + if (internal == null) { + return null; + } + return switch (type.getTypeRoot()) { + case CHAR, VARCHAR -> internal.toString(); + case BOOLEAN -> internal; + case TINYINT -> ((Number) internal).byteValue(); + case SMALLINT -> ((Number) internal).shortValue(); + case INTEGER, DATE, TIME_WITHOUT_TIME_ZONE -> ((Number) internal).intValue(); + case BIGINT -> ((Number) internal).longValue(); + case FLOAT -> ((Number) internal).floatValue(); + case DOUBLE -> ((Number) internal).doubleValue(); + case DECIMAL -> ((DecimalData) internal).toBigDecimal().doubleValue(); + case TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE -> + ((TimestampData) internal).getMillisecond(); + case BINARY, VARBINARY -> (byte[]) internal; + default -> + throw new UnsupportedOperationException( + "No Pinot type mapping for Flink type: " + type.getTypeRoot()); + }; + } +} diff --git a/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataSink.java b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataSink.java new file mode 100644 index 0000000..769a4e5 --- /dev/null +++ b/connectors/pinot-connector/src/main/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataSink.java @@ -0,0 +1,220 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.pinot; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.table.data.RowData; +import org.apache.pinot.connector.flink.sink.FlinkSegmentWriter; +import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flink 2.x {@link Sink} that writes {@link RowData} records into Apache Pinot OFFLINE segments. + * + *

Mirrors the logic of {@code PinotSink} from pinot-flink-connector (available in Pinot 1.6.0+), + * implemented here against the 1.3.0 APIs so no SNAPSHOT dependency is required. + * + *

Delivery guarantee: at-least-once. Segments are uploaded on {@code flush()} (called at + * every Flink checkpoint) and when the row buffer reaches {@code segmentFlushRows}. A task restart + * between upload and checkpoint acknowledgement may re-send rows; enable Pinot deduplication if + * exactly-once is required. + */ +@RequiredArgsConstructor +public class PinotRowDataSink implements Sink { + + private static final int EXECUTOR_POOL_SIZE = 5; + private static final long EXECUTOR_SHUTDOWN_WAIT_MS = 3_000; + + private final TableConfig tableConfig; + private final Schema schema; + private final long segmentFlushRows; + private final PinotRowDataConverter converter; + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + return new Writer(context); + } + + private final class Writer implements SinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(Writer.class); + + private final SegmentWriter segmentWriter; + private final SegmentUploader segmentUploader; + private final ExecutorService executor; + private final List> pendingUploads = new ArrayList<>(); + private long rowCount; + + private Writer(WriterInitContext context) throws IOException { + executor = Executors.newFixedThreadPool(EXECUTOR_POOL_SIZE); + SegmentWriter sw = null; + SegmentUploader su = null; + try { + sw = + new FlinkSegmentWriter( + context.getTaskInfo().getIndexOfThisSubtask(), context.metricGroup()); + sw.init(tableConfig, schema); + su = new SegmentUploaderDefault(); + su.init(tableConfig); + segmentWriter = sw; + segmentUploader = su; + LOG.info("Opened Pinot sink for table {}", tableConfig.getTableName()); + } catch (Exception e) { + executor.shutdownNow(); + closeQuietly(sw); + throw new IOException("Failed to initialise Pinot sink writer", e); + } + } + + @Override + public void write(RowData element, Context context) throws IOException, InterruptedException { + drainCompleted(); + try { + segmentWriter.collect(converter.convert(element)); + } catch (Exception e) { + throw new IOException("Failed to collect row into Pinot segment buffer", e); + } + if (++rowCount >= segmentFlushRows) { + flushAsync(); + } + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + if (rowCount > 0) { + flushAsync(); + } + awaitAllUploads(); + } + + @Override + public void close() throws Exception { + Exception failure = null; + try { + flush(true); + } catch (Exception e) { + restoreInterrupt(e); + failure = e; + } finally { + try { + shutdownExecutor(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (failure == null) failure = e; + else failure.addSuppressed(e); + } + try { + segmentWriter.close(); + } catch (Exception e) { + if (failure == null) failure = e; + else failure.addSuppressed(e); + } + } + if (failure != null) throw failure; + } + + private void flushAsync() throws IOException { + final URI segmentUri; + try { + segmentUri = segmentWriter.flush(); + } catch (Exception e) { + throw new IOException("Failed to flush Pinot segment", e); + } + rowCount = 0; + pendingUploads.add( + executor.submit( + () -> { + try { + segmentUploader.uploadSegment(segmentUri, null); + } catch (Exception e) { + throw new RuntimeException("Failed to upload Pinot segment " + segmentUri, e); + } + })); + } + + private void drainCompleted() throws IOException, InterruptedException { + Iterator> it = pendingUploads.iterator(); + while (it.hasNext()) { + Future f = it.next(); + if (f.isDone()) { + await(f); + it.remove(); + } + } + } + + private void awaitAllUploads() throws IOException, InterruptedException { + for (Future f : pendingUploads) { + await(f); + } + pendingUploads.clear(); + } + + private void await(Future f) throws IOException, InterruptedException { + try { + f.get(); + } catch (ExecutionException e) { + Throwable cause = + e.getCause() instanceof RuntimeException && e.getCause().getCause() != null + ? e.getCause().getCause() + : e.getCause(); + if (cause instanceof IOException) throw (IOException) cause; + if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + throw (InterruptedException) cause; + } + throw new IOException("Pinot segment upload failed", cause); + } + } + + private void shutdownExecutor() throws InterruptedException { + executor.shutdown(); + if (!executor.awaitTermination(EXECUTOR_SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } + + private void restoreInterrupt(Exception e) { + if (e instanceof InterruptedException) Thread.currentThread().interrupt(); + } + + private void closeQuietly(SegmentWriter sw) { + if (sw == null) return; + try { + sw.close(); + } catch (Exception ignored) { + } + } + } +} diff --git a/connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverterTest.java b/connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverterTest.java new file mode 100644 index 0000000..2f89d29 --- /dev/null +++ b/connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotRowDataConverterTest.java @@ -0,0 +1,118 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.pinot; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.junit.jupiter.api.Test; + +class PinotRowDataConverterTest { + + @Test + void convertsPrimitiveTypes() { + var fieldNames = new String[] {"b", "i", "l", "f", "d", "s"}; + var fieldTypes = + new DataType[] { + DataTypes.BOOLEAN(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.STRING() + }; + + var row = + GenericRowData.ofKind( + RowKind.INSERT, true, 42, 100L, 1.5f, 3.14, StringData.fromString("hello")); + var converter = new PinotRowDataConverter(fieldNames, fieldTypes); + var result = converter.convert(row); + + assertThat(result.getValue("b")).isEqualTo(true); + assertThat(result.getValue("i")).isEqualTo(42); + assertThat(result.getValue("l")).isEqualTo(100L); + assertThat(result.getValue("f")).isEqualTo(1.5f); + assertThat(result.getValue("d")).isEqualTo(3.14); + assertThat(result.getValue("s")).isEqualTo("hello"); + } + + @Test + void convertsTimestampToEpochMillis() { + var fieldNames = new String[] {"ts"}; + var fieldTypes = new DataType[] {DataTypes.TIMESTAMP_LTZ(3)}; + + var row = + GenericRowData.ofKind(RowKind.INSERT, TimestampData.fromEpochMillis(1_700_000_000_000L)); + var converter = new PinotRowDataConverter(fieldNames, fieldTypes); + var result = converter.convert(row); + + assertThat(result.getValue("ts")).isEqualTo(1_700_000_000_000L); + } + + @Test + void convertsDecimalToDouble() { + var fieldNames = new String[] {"amount"}; + var fieldTypes = new DataType[] {DataTypes.DECIMAL(10, 2)}; + + var row = + GenericRowData.ofKind( + RowKind.INSERT, DecimalData.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + var converter = new PinotRowDataConverter(fieldNames, fieldTypes); + var result = converter.convert(row); + + assertThat((Double) result.getValue("amount")).isEqualTo(123.45); + } + + @Test + void propagatesNullValues() { + var fieldNames = new String[] {"id", "name"}; + var fieldTypes = new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}; + + var row = GenericRowData.ofKind(RowKind.INSERT, 1L, null); + var converter = new PinotRowDataConverter(fieldNames, fieldTypes); + var result = converter.convert(row); + + assertThat(result.getValue("id")).isEqualTo(1L); + assertThat(result.getValue("name")).isNull(); + } + + @Test + void reinitializesAfterDeserialization() throws Exception { + var fieldNames = new String[] {"val"}; + var fieldTypes = new DataType[] {DataTypes.INT()}; + var converter = new PinotRowDataConverter(fieldNames, fieldTypes); + + // Simulate deserialization by cloning through Java serialization + var baos = new java.io.ByteArrayOutputStream(); + new java.io.ObjectOutputStream(baos).writeObject(converter); + var deserialized = + (PinotRowDataConverter) + new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(baos.toByteArray())) + .readObject(); + + var row = GenericRowData.ofKind(RowKind.INSERT, 99); + var result = deserialized.convert(row); + + assertThat(result.getValue("val")).isEqualTo(99); + } +} diff --git a/connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotSinkFlinkTest.java b/connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotSinkFlinkTest.java new file mode 100644 index 0000000..d9c2406 --- /dev/null +++ b/connectors/pinot-connector/src/test/java/com/datasqrl/flinkrunner/connector/pinot/PinotSinkFlinkTest.java @@ -0,0 +1,297 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.pinot; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpClient.Version; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse.BodyHandlers; +import java.time.Duration; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; + +/** + * Integration test: Flink minicluster → Pinot OFFLINE segment sink. + * + *

Uses four containers (ZooKeeper + Controller + Broker + Server). Broker and server are needed + * so that the controller's table-creation validator finds instances in {@code DefaultTenant}. + * OFFLINE segment push itself only talks to the controller; broker/server are not contacted during + * the write path. + * + *

All HTTP calls to Pinot use HTTP/1.1 explicitly: Pinot's Grizzly server does not support h2c + * upgrade, and Java's default {@code HttpClient} (HTTP_2) stalls the server-side connection read + * while negotiating the upgrade, causing a 30-second idle timeout on every POST. + */ +@Slf4j +@ExtendWith(MiniClusterExtension.class) +class PinotSinkFlinkTest { + + private static final String PINOT_IMAGE = "apachepinot/pinot:1.3.0"; + private static final int CONTROLLER_PORT = 9000; + private static final String TABLE_NAME = "test_events"; + + private static final Network NETWORK = Network.newNetwork(); + private static GenericContainer ZOOKEEPER; + private static GenericContainer CONTROLLER; + private static GenericContainer BROKER; + private static GenericContainer SERVER; + private static String controllerUrl; + + @BeforeAll + static void startContainers() throws Exception { + ZOOKEEPER = + new GenericContainer<>("zookeeper:3.8") + .withNetwork(NETWORK) + .withNetworkAliases("zookeeper") + .withExposedPorts(2181) + // Wait until PrepRequestProcessor starts — that is when ZK is truly ready + // to process client writes, not just listening on the port. + .waitingFor( + Wait.forLogMessage(".*PrepRequestProcessor.*started.*\\n", 1) + .withStartupTimeout(Duration.ofMinutes(1))); + ZOOKEEPER.start(); + + CONTROLLER = + new GenericContainer<>(PINOT_IMAGE) + .withNetwork(NETWORK) + .withCommand( + "StartController", + "-controllerPort", + String.valueOf(CONTROLLER_PORT), + "-zkAddress", + "zookeeper:2181", + "-dataDir", + "/tmp/pinotdata") + .withExposedPorts(CONTROLLER_PORT) + .withLogConsumer(frame -> System.out.print("CTRL| " + frame.getUtf8String())) + // /health returns 200 only after the lead-controller resource is assigned in Helix. + .waitingFor( + Wait.forHttp("/health") + .forPort(CONTROLLER_PORT) + .withStartupTimeout(Duration.ofMinutes(3))); + CONTROLLER.start(); + + // Broker and server register themselves via ZK; no fake instance registration needed. + BROKER = + new GenericContainer<>(PINOT_IMAGE) + .withNetwork(NETWORK) + .withCommand("StartBroker", "-zkAddress", "zookeeper:2181") + .withLogConsumer(frame -> System.out.print("BRKR| " + frame.getUtf8String())); + BROKER.start(); + + SERVER = + new GenericContainer<>(PINOT_IMAGE) + .withNetwork(NETWORK) + .withCommand("StartServer", "-zkAddress", "zookeeper:2181") + .withLogConsumer(frame -> System.out.print("SRVR| " + frame.getUtf8String())); + SERVER.start(); + + controllerUrl = + "http://" + CONTROLLER.getHost() + ":" + CONTROLLER.getMappedPort(CONTROLLER_PORT); + + // MiniCluster has NOT started yet (instance-level extension), so Helix can stabilise + // without CPU competition. Wait for cluster readiness, then create schema+table. + awaitClusterReady(controllerUrl); + createPinotTable(controllerUrl); + } + + @AfterAll + static void stopContainers() { + if (SERVER != null) SERVER.stop(); + if (BROKER != null) BROKER.stop(); + if (CONTROLLER != null) CONTROLLER.stop(); + if (ZOOKEEPER != null) ZOOKEEPER.stop(); + NETWORK.close(); + } + + @Test + void writesRowsToOfflineTable() throws Exception { + var env = StreamExecutionEnvironment.getExecutionEnvironment(); + var tEnv = StreamTableEnvironment.create(env); + + tEnv.executeSql( + "CREATE TABLE datagen_source (" + + " user_id BIGINT," + + " action STRING," + + " event_ts TIMESTAMP_LTZ(3)" + + ") WITH (" + + " 'connector' = 'datagen'," + + " 'number-of-rows' = '20'" + + ")"); + + tEnv.executeSql( + "CREATE TABLE pinot_sink (" + + " user_id BIGINT," + + " action STRING," + + " event_ts TIMESTAMP_LTZ(3)" + + ") WITH (" + + " 'connector' = 'pinot'," + + " 'controller.url' = '" + + controllerUrl + + "'," + + " 'table.name' = '" + + TABLE_NAME + + "'," + + " 'segment.flush.rows' = '10'" + + ")"); + + var result = tEnv.executeSql("INSERT INTO pinot_sink SELECT * FROM datagen_source"); + result.await(); + + assertThat(result.getResultKind()).isEqualTo(ResultKind.SUCCESS_WITH_CONTENT); + + // Verify at least one segment was uploaded to the controller. + var segments = fetchSegments(controllerUrl); + log.info("Segments in Pinot table {}: {}", TABLE_NAME, segments); + assertThat(segments).contains(TABLE_NAME); + } + + /** + * Two-phase wait: (1) instances visible in /instances, (2) broker+server assigned to + * DefaultTenant in Helix. Both checks use HTTP/1.1 to avoid h2c-upgrade stalls with Pinot's + * Grizzly server. + */ + private static void awaitClusterReady(String url) throws Exception { + // Force HTTP/1.1: Pinot's Grizzly HTTP/1.1 server does not support h2c upgrade, and Java's + // default HttpClient (HTTP_2) can stall the server-side connection read when issuing POSTs. + var http = HttpClient.newBuilder().version(Version.HTTP_1_1).build(); + long deadline = System.currentTimeMillis() + Duration.ofMinutes(3).toMillis(); + + // Phase 1: broker and server appear in /instances. + while (System.currentTimeMillis() < deadline) { + var resp = + http.send( + HttpRequest.newBuilder().uri(URI.create(url + "/instances")).GET().build(), + BodyHandlers.ofString()); + String body = resp.body(); + if (resp.statusCode() == 200 && body.contains("Broker_") && body.contains("Server_")) { + log.info("Phase 1 done — Pinot instances visible"); + break; + } + log.info("Phase 1: waiting for instances, body={}", body); + Thread.sleep(5_000); + } + + // Phase 2: DefaultTenant has broker + server assigned in Helix. + // Response: {"BrokerInstances":["Broker_..."],"ServerInstances":["Server_..."],...} + while (System.currentTimeMillis() < deadline) { + var resp = + http.send( + HttpRequest.newBuilder() + .uri(URI.create(url + "/tenants/DefaultTenant/metadata")) + .GET() + .build(), + BodyHandlers.ofString()); + String body = resp.body(); + log.info("Phase 2: tenant metadata={}", body); + if (resp.statusCode() == 200 && body.contains("Broker_") && body.contains("Server_")) { + log.info("Phase 2 done — DefaultTenant fully assigned"); + return; + } + Thread.sleep(5_000); + } + throw new AssertionError("Pinot cluster did not become ready within 3 minutes"); + } + + /** Creates the Pinot schema and OFFLINE table via the controller REST API. */ + private static void createPinotTable(String url) throws Exception { + var http = HttpClient.newBuilder().version(Version.HTTP_1_1).build(); + + var schema = + """ + { + "schemaName": "%s", + "dimensionFieldSpecs": [ + { "name": "user_id", "dataType": "LONG" }, + { "name": "action", "dataType": "STRING" }, + { "name": "event_ts", "dataType": "LONG" } + ] + } + """ + .formatted(TABLE_NAME); + + var schemaResp = + http.send( + HttpRequest.newBuilder() + .uri(URI.create(url + "/schemas")) + .header("Content-Type", "application/json") + .POST(BodyPublishers.ofString(schema)) + .build(), + BodyHandlers.ofString()); + log.info("Create schema HTTP {}: {}", schemaResp.statusCode(), schemaResp.body()); + assertThat(schemaResp.statusCode()).isLessThan(300); + + var tableConfig = + """ + { + "tableName": "%s", + "tableType": "OFFLINE", + "segmentsConfig": { + "replication": "1", + "schemaName": "%s" + }, + "ingestionConfig": { + "batchIngestionConfig": { + "segmentIngestionType": "APPEND", + "segmentIngestionFrequency": "HOURLY", + "batchConfigMaps": [ + { + "outputDirURI": "/tmp/pinotoutput", + "push.controllerUri": "%s" + } + ] + } + }, + "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, + "tableIndexConfig": { "loadMode": "MMAP" }, + "metadata": {} + } + """ + .formatted(TABLE_NAME, TABLE_NAME, url); + + var tableResp = + http.send( + HttpRequest.newBuilder() + .uri(URI.create(url + "/tables")) + .header("Content-Type", "application/json") + .POST(BodyPublishers.ofString(tableConfig)) + .build(), + BodyHandlers.ofString()); + log.info("Create table HTTP {}: {}", tableResp.statusCode(), tableResp.body()); + assertThat(tableResp.statusCode()).isLessThan(300); + } + + private static String fetchSegments(String url) throws Exception { + var http = HttpClient.newBuilder().version(Version.HTTP_1_1).build(); + var request = + HttpRequest.newBuilder().uri(URI.create(url + "/segments/" + TABLE_NAME)).GET().build(); + return http.send(request, BodyHandlers.ofString()).body(); + } +} diff --git a/connectors/pinot-connector/src/test/resources/log4j2-test.properties b/connectors/pinot-connector/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..2d91a37 --- /dev/null +++ b/connectors/pinot-connector/src/test/resources/log4j2-test.properties @@ -0,0 +1,8 @@ +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/connectors/pom.xml b/connectors/pom.xml index aba84ea..b291c31 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -32,6 +32,7 @@ kafka-safe-connector postgresql-connector + pinot-connector