From a390f0e5271ed178dac0fbf3e964a4da1e8b7aad Mon Sep 17 00:00:00 2001 From: Mohit Bisht Date: Thu, 5 Mar 2026 13:39:47 +0530 Subject: [PATCH] Improve robustness of JdbcExecutor by handling multiple or missing consumers safely --- .../wayang/jdbc/execution/JdbcExecutor.java | 232 +++++++++++------- 1 file changed, 145 insertions(+), 87 deletions(-) diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index f7a9d7c5a..943dc9d5d 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -31,7 +31,6 @@ import org.apache.wayang.core.platform.Executor; import org.apache.wayang.core.platform.ExecutorTemplate; import org.apache.wayang.core.platform.Platform; -import org.apache.wayang.core.util.WayangCollections; import org.apache.wayang.core.util.fs.FileSystem; import org.apache.wayang.core.util.fs.FileSystems; import org.apache.wayang.jdbc.channels.SqlQueryChannel; @@ -63,173 +62,222 @@ public class JdbcExecutor extends ExecutorTemplate { private final JdbcPlatformTemplate platform; - private final Connection connection; - private final Logger logger = LogManager.getLogger(this.getClass()); - private final FunctionCompiler functionCompiler = new FunctionCompiler(); public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) { super(job.getCrossPlatformExecutor()); this.platform = platform; - this.connection = this.platform.createDatabaseDescriptor(job.getConfiguration()).createJdbcConnection(); + this.connection = this.platform + .createDatabaseDescriptor(job.getConfiguration()) + .createJdbcConnection(); } @Override - public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) { - final Tuple2 pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this); + public void execute( + final ExecutionStage stage, + final OptimizationContext optimizationContext, + final ExecutionState executionState) { + + final Tuple2 pair = + JdbcExecutor.createSqlQuery(stage, optimizationContext, this); + final String query = pair.field0; final SqlQueryChannel.Instance queryChannel = pair.field1; queryChannel.setSqlQuery(query); - // Return the tipChannelInstance. executionState.register(queryChannel); } /** - * Retrieves the follow-up {@link ExecutionTask} of the given {@code task} - * unless it is not comprising a - * {@link JdbcExecutionOperator} and/or not in the given {@link ExecutionStage}. - * - * @param task whose follow-up {@link ExecutionTask} is requested; should have - * a single follower - * @param stage in which the follow-up {@link ExecutionTask} should be - * @return the said follow-up {@link ExecutionTask} or {@code null} if none + * Retrieves the follow-up {@link ExecutionTask} of the given {@code task}. + * Now implemented safely to avoid crashes when consumers are missing or multiple. */ - private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) { - assert task.getNumOuputChannels() == 1; + private static ExecutionTask findJdbcExecutionOperatorTaskInStage( + final ExecutionTask task, + final ExecutionStage stage) { + + if (task.getNumOuputChannels() != 1) { + return null; + } + final Channel outputChannel = task.getOutputChannel(0); - final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers()); - return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer - : null; + final Collection consumers = outputChannel.getConsumers(); + + if (consumers == null || consumers.size() != 1) { + return null; + } + + final ExecutionTask consumer = consumers.iterator().next(); + + if (consumer.getStage() == stage + && consumer.getOperator() instanceof JdbcExecutionOperator) { + return consumer; + } + + return null; } /** - * Instantiates the outbound {@link SqlQueryChannel} of an - * {@link ExecutionTask}. - * - * @param task whose outbound {@link SqlQueryChannel} should be - * instantiated - * @param optimizationContext provides information about the - * {@link ExecutionTask} - * @return the {@link SqlQueryChannel.Instance} + * Instantiates the outbound {@link SqlQueryChannel} of an {@link ExecutionTask}. */ - private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, - final OptimizationContext optimizationContext, final JdbcExecutor jdbcExecutor) { + private static SqlQueryChannel.Instance instantiateOutboundChannel( + final ExecutionTask task, + final OptimizationContext optimizationContext, + final JdbcExecutor jdbcExecutor) { + assert task.getNumOuputChannels() == 1 : String.format("Illegal task: %s.", task); - assert task.getOutputChannel(0) instanceof SqlQueryChannel : String.format("Illegal task: %s.", task); + assert task.getOutputChannel(0) instanceof SqlQueryChannel + : String.format("Illegal task: %s.", task); final SqlQueryChannel outputChannel = (SqlQueryChannel) task.getOutputChannel(0); - final OptimizationContext.OperatorContext operatorContext = optimizationContext - .getOperatorContext(task.getOperator()); + + final OptimizationContext.OperatorContext operatorContext = + optimizationContext.getOperatorContext(task.getOperator()); + return outputChannel.createInstance(jdbcExecutor, operatorContext, 0); } - /** - * Instantiates the outbound {@link SqlQueryChannel} of an - * {@link ExecutionTask}. - * - * @param task whose outbound {@link SqlQueryChannel} - * should be instantiated - * @param optimizationContext provides information about the - * {@link ExecutionTask} - * @param predecessorChannelInstance preceeding {@link SqlQueryChannel.Instance} - * to keep track of lineage - * @return the {@link SqlQueryChannel.Instance} - */ - private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, + private static SqlQueryChannel.Instance instantiateOutboundChannel( + final ExecutionTask task, final OptimizationContext optimizationContext, - final SqlQueryChannel.Instance predecessorChannelInstance, final JdbcExecutor jdbcExecutor) { - final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor); + final SqlQueryChannel.Instance predecessorChannelInstance, + final JdbcExecutor jdbcExecutor) { + + final SqlQueryChannel.Instance newInstance = + JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor); + newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage()); + return newInstance; } /** - * Creates a query channel and the sql statement - * - * @param stage - * @param context - * @return a tuple containing the sql statement + * Creates the SQL query for the stage. */ - protected static Tuple2 createSqlQuery(final ExecutionStage stage, - final OptimizationContext context, final JdbcExecutor jdbcExecutor) { + protected static Tuple2 createSqlQuery( + final ExecutionStage stage, + final OptimizationContext context, + final JdbcExecutor jdbcExecutor) { + final Collection startTasks = stage.getStartTasks(); final Collection termTasks = stage.getTerminalTasks(); - // Verify that we can handle this instance. - assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported"; + assert startTasks.size() == 1 : + "Invalid jdbc stage: multiple sources are not currently supported"; + final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; - assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; + + assert termTasks.size() == 1 : + "Invalid JDBC stage: multiple terminal tasks are not currently supported."; + final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; - assert startTask.getOperator() instanceof TableSource - : "Invalid JDBC stage: Start task has to be a TableSource"; - // Extract the different types of ExecutionOperators from the stage. + assert startTask.getOperator() instanceof TableSource : + "Invalid JDBC stage: Start task has to be a TableSource"; + final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator(); - SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor); + + SqlQueryChannel.Instance tipChannelInstance = + JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor); + final Collection filterTasks = new ArrayList<>(4); JdbcProjectionOperator projectionTask = null; final Collection> joinTasks = new ArrayList<>(); + final Set allTasks = stage.getAllTasks(); assert allTasks.size() <= 3; - ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); + + ExecutionTask nextTask = + JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); + while (nextTask != null) { - // Evaluate the nextTask. + if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) { filterTasks.add(filterOperator); + } else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) { - assert projectionTask == null; // Allow one projection operator per stage for now. + assert projectionTask == null; projectionTask = projectionOperator; + } else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) { joinTasks.add(joinOperator); + } else { - throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); + throw new WayangException( + String.format("Unsupported JDBC execution task %s", nextTask)); } - // Move the tipChannelInstance. - tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor); + tipChannelInstance = + JdbcExecutor.instantiateOutboundChannel( + nextTask, + context, + tipChannelInstance, + jdbcExecutor + ); - // Go to the next nextTask. - nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage); + nextTask = + JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage); } - // Create the SQL query. - final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); + final StringBuilder query = + createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); + return new Tuple2<>(query.toString(), tipChannelInstance); } - public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp, - final Collection filterTasks, JdbcProjectionOperator projectionTask, + public static StringBuilder createSqlString( + final JdbcExecutor jdbcExecutor, + final JdbcTableSource tableOp, + final Collection filterTasks, + JdbcProjectionOperator projectionTask, final Collection> joinTasks) { - final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + + final String tableName = + tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + final Collection conditions = filterTasks.stream() .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) .collect(Collectors.toList()); - final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + + final String projection = + projectionTask == null + ? "*" + : projectionTask.createSqlClause( + jdbcExecutor.connection, + jdbcExecutor.functionCompiler + ); + final Collection joins = joinTasks.stream() .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) .collect(Collectors.toList()); final StringBuilder sb = new StringBuilder(1000); + sb.append("SELECT ").append(projection).append(" FROM ").append(tableName); + if (!joins.isEmpty()) { - final String separator = " "; for (final String join : joins) { - sb.append(separator).append(join); + sb.append(" ").append(join); } } + if (!conditions.isEmpty()) { + sb.append(" WHERE "); + String separator = ""; + for (final String condition : conditions) { sb.append(separator).append(condition); separator = " AND "; } } + sb.append(';'); + return sb; } @@ -238,7 +286,7 @@ public void dispose() { try { this.connection.close(); } catch (final SQLException e) { - this.logger.error("Could not close JDBC connection to PostgreSQL correctly.", e); + this.logger.error("Could not close JDBC connection correctly.", e); } } @@ -247,27 +295,37 @@ public Platform getPlatform() { return this.platform; } - private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs) + private void saveResult( + final FileChannel.Instance outputFileChannelInstance, + final ResultSet rs) throws IOException, SQLException { - // Output results. - final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); - try (final OutputStreamWriter writer = new OutputStreamWriter( - outFs.create(outputFileChannelInstance.getSinglePath()))) { + + final FileSystem outFs = + FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); + + try (final OutputStreamWriter writer = + new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) { + while (rs.next()) { - // System.out.println(rs.getInt(1) + " " + rs.getString(2)); + final ResultSetMetaData rsmd = rs.getMetaData(); + for (int i = 1; i <= rsmd.getColumnCount(); i++) { + writer.write(rs.getString(i)); + if (i < rsmd.getColumnCount()) { writer.write('\t'); } } + if (!rs.isLast()) { writer.write('\n'); } } + } catch (final UncheckedIOException e) { throw e.getCause(); } } -} +} \ No newline at end of file