Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions connectors/pinot-connector/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright © 2026 DataSQRL (contact@datasqrl.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.datasqrl.flinkrunner</groupId>
<artifactId>connectors</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>pinot-connector</artifactId>
<name>Apache Pinot Connector</name>

<properties>
<pinot.version>1.3.0</pinot.version>
</properties>

<dependencies>
<!-- Provided by Flink runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Pinot Flink connector — uses Flink 2.x Sink<T> API from Pinot 1.3.0+ -->
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-flink-connector</artifactId>
<version>${pinot.version}</version>
<exclusions>
<!-- Flink is provided at runtime; exclude transitive copies -->
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>*</artifactId>
</exclusion>
<!-- Pinot pulls in SLF4J 2.x; DataSQRL enforcer requires SLF4J 1.x -->
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</exclusion>
<!-- Pinot's Calcite is older than Flink's; exclude so Flink's version wins -->
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- AutoService generates META-INF/services at compile time -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>

<!-- Pin jackson-core to match jackson-databind 2.18.2 from pinot-spi.
helix-core 1.3.1 pulls in jackson-core 2.12.6 which lacks StreamReadConstraints
(added in 2.15), causing NoClassDefFoundError during segment flush. -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.18.2</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
<path>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>${auto.service.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>

<!-- Chronicle Core (used by pinot-segment-spi for off-heap buffers) accesses
JDK internals via reflection. These opens are required on JDK 17+. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
--add-opens java.base/java.io=ALL-UNNAMED
--add-opens java.base/java.nio=ALL-UNNAMED
--add-opens java.base/sun.nio.ch=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright © 2026 DataSQRL (contact@datasqrl.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datasqrl.flinkrunner.connector.pinot;

import com.google.auto.service.AutoService;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;

/**
* Flink Table factory for the {@code pinot} connector identifier.
*
* <p>Example SQRL / FlinkSQL usage:
*
* <pre>{@code
* CREATE TABLE PinotSink (
* user_id BIGINT,
* event_ts TIMESTAMP_LTZ(3),
* action STRING
* ) WITH (
* 'connector' = 'pinot',
* 'controller.url' = 'http://pinot-controller:9000',
* 'table.name' = 'user_events_OFFLINE',
* 'segment.flush.rows' = '200000'
* );
*
* EXPORT user_events -> PinotSink;
* }</pre>
*/
@AutoService(Factory.class)
public class PinotDynamicTableFactory implements DynamicTableSinkFactory {

public static final String IDENTIFIER = "pinot";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Set.of(PinotOptions.CONTROLLER_URL, PinotOptions.TABLE_NAME);
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(PinotOptions.SEGMENT_FLUSH_ROWS);
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
var helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
var options = helper.getOptions();

return new PinotDynamicTableSink(
options.get(PinotOptions.CONTROLLER_URL),
options.get(PinotOptions.TABLE_NAME),
options.get(PinotOptions.SEGMENT_FLUSH_ROWS),
context.getPhysicalRowDataType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright © 2026 DataSQRL (contact@datasqrl.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datasqrl.flinkrunner.connector.pinot;

import java.util.Collections;
import java.util.HashMap;
import lombok.RequiredArgsConstructor;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;

/** Planner-facing description of the Pinot OFFLINE sink. */
@RequiredArgsConstructor
public class PinotDynamicTableSink implements DynamicTableSink {

private static final String DEFAULT_OUTPUT_DIR = "/tmp/pinotoutput";

private final String controllerUrl;
private final String tableName;
private final long segmentFlushRows;
private final DataType physicalRowDataType;

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return ChangelogMode.insertOnly();
}

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
var fieldNames = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
var fieldDataTypes = DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]);

var converter = new PinotRowDataConverter(fieldNames, fieldDataTypes);
var schema = buildPinotSchema(fieldNames, fieldDataTypes);
var tableConfig = buildTableConfig();

return SinkV2Provider.of(
new PinotRowDataSink(tableConfig, schema, segmentFlushRows, converter));
}

@Override
public DynamicTableSink copy() {
return new PinotDynamicTableSink(
controllerUrl, tableName, segmentFlushRows, physicalRowDataType);
}

@Override
public String asSummaryString() {
return "Pinot[" + tableName + "]";
}

private Schema buildPinotSchema(String[] fieldNames, DataType[] fieldDataTypes) {
var builder = new Schema.SchemaBuilder().setSchemaName(tableName);
for (int i = 0; i < fieldNames.length; i++) {
builder.addSingleValueDimension(
fieldNames[i], toPinotDataType(fieldDataTypes[i].getLogicalType()));
}
return builder.build();
}

private TableConfig buildTableConfig() {
var batchConfigMap = new HashMap<String, String>();
batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, DEFAULT_OUTPUT_DIR);
batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, controllerUrl);

var ingestionConfig = new IngestionConfig();
ingestionConfig.setBatchIngestionConfig(
new BatchIngestionConfig(Collections.singletonList(batchConfigMap), "APPEND", "HOURLY"));

return new TableConfigBuilder(TableType.OFFLINE)
.setTableName(tableName)
.setIngestionConfig(ingestionConfig)
.build();
}

private FieldSpec.DataType toPinotDataType(LogicalType type) {
return switch (type.getTypeRoot()) {
case BOOLEAN -> FieldSpec.DataType.BOOLEAN;
case TINYINT, SMALLINT, INTEGER, DATE, TIME_WITHOUT_TIME_ZONE -> FieldSpec.DataType.INT;
case BIGINT, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE ->
FieldSpec.DataType.LONG;
case FLOAT -> FieldSpec.DataType.FLOAT;
case DOUBLE, DECIMAL -> FieldSpec.DataType.DOUBLE;
case CHAR, VARCHAR -> FieldSpec.DataType.STRING;
case BINARY, VARBINARY -> FieldSpec.DataType.BYTES;
default ->
throw new UnsupportedOperationException(
"No Pinot type mapping for Flink type: " + type.getTypeRoot());
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2026 DataSQRL (contact@datasqrl.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datasqrl.flinkrunner.connector.pinot;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class PinotOptions {

public static final ConfigOption<String> CONTROLLER_URL =
ConfigOptions.key("controller.url")
.stringType()
.noDefaultValue()
.withDescription("Pinot controller base URL, e.g. http://localhost:9000");

public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table.name")
.stringType()
.noDefaultValue()
.withDescription("Target Pinot OFFLINE table name");

public static final ConfigOption<Long> SEGMENT_FLUSH_ROWS =
ConfigOptions.key("segment.flush.rows")
.longType()
.defaultValue(500_000L)
.withDescription("Max rows buffered per segment before it is flushed and uploaded");
}
Loading
Loading