diff --git a/wayang-platforms/wayang-generic-jdbc/pom.xml b/wayang-platforms/wayang-generic-jdbc/pom.xml
index 141c9434f..798748d4e 100644
--- a/wayang-platforms/wayang-generic-jdbc/pom.xml
+++ b/wayang-platforms/wayang-generic-jdbc/pom.xml
@@ -74,6 +74,13 @@
1.1.2-SNAPSHOT
compile
+
+
+ org.hsqldb
+ hsqldb
+ 2.7.2
+ test
+
diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java
index 84cd9f37b..348de83cf 100644
--- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java
+++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java
@@ -140,14 +140,20 @@ public void execute(ExecutionStage stage, OptimizationContext optimizationContex
* @return the said follow-up {@link ExecutionTask} or {@code null} if none
*/
private ExecutionTask findGenericJdbcExecutionOperatorTaskInStage(ExecutionTask task, ExecutionStage stage) {
- assert task.getNumOuputChannels() == 1;
- final Channel outputChannel = task.getOutputChannel(0);
- final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
- return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator ?
- consumer :
- null;
+ assert task.getNumOuputChannels() == 1;
+ final Channel outputChannel = task.getOutputChannel(0);
+
+ if (outputChannel.getConsumers().isEmpty()) {
+ return null;
}
+ final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
+
+ return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator
+ ? consumer
+ : null;
+}
+
/**
* Instantiates the outbound {@link SqlQueryChannel} of an {@link ExecutionTask}.
*
diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/JoinMapping.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/JoinMapping.java
new file mode 100644
index 000000000..e65addefc
--- /dev/null
+++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/JoinMapping.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.genericjdbc.mapping;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.JoinOperator;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.genericjdbc.operators.GenericJdbcJoinOperator;
+import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Mapping from {@link JoinOperator} to {@link GenericJdbcJoinOperator}.
+ */
+@SuppressWarnings("unchecked")
+public class JoinMapping implements Mapping {
+
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ GenericJdbcPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+
+ final OperatorPattern> operatorPattern =
+ new OperatorPattern<>(
+ "join",
+ new JoinOperator<>(null, null,
+ DataSetType.createDefault(Record.class),
+ DataSetType.createDefault(Record.class)),
+ false
+ );
+
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators(
+ (matchedOperator, epoch) ->
+ new GenericJdbcJoinOperator<>(matchedOperator).at(epoch)
+ );
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java
index 338942a3d..1bc00aacf 100644
--- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java
+++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java
@@ -30,7 +30,8 @@ public class Mappings {
public static final Collection ALL = Arrays.asList(
new FilterMapping(),
- new ProjectionMapping()
+ new ProjectionMapping(),
+ new JoinMapping()
);
}
diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperator.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperator.java
new file mode 100644
index 000000000..8e074e10d
--- /dev/null
+++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.genericjdbc.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.JoinOperator;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.jdbc.operators.JdbcJoinOperator;
+
+/**
+ * Generic JDBC implementation of the {@link JoinOperator}.
+ */
+public class GenericJdbcJoinOperator
+ extends JdbcJoinOperator
+ implements GenericJdbcExecutionOperator {
+
+ public GenericJdbcJoinOperator(
+ TransformationDescriptor keyDescriptor0,
+ TransformationDescriptor keyDescriptor1) {
+ super(keyDescriptor0, keyDescriptor1);
+ }
+
+ public GenericJdbcJoinOperator(JoinOperator that) {
+ super(that);
+ }
+
+ @Override
+ protected GenericJdbcJoinOperator createCopy() {
+ return new GenericJdbcJoinOperator<>(this);
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java
index f9c62d9bf..f0db80e43 100644
--- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java
+++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java
@@ -33,28 +33,29 @@
import java.sql.SQLException;
import java.util.List;
-
public class GenericJdbcTableSource extends JdbcTableSource implements GenericJdbcExecutionOperator {
+ /**
+ * Name of the JDBC configuration to use.
+ */
+ public String jdbcName;
+
/**
* Creates a new instance.
*
- * @see TableSource#TableSource(String, String...)
- * @param jdbcName on which table resides
- *
- *
+ * @param tableName the table to read from
+ * @param jdbcName the JDBC configuration name
+ * @param columnNames the columns to read
*/
-
- public String jdbcName;
- public GenericJdbcTableSource(String jdbcName, String tableName, String... columnNames) {
+ public GenericJdbcTableSource(String tableName, String... columnNames) {
super(tableName, columnNames);
- this.jdbcName = jdbcName;
+ this.jdbcName = "genericjdbc";
}
/**
* Copies an instance (exclusive of broadcasts).
*
- * @param that that should be copied
+ * @param that the instance that should be copied
*/
public GenericJdbcTableSource(GenericJdbcTableSource that) {
super(that);
@@ -66,24 +67,27 @@ public List getSupportedInputChannels(int index) {
throw new UnsupportedOperationException("This operator has no input channels.");
}
+ @Override
public CardinalityEstimator getCardinalityEstimator(int outputIndex) {
assert outputIndex == 0;
return new CardinalityEstimator() {
@Override
public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
- // see Job for StopWatch measurements
+
final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
"Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
);
- // Establish a DB connection.
try (Connection connection = GenericJdbcPlatform.getInstance()
- .createDatabaseDescriptor(optimizationContext.getConfiguration(),jdbcName)
+ .createDatabaseDescriptor(optimizationContext.getConfiguration(), jdbcName)
.createJdbcConnection()) {
- // Query the table cardinality.
- final String sql = String.format("SELECT count(*) FROM %s;", GenericJdbcTableSource.this.getTableName());
+ final String sql = String.format(
+ "SELECT count(*) FROM %s;", GenericJdbcTableSource.this.getTableName()
+ );
+
final ResultSet resultSet = connection.createStatement().executeQuery(sql);
+
if (!resultSet.next()) {
throw new SQLException("No query result for \"" + sql + "\".");
}
@@ -91,17 +95,17 @@ public CardinalityEstimate estimate(OptimizationContext optimizationContext, Car
return new CardinalityEstimate(cardinality, cardinality, 1d);
} catch (Exception e) {
+
LogManager.getLogger(this.getClass()).error(
"Could not estimate cardinality for {}.", GenericJdbcTableSource.this, e
);
- // If we could not load the cardinality, let's use a very conservative estimate.
return new CardinalityEstimate(10, 10000000, 0.9);
+
} finally {
timeMeasurement.stop();
}
}
};
}
-}
-
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java
index 4575a353f..320d2dce9 100644
--- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java
+++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java
@@ -1,13 +1,12 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -49,27 +48,17 @@
import java.util.stream.StreamSupport;
/**
- * This {@link Operator} converts {@link SqlQueryChannel}s to {@link StreamChannel}s.
+ * Converts {@link SqlQueryChannel}s into {@link StreamChannel}s.
*/
-public class GenericSqlToStreamOperator extends UnaryToUnaryOperator implements JavaExecutionOperator, JsonSerializable {
+public class GenericSqlToStreamOperator extends UnaryToUnaryOperator
+ implements JavaExecutionOperator, JsonSerializable {
private final GenericJdbcPlatform jdbcPlatform;
- /**
- * Creates a new instance.
- *
- * @param jdbcPlatform from which the SQL data comes
- */
public GenericSqlToStreamOperator(GenericJdbcPlatform jdbcPlatform) {
this(jdbcPlatform, DataSetType.createDefault(Record.class));
}
- /**
- * Creates a new instance.
- *
- * @param jdbcPlatform from which the SQL data comes
- * @param dataSetType type of the {@link Record}s being transformed; see {@link RecordType}
- */
public GenericSqlToStreamOperator(GenericJdbcPlatform jdbcPlatform, DataSetType dataSetType) {
super(dataSetType, dataSetType, false);
this.jdbcPlatform = jdbcPlatform;
@@ -86,32 +75,49 @@ public Tuple, Collection> eval
ChannelInstance[] outputs,
JavaExecutor executor,
OptimizationContext.OperatorContext operatorContext) {
- // Cast the inputs and outputs.
+
final SqlQueryChannel.Instance input = (SqlQueryChannel.Instance) inputs[0];
final StreamChannel.Instance output = (StreamChannel.Instance) outputs[0];
- GenericJdbcPlatform producerPlatform = (GenericJdbcPlatform) input.getChannel().getProducer().getPlatform();
+ GenericJdbcPlatform producerPlatform =
+ (GenericJdbcPlatform) input.getChannel().getProducer().getPlatform();
+
+ // Fix: safely resolve JDBC name
+ String jdbcName = input.getJdbcName();
+ if (jdbcName == null || jdbcName.trim().isEmpty()) {
+ jdbcName = producerPlatform.getPlatformId();
+ }
+
final Connection connection = producerPlatform
- .createDatabaseDescriptor(executor.getConfiguration(),input.getJdbcName())
+ .createDatabaseDescriptor(executor.getConfiguration(), jdbcName)
.createJdbcConnection();
Iterator resultSetIterator = new ResultSetIterator(connection, input.getSqlQuery());
- Spliterator resultSetSpliterator = Spliterators.spliteratorUnknownSize(resultSetIterator, 0);
- Stream resultSetStream = StreamSupport.stream(resultSetSpliterator, false);
+ Spliterator resultSetSpliterator =
+ Spliterators.spliteratorUnknownSize(resultSetIterator, 0);
+ Stream resultSetStream =
+ StreamSupport.stream(resultSetSpliterator, false);
output.accept(resultSetStream);
ExecutionLineageNode queryLineageNode = new ExecutionLineageNode(operatorContext);
- queryLineageNode.add(LoadProfileEstimators.createFromSpecification(
- String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()),
+ queryLineageNode.add(
+ LoadProfileEstimators.createFromSpecification(
+ String.format("wayang.%s.sqltostream.load.query",
+ this.jdbcPlatform.getPlatformId()),
executor.getConfiguration()
- ));
+ )
+ );
queryLineageNode.addPredecessor(input.getLineage());
+
ExecutionLineageNode outputLineageNode = new ExecutionLineageNode(operatorContext);
- outputLineageNode.add(LoadProfileEstimators.createFromSpecification(
- String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId()),
- executor.getConfiguration()
- ));
+ outputLineageNode.add(
+ LoadProfileEstimators.createFromSpecification(
+ String.format("wayang.%s.sqltostream.load.output",
+ this.jdbcPlatform.getPlatformId()),
+ executor.getConfiguration()
+ )
+ );
output.getLineage().addPredecessor(outputLineageNode);
return queryLineageNode.collectAndMark();
@@ -119,7 +125,8 @@ public Tuple, Collection> eval
@Override
public List getSupportedInputChannels(int index) {
- return Collections.singletonList(this.jdbcPlatform.getGenericSqlQueryChannelDescriptor());
+ return Collections.singletonList(
+ this.jdbcPlatform.getGenericSqlQueryChannelDescriptor());
}
@Override
@@ -130,8 +137,10 @@ public List getSupportedOutputChannels(int index) {
@Override
public Collection getLoadProfileEstimatorConfigurationKeys() {
return Arrays.asList(
- String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()),
- String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId())
+ String.format("wayang.%s.sqltostream.load.query",
+ this.jdbcPlatform.getPlatformId()),
+ String.format("wayang.%s.sqltostream.load.output",
+ this.jdbcPlatform.getPlatformId())
);
}
@@ -140,27 +149,12 @@ public Collection getLoadProfileEstimatorConfigurationKeys() {
*/
private static class ResultSetIterator implements Iterator, AutoCloseable {
- /**
- * Keeps around the {@link ResultSet} of the SQL query.
- */
private ResultSet resultSet;
-
- /**
- * The next {@link Record} to be delivered via {@link #next()}.
- */
private Record next;
- /**
- * Creates a new instance.
- *
- * @param connection the JDBC connection on which to execute a SQL query
- * @param sqlQuery the SQL query
- */
ResultSetIterator(Connection connection, String sqlQuery) {
try {
- //connection.setAutoCommit(false);
Statement st = connection.createStatement();
- //st.setFetchSize(100000000);
this.resultSet = st.executeQuery(sqlQuery);
} catch (SQLException e) {
this.close();
@@ -169,26 +163,28 @@ private static class ResultSetIterator implements Iterator, AutoCloseabl
this.moveToNext();
}
- /**
- * Moves this instance to the next {@link Record}.
- */
private void moveToNext() {
try {
if (this.resultSet == null || !this.resultSet.next()) {
this.next = null;
this.close();
} else {
- final int recordWidth = this.resultSet.getMetaData().getColumnCount();
+ final int recordWidth =
+ this.resultSet.getMetaData().getColumnCount();
+
Object[] values = new Object[recordWidth];
+
for (int i = 0; i < recordWidth; i++) {
values[i] = this.resultSet.getObject(i + 1);
}
+
this.next = new Record(values);
}
} catch (SQLException e) {
this.next = null;
this.close();
- throw new WayangException("Exception while iterating the result set.", e);
+ throw new WayangException(
+ "Exception while iterating the result set.", e);
}
}
@@ -210,7 +206,8 @@ public void close() {
try {
this.resultSet.close();
} catch (Throwable t) {
- LogManager.getLogger(this.getClass()).error("Could not close result set.", t);
+ LogManager.getLogger(this.getClass())
+ .error("Could not close result set.", t);
} finally {
this.resultSet = null;
}
@@ -220,13 +217,15 @@ public void close() {
@Override
public WayangJsonObj toJson() {
- return new WayangJsonObj().put("platform", this.jdbcPlatform.getClass().getCanonicalName());
+ return new WayangJsonObj()
+ .put("platform", this.jdbcPlatform.getClass().getCanonicalName());
}
@SuppressWarnings("unused")
public static GenericSqlToStreamOperator fromJson(WayangJsonObj wayangJsonObj) {
final String platformClassName = wayangJsonObj.getString("platform");
- GenericJdbcPlatform jdbcPlatform = ReflectionUtils.evaluate(platformClassName + ".getInstance()");
+ GenericJdbcPlatform jdbcPlatform =
+ ReflectionUtils.evaluate(platformClassName + ".getInstance()");
return new GenericSqlToStreamOperator(jdbcPlatform);
}
-}
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java
index ff7e48c3b..99ec02ab8 100644
--- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java
+++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java
@@ -61,7 +61,7 @@ public Executor.Factory getExecutorFactory() {
@Override
public String getJdbcDriverClassName() {
- return "None";
+ return "org.hsqldb.jdbcDriver";
}
diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcFilterOperatorTest.java b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcFilterOperatorTest.java
new file mode 100644
index 000000000..c0f22de98
--- /dev/null
+++ b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcFilterOperatorTest.java
@@ -0,0 +1,122 @@
+package org.apache.wayang.genericjdbc.operators;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.Job;
+import org.apache.wayang.core.function.PredicateDescriptor;
+import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
+import org.apache.wayang.core.plan.executionplan.ExecutionStage;
+import org.apache.wayang.core.plan.executionplan.ExecutionTask;
+import org.apache.wayang.core.platform.CrossPlatformExecutor;
+import org.apache.wayang.core.profiling.NoInstrumentationStrategy;
+import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
+import org.apache.wayang.jdbc.channels.SqlQueryChannel;
+import org.apache.wayang.jdbc.execution.JdbcExecutor;
+import org.apache.wayang.jdbc.operators.SqlToStreamOperator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class GenericJdbcFilterOperatorTest {
+
+ @Test
+ void testFilterExecution() {
+
+ Configuration configuration = new Configuration();
+
+ Job job = mock(Job.class);
+ when(job.getConfiguration()).thenReturn(configuration);
+ when(job.getCrossPlatformExecutor())
+ .thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy()));
+
+ GenericJdbcPlatform platform = GenericJdbcPlatform.getInstance();
+ SqlQueryChannel.Descriptor sqlChannelDescriptor =
+ platform.getGenericSqlQueryChannelDescriptor();
+
+ ExecutionStage sqlStage = mock(ExecutionStage.class);
+
+ GenericJdbcTableSource tableSource =
+ new GenericJdbcTableSource("T1", "A", "VAL1");
+
+ ExecutionTask tableSourceTask = new ExecutionTask(tableSource);
+ tableSourceTask.setOutputChannel(
+ 0,
+ new SqlQueryChannel(sqlChannelDescriptor, tableSource.getOutput(0))
+ );
+ tableSourceTask.setStage(sqlStage);
+
+ GenericJdbcFilterOperator filterOperator =
+ new GenericJdbcFilterOperator(
+ new PredicateDescriptor<>(
+ record -> (Integer) record.getField(0) > 0,
+ Record.class
+ ).withSqlImplementation("A > 0")
+ );
+
+ ExecutionTask filterTask = new ExecutionTask(filterOperator);
+
+ tableSourceTask.getOutputChannel(0).addConsumer(filterTask, 0);
+
+ filterTask.setOutputChannel(
+ 0,
+ new SqlQueryChannel(sqlChannelDescriptor, filterOperator.getOutput(0))
+ );
+ filterTask.setStage(sqlStage);
+
+ when(sqlStage.getStartTasks())
+ .thenReturn(Collections.singleton(tableSourceTask));
+
+ when(sqlStage.getTerminalTasks())
+ .thenReturn(Collections.singleton(filterTask));
+
+ ExecutionStage nextStage = mock(ExecutionStage.class);
+
+ SqlToStreamOperator sqlToStreamOperator =
+ new SqlToStreamOperator(platform);
+
+ ExecutionTask sqlToStreamTask =
+ new ExecutionTask(sqlToStreamOperator);
+
+ filterTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0);
+ sqlToStreamTask.setStage(nextStage);
+
+ JdbcExecutor executor = new JdbcExecutor(platform, job);
+
+ executor.execute(
+ sqlStage,
+ new DefaultOptimizationContext(job),
+ job.getCrossPlatformExecutor()
+ );
+
+ SqlQueryChannel.Instance sqlQueryChannelInstance =
+ (SqlQueryChannel.Instance)
+ job.getCrossPlatformExecutor()
+ .getChannelInstance(sqlToStreamTask.getInputChannel(0));
+
+ String sql = sqlQueryChannelInstance.getSqlQuery();
+
+ assertTrue(sql.contains("WHERE"));
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java
new file mode 100644
index 000000000..36b83775b
--- /dev/null
+++ b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java
@@ -0,0 +1,132 @@
+package org.apache.wayang.genericjdbc.operators;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.Job;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
+import org.apache.wayang.core.plan.executionplan.ExecutionStage;
+import org.apache.wayang.core.plan.executionplan.ExecutionTask;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.CrossPlatformExecutor;
+import org.apache.wayang.core.profiling.NoInstrumentationStrategy;
+import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
+import org.apache.wayang.jdbc.channels.SqlQueryChannel;
+import org.apache.wayang.jdbc.execution.JdbcExecutor;
+import org.apache.wayang.jdbc.operators.SqlToStreamOperator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class GenericJdbcJoinOperatorTest {
+
+ @Test
+ void testJoinSqlGeneration() {
+
+ Configuration configuration = new Configuration();
+
+ Job job = mock(Job.class);
+ when(job.getConfiguration()).thenReturn(configuration);
+ when(job.getCrossPlatformExecutor())
+ .thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy()));
+
+ GenericJdbcPlatform platform = GenericJdbcPlatform.getInstance();
+ SqlQueryChannel.Descriptor sqlChannelDescriptor =
+ platform.getGenericSqlQueryChannelDescriptor();
+
+ ExecutionStage sqlStage = mock(ExecutionStage.class);
+
+ GenericJdbcTableSource tableSourceA = new GenericJdbcTableSource("T1", "A", "VAL1");
+ GenericJdbcTableSource tableSourceB = new GenericJdbcTableSource("T2", "A", "VAL2");
+
+ ExecutionTask tableSourceATask = new ExecutionTask(tableSourceA);
+ tableSourceATask.setOutputChannel(
+ 0,
+ new SqlQueryChannel(sqlChannelDescriptor, tableSourceA.getOutput(0))
+ );
+ tableSourceATask.setStage(sqlStage);
+
+ ExecutionTask tableSourceBTask = new ExecutionTask(tableSourceB);
+ tableSourceBTask.setOutputChannel(
+ 0,
+ new SqlQueryChannel(sqlChannelDescriptor, tableSourceB.getOutput(0))
+ );
+ tableSourceBTask.setStage(sqlStage);
+
+ ExecutionOperator joinOperator = new GenericJdbcJoinOperator(
+ new TransformationDescriptor(
+ record -> (Integer) record.getField(0),
+ Record.class,
+ Integer.class
+ ).withSqlImplementation("T1", "A"),
+ new TransformationDescriptor(
+ record -> (Integer) record.getField(0),
+ Record.class,
+ Integer.class
+ ).withSqlImplementation("T2", "A")
+ );
+
+ ExecutionTask joinTask = new ExecutionTask(joinOperator);
+
+ tableSourceATask.getOutputChannel(0).addConsumer(joinTask, 0);
+ tableSourceBTask.getOutputChannel(0).addConsumer(joinTask, 1);
+
+ joinTask.setOutputChannel(
+ 0,
+ new SqlQueryChannel(sqlChannelDescriptor, joinOperator.getOutput(0))
+ );
+ joinTask.setStage(sqlStage);
+
+ when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceATask));
+ when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(joinTask));
+
+ // Add SqlToStreamOperator stage (required by JdbcExecutor)
+ ExecutionStage nextStage = mock(ExecutionStage.class);
+
+ SqlToStreamOperator sqlToStreamOperator = new SqlToStreamOperator(platform);
+ ExecutionTask sqlToStreamTask = new ExecutionTask(sqlToStreamOperator);
+
+ joinTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0);
+ sqlToStreamTask.setStage(nextStage);
+
+ JdbcExecutor executor = new JdbcExecutor(platform, job);
+
+ executor.execute(
+ sqlStage,
+ new DefaultOptimizationContext(job),
+ job.getCrossPlatformExecutor()
+ );
+
+ SqlQueryChannel.Instance sqlQueryChannelInstance =
+ (SqlQueryChannel.Instance)
+ job.getCrossPlatformExecutor()
+ .getChannelInstance(sqlToStreamTask.getInputChannel(0));
+
+ String sql = sqlQueryChannelInstance.getSqlQuery();
+
+ assertTrue(sql.contains("JOIN"));
+ assertTrue(sql.contains("ON"));
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties b/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties
new file mode 100644
index 000000000..5492a0237
--- /dev/null
+++ b/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+wayang.T1.jdbc.url=jdbc:hsqldb:mem:wayang_test_db;DB_CLOSE_DELAY=-1
+wayang.T1.jdbc.user=SA
+wayang.T1.jdbc.password=
+wayang.T1.jdbc.driverName=org.hsqldb.jdbcDriver
+
+wayang.T2.jdbc.url=jdbc:hsqldb:mem:wayang_test_db;DB_CLOSE_DELAY=-1
+wayang.T2.jdbc.user=SA
+wayang.T2.jdbc.password=
+wayang.T2.jdbc.driverName=org.hsqldb.jdbcDriver
+
+wayang.genericjdbc.jdbc.url=jdbc:hsqldb:mem:wayang_test_db_filter;DB_CLOSE_DELAY=-1
+wayang.genericjdbc.jdbc.user=SA
+wayang.genericjdbc.jdbc.password=
+wayang.genericjdbc.jdbc.driverName=org.hsqldb.jdbcDriver
\ No newline at end of file
diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java
index f7a9d7c5a..efa88f5fd 100644
--- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java
+++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java
@@ -1,3 +1,4 @@
+package org.apache.wayang.jdbc.execution;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -15,9 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.wayang.jdbc.execution;
-
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.TableSource;
@@ -28,19 +26,13 @@
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.platform.ExecutionState;
-import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.ExecutorTemplate;
import org.apache.wayang.core.platform.Platform;
-import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
-import org.apache.wayang.jdbc.operators.JdbcExecutionOperator;
-import org.apache.wayang.jdbc.operators.JdbcFilterOperator;
-import org.apache.wayang.jdbc.operators.JdbcJoinOperator;
-import org.apache.wayang.jdbc.operators.JdbcProjectionOperator;
-import org.apache.wayang.jdbc.operators.JdbcTableSource;
+import org.apache.wayang.jdbc.operators.*;
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -57,17 +49,11 @@
import java.util.Set;
import java.util.stream.Collectors;
-/**
- * {@link Executor} implementation for the {@link JdbcPlatformTemplate}.
- */
public class JdbcExecutor extends ExecutorTemplate {
private final JdbcPlatformTemplate platform;
-
private final Connection connection;
-
private final Logger logger = LogManager.getLogger(this.getClass());
-
private final FunctionCompiler functionCompiler = new FunctionCompiler();
public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) {
@@ -77,150 +63,166 @@ public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) {
}
@Override
- public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) {
- final Tuple2 pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
+ public void execute(final ExecutionStage stage,
+ final OptimizationContext optimizationContext,
+ final ExecutionState executionState) {
+
+ final Tuple2 pair =
+ JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
+
final String query = pair.field0;
final SqlQueryChannel.Instance queryChannel = pair.field1;
queryChannel.setSqlQuery(query);
-
- // Return the tipChannelInstance.
executionState.register(queryChannel);
}
/**
- * Retrieves the follow-up {@link ExecutionTask} of the given {@code task}
- * unless it is not comprising a
- * {@link JdbcExecutionOperator} and/or not in the given {@link ExecutionStage}.
- *
- * @param task whose follow-up {@link ExecutionTask} is requested; should have
- * a single follower
- * @param stage in which the follow-up {@link ExecutionTask} should be
- * @return the said follow-up {@link ExecutionTask} or {@code null} if none
+ * Safe version (removes WayangCollections.getSingle crash)
*/
- private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) {
+ private static ExecutionTask findJdbcExecutionOperatorTaskInStage(
+ final ExecutionTask task,
+ final ExecutionStage stage) {
+
assert task.getNumOuputChannels() == 1;
+
final Channel outputChannel = task.getOutputChannel(0);
- final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
- return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer
+
+ if (outputChannel.getConsumers().size() != 1) {
+ return null;
+ }
+
+ final ExecutionTask consumer = outputChannel.getConsumers().iterator().next();
+
+ return consumer.getStage() == stage &&
+ consumer.getOperator() instanceof JdbcExecutionOperator
+ ? consumer
: null;
}
- /**
- * Instantiates the outbound {@link SqlQueryChannel} of an
- * {@link ExecutionTask}.
- *
- * @param task whose outbound {@link SqlQueryChannel} should be
- * instantiated
- * @param optimizationContext provides information about the
- * {@link ExecutionTask}
- * @return the {@link SqlQueryChannel.Instance}
- */
- private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task,
- final OptimizationContext optimizationContext, final JdbcExecutor jdbcExecutor) {
- assert task.getNumOuputChannels() == 1 : String.format("Illegal task: %s.", task);
- assert task.getOutputChannel(0) instanceof SqlQueryChannel : String.format("Illegal task: %s.", task);
+ private static SqlQueryChannel.Instance instantiateOutboundChannel(
+ final ExecutionTask task,
+ final OptimizationContext optimizationContext,
+ final JdbcExecutor jdbcExecutor) {
+
+ assert task.getNumOuputChannels() == 1;
+ assert task.getOutputChannel(0) instanceof SqlQueryChannel;
final SqlQueryChannel outputChannel = (SqlQueryChannel) task.getOutputChannel(0);
- final OptimizationContext.OperatorContext operatorContext = optimizationContext
- .getOperatorContext(task.getOperator());
+
+ final OptimizationContext.OperatorContext operatorContext =
+ optimizationContext.getOperatorContext(task.getOperator());
+
return outputChannel.createInstance(jdbcExecutor, operatorContext, 0);
}
- /**
- * Instantiates the outbound {@link SqlQueryChannel} of an
- * {@link ExecutionTask}.
- *
- * @param task whose outbound {@link SqlQueryChannel}
- * should be instantiated
- * @param optimizationContext provides information about the
- * {@link ExecutionTask}
- * @param predecessorChannelInstance preceeding {@link SqlQueryChannel.Instance}
- * to keep track of lineage
- * @return the {@link SqlQueryChannel.Instance}
- */
- private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task,
+ private static SqlQueryChannel.Instance instantiateOutboundChannel(
+ final ExecutionTask task,
final OptimizationContext optimizationContext,
- final SqlQueryChannel.Instance predecessorChannelInstance, final JdbcExecutor jdbcExecutor) {
- final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor);
+ final SqlQueryChannel.Instance predecessorChannelInstance,
+ final JdbcExecutor jdbcExecutor) {
+
+ final SqlQueryChannel.Instance newInstance =
+ instantiateOutboundChannel(task, optimizationContext, jdbcExecutor);
+
newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage());
+
return newInstance;
}
/**
- * Creates a query channel and the sql statement
- *
- * @param stage
- * @param context
- * @return a tuple containing the sql statement
+ * Main SQL builder
*/
- protected static Tuple2 createSqlQuery(final ExecutionStage stage,
- final OptimizationContext context, final JdbcExecutor jdbcExecutor) {
+ protected static Tuple2 createSqlQuery(
+ final ExecutionStage stage,
+ final OptimizationContext context,
+ final JdbcExecutor jdbcExecutor) {
+
final Collection> startTasks = stage.getStartTasks();
final Collection> termTasks = stage.getTerminalTasks();
- // Verify that we can handle this instance.
- assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported";
- final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0];
- assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";
- final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
- assert startTask.getOperator() instanceof TableSource
- : "Invalid JDBC stage: Start task has to be a TableSource";
+ if (startTasks.isEmpty()) {
+ throw new WayangException("Invalid jdbc stage: no sources found");
+ }
+
+ final ExecutionTask startTask = (ExecutionTask) startTasks.iterator().next();
+
+ if (termTasks.size() != 1) {
+ throw new WayangException("Invalid JDBC stage: multiple terminal tasks not supported.");
+ }
- // Extract the different types of ExecutionOperators from the stage.
final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator();
- SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);
+
+ SqlQueryChannel.Instance tipChannelInstance =
+ instantiateOutboundChannel(startTask, context, jdbcExecutor);
+
final Collection filterTasks = new ArrayList<>(4);
JdbcProjectionOperator projectionTask = null;
final Collection> joinTasks = new ArrayList<>();
+
final Set allTasks = stage.getAllTasks();
- assert allTasks.size() <= 3;
- ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);
+
+ ExecutionTask nextTask =
+ findJdbcExecutionOperatorTaskInStage(startTask, stage);
+
while (nextTask != null) {
- // Evaluate the nextTask.
- if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) {
+
+ if (nextTask.getOperator() instanceof JdbcFilterOperator filterOperator) {
filterTasks.add(filterOperator);
} else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) {
- assert projectionTask == null; // Allow one projection operator per stage for now.
projectionTask = projectionOperator;
} else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) {
joinTasks.add(joinOperator);
} else {
- throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString()));
+ throw new WayangException("Unsupported JDBC execution task " + nextTask);
}
- // Move the tipChannelInstance.
- tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor);
+ tipChannelInstance =
+ instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor);
- // Go to the next nextTask.
- nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage);
+ nextTask =
+ findJdbcExecutionOperatorTaskInStage(nextTask, stage);
}
- // Create the SQL query.
- final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);
+ final StringBuilder query =
+ createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);
+
return new Tuple2<>(query.toString(), tipChannelInstance);
}
- public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp,
- final Collection filterTasks, JdbcProjectionOperator projectionTask,
+ public static StringBuilder createSqlString(
+ final JdbcExecutor jdbcExecutor,
+ final JdbcTableSource tableOp,
+ final Collection filterTasks,
+ JdbcProjectionOperator projectionTask,
final Collection> joinTasks) {
- final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
- final Collection conditions = filterTasks.stream()
- .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
- .collect(Collectors.toList());
- final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
- final Collection joins = joinTasks.stream()
- .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
- .collect(Collectors.toList());
+
+ final String tableName =
+ tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
+
+ final Collection conditions =
+ filterTasks.stream()
+ .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
+ .collect(Collectors.toList());
+
+ final String projection =
+ projectionTask == null
+ ? "*"
+ : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
+
+ final Collection joins =
+ joinTasks.stream()
+ .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
+ .collect(Collectors.toList());
final StringBuilder sb = new StringBuilder(1000);
+
sb.append("SELECT ").append(projection).append(" FROM ").append(tableName);
- if (!joins.isEmpty()) {
- final String separator = " ";
- for (final String join : joins) {
- sb.append(separator).append(join);
- }
+
+ for (final String join : joins) {
+ sb.append(" ").append(join);
}
+
if (!conditions.isEmpty()) {
sb.append(" WHERE ");
String separator = "";
@@ -229,7 +231,9 @@ public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, fin
separator = " AND ";
}
}
+
sb.append(';');
+
return sb;
}
@@ -238,7 +242,7 @@ public void dispose() {
try {
this.connection.close();
} catch (final SQLException e) {
- this.logger.error("Could not close JDBC connection to PostgreSQL correctly.", e);
+ this.logger.error("Could not close JDBC connection correctly.", e);
}
}
@@ -247,27 +251,30 @@ public Platform getPlatform() {
return this.platform;
}
- private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs)
+ private void saveResult(final FileChannel.Instance outputFileChannelInstance,
+ final ResultSet rs)
throws IOException, SQLException {
- // Output results.
- final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();
- try (final OutputStreamWriter writer = new OutputStreamWriter(
- outFs.create(outputFileChannelInstance.getSinglePath()))) {
+
+ final FileSystem outFs =
+ FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();
+
+ try (final OutputStreamWriter writer =
+ new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) {
+
while (rs.next()) {
- // System.out.println(rs.getInt(1) + " " + rs.getString(2));
+
final ResultSetMetaData rsmd = rs.getMetaData();
+
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
writer.write(rs.getString(i));
- if (i < rsmd.getColumnCount()) {
- writer.write('\t');
- }
- }
- if (!rs.isLast()) {
- writer.write('\n');
+ if (i < rsmd.getColumnCount()) writer.write('\t');
}
+
+ if (!rs.isLast()) writer.write('\n');
}
+
} catch (final UncheckedIOException e) {
throw e.getCause();
}
}
-}
+}
\ No newline at end of file