Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.ExecutorTemplate;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
Expand Down Expand Up @@ -63,173 +62,222 @@
public class JdbcExecutor extends ExecutorTemplate {

private final JdbcPlatformTemplate platform;

private final Connection connection;

private final Logger logger = LogManager.getLogger(this.getClass());

private final FunctionCompiler functionCompiler = new FunctionCompiler();

public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) {
super(job.getCrossPlatformExecutor());
this.platform = platform;
this.connection = this.platform.createDatabaseDescriptor(job.getConfiguration()).createJdbcConnection();
this.connection = this.platform
.createDatabaseDescriptor(job.getConfiguration())
.createJdbcConnection();
}

@Override
public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) {
final Tuple2<String, SqlQueryChannel.Instance> pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
public void execute(
final ExecutionStage stage,
final OptimizationContext optimizationContext,
final ExecutionState executionState) {

final Tuple2<String, SqlQueryChannel.Instance> pair =
JdbcExecutor.createSqlQuery(stage, optimizationContext, this);

final String query = pair.field0;
final SqlQueryChannel.Instance queryChannel = pair.field1;

queryChannel.setSqlQuery(query);

// Return the tipChannelInstance.
executionState.register(queryChannel);
}

/**
* Retrieves the follow-up {@link ExecutionTask} of the given {@code task}
* unless it is not comprising a
* {@link JdbcExecutionOperator} and/or not in the given {@link ExecutionStage}.
*
* @param task whose follow-up {@link ExecutionTask} is requested; should have
* a single follower
* @param stage in which the follow-up {@link ExecutionTask} should be
* @return the said follow-up {@link ExecutionTask} or {@code null} if none
* Retrieves the follow-up {@link ExecutionTask} of the given {@code task}.
* Now implemented safely to avoid crashes when consumers are missing or multiple.
*/
private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) {
assert task.getNumOuputChannels() == 1;
private static ExecutionTask findJdbcExecutionOperatorTaskInStage(
final ExecutionTask task,
final ExecutionStage stage) {

if (task.getNumOuputChannels() != 1) {
return null;
}

final Channel outputChannel = task.getOutputChannel(0);
final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer
: null;
final Collection<ExecutionTask> consumers = outputChannel.getConsumers();

if (consumers == null || consumers.size() != 1) {
return null;
}

final ExecutionTask consumer = consumers.iterator().next();

if (consumer.getStage() == stage
&& consumer.getOperator() instanceof JdbcExecutionOperator) {
return consumer;
}

return null;
}

/**
* Instantiates the outbound {@link SqlQueryChannel} of an
* {@link ExecutionTask}.
*
* @param task whose outbound {@link SqlQueryChannel} should be
* instantiated
* @param optimizationContext provides information about the
* {@link ExecutionTask}
* @return the {@link SqlQueryChannel.Instance}
* Instantiates the outbound {@link SqlQueryChannel} of an {@link ExecutionTask}.
*/
private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task,
final OptimizationContext optimizationContext, final JdbcExecutor jdbcExecutor) {
private static SqlQueryChannel.Instance instantiateOutboundChannel(
final ExecutionTask task,
final OptimizationContext optimizationContext,
final JdbcExecutor jdbcExecutor) {

assert task.getNumOuputChannels() == 1 : String.format("Illegal task: %s.", task);
assert task.getOutputChannel(0) instanceof SqlQueryChannel : String.format("Illegal task: %s.", task);
assert task.getOutputChannel(0) instanceof SqlQueryChannel
: String.format("Illegal task: %s.", task);

final SqlQueryChannel outputChannel = (SqlQueryChannel) task.getOutputChannel(0);
final OptimizationContext.OperatorContext operatorContext = optimizationContext
.getOperatorContext(task.getOperator());

final OptimizationContext.OperatorContext operatorContext =
optimizationContext.getOperatorContext(task.getOperator());

return outputChannel.createInstance(jdbcExecutor, operatorContext, 0);
}

/**
* Instantiates the outbound {@link SqlQueryChannel} of an
* {@link ExecutionTask}.
*
* @param task whose outbound {@link SqlQueryChannel}
* should be instantiated
* @param optimizationContext provides information about the
* {@link ExecutionTask}
* @param predecessorChannelInstance preceeding {@link SqlQueryChannel.Instance}
* to keep track of lineage
* @return the {@link SqlQueryChannel.Instance}
*/
private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task,
private static SqlQueryChannel.Instance instantiateOutboundChannel(
final ExecutionTask task,
final OptimizationContext optimizationContext,
final SqlQueryChannel.Instance predecessorChannelInstance, final JdbcExecutor jdbcExecutor) {
final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor);
final SqlQueryChannel.Instance predecessorChannelInstance,
final JdbcExecutor jdbcExecutor) {

final SqlQueryChannel.Instance newInstance =
JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor);

newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage());

return newInstance;
}

