From ae8039fe0150b344f50a2e7abda99a7126985904 Mon Sep 17 00:00:00 2001 From: Ashish Jhaveri Date: Mon, 11 May 2026 14:21:35 -0500 Subject: [PATCH 1/5] Implement rescale change --- .../streaming/connectors/psc/table/PscTableCommonUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java index 558d379..30aa360 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -252,6 +252,8 @@ private static int getTopicPartitionCount(List topicUris, Properties psc private PscTableCommonUtils() { throw new UnsupportedOperationException("Utility class should not be instantiated"); } + + public static int getEffectiveSourceParallelism(){} } From d3bcf6b2932cae025624b6b263360697cc43294a Mon Sep 17 00:00:00 2001 From: Ashish Jhaveri Date: Mon, 11 May 2026 14:45:18 -0500 Subject: [PATCH 2/5] Implement source operator rescaling based on scan.parallelism, table.exec.resource.default-parallelism and kafka partition count values --- .../psc/table/PscDynamicSource.java | 55 +++++----- .../psc/table/PscDynamicTableFactory.java | 26 ++--- .../psc/table/PscTableCommonUtils.java | 62 ++++++++++- .../psc/table/PscTableCommonUtilsTest.java | 102 ++++++++++++++++++ 4 files changed, 204 insertions(+), 41 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index d3cb44d..fc169e5 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -449,41 +449,44 @@ public DataStream produceDataStream( DataStreamSource sourceStream = execEnv.fromSource( pscSource, watermarkStrategy, "PscSource-" + tableIdentifier); - - // Source parallelism is determined by partition count (Flink's default for Kafka-like sources) - // We do NOT set it explicitly even if scan.parallelism is configured, because: - // - A Kafka source can only have as many active subtasks as there are partitions - // - Setting higher parallelism would create idle subtasks - // - Instead, we use rescale() to redistribute data to the intended downstream parallelism - + DataStream resultStream = sourceStream; - - // Determine the intended downstream parallelism for rate limiting - // This is scan.parallelism if set, otherwise global default parallelism - int intendedParallelism = getIntendedParallelism(execEnv); - - // Apply rescale FIRST if enabled - // This redistributes data from source parallelism (= partition count) to intended parallelism - // Ensures all downstream subtasks (including rate limiters) receive traffic + LOG.info("Rescale logic " + scanParallelism + "::" + enableRescale + "::" + execEnv.getParallelism()); if (enableRescale) { - resultStream = resultStream.rescale(); + if(scanParallelism != null && scanParallelism > 0 ) { + int sourceParallelism = Math.min(scanParallelism, execEnv.getParallelism()); + sourceStream.setParallelism(sourceParallelism); + LOG.info("Rescale enabled: set source parallelism to {} " + + "(partition count / effective parallelism): {}, " + + "job parallelism: {}", + sourceParallelism, scanParallelism, execEnv.getParallelism()); + } else { + LOG.info("Rescale enabled but could not determine partition count or " + + "from effective parallelism, " + + "source will use job default parallelism: {}", + execEnv.getParallelism()); + } + } + else + { + LOG.info("Rescale disabled: source will use job default " + + "parallelism = {}", execEnv.getParallelism()); } - - // Apply rate limiting AFTER rescale if configured - // Rate limiter parallelism must match the actual parallelism of incoming data: - // - If rescale enabled: use intendedParallelism (all subtasks are active after rescale) - // - If rescale disabled: use source parallelism (rate limiter stays with source) + + LOG.info("@@@@>>>enableRescale ::" + enableRescale); + if (isRateLimitingEnabled(rateLimitRecordsPerSecond)) { - int rateLimiterParallelism = enableRescale ? intendedParallelism : sourceStream.getParallelism(); - String rateLimiterOperatorName = "PscRateLimit-" + tableIdentifier; resultStream = resultStream .map(new PscRateLimitMap<>(rateLimitRecordsPerSecond)) - .setParallelism(rateLimiterParallelism) + .setParallelism(sourceStream.getParallelism()) .name(rateLimiterOperatorName) .uid(rateLimiterOperatorName); } - + if (enableRescale) { + resultStream = resultStream.rescale(); + } + // Prefer explicit user-provided UID prefix if present; otherwise rely on provider context. if (sourceUidPrefix != null) { final String trimmedPrefix = sourceUidPrefix.trim(); @@ -961,7 +964,7 @@ protected PscSource createPscSource( offsetResetConfig = getResetStrategy(offsetResetConfig); pscSourceBuilder.setStartingOffsets( OffsetsInitializer.committedOffsets(offsetResetConfig)); - LOG.info("Setting starting offsets to committed offsets with reset strategy: {}", offsetResetConfig); + LOG.info("####>>>Setting starting offsets to committed offsets with reset strategy: {}", offsetResetConfig); break; case SPECIFIC_OFFSETS: Map offsets = new HashMap<>(); diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index c5a25cf..56fa1d7 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -99,7 +99,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateDeliveryGuarantee; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSinkOptions; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSourceOptions; -import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale; +import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.getEffectiveSourceParallelism; /** * Factory for creating configured instances of {@link PscDynamicSource} and {@link @@ -220,22 +220,24 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final boolean shouldRescale =tableOptions.get(SCAN_ENABLE_RESCALE); // Read scan.parallelism configuration final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); - // Log scan parallelism configuration if set - if (scanParallelism != null) { - LOG.info("scan.parallelism configured: {} (will override partition-based parallelism)", - scanParallelism); - } - - // Determine if rescale should be applied based on scan.parallelism (if set) or global default - final boolean shouldRescale = shouldApplyRescale( - tableOptions, + final Integer effectiveParallelism = shouldRescale ? + getEffectiveSourceParallelism( context.getConfiguration(), getSourceTopicUris(tableOptions), properties, - scanParallelism); + scanParallelism) + : -1; + + + // Log scan parallelism configuration if set + if (shouldRescale && effectiveParallelism > 0 ) { + LOG.info("Psc will use parallelism={} for source operator", + effectiveParallelism); + } // Get rate limit configuration final Double rateLimitRecordsPerSecond = tableOptions.getOptional(SCAN_RATE_LIMIT).orElse(null); @@ -266,7 +268,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), shouldRescale, rateLimitRecordsPerSecond, - scanParallelism); + effectiveParallelism); } @Override diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java index 30aa360..40ed6fe 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -24,6 +24,7 @@ import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.slf4j.Logger; @@ -112,7 +113,7 @@ public static boolean shouldApplyRescale( List topicUris, Properties pscProperties, @Nullable Integer scanParallelism) { - + // First check if rescale is enabled by user if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { return false; @@ -164,6 +165,63 @@ public static boolean shouldApplyRescale( return shouldRescale; } + /** + * Determines the effective parallelism for the source by walking the following + * fallback chain and returning the first source that yields a positive value: + *
    + *
  1. {@code scan.parallelism} (table-level override) — used if non-null and not -1
  2. + *
  3. {@code table.exec.resource.default-parallelism} — used if set and not -1
  4. + *
  5. Kafka partition count for the source topics (queried via PSC metadata client)
  6. + *
+ * + *

If none of the above produces a positive value (e.g. the partition-count query + * fails), this method returns {@code -1} to signal "unknown". + * + * @param globalConfig Global Flink configuration (read for table.exec.resource.default-parallelism) + * @param topicUris List of topic URIs used to query partition count + * @param pscProperties PSC properties for metadata client connection + * @param scanParallelism Optional explicit scan.parallelism configuration + * @return Effective parallelism, or -1 if it cannot be determined + */ + public static int getEffectiveSourceParallelism( + ReadableConfig globalConfig, + List topicUris, + Properties pscProperties, + @Nullable Integer scanParallelism) { + + // 1) scan.parallelism + if (scanParallelism != null && scanParallelism > 0) { + LOG.info("Effective source parallelism = {} (source: {})", + scanParallelism, PscConnectorOptions.SCAN_PARALLELISM.key()); + return scanParallelism; + } + + // 2) table.exec.resource.default-parallelism + Integer tableExecParallelism = + globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + if (tableExecParallelism != null && tableExecParallelism > 0) { + LOG.info("Effective source parallelism = {} (source: {})", + tableExecParallelism, + ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key()); + return tableExecParallelism; + } + + // 3) Kafka partition count + int partitionCount = partitionCountProvider.getPartitionCount(topicUris, pscProperties); + if (partitionCount > 0) { + LOG.info("Effective source parallelism = {} (source: kafka partition count)", + partitionCount); + return partitionCount; + } + + // 4) Unknown + LOG.warn("Could not determine effective source parallelism: {} is unset/-1, {} is unset/-1, " + + "and partition count could not be retrieved. Returning -1.", + PscConnectorOptions.SCAN_PARALLELISM.key(), + ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key()); + return -1; + } + /** * Queries the minimum partition count across all specified topic URIs. * @@ -252,8 +310,6 @@ private static int getTopicPartitionCount(List topicUris, Properties psc private PscTableCommonUtils() { throw new UnsupportedOperationException("Utility class should not be instantiated"); } - - public static int getEffectiveSourceParallelism(){} } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java index c709e64..615a6ab 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java @@ -361,6 +361,108 @@ public void testProviderResetRestoresDefaultBehavior() { assertThat(resultAfterReset).isFalse(); // partition count = -1, fail-safe } + // ============================================ + // Tests for getEffectiveSourceParallelism() + // Precedence: scan.parallelism > table.exec.resource.default-parallelism > kafka partition count + // ============================================ + + @Test + public void testEffectiveParallelismFromScanParallelism() { + // Given: scan.parallelism = 12, others would also resolve but should be ignored + globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); + PscTableCommonUtils.setProviderForTest((topicUris, props) -> 7); + + // When + int parallelism = PscTableCommonUtils.getEffectiveSourceParallelism( + globalConfig, topicUris, pscProperties, 12); + + // Then: scan.parallelism wins + assertThat(parallelism).isEqualTo(12); + } + + @Test + public void testEffectiveParallelismFallsThroughWhenScanParallelismIsNull() { + // Given: scan.parallelism not set; table.exec set to 4; partition count = 7 + globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); + PscTableCommonUtils.setProviderForTest((topicUris, props) -> 7); + + // When + int parallelism = PscTableCommonUtils.getEffectiveSourceParallelism( + globalConfig, topicUris, pscProperties, null); + + // Then: table.exec wins (4) + assertThat(parallelism).isEqualTo(4); + } + + @Test + public void testEffectiveParallelismFallsThroughWhenScanParallelismIsMinusOne() { + // Given: scan.parallelism = -1 (unset sentinel) + globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); + PscTableCommonUtils.setProviderForTest((topicUris, props) -> 7); + + // When + int parallelism = PscTableCommonUtils.getEffectiveSourceParallelism( + globalConfig, topicUris, pscProperties, -1); + + // Then: -1 is treated as unset; table.exec wins (4) + assertThat(parallelism).isEqualTo(4); + } + + @Test + public void testEffectiveParallelismFallsThroughToPartitionCount() { + // Given: scan.parallelism unset; table.exec = -1; partition count = 7 + globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, -1); + PscTableCommonUtils.setProviderForTest((topicUris, props) -> 7); + + // When + int parallelism = PscTableCommonUtils.getEffectiveSourceParallelism( + globalConfig, topicUris, pscProperties, null); + + // Then: kafka partition count is used + assertThat(parallelism).isEqualTo(7); + } + + @Test + public void testEffectiveParallelismFallsThroughToPartitionCountWhenTableExecIsUnset() { + // Given: scan.parallelism unset; table.exec not configured at all (returns null/default) + // Note: globalConfig has no value for TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM + PscTableCommonUtils.setProviderForTest((topicUris, props) -> 9); + + // When + int parallelism = PscTableCommonUtils.getEffectiveSourceParallelism( + globalConfig, topicUris, pscProperties, null); + + // Then: kafka partition count is used + assertThat(parallelism).isEqualTo(9); + } + + @Test + public void testEffectiveParallelismReturnsMinusOneWhenAllSourcesFail() { + // Given: scan.parallelism unset; table.exec = -1; partition count provider returns -1 + globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, -1); + PscTableCommonUtils.setProviderForTest((topicUris, props) -> -1); + + // When + int parallelism = PscTableCommonUtils.getEffectiveSourceParallelism( + globalConfig, topicUris, pscProperties, null); + + // Then: -1 (unknown) + assertThat(parallelism).isEqualTo(-1); + } + + @Test + public void testEffectiveParallelismReturnsMinusOneWhenPartitionCountIsZero() { + // Given: scan.parallelism null; table.exec unset; partition count provider returns 0 + PscTableCommonUtils.setProviderForTest((topicUris, props) -> 0); + + // When + int parallelism = PscTableCommonUtils.getEffectiveSourceParallelism( + globalConfig, topicUris, pscProperties, null); + + // Then: 0 is invalid → -1 (unknown) + assertThat(parallelism).isEqualTo(-1); + } + // ============================================ // Tests validating PscMetadataClient configuration fix // ============================================ From 79695dbeb25309dd30f5d62c947c654c2e2882d3 Mon Sep 17 00:00:00 2001 From: Ashish Jhaveri Date: Mon, 11 May 2026 15:01:13 -0500 Subject: [PATCH 3/5] Removed obsolete code --- .../psc/table/PscTableCommonUtils.java | 74 ----- .../table/UpsertPscDynamicTableFactory.java | 26 +- .../psc/table/PscTableCommonUtilsTest.java | 296 ------------------ 3 files changed, 14 insertions(+), 382 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java index 40ed6fe..6f457c2 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtils.java @@ -91,80 +91,6 @@ static synchronized void resetProvider() { partitionCountProvider = PscTableCommonUtils::getTopicPartitionCount; } - /** - * Determines whether rescale() should be applied based on: - * 1. scan.enable-rescale flag must be true - * 2. Effective parallelism (scan.parallelism or global default) > partition count - * - *

When scan.parallelism is set, it takes precedence over global default for - * rescale decision logic. This ensures rescale is only applied when the user's - * intended source parallelism exceeds partition count. - * - * @param tableOptions User's table configuration options - * @param globalConfig Global Flink configuration - * @param topicUris List of topic URIs to query for partition counts - * @param pscProperties PSC properties for metadata client connection - * @param scanParallelism Optional explicit scan.parallelism configuration - * @return true if rescale should be applied, false otherwise - */ - public static boolean shouldApplyRescale( - ReadableConfig tableOptions, - ReadableConfig globalConfig, - List topicUris, - Properties pscProperties, - @Nullable Integer scanParallelism) { - - // First check if rescale is enabled by user - if (!tableOptions.get(SCAN_ENABLE_RESCALE)) { - return false; - } - - // Determine effective parallelism: scan.parallelism takes precedence over global default - Integer effectiveParallelism; - String parallelismSource; - - if (scanParallelism != null && scanParallelism > 0) { - effectiveParallelism = scanParallelism; - parallelismSource = PscConnectorOptions.SCAN_PARALLELISM.key(); - } else { - effectiveParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); - parallelismSource = ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(); - } - - // If parallelism is not set or invalid, cannot determine - if (effectiveParallelism == null || effectiveParallelism <= 0) { - LOG.info("scan.enable-rescale is true, but effective parallelism from {} is {} (not set or invalid). " + - "Rescale will not be applied.", - parallelismSource, effectiveParallelism); - return false; - } - - // Query partition count using the configured provider (mockable for testing) - int partitionCount = partitionCountProvider.getPartitionCount(topicUris, pscProperties); - - // If partition count couldn't be determined, don't apply rescale (fail-safe) - if (partitionCount <= 0) { - LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " + - "Rescale will not be applied."); - return false; - } - - // Apply rescale only if effective parallelism exceeds partition count - boolean shouldRescale = effectiveParallelism > partitionCount; - - if (shouldRescale) { - LOG.info("Applying rescale(): {} ({}) > partition count ({}). " + - "Data will be redistributed to fully utilize downstream operators.", - parallelismSource, effectiveParallelism, partitionCount); - } else { - LOG.info("Skipping rescale(): {} ({}) <= partition count ({}). " + - "No shuffle needed as source parallelism matches or is less than partition count.", - parallelismSource, effectiveParallelism, partitionCount); - } - - return shouldRescale; - } - /** * Determines the effective parallelism for the source by walking the following * fallback chain and returning the first source that yields a positive value: diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index 0805d0a..ccae228 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -85,7 +85,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateScanBoundedMode; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateConsumerClientOptions; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateProducerClientOptions; -import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale; +import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.getEffectiveSourceParallelism; /** Upsert-Psc factory. */ public class UpsertPscDynamicTableFactory @@ -161,22 +161,24 @@ public DynamicTableSource createDynamicTableSource(Context context) { final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + final boolean shouldRescale =tableOptions.get(SCAN_ENABLE_RESCALE); // Read scan.parallelism configuration final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); - // Log scan parallelism configuration if set - if (scanParallelism != null) { - LOG.info("scan.parallelism configured: {} (will override partition-based parallelism)", - scanParallelism); - } - - // Determine if rescale should be applied based on scan.parallelism (if set) or global default - final boolean shouldRescale = shouldApplyRescale( - tableOptions, + final Integer effectiveParallelism = shouldRescale ? + getEffectiveSourceParallelism( context.getConfiguration(), getSourceTopicUris(tableOptions), properties, - scanParallelism); + scanParallelism) + : -1; + + + // Log scan parallelism configuration if set + if (shouldRescale && effectiveParallelism > 0 ) { + LOG.info("Psc will use parallelism={} for source operator", + effectiveParallelism); + } // Get rate limit configuration final Double rateLimitRecordsPerSecond = tableOptions.getOptional(SCAN_RATE_LIMIT).orElse(null); @@ -208,7 +210,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null), shouldRescale, rateLimitRecordsPerSecond, - scanParallelism); + effectiveParallelism); } @Override diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java index 615a6ab..118af89 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableCommonUtilsTest.java @@ -65,302 +65,6 @@ public void tearDown() { PscTableCommonUtils.resetProvider(); } - // ============================================ - // Tests for rescale enabled flag checks - // ============================================ - - @Test - public void testShouldNotRescaleWhenDisabled() { - // Given: rescale is disabled - tableOptions.set(SCAN_ENABLE_RESCALE, false); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 10); - - // When: shouldApplyRescale is called - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - - // Then: rescale is not applied - assertThat(result).isFalse(); - } - - // ============================================ - // Tests for scan.parallelism with mocked partition counts - // ============================================ - - @Test - public void testShouldRescaleWhenScanParallelismExceedsPartitionCount() { - // Given: scan.parallelism = 10, partition count = 5, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); // Should be ignored - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 5); - - // When: shouldApplyRescale is called with scan.parallelism = 10 - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, 10); - - // Then: rescale is applied (10 > 5) - assertThat(result).isTrue(); - } - - @Test - public void testShouldNotRescaleWhenScanParallelismLessThanPartitionCount() { - // Given: scan.parallelism = 5, partition count = 20, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 100); // Should be ignored - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 20); - - // When: shouldApplyRescale is called with scan.parallelism = 5 - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, 5); - - // Then: rescale is not applied (5 < 20) - assertThat(result).isFalse(); - } - - @Test - public void testShouldNotRescaleWhenScanParallelismEqualsPartitionCount() { - // Given: scan.parallelism = 10, partition count = 10, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 10); - - // When: shouldApplyRescale is called with scan.parallelism = 10 - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, 10); - - // Then: rescale is not applied (10 == 10) - assertThat(result).isFalse(); - } - - // ============================================ - // Tests for global default parallelism with mocked partition counts - // ============================================ - - @Test - public void testShouldRescaleWhenGlobalParallelismExceedsPartitionCount() { - // Given: global parallelism = 50, partition count = 10, no scan.parallelism, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 50); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 10); - - // When: shouldApplyRescale is called with no scan.parallelism - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - - // Then: rescale is applied (50 > 10) - assertThat(result).isTrue(); - } - - @Test - public void testShouldNotRescaleWhenGlobalParallelismLessThanPartitionCount() { - // Given: global parallelism = 5, partition count = 20, no scan.parallelism, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 5); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 20); - - // When: shouldApplyRescale is called with no scan.parallelism - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - - // Then: rescale is not applied (5 < 20) - assertThat(result).isFalse(); - } - - // ============================================ - // Tests for invalid parallelism configurations - // ============================================ - - @Test - public void testShouldNotRescaleWhenScanParallelismIsZero() { - // Given: scan.parallelism = 0 (invalid), rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 10); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 5); - - // When: shouldApplyRescale is called with scan.parallelism = 0 - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, 0); - - // Then: falls back to global parallelism, rescale is applied (10 > 5) - assertThat(result).isTrue(); - } - - @Test - public void testShouldNotRescaleWhenScanParallelismIsNegative() { - // Given: scan.parallelism = -1 (invalid), rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 10); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 5); - - // When: shouldApplyRescale is called with scan.parallelism = -1 - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, -1); - - // Then: falls back to global parallelism, rescale is applied (10 > 5) - assertThat(result).isTrue(); - } - - @Test - public void testShouldNotRescaleWhenNoParallelismConfigured() { - // Given: no scan.parallelism, no global parallelism, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - // globalConfig has no default parallelism set (returns null) - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 10); - - // When: shouldApplyRescale is called - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - - // Then: rescale is not applied (no valid parallelism to compare) - assertThat(result).isFalse(); - } - - // ============================================ - // Tests for partition count edge cases - // ============================================ - - @Test - public void testShouldNotRescaleWhenPartitionCountCannotBeDetermined() { - // Given: partition count = -1 (cannot be determined), rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 10); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> -1); - - // When: shouldApplyRescale is called - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - - // Then: rescale is not applied (fail-safe behavior) - assertThat(result).isFalse(); - } - - @Test - public void testShouldNotRescaleWhenPartitionCountIsZero() { - // Given: partition count = 0 (invalid), rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 10); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 0); - - // When: shouldApplyRescale is called - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - - // Then: rescale is not applied (fail-safe behavior) - assertThat(result).isFalse(); - } - - // ============================================ - // Tests for scan.parallelism precedence - // ============================================ - - @Test - public void testScanParallelismTakesPrecedenceOverGlobalParallelism() { - // Given: scan.parallelism = 100, global parallelism = 5, partition count = 10, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 5); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 10); - - // When: shouldApplyRescale is called with scan.parallelism = 100 - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, 100); - - // Then: rescale is applied based on scan.parallelism (100 > 10), not global (5 < 10) - assertThat(result).isTrue(); - } - - // ============================================ - // Tests for multi-topic scenarios - // ============================================ - - @Test - public void testShouldRescaleWithMultipleTopics() { - // Given: multiple topics, min partition count = 8, scan.parallelism = 20, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - List multipleTopics = Arrays.asList( - "plaintext:kafka:local:test-cluster:/topic1", - "plaintext:kafka:local:test-cluster:/topic2" - ); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 8); - - // When: shouldApplyRescale is called - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, multipleTopics, pscProperties, 20); - - // Then: rescale is applied (20 > 8) - assertThat(result).isTrue(); - } - - // ============================================ - // Tests for high parallelism scenarios - // ============================================ - - @Test - public void testShouldRescaleWithHighParallelismAndLowPartitionCount() { - // Given: scan.parallelism = 2000, partition count = 10, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 10); - - // When: shouldApplyRescale is called with high parallelism - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, 2000); - - // Then: rescale is applied (2000 > 10) - assertThat(result).isTrue(); - } - - @Test - public void testShouldNotRescaleWithHighPartitionCountAndLowParallelism() { - // Given: scan.parallelism = 2, partition count = 1000, rescale enabled - tableOptions.set(SCAN_ENABLE_RESCALE, true); - - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 1000); - - // When: shouldApplyRescale is called - boolean result = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, 2); - - // Then: rescale is not applied (2 < 1000) - assertThat(result).isFalse(); - } - - // ============================================ - // Tests for provider reset mechanism - // ============================================ - - @Test - public void testProviderResetRestoresDefaultBehavior() { - // Given: custom provider is set - PscTableCommonUtils.setProviderForTest((topicUris, props) -> 42); - - tableOptions.set(SCAN_ENABLE_RESCALE, true); - globalConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 100); - - // Verify custom provider works - boolean resultWithMock = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - assertThat(resultWithMock).isTrue(); // 100 > 42 - - // When: provider is reset - PscTableCommonUtils.resetProvider(); - - // Then: default behavior is restored (returns -1 in unit test environment) - boolean resultAfterReset = PscTableCommonUtils.shouldApplyRescale( - tableOptions, globalConfig, topicUris, pscProperties, null); - assertThat(resultAfterReset).isFalse(); // partition count = -1, fail-safe - } - // ============================================ // Tests for getEffectiveSourceParallelism() // Precedence: scan.parallelism > table.exec.resource.default-parallelism > kafka partition count From 3b07b560a724b2e57f843a0084488b605f8cddb2 Mon Sep 17 00:00:00 2001 From: Ashish Jhaveri Date: Tue, 12 May 2026 11:19:58 -0500 Subject: [PATCH 4/5] Added Unit test coverage --- .../psc/table/PscDynamicTableFactoryTest.java | 610 +++++++++++++++--- .../UpsertPscDynamicTableFactoryTest.java | 212 +++++- 2 files changed, 723 insertions(+), 99 deletions(-) diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index 917cb38..e747e49 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -52,8 +52,12 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.WatermarkSpec; @@ -1438,6 +1442,34 @@ private void addScanParallelismConfig(Map options, int paralleli options.put(PscConnectorOptions.SCAN_PARALLELISM.key(), String.valueOf(parallelism)); } + /** + * Builds a PSC dynamic table source using {@link FactoryUtil#createDynamicTableSource} with + * a caller-supplied global {@link Configuration}. {@link FactoryMocks#createTableSource} + * hard-codes {@code new Configuration()}, so this overload is needed when a test must + * exercise factory behavior that reads from the global config (e.g. when the factory calls + * {@link PscTableCommonUtils#getEffectiveSourceParallelism} and we need to set + * {@code table.exec.resource.default-parallelism}). + */ + private DynamicTableSource createTableSourceWithGlobalConfig( + ResolvedSchema schema, + Map tableOptions, + Configuration globalConfig) { + return FactoryUtil.createDynamicTableSource( + null, + FactoryMocks.IDENTIFIER, + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(schema).build(), + "mock source", + Collections.emptyList(), + tableOptions), + schema), + Collections.emptyMap(), + globalConfig, + Thread.currentThread().getContextClassLoader(), + false); + } + @Test public void testOperatorChainingWithRateLimitOnly() { // When only rate limiting is enabled (rescale disabled), verify rate limiter is applied @@ -1469,36 +1501,50 @@ public void testOperatorChainingWithRateLimitOnly() { @Test public void testOperatorChainingWithRescaleAndRateLimit() { - // When both rescale and rate limiting are enabled, verify configuration + // Verifies the new operator chain when both rescale and rate limiting are enabled: + // Source -> PscRateLimit -> rescale (terminal PartitionTransformation) + // scan.parallelism (tier 1) determines the source/rate-limit parallelism, capped + // by the env parallelism in produceTransformationFromSource. final Map modifiedOptions = getModifiedOptions( getBasicSourceOptions(), options -> { addRescaleConfig(options, true); addRateLimitConfig(options, 5000.0); - addScanParallelismConfig(options, 100); // Explicitly set higher than partition count + addScanParallelismConfig(options, 100); }); - + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Verify configuration is correctly set - assertThat(pscSource.rateLimitRecordsPerSecond).isNotNull(); + assertThat(pscSource.enableRescale).isTrue(); assertThat(pscSource.rateLimitRecordsPerSecond).isEqualTo(5000.0); assertThat(pscSource.scanParallelism).isEqualTo(100); - - // Get transformation using helper method (reuses same source) - final Transformation transformation = produceTransformationFromSource(pscSource, 10); - - // The final transformation should be the rate limiter - assertThat(transformation).isNotNull(); - assertThat(transformation.getName()).contains("PscRateLimit"); - - // In test environment without real partitions, runtime rescale decision may differ - // The key verification is that configuration is properly passed through - // In production, with actual partition count > scan.parallelism, rescale would be applied + + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // Terminal is the rescale PartitionTransformation. + assertThat(terminal).isNotNull(); + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + + // Terminal's input is the rate-limit operator. + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation rateLimitOp = terminal.getInputs().get(0); + assertThat(rateLimitOp.getName()).contains("PscRateLimit"); + + // The rate-limit operator wraps the Kafka source. + assertThat(rateLimitOp.getInputs()).isNotEmpty(); + final Transformation sourceOp = rateLimitOp.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // Source and rate-limit are both pinned to min(scanParallelism, env). + final int expectedSourceParallelism = + Math.min(pscSource.scanParallelism, envParallelism); + assertThat(sourceOp.getParallelism()).isEqualTo(expectedSourceParallelism); + assertThat(rateLimitOp.getParallelism()).isEqualTo(expectedSourceParallelism); } @Test @@ -1534,7 +1580,10 @@ public void testOperatorChainingWithRescaleWithoutRateLimit() { @Test public void testRateLimiterParallelismConfiguration() { - // Verify that scan.parallelism and rate limiting configurations are properly stored + // Verifies that scan.parallelism and rate-limit options are wired into the + // PscDynamicSource and that the rate-limit operator is inserted between the + // source and the rescale, with its parallelism matching the (capped) source + // parallelism. final Map modifiedOptions = getModifiedOptions( getBasicSourceOptions(), @@ -1543,23 +1592,31 @@ public void testRateLimiterParallelismConfiguration() { addScanParallelismConfig(options, 200); addRateLimitConfig(options, 10000.0); }); - + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Verify all configuration values are correctly stored in the source + assertThat(pscSource.scanParallelism).isEqualTo(200); assertThat(pscSource.rateLimitRecordsPerSecond).isEqualTo(10000.0); - - // Get transformation using helper method (reuses same source) - final Transformation transformation = produceTransformationFromSource(pscSource, 10); - - // Rate limiter operator should be present - assertThat(transformation).isNotNull(); - assertThat(transformation.getName()).contains("PscRateLimit"); - - // Note: See testRescaleAndRateLimitChain() and testSkipsRescaleWhenNotNeeded() - // for comprehensive partition count testing with mocked providers. + assertThat(pscSource.enableRescale).isTrue(); + + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // Terminal is the rescale PartitionTransformation; its input is the rate-limit map. + assertThat(terminal).isNotNull(); + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getInputs()).isNotEmpty(); + + final Transformation rateLimitOp = terminal.getInputs().get(0); + assertThat(rateLimitOp.getName()).contains("PscRateLimit"); + + // Rate-limit parallelism follows the upstream source parallelism, which is + // capped at min(scanParallelism, env). + final int expectedSourceParallelism = + Math.min(pscSource.scanParallelism, envParallelism); + assertThat(rateLimitOp.getParallelism()).isEqualTo(expectedSourceParallelism); } @Test @@ -1603,13 +1660,21 @@ public void testRescaleCreatesPartitionTransformation() { @Test public void testRescaleAndRateLimitChain() { - // Verifies complete operator chain: Source → Rescale → RateLimit - + // Verifies the operator chain: Source -> RateLimit -> Rescale. + // Rescale is now applied AFTER the rate limiter, so the terminal transformation + // is a PartitionTransformation (the rescale), whose input is the rate-limit + // OneInputTransformation, whose input is the SourceTransformation. + // Also verifies that source/rate-limit parallelism is capped at env parallelism: + // sourceParallelism = min(scanParallelism, env.getParallelism()) = min(100, 10) = 10 + // rateLimiterParallelism = sourceStream.getParallelism() = 10 + try { - // Mock partition count = 20 + // Mock partition count = 20 (irrelevant to the gate now; rescale is gated only + // by scan.enable-rescale, but kept here so the resolver in PscTableCommonUtils + // is deterministic if it ever falls through to tier 3). PscTableCommonUtils.setProviderForTest((topicUris, props) -> 20); - - // Create source with scan.parallelism = 100 (> 20), rescale and rate limiting enabled + + // Create source with scan.parallelism = 100, rescale and rate limiting enabled final Map modifiedOptions = getModifiedOptions( getBasicSourceOptions(), @@ -1618,42 +1683,66 @@ public void testRescaleAndRateLimitChain() { addScanParallelismConfig(options, 100); addRateLimitConfig(options, 5000.0); }); - + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Verify configuration + + // Verify configuration carried into the source. + // (PscDynamicSource.scanParallelism now holds the effective parallelism + // resolved by the factory; with scan.parallelism set, tier 1 returns 100.) assertThat(pscSource.scanParallelism).isEqualTo(100); assertThat(pscSource.rateLimitRecordsPerSecond).isEqualTo(5000.0); - - // Get transformation - should apply rescale since 100 > 20 - final Transformation transformation = produceTransformationFromSource(pscSource, 10); - - // The final transformation should be the rate limiter - assertThat(transformation).isNotNull(); - assertThat(transformation.getName()).contains("PscRateLimit"); - assertThat(transformation.getParallelism()).isEqualTo(100); - - // The input to rate limiter should be PartitionTransformation (rescale) - assertThat(transformation.getInputs()).isNotEmpty(); - Transformation inputTransformation = transformation.getInputs().get(0); - assertThat(inputTransformation).isInstanceOf(PartitionTransformation.class); + assertThat(pscSource.enableRescale).isTrue(); + + // Build the transformation with env parallelism = 10. + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // 1) Terminal is the rescale operator (PartitionTransformation). + assertThat(terminal).isNotNull(); + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + + // 2) Terminal's input is the rate-limit map. + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation rateLimitOp = terminal.getInputs().get(0); + assertThat(rateLimitOp.getName()).contains("PscRateLimit"); + + // Rate-limit parallelism should match the (capped) source parallelism: + // min(scanParallelism=100, env=10) = 10. + final int expectedSourceParallelism = + Math.min(pscSource.scanParallelism, envParallelism); + assertThat(rateLimitOp.getParallelism()).isEqualTo(expectedSourceParallelism); + + // 3) The rate-limit map's input is the Kafka source. + assertThat(rateLimitOp.getInputs()).isNotEmpty(); + final Transformation sourceOp = rateLimitOp.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // Source operator parallelism should be capped at env parallelism. + assertThat(sourceOp.getParallelism()).isEqualTo(expectedSourceParallelism); } finally { PscTableCommonUtils.resetProvider(); } } @Test - public void testSkipsRescaleWhenNotNeeded() { - // Verifies rescale is NOT applied when scan.parallelism <= partition count - + public void testRescaleAlwaysAppliedWhenEnabled() { + // Verifies that rescale is applied whenever scan.enable-rescale=true, regardless + // of how scan.parallelism compares to partition count. The factory now passes + // shouldRescale through directly and computes the source parallelism via + // PscTableCommonUtils.getEffectiveSourceParallelism(), and PscDynamicSource + // unconditionally wraps the stream in rescale() when enableRescale is true. + try { - // Mock partition count = 50 + // Mock partition count = 50. In the OLD logic this would have caused + // shouldApplyRescale() to return false (since scan.parallelism 20 <= 50); + // in the NEW logic it must NOT prevent rescale from being applied. PscTableCommonUtils.setProviderForTest((topicUris, props) -> 50); - - // Create source with scan.parallelism = 20 (< 50), rescale enabled + + // scan.parallelism = 20, rescale enabled, no rate limiting. final Map modifiedOptions = getModifiedOptions( getBasicSourceOptions(), @@ -1661,23 +1750,32 @@ public void testSkipsRescaleWhenNotNeeded() { addRescaleConfig(options, true); addScanParallelismConfig(options, 20); }); - + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Verify configuration + assertThat(pscSource.enableRescale).isTrue(); assertThat(pscSource.scanParallelism).isEqualTo(20); - - // Get transformation - should NOT apply rescale since 20 < 50 - final Transformation transformation = produceTransformationFromSource(pscSource, 10); - - // With scan.parallelism (20) < partition count (50), rescale is NOT applied - // The transformation should be SourceTransformation, not PartitionTransformation - assertThat(transformation).isNotNull(); - assertThat(transformation).isInstanceOf(SourceTransformation.class); - assertThat(transformation).isNotInstanceOf(PartitionTransformation.class); + + // Build the transformation with env parallelism = 10 (smaller than scan.parallelism). + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // 1) Terminal is the rescale operator (PartitionTransformation), NOT the bare source. + assertThat(terminal).isNotNull(); + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + + // 2) The rescale's input is the Kafka SourceTransformation (no rate limiter in this scenario). + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // 3) Source operator parallelism is capped at min(scanParallelism, env). + final int expectedSourceParallelism = + Math.min(pscSource.scanParallelism, envParallelism); + assertThat(sourceOp.getParallelism()).isEqualTo(expectedSourceParallelism); } finally { PscTableCommonUtils.resetProvider(); } @@ -1685,14 +1783,18 @@ public void testSkipsRescaleWhenNotNeeded() { @Test public void testRescaleAndRateLimitWithDifferentParallelism() { - // Verifies PartitionTransformation and correct rate limiter parallelism - // when scan.parallelism > partition count with both rescale and rate limiting - + // Verifies that when scan.parallelism is larger than env parallelism, both the + // source and the rate-limit operator are capped at env parallelism. The chain is: + // Source(parallelism=min(scanParallelism, env)) + // -> PscRateLimit(parallelism=sourceStream.getParallelism()) + // -> rescale (terminal PartitionTransformation) + try { - // Mock partition count = 15 + // Mock partition count = 15. In the new logic this is not consulted because + // scan.parallelism (tier 1) wins in getEffectiveSourceParallelism. PscTableCommonUtils.setProviderForTest((topicUris, props) -> 15); - - // Create source with scan.parallelism = 80 (> 15), rescale and rate limiting + + // scan.parallelism = 80 (> env parallelism = 10), rescale + rate limit enabled. final Map modifiedOptions = getModifiedOptions( getBasicSourceOptions(), @@ -1701,35 +1803,343 @@ public void testRescaleAndRateLimitWithDifferentParallelism() { addScanParallelismConfig(options, 80); addRateLimitConfig(options, 10000.0); }); - + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); assertThat(actualSource).isInstanceOf(PscDynamicSource.class); - + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; - - // Verify configuration assertThat(pscSource.scanParallelism).isEqualTo(80); assertThat(pscSource.rateLimitRecordsPerSecond).isEqualTo(10000.0); assertThat(pscSource.enableRescale).isTrue(); - - // Get transformation with global parallelism = 10 (different from scan.parallelism) - final Transformation transformation = produceTransformationFromSource(pscSource, 10); - - // The final transformation should be the rate limiter - assertThat(transformation).isNotNull(); - assertThat(transformation.getName()).contains("PscRateLimit"); - // Rate limiter parallelism should be scan.parallelism (80), not global (10) - assertThat(transformation.getParallelism()).isEqualTo(80); - - // The input to rate limiter should be PartitionTransformation (rescale) - assertThat(transformation.getInputs()).isNotEmpty(); - Transformation inputTransformation = transformation.getInputs().get(0); - assertThat(inputTransformation).isInstanceOf(PartitionTransformation.class); + + // Build the transformation with env parallelism = 10 (smaller than scan.parallelism). + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // 1) Terminal is the rescale PartitionTransformation. + assertThat(terminal).isNotNull(); + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + + // 2) Terminal's input is the rate-limit map. + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation rateLimitOp = terminal.getInputs().get(0); + assertThat(rateLimitOp.getName()).contains("PscRateLimit"); + + // Rate-limit and source are both capped at env parallelism (NOT 80). + final int expectedSourceParallelism = + Math.min(pscSource.scanParallelism, envParallelism); + assertThat(rateLimitOp.getParallelism()).isEqualTo(expectedSourceParallelism); + + // 3) The rate-limit map's input is the Kafka source, also at the capped parallelism. + assertThat(rateLimitOp.getInputs()).isNotEmpty(); + final Transformation sourceOp = rateLimitOp.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + assertThat(sourceOp.getParallelism()).isEqualTo(expectedSourceParallelism); } finally { PscTableCommonUtils.resetProvider(); } } + @Test + public void testSourceParallelismCappedByEnvParallelism() { + // When scan.parallelism > env parallelism, PscDynamicSource.produceDataStream() caps + // the source operator at env.getParallelism() so we never exceed the available slots. + // Chain (no rate limiter): Source(parallelism=env) -> rescale (PartitionTransformation). + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + addRescaleConfig(options, true); + addScanParallelismConfig(options, 80); + }); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + // scan.parallelism (tier 1) wins in getEffectiveSourceParallelism and is stored on the source. + assertThat(pscSource.scanParallelism).isEqualTo(80); + assertThat(pscSource.enableRescale).isTrue(); + + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // Terminal is the rescale PartitionTransformation; its input is the Kafka source. + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // Cap kicks in: source parallelism = env (10), NOT scan.parallelism (80). + assertThat(sourceOp.getParallelism()).isEqualTo(envParallelism); + assertThat(sourceOp.getParallelism()).isLessThan(pscSource.scanParallelism); + } + + @Test + public void testSourceParallelismUsesScanWhenSmallerThanEnv() { + // When scan.parallelism < env parallelism, the cap does NOT change the value: + // source parallelism = scan.parallelism. This is the symmetric counterpart to + // testSourceParallelismCappedByEnvParallelism. + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + addRescaleConfig(options, true); + addScanParallelismConfig(options, 4); + }); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.scanParallelism).isEqualTo(4); + assertThat(pscSource.enableRescale).isTrue(); + + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // Terminal is the rescale PartitionTransformation; its input is the Kafka source. + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // Source parallelism = scan.parallelism (4), since it is already <= env (10). + assertThat(sourceOp.getParallelism()).isEqualTo(pscSource.scanParallelism); + assertThat(sourceOp.getParallelism()).isLessThan(envParallelism); + } + + @Test + public void testEffectiveParallelismFallsBackToPartitionCount() { + // With neither scan.parallelism nor table.exec.resource.default-parallelism set, + // PscTableCommonUtils.getEffectiveSourceParallelism() must fall through to the + // Kafka partition count (tier 3). The factory stores that resolved value on + // PscDynamicSource.scanParallelism, so we can assert it directly. + try { + final int partitionCount = 7; + PscTableCommonUtils.setProviderForTest((topicUris, props) -> partitionCount); + + // Only rescale enabled - no scan.parallelism, no rate limit. + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> addRescaleConfig(options, true)); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isTrue(); + // tier 1 (scan.parallelism) and tier 2 (table.exec.resource.default-parallelism) + // are unset, so the partition-count fallback wins. + assertThat(pscSource.scanParallelism).isEqualTo(partitionCount); + + // Verify the resolved value is applied to the source operator at stream-build time. + final int envParallelism = 20; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + assertThat(sourceOp.getParallelism()) + .isEqualTo(Math.min(partitionCount, envParallelism)); + } finally { + PscTableCommonUtils.resetProvider(); + } + } + + @Test + public void testEffectiveParallelismFallsBackToTableExec() { + // Tier 2 of getEffectiveSourceParallelism(): when scan.parallelism is not set but + // table.exec.resource.default-parallelism is, the factory must resolve the source + // parallelism to that value AND must NOT fall through to the partition-count + // provider (tier 3). We inject a non-default value via a custom global Configuration + // and install a partition-count mock with a distinguishable value to assert + // tier 3 is never consulted. + try { + final int sentinelPartitionCount = 999; // distinct from the tier-2 value + PscTableCommonUtils.setProviderForTest((topicUris, props) -> sentinelPartitionCount); + + final int tableExecParallelism = 12; + final Configuration globalConfig = new Configuration(); + globalConfig.set( + ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + tableExecParallelism); + + // Only rescale enabled. No scan.parallelism, no rate limit. + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> addRescaleConfig(options, true)); + + final DynamicTableSource actualSource = + createTableSourceWithGlobalConfig(SCHEMA, modifiedOptions, globalConfig); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isTrue(); + // tier 2 wins: resolved effective parallelism equals table.exec.resource.default-parallelism, + // NOT the partition-count sentinel. + assertThat(pscSource.scanParallelism).isEqualTo(tableExecParallelism); + assertThat(pscSource.scanParallelism).isNotEqualTo(sentinelPartitionCount); + + // The resolved value flows through to the source operator at stream-build time, + // capped by env parallelism. + final int envParallelism = 20; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + assertThat(sourceOp.getParallelism()) + .isEqualTo(Math.min(tableExecParallelism, envParallelism)); + } finally { + PscTableCommonUtils.resetProvider(); + } + } + + @Test + public void testRescaleWithUnknownEffectiveParallelism() { + // When all three tiers of getEffectiveSourceParallelism() fail to produce a positive + // value, the factory stores -1 on PscDynamicSource.scanParallelism. PscDynamicSource + // must still honor scan.enable-rescale=true (i.e. apply rescale()), but it must NOT + // call setParallelism() on the source operator -- the source inherits env parallelism + // unchanged. + try { + // Tier 3 failure: partition-count provider returns -1 (signals "could not determine"). + PscTableCommonUtils.setProviderForTest((topicUris, props) -> -1); + + // No scan.parallelism (tier 1 unset). FactoryMocks.createTableSource passes an empty + // Configuration, so table.exec.resource.default-parallelism defaults to -1 (tier 2 + // unset). Combined with the tier-3 failure above, getEffectiveSourceParallelism() + // must return -1. + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> addRescaleConfig(options, true)); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isTrue(); + // All three tiers failed -> -1 ("unknown") is propagated to the source. + assertThat(pscSource.scanParallelism).isEqualTo(-1); + + // Build the transformation with env parallelism = 5. + final int envParallelism = 5; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // 1) Rescale is STILL applied (terminal is PartitionTransformation) because + // scan.enable-rescale=true is treated as authoritative -- the unknown + // effective parallelism only suppresses the explicit setParallelism call. + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + + // 2) The rescale's input is the bare SourceTransformation (no rate limiter). + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // 3) Source operator inherits env parallelism (no setParallelism call was made + // because scanParallelism <= 0). This is the key behavioral assertion for + // the unknown case: we do NOT pin the source to -1, we leave it as-is. + assertThat(sourceOp.getParallelism()).isEqualTo(envParallelism); + } finally { + PscTableCommonUtils.resetProvider(); + } + } + + @Test + public void testRescaleAfterRateLimitOrder() { + // Pins the operator ordering established in PscDynamicSource.produceDataStream(): + // Source -> PscRateLimit (rate limiter) -> rescale (terminal) + // i.e. rescale is the OUTERMOST operator, applied AFTER the rate limiter, + // not before it. This guards against accidental reordering. + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + addRescaleConfig(options, true); + addRateLimitConfig(options, 1000.0); + addScanParallelismConfig(options, 20); + }); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isTrue(); + assertThat(pscSource.rateLimitRecordsPerSecond).isEqualTo(1000.0); + assertThat(pscSource.scanParallelism).isEqualTo(20); + + final Transformation terminal = produceTransformationFromSource(pscSource, 10); + + // 1) Outermost / terminal op is the rescale. + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getName()).doesNotContain("PscRateLimit"); + + // 2) Immediately below rescale is the rate limiter (NOT the bare source). + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation rateLimitOp = terminal.getInputs().get(0); + assertThat(rateLimitOp.getName()).contains("PscRateLimit"); + assertThat(rateLimitOp).isNotInstanceOf(PartitionTransformation.class); + assertThat(rateLimitOp).isNotInstanceOf(SourceTransformation.class); + + // 3) Below the rate limiter is the Kafka source. This rules out the swapped + // ordering (Source -> rescale -> rate limiter), in which case the rate + // limiter's input would be a PartitionTransformation. + assertThat(rateLimitOp.getInputs()).isNotEmpty(); + final Transformation sourceOp = rateLimitOp.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + } + + @Test + public void testRateLimiterParallelismMatchesSourceParallelism() { + // The rate limiter is chained via DataStream.map(...) on the source stream, so its + // parallelism must equal the source stream's parallelism (the capped value), + // never scan.parallelism directly when scan.parallelism > env parallelism. + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + addRescaleConfig(options, true); + addRateLimitConfig(options, 2000.0); + addScanParallelismConfig(options, 50); + }); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.scanParallelism).isEqualTo(50); + assertThat(pscSource.rateLimitRecordsPerSecond).isEqualTo(2000.0); + + final int envParallelism = 8; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation rateLimitOp = terminal.getInputs().get(0); + assertThat(rateLimitOp.getName()).contains("PscRateLimit"); + + assertThat(rateLimitOp.getInputs()).isNotEmpty(); + final Transformation sourceOp = rateLimitOp.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // Both must equal min(scanParallelism, env). The rate limiter parallelism is + // explicitly read from sourceStream.getParallelism() in PscDynamicSource, so it + // is locked to the source operator's parallelism. + final int expectedParallelism = Math.min(pscSource.scanParallelism, envParallelism); + assertThat(sourceOp.getParallelism()).isEqualTo(expectedParallelism); + assertThat(rateLimitOp.getParallelism()).isEqualTo(expectedParallelism); + assertThat(rateLimitOp.getParallelism()).isEqualTo(sourceOp.getParallelism()); + } + @Test public void testTableSinkAutoCompleteSchemaRegistrySubject() { // only format @@ -2217,7 +2627,11 @@ private static PscDynamicSource createExpectedScanSource( null, false, null, // rateLimitRecordsPerSecond - null); // scanParallelism + -1); // scanParallelism: PscDynamicTableFactory passes + // `shouldRescale ? getEffectiveSourceParallelism(...) : -1`, so when + // rescale is disabled (the default for these expected-source builders) + // the actual source carries Integer(-1), not null. We mirror that here + // so PscDynamicSource.equals()'s scanParallelism check passes. } private static PscDynamicSink createExpectedSink( diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java index 2afa561..41ab741 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java @@ -43,6 +43,7 @@ import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.table.api.DataTypes; @@ -71,6 +72,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.util.TestLogger; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -158,6 +160,16 @@ public class UpsertPscDynamicTableFactoryTest extends TestLogger { @Rule public ExpectedException thrown = ExpectedException.none(); + /** + * Reset partition count provider after each test to prevent test pollution. + * Tests that mock the provider via {@link PscTableCommonUtils#setProviderForTest} + * must not affect other tests. + */ + @After + public void tearDown() { + PscTableCommonUtils.resetProvider(); + } + @Test public void testTableSource() { final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); @@ -700,6 +712,194 @@ public void testExactlyOnceGuaranteeWithoutTransactionalIdPrefix() { createTableSink(SINK_SCHEMA, modifiedOptions); } + // -------------------------------------------------------------------------------------------- + // Rescale / parallelism tests for the upsert source + // -------------------------------------------------------------------------------------------- + + @Test + public void testUpsertSourceWithRescaleEnabledProducesPartitionTransformation() { + // With scan.enable-rescale=true on the upsert source, the produced stream must end + // in a PartitionTransformation (rescale()), and the upstream operator must be the + // bare Kafka source (no rate limiter set). + final Map options = getModifiedOptions( + getFullSourceOptions(), + opts -> { + addRescaleConfig(opts, true); + addScanParallelismConfig(opts, 50); + }); + + final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, options); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isTrue(); + // scan.parallelism (tier 1) wins in getEffectiveSourceParallelism and is stored on + // the source by UpsertPscDynamicTableFactory. + assertThat(pscSource.scanParallelism).isEqualTo(50); + + final int envParallelism = 20; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + assertThat(sourceOp.getParallelism()) + .isEqualTo(Math.min(pscSource.scanParallelism, envParallelism)); + } + + @Test + public void testUpsertSourceWithRescaleAndRateLimitOrder() { + // Pins the operator ordering for the upsert source path: + // Source -> PscRateLimit -> rescale (terminal PartitionTransformation) + // This must match the non-upsert factory's behavior. + final Map options = getModifiedOptions( + getFullSourceOptions(), + opts -> { + addRescaleConfig(opts, true); + addScanParallelismConfig(opts, 20); + addRateLimitConfig(opts, 1000.0); + }); + + final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, options); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isTrue(); + assertThat(pscSource.rateLimitRecordsPerSecond).isEqualTo(1000.0); + assertThat(pscSource.scanParallelism).isEqualTo(20); + + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // 1) Outermost / terminal op is the rescale. + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getName()).doesNotContain("PscRateLimit"); + + // 2) Immediately below rescale is the rate limiter (NOT the bare source). + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation rateLimitOp = terminal.getInputs().get(0); + assertThat(rateLimitOp.getName()).contains("PscRateLimit"); + + // 3) Below the rate limiter is the Kafka source. + assertThat(rateLimitOp.getInputs()).isNotEmpty(); + final Transformation sourceOp = rateLimitOp.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // Source and rate-limit are both pinned to min(scanParallelism, env). + final int expectedSourceParallelism = + Math.min(pscSource.scanParallelism, envParallelism); + assertThat(sourceOp.getParallelism()).isEqualTo(expectedSourceParallelism); + assertThat(rateLimitOp.getParallelism()).isEqualTo(expectedSourceParallelism); + } + + @Test + public void testUpsertSourceParallelismCappedByEnv() { + // When scan.parallelism (80) exceeds env.getParallelism() (10), the source + // operator parallelism must be capped at env parallelism (10) by + // PscDynamicSource.produceDataStream(), even though pscSource.scanParallelism + // continues to report the configured value (80). + final Map options = getModifiedOptions( + getFullSourceOptions(), + opts -> { + addRescaleConfig(opts, true); + addScanParallelismConfig(opts, 80); + }); + + final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, options); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + // Configured scan.parallelism is preserved on the source instance. + assertThat(pscSource.scanParallelism).isEqualTo(80); + assertThat(pscSource.enableRescale).isTrue(); + + final int envParallelism = 10; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + assertThat(terminal).isInstanceOf(PartitionTransformation.class); + assertThat(terminal.getInputs()).isNotEmpty(); + final Transformation sourceOp = terminal.getInputs().get(0); + assertThat(sourceOp).isInstanceOf(SourceTransformation.class); + + // Cap kicks in: source parallelism = env (10), NOT scan.parallelism (80). + assertThat(sourceOp.getParallelism()).isEqualTo(envParallelism); + assertThat(sourceOp.getParallelism()).isLessThan(pscSource.scanParallelism); + } + + @Test + public void testUpsertSourceWithRescaleDisabled() { + // With scan.enable-rescale=false the upsert factory must skip + // getEffectiveSourceParallelism() entirely and store -1 on the source. + // No rescale() should be applied, and (in the absence of a rate limiter) the + // terminal transformation should be the bare SourceTransformation. + final Map options = getModifiedOptions( + getFullSourceOptions(), + opts -> { + addRescaleConfig(opts, false); + // scan.parallelism is set but should NOT be consulted because rescale=false. + addScanParallelismConfig(opts, 20); + }); + + final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, options); + assertThat(actualSource).isInstanceOf(PscDynamicSource.class); + + final PscDynamicSource pscSource = (PscDynamicSource) actualSource; + assertThat(pscSource.enableRescale).isFalse(); + // When rescale is disabled the factory short-circuits to -1 regardless of scan.parallelism. + assertThat(pscSource.scanParallelism).isEqualTo(-1); + + final int envParallelism = 5; + final Transformation terminal = + produceTransformationFromSource(pscSource, envParallelism); + + // No rescale, no rate limiter => terminal is the bare Kafka source. + assertThat(terminal).isInstanceOf(SourceTransformation.class); + assertThat(terminal).isNotInstanceOf(PartitionTransformation.class); + // Source inherits env parallelism (no explicit setParallelism call). + assertThat(terminal.getParallelism()).isEqualTo(envParallelism); + } + + /** + * Helper method to create a transformation from a {@link PscDynamicSource}. + * Reduces repetitive code in operator chaining tests. + * + * @param pscSource the PSC dynamic source + * @param globalParallelism global parallelism for the execution environment + * @return the final transformation in the operator chain + */ + private Transformation produceTransformationFromSource( + PscDynamicSource pscSource, int globalParallelism) { + final ScanTableSource.ScanRuntimeProvider runtimeProvider = + pscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(runtimeProvider).isInstanceOf(DataStreamScanProvider.class); + + final DataStreamScanProvider dataStreamProvider = (DataStreamScanProvider) runtimeProvider; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.setParallelism(globalParallelism); + + return dataStreamProvider.produceDataStream(n -> Optional.empty(), env).getTransformation(); + } + + /** Helper method to add rescale configuration to options map. */ + private static void addRescaleConfig(Map options, boolean enableRescale) { + options.put(PscConnectorOptions.SCAN_ENABLE_RESCALE.key(), String.valueOf(enableRescale)); + } + + /** Helper method to add rate limit configuration to options map. */ + private static void addRateLimitConfig(Map options, double rateLimit) { + options.put(PscConnectorOptions.SCAN_RATE_LIMIT.key(), String.valueOf(rateLimit)); + } + + /** Helper method to add scan.parallelism configuration to options map. */ + private static void addScanParallelismConfig(Map options, int parallelism) { + options.put(PscConnectorOptions.SCAN_PARALLELISM.key(), String.valueOf(parallelism)); + } + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- @@ -837,6 +1037,12 @@ private PscDynamicSource createExpectedScanSource( String keyPrefix, String topicUri, Properties properties) { + // NOTE: UpsertPscDynamicTableFactory now ALWAYS passes a non-null effectiveParallelism + // to PscDynamicSource (it computes `shouldRescale ? getEffectiveSourceParallelism(...) : -1`). + // The backward-compatible 16-arg PscDynamicSource ctor delegates with scanParallelism=null, + // which would no longer match the factory output. We therefore use the full ctor here and + // mirror the factory's defaults explicitly: sourceUidPrefix=null, enableRescale=false, + // rateLimitRecordsPerSecond=null, scanParallelism=-1. return new PscDynamicSource( producedDataType, keyDecodingFormat, @@ -854,7 +1060,11 @@ private PscDynamicSource createExpectedScanSource( Collections.emptyMap(), 0, true, - FactoryMocks.IDENTIFIER.asSummaryString()); + FactoryMocks.IDENTIFIER.asSummaryString(), + null, + false, + null, + -1); } private static PscDynamicSink createExpectedSink( From f48a72e5711b7e4cfed1f962b27c04e1c2993f8c Mon Sep 17 00:00:00 2001 From: Ashish Jhaveri Date: Fri, 15 May 2026 14:27:35 -0500 Subject: [PATCH 5/5] removed unused code --- .../connectors/psc/table/PscDynamicSource.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index fc169e5..7545c3c 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -225,17 +225,6 @@ public static boolean isRateLimitingEnabled(@Nullable Double rateLimitRecordsPer return rateLimitRecordsPerSecond != null && rateLimitRecordsPerSecond > 0; } - /** - * Determines the intended downstream parallelism. - * Uses scan.parallelism if configured, otherwise falls back to global default. - * - * @param execEnv the stream execution environment - * @return the intended parallelism for downstream operators - */ - private int getIntendedParallelism(StreamExecutionEnvironment execEnv) { - return scanParallelism != null ? scanParallelism : execEnv.getParallelism(); - } - /** * Backwards-compatible constructor that accepts int[] projections. * Converts them to int[][] format internally. @@ -473,8 +462,6 @@ public DataStream produceDataStream( + "parallelism = {}", execEnv.getParallelism()); } - LOG.info("@@@@>>>enableRescale ::" + enableRescale); - if (isRateLimitingEnabled(rateLimitRecordsPerSecond)) { String rateLimiterOperatorName = "PscRateLimit-" + tableIdentifier; resultStream = resultStream