Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,6 @@ public static boolean isRateLimitingEnabled(@Nullable Double rateLimitRecordsPer
return rateLimitRecordsPerSecond != null && rateLimitRecordsPerSecond > 0;
}

/**
* Determines the intended downstream parallelism.
* Uses scan.parallelism if configured, otherwise falls back to global default.
*
* @param execEnv the stream execution environment
* @return the intended parallelism for downstream operators
*/
private int getIntendedParallelism(StreamExecutionEnvironment execEnv) {
return scanParallelism != null ? scanParallelism : execEnv.getParallelism();
}

/**
* Backwards-compatible constructor that accepts int[] projections.
* Converts them to int[][] format internally.
Expand Down Expand Up @@ -449,41 +438,42 @@ public DataStream<RowData> produceDataStream(
DataStreamSource<RowData> sourceStream =
execEnv.fromSource(
pscSource, watermarkStrategy, "PscSource-" + tableIdentifier);

// Source parallelism is determined by partition count (Flink's default for Kafka-like sources)
// We do NOT set it explicitly even if scan.parallelism is configured, because:
// - A Kafka source can only have as many active subtasks as there are partitions
// - Setting higher parallelism would create idle subtasks
// - Instead, we use rescale() to redistribute data to the intended downstream parallelism


DataStream<RowData> resultStream = sourceStream;

// Determine the intended downstream parallelism for rate limiting
// This is scan.parallelism if set, otherwise global default parallelism
int intendedParallelism = getIntendedParallelism(execEnv);

// Apply rescale FIRST if enabled
// This redistributes data from source parallelism (= partition count) to intended parallelism
// Ensures all downstream subtasks (including rate limiters) receive traffic
LOG.info("Rescale logic " + scanParallelism + "::" + enableRescale + "::" + execEnv.getParallelism());
if (enableRescale) {
resultStream = resultStream.rescale();
if(scanParallelism != null && scanParallelism > 0 ) {
Comment thread
ashish47108 marked this conversation as resolved.
int sourceParallelism = Math.min(scanParallelism, execEnv.getParallelism());
Comment thread
ashish47108 marked this conversation as resolved.
sourceStream.setParallelism(sourceParallelism);
LOG.info("Rescale enabled: set source parallelism to {} "
+ "(partition count / effective parallelism): {}, "
+ "job parallelism: {}",
sourceParallelism, scanParallelism, execEnv.getParallelism());
} else {
LOG.info("Rescale enabled but could not determine partition count or "
+ "from effective parallelism, "
+ "source will use job default parallelism: {}",
execEnv.getParallelism());
}
}

// Apply rate limiting AFTER rescale if configured
// Rate limiter parallelism must match the actual parallelism of incoming data:
// - If rescale enabled: use intendedParallelism (all subtasks are active after rescale)
// - If rescale disabled: use source parallelism (rate limiter stays with source)
else
{
LOG.info("Rescale disabled: source will use job default "
+ "parallelism = {}", execEnv.getParallelism());
}

if (isRateLimitingEnabled(rateLimitRecordsPerSecond)) {
int rateLimiterParallelism = enableRescale ? intendedParallelism : sourceStream.getParallelism();

String rateLimiterOperatorName = "PscRateLimit-" + tableIdentifier;
resultStream = resultStream
.map(new PscRateLimitMap<>(rateLimitRecordsPerSecond))
.setParallelism(rateLimiterParallelism)
.setParallelism(sourceStream.getParallelism())
.name(rateLimiterOperatorName)
.uid(rateLimiterOperatorName);
}

if (enableRescale) {
resultStream = resultStream.rescale();
}

// Prefer explicit user-provided UID prefix if present; otherwise rely on provider context.
if (sourceUidPrefix != null) {
final String trimmedPrefix = sourceUidPrefix.trim();
Expand Down Expand Up @@ -961,7 +951,7 @@ protected PscSource<RowData> createPscSource(
offsetResetConfig = getResetStrategy(offsetResetConfig);
pscSourceBuilder.setStartingOffsets(
OffsetsInitializer.committedOffsets(offsetResetConfig));
LOG.info("Setting starting offsets to committed offsets with reset strategy: {}", offsetResetConfig);
LOG.info("####>>>Setting starting offsets to committed offsets with reset strategy: {}", offsetResetConfig);
break;
case SPECIFIC_OFFSETS:
Map<TopicUriPartition, Long> offsets = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateDeliveryGuarantee;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSinkOptions;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateTableSourceOptions;
import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale;
import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.getEffectiveSourceParallelism;

/**
* Factory for creating configured instances of {@link PscDynamicSource} and {@link
Expand Down Expand Up @@ -220,22 +220,24 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

final boolean shouldRescale =tableOptions.get(SCAN_ENABLE_RESCALE);
// Read scan.parallelism configuration
final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);

// Log scan parallelism configuration if set
if (scanParallelism != null) {
LOG.info("scan.parallelism configured: {} (will override partition-based parallelism)",
scanParallelism);
}

// Determine if rescale should be applied based on scan.parallelism (if set) or global default
final boolean shouldRescale = shouldApplyRescale(
tableOptions,
final Integer effectiveParallelism = shouldRescale ?
getEffectiveSourceParallelism(
context.getConfiguration(),
getSourceTopicUris(tableOptions),
properties,
scanParallelism);
scanParallelism)
: -1;


// Log scan parallelism configuration if set
if (shouldRescale && effectiveParallelism > 0 ) {
LOG.info("Psc will use parallelism={} for source operator",
effectiveParallelism);
}

// Get rate limit configuration
final Double rateLimitRecordsPerSecond = tableOptions.getOptional(SCAN_RATE_LIMIT).orElse(null);
Expand Down Expand Up @@ -266,7 +268,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null),
shouldRescale,
rateLimitRecordsPerSecond,
scanParallelism);
effectiveParallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.pinterest.psc.metadata.TopicUriMetadata;
import com.pinterest.psc.metadata.client.PscMetadataClient;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -91,77 +92,60 @@ static synchronized void resetProvider() {
}

/**
* Determines whether rescale() should be applied based on:
* 1. scan.enable-rescale flag must be true
* 2. Effective parallelism (scan.parallelism or global default) > partition count
*
* <p>When scan.parallelism is set, it takes precedence over global default for
* rescale decision logic. This ensures rescale is only applied when the user's
* intended source parallelism exceeds partition count.
* Determines the effective parallelism for the source by walking the following
* fallback chain and returning the first source that yields a positive value:
* <ol>
* <li>{@code scan.parallelism} (table-level override) &mdash; used if non-null and not -1</li>
* <li>{@code table.exec.resource.default-parallelism} &mdash; used if set and not -1</li>
* <li>Kafka partition count for the source topics (queried via PSC metadata client)</li>
* </ol>
*
* @param tableOptions User's table configuration options
* @param globalConfig Global Flink configuration
* @param topicUris List of topic URIs to query for partition counts
* @param pscProperties PSC properties for metadata client connection
* <p>If none of the above produces a positive value (e.g. the partition-count query
* fails), this method returns {@code -1} to signal "unknown".
*
* @param globalConfig Global Flink configuration (read for table.exec.resource.default-parallelism)
* @param topicUris List of topic URIs used to query partition count
* @param pscProperties PSC properties for metadata client connection
* @param scanParallelism Optional explicit scan.parallelism configuration
* @return true if rescale should be applied, false otherwise
* @return Effective parallelism, or -1 if it cannot be determined
*/
public static boolean shouldApplyRescale(
ReadableConfig tableOptions,
public static int getEffectiveSourceParallelism(
ReadableConfig globalConfig,
List<String> topicUris,
Properties pscProperties,
@Nullable Integer scanParallelism) {

// First check if rescale is enabled by user
if (!tableOptions.get(SCAN_ENABLE_RESCALE)) {
return false;
}

// Determine effective parallelism: scan.parallelism takes precedence over global default
Integer effectiveParallelism;
String parallelismSource;

// 1) scan.parallelism
if (scanParallelism != null && scanParallelism > 0) {
effectiveParallelism = scanParallelism;
parallelismSource = PscConnectorOptions.SCAN_PARALLELISM.key();
} else {
effectiveParallelism = globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
parallelismSource = ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key();
LOG.info("Effective source parallelism = {} (source: {})",
scanParallelism, PscConnectorOptions.SCAN_PARALLELISM.key());
return scanParallelism;
}

// If parallelism is not set or invalid, cannot determine
if (effectiveParallelism == null || effectiveParallelism <= 0) {
LOG.info("scan.enable-rescale is true, but effective parallelism from {} is {} (not set or invalid). " +
"Rescale will not be applied.",
parallelismSource, effectiveParallelism);
return false;

// 2) table.exec.resource.default-parallelism
Integer tableExecParallelism =
globalConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
if (tableExecParallelism != null && tableExecParallelism > 0) {
LOG.info("Effective source parallelism = {} (source: {})",
tableExecParallelism,
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key());
return tableExecParallelism;
}

// Query partition count using the configured provider (mockable for testing)
// 3) Kafka partition count
int partitionCount = partitionCountProvider.getPartitionCount(topicUris, pscProperties);

// If partition count couldn't be determined, don't apply rescale (fail-safe)
if (partitionCount <= 0) {
LOG.warn("scan.enable-rescale is true, but partition count could not be determined. " +
"Rescale will not be applied.");
return false;
if (partitionCount > 0) {
LOG.info("Effective source parallelism = {} (source: kafka partition count)",
partitionCount);
return partitionCount;
}

// Apply rescale only if effective parallelism exceeds partition count
boolean shouldRescale = effectiveParallelism > partitionCount;

if (shouldRescale) {
LOG.info("Applying rescale(): {} ({}) > partition count ({}). " +
"Data will be redistributed to fully utilize downstream operators.",
parallelismSource, effectiveParallelism, partitionCount);
} else {
LOG.info("Skipping rescale(): {} ({}) <= partition count ({}). " +
"No shuffle needed as source parallelism matches or is less than partition count.",
parallelismSource, effectiveParallelism, partitionCount);
}

return shouldRescale;
// 4) Unknown
LOG.warn("Could not determine effective source parallelism: {} is unset/-1, {} is unset/-1, " +
"and partition count could not be retrieved. Returning -1.",
PscConnectorOptions.SCAN_PARALLELISM.key(),
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key());
return -1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateScanBoundedMode;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateConsumerClientOptions;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.validateProducerClientOptions;
import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.shouldApplyRescale;
import static com.pinterest.flink.streaming.connectors.psc.table.PscTableCommonUtils.getEffectiveSourceParallelism;

/** Upsert-Psc factory. */
public class UpsertPscDynamicTableFactory
Expand Down Expand Up @@ -161,22 +161,24 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final PscConnectorOptionsUtil.BoundedOptions boundedOptions = getBoundedOptions(tableOptions);

final boolean shouldRescale =tableOptions.get(SCAN_ENABLE_RESCALE);
// Read scan.parallelism configuration
final Integer scanParallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);

// Log scan parallelism configuration if set
if (scanParallelism != null) {
LOG.info("scan.parallelism configured: {} (will override partition-based parallelism)",
scanParallelism);
}

// Determine if rescale should be applied based on scan.parallelism (if set) or global default
final boolean shouldRescale = shouldApplyRescale(
tableOptions,
final Integer effectiveParallelism = shouldRescale ?
getEffectiveSourceParallelism(
context.getConfiguration(),
getSourceTopicUris(tableOptions),
properties,
scanParallelism);
scanParallelism)
: -1;


// Log scan parallelism configuration if set
if (shouldRescale && effectiveParallelism > 0 ) {
LOG.info("Psc will use parallelism={} for source operator",
effectiveParallelism);
}

// Get rate limit configuration
final Double rateLimitRecordsPerSecond = tableOptions.getOptional(SCAN_RATE_LIMIT).orElse(null);
Expand Down Expand Up @@ -208,7 +210,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions.getOptional(SOURCE_UID_PREFIX).orElse(null),
shouldRescale,
rateLimitRecordsPerSecond,
scanParallelism);
effectiveParallelism);
}

@Override
Expand Down
Loading
Loading