/**
* Creates a query channel and the sql statement
*
* @param stage
* @param context
* @return a tuple containing the sql statement
* Creates the SQL query for the stage.
*/
protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(final ExecutionStage stage,
final OptimizationContext context, final JdbcExecutor jdbcExecutor) {
protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(
final ExecutionStage stage,
final OptimizationContext context,
final JdbcExecutor jdbcExecutor) {

final Collection<?> startTasks = stage.getStartTasks();
final Collection<?> termTasks = stage.getTerminalTasks();

// Verify that we can handle this instance.
assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported";
assert startTasks.size() == 1 :
"Invalid jdbc stage: multiple sources are not currently supported";

final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0];
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";

assert termTasks.size() == 1 :
"Invalid JDBC stage: multiple terminal tasks are not currently supported.";

final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
assert startTask.getOperator() instanceof TableSource
: "Invalid JDBC stage: Start task has to be a TableSource";

// Extract the different types of ExecutionOperators from the stage.
assert startTask.getOperator() instanceof TableSource :
"Invalid JDBC stage: Start task has to be a TableSource";

final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator();
SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);

SqlQueryChannel.Instance tipChannelInstance =
JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);

final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
JdbcProjectionOperator projectionTask = null;
final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();

final Set<ExecutionTask> allTasks = stage.getAllTasks();
assert allTasks.size() <= 3;
ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);

ExecutionTask nextTask =
JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);

while (nextTask != null) {
// Evaluate the nextTask.

if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) {
filterTasks.add(filterOperator);

} else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) {
assert projectionTask == null; // Allow one projection operator per stage for now.
assert projectionTask == null;
projectionTask = projectionOperator;

} else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) {
joinTasks.add(joinOperator);

} else {
throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString()));
throw new WayangException(
String.format("Unsupported JDBC execution task %s", nextTask));
}

// Move the tipChannelInstance.
tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor);
tipChannelInstance =
JdbcExecutor.instantiateOutboundChannel(
nextTask,
context,
tipChannelInstance,
jdbcExecutor
);

// Go to the next nextTask.
nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage);
nextTask =
JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage);
}

// Create the SQL query.
final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);
final StringBuilder query =
createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);

return new Tuple2<>(query.toString(), tipChannelInstance);
}

public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp,
final Collection<JdbcFilterOperator> filterTasks, JdbcProjectionOperator projectionTask,
public static StringBuilder createSqlString(
final JdbcExecutor jdbcExecutor,
final JdbcTableSource tableOp,
final Collection<JdbcFilterOperator> filterTasks,
JdbcProjectionOperator projectionTask,
final Collection<JdbcJoinOperator<?>> joinTasks) {
final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

final String tableName =
tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

final Collection<String> conditions = filterTasks.stream()
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
.collect(Collectors.toList());
final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

final String projection =
projectionTask == null
? "*"
: projectionTask.createSqlClause(
jdbcExecutor.connection,
jdbcExecutor.functionCompiler
);

final Collection<String> joins = joinTasks.stream()
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
.collect(Collectors.toList());

final StringBuilder sb = new StringBuilder(1000);

sb.append("SELECT ").append(projection).append(" FROM ").append(tableName);

if (!joins.isEmpty()) {
final String separator = " ";
for (final String join : joins) {
sb.append(separator).append(join);
sb.append(" ").append(join);
}
}

if (!conditions.isEmpty()) {

sb.append(" WHERE ");

String separator = "";

for (final String condition : conditions) {
sb.append(separator).append(condition);
separator = " AND ";
}
}

sb.append(';');

return sb;
}

Expand All @@ -238,7 +286,7 @@ public void dispose() {
try {
this.connection.close();
} catch (final SQLException e) {
this.logger.error("Could not close JDBC connection to PostgreSQL correctly.", e);
this.logger.error("Could not close JDBC connection correctly.", e);
}
}

Expand All @@ -247,27 +295,37 @@ public Platform getPlatform() {
return this.platform;
}

private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs)
private void saveResult(
final FileChannel.Instance outputFileChannelInstance,
final ResultSet rs)
throws IOException, SQLException {
// Output results.
final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();
try (final OutputStreamWriter writer = new OutputStreamWriter(
outFs.create(outputFileChannelInstance.getSinglePath()))) {

final FileSystem outFs =
FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();

try (final OutputStreamWriter writer =
new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) {

while (rs.next()) {
// System.out.println(rs.getInt(1) + " " + rs.getString(2));

final ResultSetMetaData rsmd = rs.getMetaData();

for (int i = 1; i <= rsmd.getColumnCount(); i++) {

writer.write(rs.getString(i));

if (i < rsmd.getColumnCount()) {
writer.write('\t');
}
}

if (!rs.isLast()) {
writer.write('\n');
}
}

} catch (final UncheckedIOException e) {
throw e.getCause();
}
}
}
}