Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions flink-table/flink-table-api-java-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${project.version}</version>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,44 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.datagen.table.types.RowDataGenerator;
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
import org.apache.flink.legacy.table.sources.StreamTableSource;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.connector.datagen.table.types.SequenceGeneratorFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import javax.annotation.Nullable;

/** A {@link StreamTableSource} that emits generated data rows. */
/**
* A {@link ScanTableSource} that emits generated row data via the FLIP-27 {@link
* DataGeneratorSource}. Each per-field {@link GeneratorFunction} is invoked with the current
* sequence index to assemble a {@link RowData}.
*/
@Internal
public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown {

private final DataGenerator<?>[] fieldGenerators;
private final GeneratorFunction<Long, ?>[] fieldGenerators;
private final String tableName;
private final DataType rowDataType;
private final long rowsPerSecond;
private Long numberOfRows;
private @Nullable Long numberOfRows;
private final @Nullable Integer parallelism;

public DataGenTableSource(
DataGenerator<?>[] fieldGenerators,
GeneratorFunction<Long, ?>[] fieldGenerators,
String tableName,
DataType rowDataType,
long rowsPerSecond,
Long numberOfRows,
Integer parallelism) {
@Nullable Long numberOfRows,
@Nullable Integer parallelism) {
this.fieldGenerators = fieldGenerators;
this.tableName = tableName;
this.rowDataType = rowDataType;
Expand All @@ -62,16 +68,61 @@ public DataGenTableSource(

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
boolean isBounded = numberOfRows != null;
return SourceFunctionProvider.of(createSource(), isBounded, parallelism);
TypeInformation<RowData> typeInfo = context.createTypeInformation(rowDataType);
return SourceProvider.of(createSource(typeInfo), parallelism);
}

@VisibleForTesting
public DataGeneratorSource<RowData> createSource() {
public DataGeneratorSource<RowData> createSource(TypeInformation<RowData> typeInfo) {
return new DataGeneratorSource<>(
new RowDataGenerator(fieldGenerators, DataType.getFieldNames(rowDataType), 0),
rowsPerSecond,
numberOfRows);
buildRowGenerator(),
computeEffectiveCount(),
RateLimiterStrategy.perSecond(rowsPerSecond),
typeInfo);
}

/**
* Returns the per-field generator function that produces a single {@link RowData} for a given
* sequence index. Exposed for tests that exercise the generation logic directly.
*/
@VisibleForTesting
public GeneratorFunction<Long, RowData> buildRowGenerator() {
return new RowDataGenerator(fieldGenerators, DataType.getFieldNames(rowDataType), 0);
}

/**
* Computes the bound passed to {@link DataGeneratorSource}. When {@code numberOfRows} is
* configured we honor it; otherwise we cap the source at the smallest sequence-field range to
* preserve the legacy "halt when any sequence field is exhausted" semantic, falling back to
* {@link Long#MAX_VALUE} when no sequence fields are present.
*/
@VisibleForTesting
public long computeEffectiveCount() {
if (numberOfRows != null) {
return numberOfRows;
}
long minSequenceCount = Long.MAX_VALUE;
boolean hasSequence = false;
for (GeneratorFunction<Long, ?> generator : fieldGenerators) {
if (generator instanceof SequenceGeneratorFunction) {
hasSequence = true;
long total = ((SequenceGeneratorFunction<?>) generator).getTotalCount();
if (total < minSequenceCount) {
minSequenceCount = total;
}
}
}
return hasSequence ? minSequenceCount : Long.MAX_VALUE;
}

@VisibleForTesting
public GeneratorFunction<Long, ?>[] getFieldGenerators() {
return fieldGenerators;
}

@VisibleForTesting
public @Nullable Integer getParallelism() {
return parallelism;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
Expand Down Expand Up @@ -86,7 +86,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
context.getCatalogTable().getOptions().forEach(options::setString);

DataType rowDataType = context.getPhysicalRowDataType();
DataGenerator<?>[] fieldGenerators = new DataGenerator[DataType.getFieldCount(rowDataType)];
@SuppressWarnings({"unchecked"})
GeneratorFunction<Long, ?>[] fieldGenerators =
new GeneratorFunction[DataType.getFieldCount(rowDataType)];
Set<ConfigOption<?>> optionalOptions = new HashSet<>();

List<String> fieldNames = DataType.getFieldNames(rowDataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.flink.connector.datagen.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -53,13 +51,12 @@ protected DataGenVisitorBase(String name, ReadableConfig config) {

@Override
public DataGeneratorContainer visit(DateType dateType) {
return DataGeneratorContainer.of(
TimeGenerator.of(() -> (int) LocalDate.now().toEpochDay()));
return DataGeneratorContainer.of(timeGenerator(() -> (int) LocalDate.now().toEpochDay()));
}

@Override
public DataGeneratorContainer visit(TimeType timeType) {
return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_DAY)));
return DataGeneratorContainer.of(timeGenerator(() -> LocalTime.now().get(MILLI_OF_DAY)));
}

@Override
Expand All @@ -69,25 +66,7 @@ protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {

private interface SerializableSupplier<T> extends Supplier<T>, Serializable {}

private abstract static class TimeGenerator<T> implements DataGenerator<T> {

public static <T> TimeGenerator<T> of(SerializableSupplier<T> supplier) {
return new TimeGenerator<T>() {
@Override
public T next() {
return supplier.get();
}
};
}

@Override
public void open(
String name, FunctionInitializationContext context, RuntimeContext runtimeContext)
throws Exception {}

@Override
public boolean hasNext() {
return true;
}
private static <T> GeneratorFunction<Long, T> timeGenerator(SerializableSupplier<T> supplier) {
return value -> supplier.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,32 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.connector.datagen.source.GeneratorFunction;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/** Container class for wrapping a {@link DataGenerator with its configuration options}. */
/** Container class for wrapping a {@link GeneratorFunction} with its configuration options. */
@Internal
public class DataGeneratorContainer {

private final DataGenerator generator;
private final GeneratorFunction<Long, ?> generator;

/** Generator config options, for validation. */
private final Set<ConfigOption<?>> options;

private DataGeneratorContainer(DataGenerator generator, Set<ConfigOption<?>> options) {
private DataGeneratorContainer(
GeneratorFunction<Long, ?> generator, Set<ConfigOption<?>> options) {
this.generator = generator;
this.options = options;
}

public static DataGeneratorContainer of(DataGenerator generator, ConfigOption<?>... options) {
public static DataGeneratorContainer of(
GeneratorFunction<Long, ?> generator, ConfigOption<?>... options) {
return new DataGeneratorContainer(generator, new HashSet<>(Arrays.asList(options)));
}

public DataGenerator getGenerator() {
public GeneratorFunction<Long, ?> getGenerator() {
return generator;
}

Expand Down
Loading