From dca055e4e321dfcf2a80d5ed89c2c3c548d62afa Mon Sep 17 00:00:00 2001 From: Dennis-Mircea Ciupitu Date: Fri, 1 May 2026 20:43:08 +0300 Subject: [PATCH] [FLINK-39588][flink-table][connector/datagen] Migrate the DataGen table connector off deprecated source APIs --- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 82 -------- .../flink-table-api-java-bridge/pom.xml | 6 + .../datagen/table/DataGenTableSource.java | 83 ++++++-- .../table/DataGenTableSourceFactory.java | 6 +- .../datagen/table/DataGenVisitorBase.java | 31 +-- .../datagen/table/DataGeneratorContainer.java | 15 +- .../datagen/table/RandomGeneratorVisitor.java | 66 +++--- .../table/SequenceGeneratorVisitor.java | 58 ++---- .../table/types/DataGeneratorMapper.java | 34 ++-- .../types/DecimalDataRandomGenerator.java | 20 +- .../table/types/RandomGeneratorFunction.java | 190 ++++++++++++++++++ .../datagen/table/types/RowDataGenerator.java | 47 ++--- .../types/SequenceGeneratorFunction.java | 169 ++++++++++++++++ .../types/DecimalDataRandomGeneratorTest.java | 4 +- .../DataGenTableSourceFactoryTest.java | 156 ++++++-------- 15 files changed, 597 insertions(+), 370 deletions(-) create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RandomGeneratorFunction.java create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/SequenceGeneratorFunction.java diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index e8e2bdbd5cdd8..8cd134c1a03e1 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -1,13 +1,4 @@ -Class implements interface in (DataGenVisitorBase.java:0) Class extends class in (DataGenVisitorBase.java:0) -Class extends class in (RandomGeneratorVisitor.java:0) -Class extends class in (RandomGeneratorVisitor.java:0) -Class extends class in (RandomGeneratorVisitor.java:0) -Class extends class in (SequenceGeneratorVisitor.java:0) -Class extends class in (SequenceGeneratorVisitor.java:0) -Class implements interface in (DataGeneratorMapper.java:0) -Class implements interface in (DecimalDataRandomGenerator.java:0) -Class implements interface in (RowDataGenerator.java:0) Class implements interface in (FileSink.java:0) Class has generic interface , org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest>> with type argument depending on in (CompactCoordinator.java:0) Class extends class in (CompactCoordinatorFactory.java:0) @@ -40,19 +31,7 @@ Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method in (DataGeneratorSource.java:141) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, org.apache.flink.api.common.typeinfo.TypeInformation)> has parameter of type in (DataGeneratorSource.java:0) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> has parameter of type in (GeneratorSourceReaderFactory.java:0) -Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, java.lang.Integer)> depends on component type in (DataGenTableSource.java:0) -Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, java.lang.Integer)> has parameter of type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (DataGenTableSource.java:0) Constructor (java.lang.String, org.apache.flink.configuration.ReadableConfig)> calls constructor ()> in (DataGenVisitorBase.java:49) -Constructor (org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, java.util.Set)> has parameter of type in (DataGeneratorContainer.java:0) -Constructor (int)> calls constructor ()> in (RandomGeneratorVisitor.java:461) -Constructor (java.time.Duration)> calls constructor ()> in (RandomGeneratorVisitor.java:476) -Constructor (int)> calls constructor ()> in (RandomGeneratorVisitor.java:491) -Constructor (long, long)> calls constructor (long, long)> in (SequenceGeneratorVisitor.java:207) -Constructor (long, long)> calls constructor (long, long)> in (SequenceGeneratorVisitor.java:216) -Constructor (org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, org.apache.flink.util.function.SerializableFunction, float)> has generic parameter type > with type argument depending on in (DataGeneratorMapper.java:0) -Constructor (org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, org.apache.flink.util.function.SerializableFunction, float)> has parameter of type in (DataGeneratorMapper.java:0) -Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.util.List, float)> depends on component type in (RowDataGenerator.java:0) -Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.util.List, float)> has parameter of type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (RowDataGenerator.java:0) Constructor (org.apache.flink.core.fs.Path, long, org.apache.flink.api.common.serialization.BulkWriter$Factory, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner, org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy, org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type in (FileSink.java:0) Constructor (org.apache.flink.core.fs.Path, org.apache.flink.api.common.serialization.BulkWriter$Factory, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls method in (FileSink.java:548) Constructor (org.apache.flink.core.fs.Path, long, org.apache.flink.api.common.serialization.Encoder, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type in (FileSink.java:0) @@ -132,12 +111,6 @@ Constructor (org.apache.flink.table.connector.sink.DynamicTableSink$DataStructureConverter, java.lang.String, boolean)> calls constructor (java.lang.String, boolean)> in (PrintTableSinkFactory.java:173) Constructor (org.apache.flink.table.connector.sink.DynamicTableSink$DataStructureConverter, java.lang.String, boolean)> calls constructor ()> in (PrintTableSinkFactory.java:171) Field has type in (GeneratorSourceReaderFactory.java:0) -Field depends on component type in (DataGenTableSource.java:0) -Field has type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (DataGenTableSource.java:0) -Field has type in (DataGeneratorContainer.java:0) -Field has type in (DataGeneratorMapper.java:0) -Field depends on component type in (RowDataGenerator.java:0) -Field has type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (RowDataGenerator.java:0) Field has type in (FileSink.java:0) Field has type in (FileSink.java:0) Field has type in (FileSinkCommittable.java:0) @@ -200,63 +173,8 @@ Method is annotated with in (DataGeneratorSource.java:0) Method calls constructor (org.apache.flink.api.connector.source.SourceReader, org.apache.flink.api.connector.source.util.ratelimit.RateLimiter)> in (GeneratorSourceReaderFactory.java:63) Method calls method in (GeneratorSourceReaderFactory.java:62) -Method calls constructor (org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, long, java.lang.Long)> in (DataGenTableSource.java:72) -Method has return type in (DataGenTableSource.java:0) Method is annotated with in (DataGenTableSource.java:0) -Method calls method in (DataGenTableSource.java:66) -Method has return type in (DataGeneratorContainer.java:0) -Method has parameter of type <[Lorg.apache.flink.configuration.ConfigOption;> in (DataGeneratorContainer.java:0) -Method has parameter of type in (DataGeneratorContainer.java:0) -Method has return type in (RandomGeneratorVisitor.java:0) -Method has return type in (RandomGeneratorVisitor.java:0) -Method has return type in (RandomGeneratorVisitor.java:0) -Method calls method in (RandomGeneratorVisitor.java:352) -Method calls method in (RandomGeneratorVisitor.java:231) -Method calls method in (RandomGeneratorVisitor.java:232) -Method calls method in (RandomGeneratorVisitor.java:137) -Method calls method in (RandomGeneratorVisitor.java:137) -Method calls method in (RandomGeneratorVisitor.java:144) -Method calls method in (RandomGeneratorVisitor.java:300) -Method calls method in (RandomGeneratorVisitor.java:301) -Method calls method in (RandomGeneratorVisitor.java:257) -Method calls method in (RandomGeneratorVisitor.java:258) -Method calls method in (RandomGeneratorVisitor.java:244) -Method calls method in (RandomGeneratorVisitor.java:245) -Method calls method in (RandomGeneratorVisitor.java:218) -Method calls method in (RandomGeneratorVisitor.java:219) -Method calls method in (RandomGeneratorVisitor.java:338) -Method calls method in (RandomGeneratorVisitor.java:405) -Method calls method in (RandomGeneratorVisitor.java:372) -Method calls method in (RandomGeneratorVisitor.java:370) -Method calls method in (RandomGeneratorVisitor.java:204) -Method calls method in (RandomGeneratorVisitor.java:206) -Method calls method in (RandomGeneratorVisitor.java:314) -Method calls method in (RandomGeneratorVisitor.java:190) -Method calls method in (RandomGeneratorVisitor.java:192) -Method calls method in (RandomGeneratorVisitor.java:179) -Method calls method in (RandomGeneratorVisitor.java:158) -Method calls method in (RandomGeneratorVisitor.java:159) -Method calls method in (RandomGeneratorVisitor.java:287) -Method calls method in (RandomGeneratorVisitor.java:288) -Method calls method in (RandomGeneratorVisitor.java:326) Method calls method in (SequenceGeneratorVisitor.java:221) -Method has return type in (SequenceGeneratorVisitor.java:0) -Method has return type in (SequenceGeneratorVisitor.java:0) -Method calls method in (SequenceGeneratorVisitor.java:172) -Method calls method in (SequenceGeneratorVisitor.java:108) -Method calls method in (SequenceGeneratorVisitor.java:197) -Method calls method in (SequenceGeneratorVisitor.java:189) -Method calls method in (SequenceGeneratorVisitor.java:180) -Method calls method in (SequenceGeneratorVisitor.java:164) -Method calls method in (SequenceGeneratorVisitor.java:155) -Method calls method in (SequenceGeneratorVisitor.java:146) -Method calls method in (DataGeneratorMapper.java:55) -Method calls method in (DataGeneratorMapper.java:61) -Method calls method in (DataGeneratorMapper.java:50) -Method calls method in (RowDataGenerator.java:68) -Method calls method in (RowDataGenerator.java:80) -Method calls method in (RowDataGenerator.java:54) -Method calls method in (RowDataGenerator.java:61) Method has return type in (FileSink.java:0) Method calls constructor (org.apache.flink.core.fs.RecoverableWriter, org.apache.flink.api.common.serialization.BulkWriter$Factory)> in (FileSink.java:694) Method has return type in (FileSink.java:0) diff --git a/flink-table/flink-table-api-java-bridge/pom.xml b/flink-table/flink-table-api-java-bridge/pom.xml index 896052be64bae..a90148cad5ee0 100644 --- a/flink-table/flink-table-api-java-bridge/pom.xml +++ b/flink-table/flink-table-api-java-bridge/pom.xml @@ -53,6 +53,12 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-datagen + ${project.version} + + diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java index 3c00d2a1a043b..2d01462625cba 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java @@ -20,38 +20,44 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.datagen.table.types.RowDataGenerator; -import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider; -import org.apache.flink.legacy.table.sources.StreamTableSource; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; -import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; +import org.apache.flink.connector.datagen.table.types.SequenceGeneratorFunction; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import javax.annotation.Nullable; -/** A {@link StreamTableSource} that emits generated data rows. */ +/** + * A {@link ScanTableSource} that emits generated row data via the FLIP-27 {@link + * DataGeneratorSource}. Each per-field {@link GeneratorFunction} is invoked with the current + * sequence index to assemble a {@link RowData}. + */ @Internal public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown { - private final DataGenerator[] fieldGenerators; + private final GeneratorFunction[] fieldGenerators; private final String tableName; private final DataType rowDataType; private final long rowsPerSecond; - private Long numberOfRows; + private @Nullable Long numberOfRows; private final @Nullable Integer parallelism; public DataGenTableSource( - DataGenerator[] fieldGenerators, + GeneratorFunction[] fieldGenerators, String tableName, DataType rowDataType, long rowsPerSecond, - Long numberOfRows, - Integer parallelism) { + @Nullable Long numberOfRows, + @Nullable Integer parallelism) { this.fieldGenerators = fieldGenerators; this.tableName = tableName; this.rowDataType = rowDataType; @@ -62,16 +68,61 @@ public DataGenTableSource( @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { - boolean isBounded = numberOfRows != null; - return SourceFunctionProvider.of(createSource(), isBounded, parallelism); + TypeInformation typeInfo = context.createTypeInformation(rowDataType); + return SourceProvider.of(createSource(typeInfo), parallelism); } @VisibleForTesting - public DataGeneratorSource createSource() { + public DataGeneratorSource createSource(TypeInformation typeInfo) { return new DataGeneratorSource<>( - new RowDataGenerator(fieldGenerators, DataType.getFieldNames(rowDataType), 0), - rowsPerSecond, - numberOfRows); + buildRowGenerator(), + computeEffectiveCount(), + RateLimiterStrategy.perSecond(rowsPerSecond), + typeInfo); + } + + /** + * Returns the per-field generator function that produces a single {@link RowData} for a given + * sequence index. Exposed for tests that exercise the generation logic directly. + */ + @VisibleForTesting + public GeneratorFunction buildRowGenerator() { + return new RowDataGenerator(fieldGenerators, DataType.getFieldNames(rowDataType), 0); + } + + /** + * Computes the bound passed to {@link DataGeneratorSource}. When {@code numberOfRows} is + * configured we honor it; otherwise we cap the source at the smallest sequence-field range to + * preserve the legacy "halt when any sequence field is exhausted" semantic, falling back to + * {@link Long#MAX_VALUE} when no sequence fields are present. + */ + @VisibleForTesting + public long computeEffectiveCount() { + if (numberOfRows != null) { + return numberOfRows; + } + long minSequenceCount = Long.MAX_VALUE; + boolean hasSequence = false; + for (GeneratorFunction generator : fieldGenerators) { + if (generator instanceof SequenceGeneratorFunction) { + hasSequence = true; + long total = ((SequenceGeneratorFunction) generator).getTotalCount(); + if (total < minSequenceCount) { + minSequenceCount = total; + } + } + } + return hasSequence ? minSequenceCount : Long.MAX_VALUE; + } + + @VisibleForTesting + public GeneratorFunction[] getFieldGenerators() { + return fieldGenerators; + } + + @VisibleForTesting + public @Nullable Integer getParallelism() { + return parallelism; } @Override diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java index fc55764fddf02..f32db0413410b 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java @@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; @@ -86,7 +86,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { context.getCatalogTable().getOptions().forEach(options::setString); DataType rowDataType = context.getPhysicalRowDataType(); - DataGenerator[] fieldGenerators = new DataGenerator[DataType.getFieldCount(rowDataType)]; + @SuppressWarnings({"unchecked"}) + GeneratorFunction[] fieldGenerators = + new GeneratorFunction[DataType.getFieldCount(rowDataType)]; Set> optionalOptions = new HashSet<>(); List fieldNames = DataType.getFieldNames(rowDataType); diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java index 8fbc894e9966c..ebd5e55267ec3 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenVisitorBase.java @@ -19,10 +19,8 @@ package org.apache.flink.connector.datagen.table; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.LogicalType; @@ -53,13 +51,12 @@ protected DataGenVisitorBase(String name, ReadableConfig config) { @Override public DataGeneratorContainer visit(DateType dateType) { - return DataGeneratorContainer.of( - TimeGenerator.of(() -> (int) LocalDate.now().toEpochDay())); + return DataGeneratorContainer.of(timeGenerator(() -> (int) LocalDate.now().toEpochDay())); } @Override public DataGeneratorContainer visit(TimeType timeType) { - return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_DAY))); + return DataGeneratorContainer.of(timeGenerator(() -> LocalTime.now().get(MILLI_OF_DAY))); } @Override @@ -69,25 +66,7 @@ protected DataGeneratorContainer defaultMethod(LogicalType logicalType) { private interface SerializableSupplier extends Supplier, Serializable {} - private abstract static class TimeGenerator implements DataGenerator { - - public static TimeGenerator of(SerializableSupplier supplier) { - return new TimeGenerator() { - @Override - public T next() { - return supplier.get(); - } - }; - } - - @Override - public void open( - String name, FunctionInitializationContext context, RuntimeContext runtimeContext) - throws Exception {} - - @Override - public boolean hasNext() { - return true; - } + private static GeneratorFunction timeGenerator(SerializableSupplier supplier) { + return value -> supplier.get(); } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGeneratorContainer.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGeneratorContainer.java index 5c4e850170b03..baf4307409f34 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGeneratorContainer.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGeneratorContainer.java @@ -20,31 +20,32 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import java.util.Arrays; import java.util.HashSet; import java.util.Set; -/** Container class for wrapping a {@link DataGenerator with its configuration options}. */ +/** Container class for wrapping a {@link GeneratorFunction} with its configuration options. */ @Internal public class DataGeneratorContainer { - - private final DataGenerator generator; + private final GeneratorFunction generator; /** Generator config options, for validation. */ private final Set> options; - private DataGeneratorContainer(DataGenerator generator, Set> options) { + private DataGeneratorContainer( + GeneratorFunction generator, Set> options) { this.generator = generator; this.options = options; } - public static DataGeneratorContainer of(DataGenerator generator, ConfigOption... options) { + public static DataGeneratorContainer of( + GeneratorFunction generator, ConfigOption... options) { return new DataGeneratorContainer(generator, new HashSet<>(Arrays.asList(options))); } - public DataGenerator getGenerator() { + public GeneratorFunction getGenerator() { return generator; } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java index 4c3a399c11b91..7ae77b8211e0e 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java @@ -22,11 +22,11 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.datagen.table.types.DataGeneratorMapper; import org.apache.flink.connector.datagen.table.types.DecimalDataRandomGenerator; +import org.apache.flink.connector.datagen.table.types.RandomGeneratorFunction; import org.apache.flink.connector.datagen.table.types.RowDataGenerator; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; -import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; @@ -134,7 +134,7 @@ public RandomGeneratorVisitor(String name, ReadableConfig config) { public DataGeneratorContainer visit(BooleanType booleanType) { ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.booleanGenerator().withNullRate(config.get(nr)), nr); + RandomGeneratorFunction.booleanGenerator().withNullRate(config.get(nr)), nr); } @Override @@ -187,7 +187,7 @@ public DataGeneratorContainer visit(TinyIntType tinyIntType) { ConfigOption max = maxKey.intType().defaultValue((int) Byte.MAX_VALUE); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.byteGenerator( + RandomGeneratorFunction.byteGenerator( config.get(min).byteValue(), config.get(max).byteValue()) .withNullRate(config.get(nr)), min, @@ -201,7 +201,7 @@ public DataGeneratorContainer visit(SmallIntType smallIntType) { ConfigOption max = maxKey.intType().defaultValue((int) Short.MAX_VALUE); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.shortGenerator( + RandomGeneratorFunction.shortGenerator( config.get(min).shortValue(), config.get(max).shortValue()) .withNullRate(config.get(nr)), min, @@ -215,7 +215,7 @@ public DataGeneratorContainer visit(IntType integerType) { ConfigOption max = maxKey.intType().defaultValue(Integer.MAX_VALUE); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.intGenerator(config.get(min), config.get(max)) + RandomGeneratorFunction.intGenerator(config.get(min), config.get(max)) .withNullRate(config.get(nr)), min, max, @@ -228,7 +228,7 @@ public DataGeneratorContainer visit(BigIntType bigIntType) { ConfigOption max = maxKey.longType().defaultValue(Long.MAX_VALUE); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.longGenerator(config.get(min), config.get(max)) + RandomGeneratorFunction.longGenerator(config.get(min), config.get(max)) .withNullRate(config.get(nr)), min, max, @@ -241,7 +241,7 @@ public DataGeneratorContainer visit(FloatType floatType) { ConfigOption max = maxKey.floatType().defaultValue(Float.MAX_VALUE); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.floatGenerator(config.get(min), config.get(max)) + RandomGeneratorFunction.floatGenerator(config.get(min), config.get(max)) .withNullRate(config.get(nr)), min, max, @@ -254,7 +254,7 @@ public DataGeneratorContainer visit(DoubleType doubleType) { ConfigOption max = maxKey.doubleType().defaultValue(Double.MAX_VALUE); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.doubleGenerator(config.get(min), config.get(max)) + RandomGeneratorFunction.doubleGenerator(config.get(min), config.get(max)) .withNullRate(config.get(nr)), min, max, @@ -284,7 +284,7 @@ public DataGeneratorContainer visit(YearMonthIntervalType yearMonthIntervalType) ConfigOption max = maxKey.intType().defaultValue(120000); // Period max ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.intGenerator(config.get(min), config.get(max)) + RandomGeneratorFunction.intGenerator(config.get(min), config.get(max)) .withNullRate(config.get(nr)), min, max, @@ -297,7 +297,7 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) { ConfigOption max = maxKey.longType().defaultValue(Long.MAX_VALUE); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - RandomGenerator.longGenerator(config.get(min), config.get(max)) + RandomGeneratorFunction.longGenerator(config.get(min), config.get(max)) .withNullRate(config.get(nr)), min, max, @@ -348,8 +348,10 @@ public DataGeneratorContainer visit(ArrayType arrayType) { DataGeneratorContainer container = arrayType.getElementType().accept(new RandomGeneratorVisitor(fieldName, config)); ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); - DataGenerator generator = - RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption)); + GeneratorFunction generator = + RandomGeneratorFunction.arrayGenerator( + (GeneratorFunction) container.getGenerator(), + config.get(lenOption)); Set> options = container.getOptions(); options.add(nr); options.add(lenOption); @@ -366,10 +368,10 @@ public DataGeneratorContainer visit(MultisetType multisetType) { DataGeneratorContainer container = multisetType.getElementType().accept(new RandomGeneratorVisitor(fieldName, config)); - DataGenerator> mapGenerator = - RandomGenerator.mapGenerator( - container.getGenerator(), - RandomGenerator.intGenerator(0, 10), + GeneratorFunction> mapGenerator = + RandomGeneratorFunction.mapGenerator( + (GeneratorFunction) container.getGenerator(), + RandomGeneratorFunction.intGenerator(0, 10), config.get(lenOption)); Set> options = container.getOptions(); @@ -401,10 +403,10 @@ public DataGeneratorContainer visit(MapType mapType) { options.add(nr); options.add(lenOption); - DataGenerator> mapGenerator = - RandomGenerator.mapGenerator( - keyContainer.getGenerator(), - valContainer.getGenerator(), + GeneratorFunction> mapGenerator = + RandomGeneratorFunction.mapGenerator( + (GeneratorFunction) keyContainer.getGenerator(), + (GeneratorFunction) valContainer.getGenerator(), config.get(lenOption)); return DataGeneratorContainer.of( @@ -432,10 +434,10 @@ public DataGeneratorContainer visit(RowType rowType) { ConfigOption nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); fieldOptions.add(nr); - DataGenerator[] generators = + GeneratorFunction[] generators = fieldContainers.stream() .map(DataGeneratorContainer::getGenerator) - .toArray(DataGenerator[]::new); + .toArray(GeneratorFunction[]::new); return DataGeneratorContainer.of( new RowDataGenerator(generators, rowType.getFieldNames(), config.get(nr)), @@ -457,10 +459,10 @@ private ConfigOption getLengthOption(Supplier defaultLengthSup .defaultValue(defaultLengthSupplier.get()); } - private static RandomGenerator getRandomStringGenerator(int length) { - return new RandomGenerator() { + private static RandomGeneratorFunction getRandomStringGenerator(int length) { + return new RandomGeneratorFunction() { @Override - public StringData next() { + public StringData map(Long value) { if (nullRate == NULL_RATE_DEFAULT || ThreadLocalRandom.current().nextFloat() > nullRate) { int len = generateLength(length, varLen); @@ -471,11 +473,11 @@ public StringData next() { }; } - private static RandomGenerator getRandomPastTimestampGenerator( + private static RandomGeneratorFunction getRandomPastTimestampGenerator( Duration maxPast) { - return new RandomGenerator() { + return new RandomGeneratorFunction() { @Override - public TimestampData next() { + public TimestampData map(Long value) { if (nullRate == NULL_RATE_DEFAULT || ThreadLocalRandom.current().nextFloat() > nullRate) { long maxPastMillis = maxPast.toMillis(); @@ -487,10 +489,10 @@ public TimestampData next() { }; } - private static RandomGenerator getRandomBytesGenerator(int length) { - return new RandomGenerator() { + private static RandomGeneratorFunction getRandomBytesGenerator(int length) { + return new RandomGeneratorFunction() { @Override - public byte[] next() { + public byte[] map(Long value) { byte[] arr = new byte[generateLength(length, varLen)]; random.getRandomGenerator().nextBytes(arr); return arr; diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java index 7c7b6045186d7..734747861e239 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java @@ -22,10 +22,9 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator; -import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator; +import org.apache.flink.connector.datagen.table.types.RandomGeneratorFunction; +import org.apache.flink.connector.datagen.table.types.SequenceGeneratorFunction; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BinaryType; import org.apache.flink.table.types.logical.BooleanType; @@ -39,8 +38,6 @@ import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; -import org.apache.flink.shaded.guava33.com.google.common.primitives.Longs; - import static org.apache.flink.configuration.ConfigOptions.key; /** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */ @@ -105,13 +102,14 @@ public SequenceGeneratorVisitor(String name, ReadableConfig config) { @Override public DataGeneratorContainer visit(BooleanType booleanType) { - return DataGeneratorContainer.of(RandomGenerator.booleanGenerator()); + return DataGeneratorContainer.of(RandomGeneratorFunction.booleanGenerator()); } @Override public DataGeneratorContainer visit(CharType charType) { return DataGeneratorContainer.of( - getSequenceStringGenerator(config.get(longStart), config.get(longEnd)), + SequenceGeneratorFunction.stringDataGenerator( + config.get(longStart), config.get(longEnd)), longStart, longEnd); } @@ -119,7 +117,8 @@ public DataGeneratorContainer visit(CharType charType) { @Override public DataGeneratorContainer visit(VarCharType varCharType) { return DataGeneratorContainer.of( - getSequenceStringGenerator(config.get(longStart), config.get(longEnd)), + SequenceGeneratorFunction.stringDataGenerator( + config.get(longStart), config.get(longEnd)), longStart, longEnd); } @@ -127,7 +126,8 @@ public DataGeneratorContainer visit(VarCharType varCharType) { @Override public DataGeneratorContainer visit(BinaryType binaryType) { return DataGeneratorContainer.of( - getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)), + SequenceGeneratorFunction.bytesGenerator( + config.get(longStart), config.get(longEnd)), longStart, longEnd); } @@ -135,7 +135,8 @@ public DataGeneratorContainer visit(BinaryType binaryType) { @Override public DataGeneratorContainer visit(VarBinaryType varBinaryType) { return DataGeneratorContainer.of( - getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)), + SequenceGeneratorFunction.bytesGenerator( + config.get(longStart), config.get(longEnd)), longStart, longEnd); } @@ -143,7 +144,7 @@ public DataGeneratorContainer visit(VarBinaryType varBinaryType) { @Override public DataGeneratorContainer visit(TinyIntType tinyIntType) { return DataGeneratorContainer.of( - SequenceGenerator.byteGenerator( + SequenceGeneratorFunction.byteGenerator( config.get(intStart).byteValue(), config.get(intEnd).byteValue()), intStart, intEnd); @@ -152,7 +153,7 @@ public DataGeneratorContainer visit(TinyIntType tinyIntType) { @Override public DataGeneratorContainer visit(SmallIntType smallIntType) { return DataGeneratorContainer.of( - SequenceGenerator.shortGenerator( + SequenceGeneratorFunction.shortGenerator( config.get(intStart).shortValue(), config.get(intEnd).shortValue()), intStart, intEnd); @@ -161,7 +162,7 @@ public DataGeneratorContainer visit(SmallIntType smallIntType) { @Override public DataGeneratorContainer visit(IntType integerType) { return DataGeneratorContainer.of( - SequenceGenerator.intGenerator(config.get(intStart), config.get(intEnd)), + SequenceGeneratorFunction.intGenerator(config.get(intStart), config.get(intEnd)), intStart, intEnd); } @@ -169,7 +170,7 @@ public DataGeneratorContainer visit(IntType integerType) { @Override public DataGeneratorContainer visit(BigIntType bigIntType) { return DataGeneratorContainer.of( - SequenceGenerator.longGenerator(config.get(longStart), config.get(longEnd)), + SequenceGeneratorFunction.longGenerator(config.get(longStart), config.get(longEnd)), longStart, longEnd); } @@ -177,7 +178,7 @@ public DataGeneratorContainer visit(BigIntType bigIntType) { @Override public DataGeneratorContainer visit(FloatType floatType) { return DataGeneratorContainer.of( - SequenceGenerator.floatGenerator( + SequenceGeneratorFunction.floatGenerator( config.get(intStart).shortValue(), config.get(intEnd).shortValue()), intStart, intEnd); @@ -186,7 +187,7 @@ public DataGeneratorContainer visit(FloatType floatType) { @Override public DataGeneratorContainer visit(DoubleType doubleType) { return DataGeneratorContainer.of( - SequenceGenerator.doubleGenerator(config.get(intStart), config.get(intEnd)), + SequenceGeneratorFunction.doubleGenerator(config.get(intStart), config.get(intEnd)), intStart, intEnd); } @@ -194,7 +195,7 @@ public DataGeneratorContainer visit(DoubleType doubleType) { @Override public DataGeneratorContainer visit(DecimalType decimalType) { return DataGeneratorContainer.of( - SequenceGenerator.bigDecimalGenerator( + SequenceGeneratorFunction.bigDecimalGenerator( config.get(intStart), config.get(intEnd), decimalType.getPrecision(), @@ -202,27 +203,4 @@ public DataGeneratorContainer visit(DecimalType decimalType) { intStart, intEnd); } - - private static SequenceGenerator getSequenceStringGenerator(long start, long end) { - return new SequenceGenerator(start, end) { - @Override - public StringData next() { - return StringData.fromString(valuesToEmit.poll().toString()); - } - }; - } - - private static SequenceGenerator getSequenceBytesGenerator(long start, long end) { - return new SequenceGenerator(start, end) { - @Override - public byte[] next() { - Long value = valuesToEmit.poll(); - if (value != null) { - return Longs.toByteArray(value); - } else { - return new byte[0]; - } - } - }; - } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java index 12c8a8aae033d..9cb367d275723 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java @@ -19,46 +19,44 @@ package org.apache.flink.connector.datagen.table.types; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.util.function.SerializableFunction; import java.util.concurrent.ThreadLocalRandom; -/** Utility for mapping the output of a {@link DataGenerator}. */ +/** + * Maps the output of an upstream {@link GeneratorFunction} through a {@link SerializableFunction}. + */ @Internal -public class DataGeneratorMapper implements DataGenerator { +public class DataGeneratorMapper implements GeneratorFunction { + + private static final long serialVersionUID = 1L; - private final DataGenerator generator; + private final GeneratorFunction generator; private final SerializableFunction mapper; private final float nullRate; public DataGeneratorMapper( - DataGenerator generator, SerializableFunction mapper, float nullRate) { + GeneratorFunction generator, + SerializableFunction mapper, + float nullRate) { this.generator = generator; this.mapper = mapper; this.nullRate = nullRate; } @Override - public void open( - String name, FunctionInitializationContext context, RuntimeContext runtimeContext) - throws Exception { - generator.open(name, context, runtimeContext); - } - - @Override - public boolean hasNext() { - return generator.hasNext(); + public void open(SourceReaderContext readerContext) throws Exception { + generator.open(readerContext); } @Override - public B next() { + public B map(Long value) throws Exception { if (nullRate == 0f || ThreadLocalRandom.current().nextFloat() > nullRate) { - return mapper.apply(generator.next()); + return mapper.apply(generator.map(value)); } return null; } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGenerator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGenerator.java index 44088ed4d1e09..7349474ac3058 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGenerator.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGenerator.java @@ -19,9 +19,7 @@ package org.apache.flink.connector.datagen.table.types; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.table.data.DecimalData; import org.apache.flink.util.Preconditions; @@ -32,7 +30,9 @@ /** Generates random {@link DecimalData} values. */ @Internal -public class DecimalDataRandomGenerator implements DataGenerator { +public class DecimalDataRandomGenerator implements GeneratorFunction { + + private static final long serialVersionUID = 1L; private final int precision; @@ -57,17 +57,7 @@ public DecimalDataRandomGenerator( } @Override - public void open( - String name, FunctionInitializationContext context, RuntimeContext runtimeContext) - throws Exception {} - - @Override - public boolean hasNext() { - return true; - } - - @Override - public DecimalData next() { + public DecimalData map(Long value) { if (nullRate == 0f || ThreadLocalRandom.current().nextFloat() > nullRate) { BigDecimal decimal = new BigDecimal( diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RandomGeneratorFunction.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RandomGeneratorFunction.java new file mode 100644 index 0000000000000..20e7c06199013 --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RandomGeneratorFunction.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.table.types; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.util.CollectionUtil; + +import org.apache.commons.math3.random.RandomDataGenerator; + +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; + +/** + * A {@link GeneratorFunction} that produces random elements of type {@code T}. The index argument + * is ignored; randomness is sourced from a per-subtask {@link RandomDataGenerator} initialized in + * {@link #open(SourceReaderContext)}. + * + *

This is the FLIP-27 replacement for the legacy {@code + * org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator}. It preserves the legacy + * null-rate semantics: a value is produced when {@code nullRate < random.nextFloat()}, otherwise + * {@code null} is emitted. + */ +@Internal +public abstract class RandomGeneratorFunction implements GeneratorFunction { + + private static final long serialVersionUID = 1L; + + protected transient RandomDataGenerator random; + protected float nullRate; + protected boolean varLen; + + @Override + public void open(SourceReaderContext readerContext) throws Exception { + this.random = new RandomDataGenerator(); + } + + public RandomGeneratorFunction withNullRate(float nullRate) { + this.nullRate = nullRate; + return this; + } + + public RandomGeneratorFunction withVarLen(boolean varLen) { + this.varLen = varLen; + return this; + } + + protected T nextWithNullRate(Supplier supplier) { + if (nullRate == 0f || nullRate < ThreadLocalRandom.current().nextFloat()) { + return supplier.get(); + } + return null; + } + + public static RandomGeneratorFunction longGenerator(long min, long max) { + return new RandomGeneratorFunction() { + @Override + public Long map(Long value) { + return nextWithNullRate(() -> random.nextLong(min, max)); + } + }; + } + + public static RandomGeneratorFunction intGenerator(int min, int max) { + return new RandomGeneratorFunction() { + @Override + public Integer map(Long value) { + return nextWithNullRate(() -> random.nextInt(min, max)); + } + }; + } + + public static RandomGeneratorFunction shortGenerator(short min, short max) { + return new RandomGeneratorFunction() { + @Override + public Short map(Long value) { + return nextWithNullRate(() -> (short) random.nextInt(min, max)); + } + }; + } + + public static RandomGeneratorFunction byteGenerator(byte min, byte max) { + return new RandomGeneratorFunction() { + @Override + public Byte map(Long value) { + return nextWithNullRate(() -> (byte) random.nextInt(min, max)); + } + }; + } + + public static RandomGeneratorFunction floatGenerator(float min, float max) { + return new RandomGeneratorFunction() { + @Override + public Float map(Long value) { + return nextWithNullRate(() -> (float) random.nextUniform(min, max)); + } + }; + } + + public static RandomGeneratorFunction doubleGenerator(double min, double max) { + return new RandomGeneratorFunction() { + @Override + public Double map(Long value) { + return nextWithNullRate(() -> random.nextUniform(min, max)); + } + }; + } + + public static RandomGeneratorFunction stringGenerator(int len) { + return new RandomGeneratorFunction() { + @Override + public String map(Long value) { + return nextWithNullRate(() -> random.nextHexString(len)); + } + }; + } + + public static RandomGeneratorFunction booleanGenerator() { + return new RandomGeneratorFunction() { + @Override + public Boolean map(Long value) { + return nextWithNullRate(() -> random.nextInt(0, 1) == 0); + } + }; + } + + public static RandomGeneratorFunction arrayGenerator( + GeneratorFunction generator, int len) { + return new RandomGeneratorFunction() { + + @Override + public void open(SourceReaderContext readerContext) throws Exception { + super.open(readerContext); + generator.open(readerContext); + } + + @Override + public T[] map(Long value) throws Exception { + @SuppressWarnings("unchecked") + T[] array = (T[]) new Object[len]; + + for (int i = 0; i < len; i++) { + array[i] = generator.map(value); + } + + return array; + } + }; + } + + public static RandomGeneratorFunction> mapGenerator( + GeneratorFunction key, GeneratorFunction val, int size) { + return new RandomGeneratorFunction>() { + + @Override + public void open(SourceReaderContext readerContext) throws Exception { + super.open(readerContext); + key.open(readerContext); + val.open(readerContext); + } + + @Override + public Map map(Long value) throws Exception { + Map map = CollectionUtil.newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) { + map.put(key.map(value), val.map(value)); + } + return map; + } + }; + } +} diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RowDataGenerator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RowDataGenerator.java index 73bb3db2d8b8c..5f63645c862e2 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RowDataGenerator.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/RowDataGenerator.java @@ -19,65 +19,48 @@ package org.apache.flink.connector.datagen.table.types; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -/** Data generator for Flink's internal {@link RowData} type. */ +/** + * Composite {@link GeneratorFunction} that builds a {@link RowData} by invoking each per-field + * generator with the current sequence index. With probability {@code nullRate}, an entire row is + * replaced with {@code null}, matching the legacy behavior. + */ @Internal -public class RowDataGenerator implements DataGenerator { +public class RowDataGenerator implements GeneratorFunction { private static final long serialVersionUID = 1L; - private final DataGenerator[] fieldGenerators; + private final GeneratorFunction[] fieldGenerators; private final List fieldNames; private final float nullRate; public RowDataGenerator( - DataGenerator[] fieldGenerators, List fieldNames, float nullRate) { + GeneratorFunction[] fieldGenerators, List fieldNames, float nullRate) { this.fieldGenerators = fieldGenerators; this.fieldNames = fieldNames; this.nullRate = nullRate; } @Override - public void open( - String name, FunctionInitializationContext context, RuntimeContext runtimeContext) - throws Exception { - for (int i = 0; i < fieldGenerators.length; i++) { - fieldGenerators[i].open(fieldNames.get(i), context, runtimeContext); - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - for (DataGenerator generator : fieldGenerators) { - generator.snapshotState(context); - } - } - - @Override - public boolean hasNext() { - for (DataGenerator generator : fieldGenerators) { - if (!generator.hasNext()) { - return false; - } + public void open(SourceReaderContext readerContext) throws Exception { + for (GeneratorFunction generator : fieldGenerators) { + generator.open(readerContext); } - return true; } @Override - public RowData next() { + public RowData map(Long value) throws Exception { if (nullRate == 0f || ThreadLocalRandom.current().nextFloat() > nullRate) { GenericRowData row = new GenericRowData(fieldNames.size()); for (int i = 0; i < fieldGenerators.length; i++) { - row.setField(i, fieldGenerators[i].next()); + row.setField(i, fieldGenerators[i].map(value)); } return row; } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/SequenceGeneratorFunction.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/SequenceGeneratorFunction.java new file mode 100644 index 0000000000000..44044ab862bf5 --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/SequenceGeneratorFunction.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.table.types; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.table.data.StringData; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava33.com.google.common.primitives.Longs; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; + +/** + * A {@link GeneratorFunction} that emits each value from a closed {@code [start, end]} interval + * exactly once when driven with indices in {@code [0, end - start]}. + * + *

This is the FLIP-27 replacement for the legacy {@code + * org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator}. Compared to the + * legacy implementation, the per-subtask checkpointed deque of remaining values is no longer + * needed: the {@code NumberSequenceSource} that drives the FLIP-27 {@code DataGeneratorSource} + * persists the remaining index range itself, and this function is a pure mapping {@code idx -> + * convert(start + idx)}. + * + *

The factory wiring guarantees that the source-level {@code count} never exceeds {@link + * #getTotalCount()}, so the {@code start + idx} computation never wraps past {@code end}. + * + * @param The produced element type. + */ +@Internal +public abstract class SequenceGeneratorFunction implements GeneratorFunction { + + private static final long serialVersionUID = 1L; + + private final long start; + private final long end; + + protected SequenceGeneratorFunction(long start, long end) { + Preconditions.checkArgument(end >= start, "end (%s) must be >= start (%s)", end, start); + this.start = start; + this.end = end; + } + + /** + * Returns the total number of distinct values produced by this generator, i.e. {@code end - + * start + 1}. + */ + public long getTotalCount() { + return end - start + 1; + } + + @Override + public final T map(Long value) { + return convert(start + value); + } + + /** Converts the produced {@code long} value to the desired output type. */ + protected abstract T convert(long value); + + public static SequenceGeneratorFunction longGenerator(long start, long end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected Long convert(long value) { + return value; + } + }; + } + + public static SequenceGeneratorFunction intGenerator(int start, int end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected Integer convert(long value) { + return (int) value; + } + }; + } + + public static SequenceGeneratorFunction shortGenerator(short start, short end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected Short convert(long value) { + return (short) value; + } + }; + } + + public static SequenceGeneratorFunction byteGenerator(byte start, byte end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected Byte convert(long value) { + return (byte) value; + } + }; + } + + public static SequenceGeneratorFunction floatGenerator(short start, short end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected Float convert(long value) { + return (float) value; + } + }; + } + + public static SequenceGeneratorFunction doubleGenerator(int start, int end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected Double convert(long value) { + return (double) value; + } + }; + } + + public static SequenceGeneratorFunction bigDecimalGenerator( + int start, int end, int precision, int scale) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected BigDecimal convert(long value) { + BigDecimal decimal = new BigDecimal((double) value, new MathContext(precision)); + return decimal.setScale(scale, RoundingMode.DOWN); + } + }; + } + + public static SequenceGeneratorFunction stringGenerator(long start, long end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected String convert(long value) { + return Long.toString(value); + } + }; + } + + public static SequenceGeneratorFunction stringDataGenerator(long start, long end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected StringData convert(long value) { + return StringData.fromString(Long.toString(value)); + } + }; + } + + public static SequenceGeneratorFunction bytesGenerator(long start, long end) { + return new SequenceGeneratorFunction(start, end) { + @Override + protected byte[] convert(long value) { + return Longs.toByteArray(value); + } + }; + } +} diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGeneratorTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGeneratorTest.java index ee4cb19b2a8b9..cd1336d23367c 100644 --- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGeneratorTest.java +++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/connector/datagen/table/types/DecimalDataRandomGeneratorTest.java @@ -37,7 +37,7 @@ void testGenerateDecimalValues() { new DecimalDataRandomGenerator( precision, scale, Double.MIN_VALUE, Double.MAX_VALUE, 0f); - DecimalData value = gen.next(); + DecimalData value = gen.map(0L); assertThat(value) .as("Null value for DECIMAL(" + precision + "," + scale + ")") .isNotNull(); @@ -97,7 +97,7 @@ void testMinMax() { DecimalDataRandomGenerator gen = new DecimalDataRandomGenerator( precision, scale, min.doubleValue(), max.doubleValue(), 0f); - DecimalData result = gen.next(); + DecimalData result = gen.map(0L); assertThat(result) .as("Null value for DECIMAL(" + precision + "," + scale + ")") diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java index 3025e59fc002c..76caf5ea8fa71 100644 --- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java +++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java @@ -18,37 +18,26 @@ package org.apache.flink.table.factories; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.datagen.table.DataGenConnectorOptions; import org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil; import org.apache.flink.connector.datagen.table.DataGenTableSource; import org.apache.flink.connector.datagen.table.DataGenTableSourceFactory; import org.apache.flink.connector.datagen.table.RandomGeneratorVisitor; -import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider; -import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; -import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSourceTest; -import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.connector.datagen.table.types.SequenceGeneratorFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.util.InstantiationUtil; import org.junit.jupiter.api.Test; import javax.annotation.Nullable; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -215,6 +204,8 @@ void testSource() throws Exception { List results = runGenerator(SCHEMA, descriptor); final long end = System.currentTimeMillis(); + // The two sequence fields use [50,60] (count 11) and [1,11] (count 11); both equal so the + // bound is 11. Without numberOfRows configured, the source halts at 11. assertThat(results).hasSize(11); for (int i = 0; i < results.size(); i++) { RowData row = results.get(i); @@ -414,6 +405,13 @@ void testFixedLengthDataType() throws Exception { "Custom length for fixed-length type (CHAR/BINARY) field 'f0' is not supported."); } + /** + * Drives the {@link DataGenTableSource}'s {@link GeneratorFunction} directly, exercising the + * factory wiring and per-field generator behavior end-to-end without needing a running cluster. + * Counts are bounded by {@link DataGenTableSource#computeEffectiveCount()} which preserves the + * legacy behavior of halting at the smallest configured sequence range when {@code + * number-of-rows} is not set. + */ private List runGenerator(ResolvedSchema schema, DescriptorProperties descriptor) throws Exception { DynamicTableSource source = createTableSource(schema, descriptor.asMap()); @@ -421,25 +419,24 @@ private List runGenerator(ResolvedSchema schema, DescriptorProperties d assertThat(source).isInstanceOf(DataGenTableSource.class); DataGenTableSource dataGenTableSource = (DataGenTableSource) source; - DataGeneratorSource gen = dataGenTableSource.createSource(); - - // test java serialization. - gen = InstantiationUtil.clone(gen); - - StreamSource> src = new StreamSource<>(gen); - AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); - testHarness.open(); - - TestContext ctx = new TestContext(); - - gen.run(ctx); - - return ctx.results; + GeneratorFunction rowGenerator = dataGenTableSource.buildRowGenerator(); + rowGenerator.open(stubReaderContext()); + + long count = dataGenTableSource.computeEffectiveCount(); + // Cap the number of generated rows to a sane upper bound when no explicit limit was + // configured (e.g. unbounded random-only sources). Tests that depend on a specific count + // always set NUMBER_OF_ROWS explicitly. + long limit = count == Long.MAX_VALUE ? 1_000L : count; + + List results = new ArrayList<>(); + for (long i = 0; i < limit; i++) { + results.add(rowGenerator.map(i)); + } + return results; } @Test - void testSequenceCheckpointRestore() throws Exception { + void testSequenceProducesEachValueExactlyOnce() throws Exception { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); descriptor.putString( @@ -455,24 +452,30 @@ void testSequenceCheckpointRestore() throws Exception { ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), descriptor.asMap()); - DataGenTableSource dataGenTableSource = (DataGenTableSource) dynamicTableSource; - DataGeneratorSource source = dataGenTableSource.createSource(); + DataGenTableSource source = (DataGenTableSource) dynamicTableSource; + + // The factory wires a SequenceGeneratorFunction[0..100] for f0; with no number-of-rows + // configured the effective bound matches the sequence range, so every value in [0,100] + // is produced exactly once when the source is driven across indices [0, count). + assertThat(source.computeEffectiveCount()).isEqualTo(101L); + assertThat(source.getFieldGenerators()[0]).isInstanceOf(SequenceGeneratorFunction.class); + assertThat(((SequenceGeneratorFunction) source.getFieldGenerators()[0]).getTotalCount()) + .isEqualTo(101L); - final int initElement = 0; - final int maxElement = 100; - final Set expectedOutput = new HashSet<>(); - for (long i = initElement; i <= maxElement; i++) { - expectedOutput.add(GenericRowData.of(i)); + List results = + runGenerator( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), descriptor); + + Set emitted = new HashSet<>(); + for (RowData row : results) { + emitted.add(row.getLong(0)); } - DataGeneratorSourceTest.innerTestDataGenCheckpointRestore( - () -> { - try { - return InstantiationUtil.clone(source); - } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException(e); - } - }, - expectedOutput); + Set expected = new HashSet<>(); + for (long i = 0; i <= 100; i++) { + expected.add(i); + } + assertThat(emitted).isEqualTo(expected); + assertThat(results).hasSize(101); } @Test @@ -643,13 +646,7 @@ void testWithParallelism() { assertThat(source).isInstanceOf(DataGenTableSource.class); DataGenTableSource dataGenTableSource = (DataGenTableSource) source; - ScanTableSource.ScanRuntimeProvider scanRuntimeProvider = - dataGenTableSource.getScanRuntimeProvider(new TestScanContext()); - assertThat(scanRuntimeProvider).isInstanceOf(SourceFunctionProvider.class); - - SourceFunctionProvider sourceFunctionProvider = - (SourceFunctionProvider) scanRuntimeProvider; - assertThat(sourceFunctionProvider.getParallelism()).hasValue(10); + assertThat(dataGenTableSource.getParallelism()).isEqualTo(10); } private void assertException( @@ -692,50 +689,13 @@ private void assertException( .satisfies(anyCauseMatches(ValidationException.class, expectedMessage)); } - private static class TestContext implements SourceFunction.SourceContext { - - private final Object lock = new Object(); - - private final List results = new ArrayList<>(); - - @Override - public void collect(RowData element) { - results.add(element); - } - - @Override - public void collectWithTimestamp(RowData element, long timestamp) {} - - @Override - public void emitWatermark(Watermark mark) {} - - @Override - public void markAsTemporarilyIdle() {} - - @Override - public Object getCheckpointLock() { - return lock; - } - - @Override - public void close() {} - } - - private static class TestScanContext implements ScanTableSource.ScanContext { - @Override - public TypeInformation createTypeInformation(DataType producedDataType) { - return null; - } - - @Override - public TypeInformation createTypeInformation(LogicalType producedLogicalType) { - return null; - } - - @Override - public DynamicTableSource.DataStructureConverter createDataStructureConverter( - DataType producedDataType) { - return null; - } + /** + * Returns a {@link SourceReaderContext} suitable for opening generator functions in unit tests. + * The generator implementations only call {@code open} for one-time initialization (e.g. {@code + * RandomGeneratorFunction} instantiating its {@code RandomDataGenerator}); they do not invoke + * any methods on the context, so a {@code null}-returning stub is sufficient. + */ + private static SourceReaderContext stubReaderContext() { + return null; } }