From dfa48fe484e5ed4b15e8ddcba6ee98395daddedd Mon Sep 17 00:00:00 2001 From: Mohit Bisht Date: Mon, 2 Mar 2026 18:19:59 +0530 Subject: [PATCH 1/4] Wayang-707 Add Generic JDBC Join operator --- .../genericjdbc/mapping/JoinMapping.java | 70 +++++++++++++++++++ .../wayang/genericjdbc/mapping/Mappings.java | 3 +- .../operators/GenericJdbcJoinOperator.java | 47 +++++++++++++ 3 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/JoinMapping.java create mode 100644 wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperator.java diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/JoinMapping.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/JoinMapping.java new file mode 100644 index 000000000..e65addefc --- /dev/null +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/JoinMapping.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.genericjdbc.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.genericjdbc.operators.GenericJdbcJoinOperator; +import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link JoinOperator} to {@link GenericJdbcJoinOperator}. + */ +@SuppressWarnings("unchecked") +public class JoinMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + GenericJdbcPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + + final OperatorPattern> operatorPattern = + new OperatorPattern<>( + "join", + new JoinOperator<>(null, null, + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class)), + false + ); + + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> + new GenericJdbcJoinOperator<>(matchedOperator).at(epoch) + ); + } +} \ No newline at end of file diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java index 338942a3d..1bc00aacf 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java @@ -30,7 +30,8 @@ public class Mappings { public static final Collection ALL = Arrays.asList( new FilterMapping(), - new ProjectionMapping() + new ProjectionMapping(), + new JoinMapping() ); } diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperator.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperator.java new file mode 100644 index 000000000..8e074e10d --- /dev/null +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.genericjdbc.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcJoinOperator; + +/** + * Generic JDBC implementation of the {@link JoinOperator}. + */ +public class GenericJdbcJoinOperator + extends JdbcJoinOperator + implements GenericJdbcExecutionOperator { + + public GenericJdbcJoinOperator( + TransformationDescriptor keyDescriptor0, + TransformationDescriptor keyDescriptor1) { + super(keyDescriptor0, keyDescriptor1); + } + + public GenericJdbcJoinOperator(JoinOperator that) { + super(that); + } + + @Override + protected GenericJdbcJoinOperator createCopy() { + return new GenericJdbcJoinOperator<>(this); + } +} \ No newline at end of file From cf057f9f321ac2d60ceb79e8d36592bb1e2508ac Mon Sep 17 00:00:00 2001 From: Mohit Bisht Date: Mon, 2 Mar 2026 20:28:31 +0530 Subject: [PATCH 2/4] Add unit test for GenericJdbcJoinOperator --- .../GenericJdbcJoinOperatorTest.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java new file mode 100644 index 000000000..daa563015 --- /dev/null +++ b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java @@ -0,0 +1,50 @@ +package org.apache.wayang.genericjdbc.operators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class GenericJdbcJoinOperatorTest { + + @Test + public void testOperatorCreation() { + + TransformationDescriptor keyExtractor1 = + new TransformationDescriptor<>( + (Record record) -> (Integer) record.getField(0), + Record.class, + Integer.class + ); + + TransformationDescriptor keyExtractor2 = + new TransformationDescriptor<>( + (Record record) -> (Integer) record.getField(0), + Record.class, + Integer.class + ); + + GenericJdbcJoinOperator operator = + new GenericJdbcJoinOperator<>(keyExtractor1, keyExtractor2); + + assertNotNull(operator); + } +} \ No newline at end of file From 7e7870dd07b4c19b7c92e7649440f290455c5ae0 Mon Sep 17 00:00:00 2001 From: Mohit Bisht Date: Wed, 4 Mar 2026 14:23:06 +0530 Subject: [PATCH 3/4] Fix GenericJdbcTableSource jdbcName handling causing incorrect configuration lookup --- wayang-platforms/wayang-generic-jdbc/pom.xml | 7 ++ .../operators/GenericJdbcTableSource.java | 53 ++++++----- .../GenericJdbcJoinOperatorTest.java | 88 ++++++++++++++++--- .../src/test/resources/wayang.properties | 25 ++++++ 4 files changed, 139 insertions(+), 34 deletions(-) create mode 100644 wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties diff --git a/wayang-platforms/wayang-generic-jdbc/pom.xml b/wayang-platforms/wayang-generic-jdbc/pom.xml index 141c9434f..798748d4e 100644 --- a/wayang-platforms/wayang-generic-jdbc/pom.xml +++ b/wayang-platforms/wayang-generic-jdbc/pom.xml @@ -74,6 +74,13 @@ 1.1.2-SNAPSHOT compile + + + org.hsqldb + hsqldb + 2.7.2 + test + diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java index f9c62d9bf..dffb4f228 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java @@ -33,28 +33,29 @@ import java.sql.SQLException; import java.util.List; - public class GenericJdbcTableSource extends JdbcTableSource implements GenericJdbcExecutionOperator { + /** + * Name of the JDBC configuration to use. + */ + public String jdbcName; + /** * Creates a new instance. * - * @see TableSource#TableSource(String, String...) - * @param jdbcName on which table resides - * - * + * @param tableName the table to read from + * @param jdbcName the JDBC configuration name + * @param columnNames the columns to read */ - - public String jdbcName; - public GenericJdbcTableSource(String jdbcName, String tableName, String... columnNames) { + public GenericJdbcTableSource(String tableName, String... columnNames) { super(tableName, columnNames); - this.jdbcName = jdbcName; + this.jdbcName = "genericjdbc"; } /** * Copies an instance (exclusive of broadcasts). * - * @param that that should be copied + * @param that the instance that should be copied */ public GenericJdbcTableSource(GenericJdbcTableSource that) { super(that); @@ -66,42 +67,54 @@ public List getSupportedInputChannels(int index) { throw new UnsupportedOperationException("This operator has no input channels."); } + @Override public CardinalityEstimator getCardinalityEstimator(int outputIndex) { assert outputIndex == 0; + return new CardinalityEstimator() { @Override public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) { - // see Job for StopWatch measurements + final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start( - "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities" + "Optimization", + "Cardinality&Load Estimation", + "Push Estimation", + "Estimate source cardinalities" ); - // Establish a DB connection. try (Connection connection = GenericJdbcPlatform.getInstance() - .createDatabaseDescriptor(optimizationContext.getConfiguration(),jdbcName) + .createDatabaseDescriptor(optimizationContext.getConfiguration(), jdbcName) .createJdbcConnection()) { - // Query the table cardinality. - final String sql = String.format("SELECT count(*) FROM %s;", GenericJdbcTableSource.this.getTableName()); + final String sql = String.format( + "SELECT count(*) FROM %s;", + GenericJdbcTableSource.this.getTableName() + ); + final ResultSet resultSet = connection.createStatement().executeQuery(sql); + if (!resultSet.next()) { throw new SQLException("No query result for \"" + sql + "\"."); } + long cardinality = resultSet.getLong(1); + return new CardinalityEstimate(cardinality, cardinality, 1d); } catch (Exception e) { + LogManager.getLogger(this.getClass()).error( - "Could not estimate cardinality for {}.", GenericJdbcTableSource.this, e + "Could not estimate cardinality for {}.", + GenericJdbcTableSource.this, + e ); - // If we could not load the cardinality, let's use a very conservative estimate. return new CardinalityEstimate(10, 10000000, 0.9); + } finally { timeMeasurement.stop(); } } }; } -} - +} \ No newline at end of file diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java index daa563015..02a3ea72a 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java +++ b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java @@ -1,5 +1,4 @@ package org.apache.wayang.genericjdbc.operators; - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -16,35 +15,96 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.basic.operators.LocalCallbackSink; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.function.TransformationDescriptor; -import org.junit.Test; +import org.apache.wayang.core.plan.wayangplan.WayangPlan; +import org.apache.wayang.genericjdbc.GenericJdbc; +import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform; +import org.apache.wayang.java.platform.JavaPlatform; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; -import static org.junit.Assert.*; +import static org.junit.jupiter.api.Assertions.assertEquals; -public class GenericJdbcJoinOperatorTest { +class GenericJdbcJoinOperatorTest { @Test - public void testOperatorCreation() { + void testJoinExecution() throws Exception { - TransformationDescriptor keyExtractor1 = + String url = "jdbc:hsqldb:mem:wayang_test_db;DB_CLOSE_DELAY=-1"; + + Class.forName("org.hsqldb.jdbcDriver"); + + try (Connection conn = DriverManager.getConnection(url, "SA", "")) { + Statement stmt = conn.createStatement(); + + stmt.execute("CREATE TABLE T1 (A INT, VAL1 VARCHAR(20));"); + stmt.execute("INSERT INTO T1 VALUES (1, 'Apache');"); + + stmt.execute("CREATE TABLE T2 (A INT, VAL2 INT);"); + stmt.execute("INSERT INTO T2 VALUES (1, 2026);"); + } + + Configuration config = new Configuration(); + config.setProperty("wayang.genericjdbc.jdbc.url", url); + config.setProperty("wayang.genericjdbc.jdbc.user", "SA"); + config.setProperty("wayang.genericjdbc.jdbc.password", ""); + config.setProperty("wayang.genericjdbc.jdbc.driverName", "org.hsqldb.jdbcDriver"); + + GenericJdbcTableSource source1 = new GenericJdbcTableSource("T1", "A", "VAL1"); + GenericJdbcTableSource source2 = new GenericJdbcTableSource("T2", "A", "VAL2"); + + TransformationDescriptor keyExtractor0 = new TransformationDescriptor<>( - (Record record) -> (Integer) record.getField(0), + r -> (Integer) r.getField(0), Record.class, Integer.class - ); + ).withSqlImplementation("T1", "A"); - TransformationDescriptor keyExtractor2 = + TransformationDescriptor keyExtractor1 = new TransformationDescriptor<>( - (Record record) -> (Integer) record.getField(0), + r -> (Integer) r.getField(0), Record.class, Integer.class + ).withSqlImplementation("T2", "A"); + + JoinOperator join = + new JoinOperator<>(keyExtractor0, keyExtractor1); + + List> results = new ArrayList<>(); + + @SuppressWarnings("unchecked") + LocalCallbackSink> sink = + LocalCallbackSink.createCollectingSink( + results, + (Class>) (Class) Tuple2.class ); - GenericJdbcJoinOperator operator = - new GenericJdbcJoinOperator<>(keyExtractor1, keyExtractor2); + source1.addTargetPlatform(GenericJdbcPlatform.getInstance()); + source2.addTargetPlatform(GenericJdbcPlatform.getInstance()); + join.addTargetPlatform(JavaPlatform.getInstance()); + sink.addTargetPlatform(JavaPlatform.getInstance()); + + source1.connectTo(0, join, 0); + source2.connectTo(0, join, 1); + join.connectTo(0, sink, 0); + + WayangContext ctx = new WayangContext(config) + .with(GenericJdbc.plugin()) + .with(org.apache.wayang.java.Java.basicPlugin()); + + ctx.execute(new WayangPlan(sink)); - assertNotNull(operator); + assertEquals(1, results.size()); } } \ No newline at end of file diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties b/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties new file mode 100644 index 000000000..d1907cfb1 --- /dev/null +++ b/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +wayang.T1.jdbc.url=jdbc:hsqldb:mem:wayang_test_db;DB_CLOSE_DELAY=-1 +wayang.T1.jdbc.user=SA +wayang.T1.jdbc.password= +wayang.T1.jdbc.driverName=org.hsqldb.jdbcDriver + +wayang.T2.jdbc.url=jdbc:hsqldb:mem:wayang_test_db;DB_CLOSE_DELAY=-1 +wayang.T2.jdbc.user=SA +wayang.T2.jdbc.password= +wayang.T2.jdbc.driverName=org.hsqldb.jdbcDriver \ No newline at end of file From 96319fc9f9f932428c5bc5bba336a24b0954bc01 Mon Sep 17 00:00:00 2001 From: Mohit Bisht Date: Thu, 5 Mar 2026 00:10:00 +0530 Subject: [PATCH 4/4] Fix GenericJdbcJoinOperatorTest and GenericJdbcFilterOperatorTest SQL execution pipeline --- .../execution/GenericJdbcExecutor.java | 18 +- .../operators/GenericJdbcTableSource.java | 15 +- .../operators/GenericSqlToStreamOperator.java | 119 +++++---- .../platform/GenericJdbcPlatform.java | 2 +- .../GenericJdbcFilterOperatorTest.java | 122 +++++++++ .../GenericJdbcJoinOperatorTest.java | 156 ++++++----- .../src/test/resources/wayang.properties | 7 +- .../wayang/jdbc/execution/JdbcExecutor.java | 245 +++++++++--------- 8 files changed, 418 insertions(+), 266 deletions(-) create mode 100644 wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcFilterOperatorTest.java diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java index 84cd9f37b..348de83cf 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java @@ -140,14 +140,20 @@ public void execute(ExecutionStage stage, OptimizationContext optimizationContex * @return the said follow-up {@link ExecutionTask} or {@code null} if none */ private ExecutionTask findGenericJdbcExecutionOperatorTaskInStage(ExecutionTask task, ExecutionStage stage) { - assert task.getNumOuputChannels() == 1; - final Channel outputChannel = task.getOutputChannel(0); - final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers()); - return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator ? - consumer : - null; + assert task.getNumOuputChannels() == 1; + final Channel outputChannel = task.getOutputChannel(0); + + if (outputChannel.getConsumers().isEmpty()) { + return null; } + final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers()); + + return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator + ? consumer + : null; +} + /** * Instantiates the outbound {@link SqlQueryChannel} of an {@link ExecutionTask}. * diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java index dffb4f228..f0db80e43 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java @@ -70,16 +70,12 @@ public List getSupportedInputChannels(int index) { @Override public CardinalityEstimator getCardinalityEstimator(int outputIndex) { assert outputIndex == 0; - return new CardinalityEstimator() { @Override public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) { final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start( - "Optimization", - "Cardinality&Load Estimation", - "Push Estimation", - "Estimate source cardinalities" + "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities" ); try (Connection connection = GenericJdbcPlatform.getInstance() @@ -87,8 +83,7 @@ public CardinalityEstimate estimate(OptimizationContext optimizationContext, Car .createJdbcConnection()) { final String sql = String.format( - "SELECT count(*) FROM %s;", - GenericJdbcTableSource.this.getTableName() + "SELECT count(*) FROM %s;", GenericJdbcTableSource.this.getTableName() ); final ResultSet resultSet = connection.createStatement().executeQuery(sql); @@ -96,17 +91,13 @@ public CardinalityEstimate estimate(OptimizationContext optimizationContext, Car if (!resultSet.next()) { throw new SQLException("No query result for \"" + sql + "\"."); } - long cardinality = resultSet.getLong(1); - return new CardinalityEstimate(cardinality, cardinality, 1d); } catch (Exception e) { LogManager.getLogger(this.getClass()).error( - "Could not estimate cardinality for {}.", - GenericJdbcTableSource.this, - e + "Could not estimate cardinality for {}.", GenericJdbcTableSource.this, e ); return new CardinalityEstimate(10, 10000000, 0.9); diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java index 4575a353f..320d2dce9 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java @@ -1,13 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -49,27 +48,17 @@ import java.util.stream.StreamSupport; /** - * This {@link Operator} converts {@link SqlQueryChannel}s to {@link StreamChannel}s. + * Converts {@link SqlQueryChannel}s into {@link StreamChannel}s. */ -public class GenericSqlToStreamOperator extends UnaryToUnaryOperator implements JavaExecutionOperator, JsonSerializable { +public class GenericSqlToStreamOperator extends UnaryToUnaryOperator + implements JavaExecutionOperator, JsonSerializable { private final GenericJdbcPlatform jdbcPlatform; - /** - * Creates a new instance. - * - * @param jdbcPlatform from which the SQL data comes - */ public GenericSqlToStreamOperator(GenericJdbcPlatform jdbcPlatform) { this(jdbcPlatform, DataSetType.createDefault(Record.class)); } - /** - * Creates a new instance. - * - * @param jdbcPlatform from which the SQL data comes - * @param dataSetType type of the {@link Record}s being transformed; see {@link RecordType} - */ public GenericSqlToStreamOperator(GenericJdbcPlatform jdbcPlatform, DataSetType dataSetType) { super(dataSetType, dataSetType, false); this.jdbcPlatform = jdbcPlatform; @@ -86,32 +75,49 @@ public Tuple, Collection> eval ChannelInstance[] outputs, JavaExecutor executor, OptimizationContext.OperatorContext operatorContext) { - // Cast the inputs and outputs. + final SqlQueryChannel.Instance input = (SqlQueryChannel.Instance) inputs[0]; final StreamChannel.Instance output = (StreamChannel.Instance) outputs[0]; - GenericJdbcPlatform producerPlatform = (GenericJdbcPlatform) input.getChannel().getProducer().getPlatform(); + GenericJdbcPlatform producerPlatform = + (GenericJdbcPlatform) input.getChannel().getProducer().getPlatform(); + + // Fix: safely resolve JDBC name + String jdbcName = input.getJdbcName(); + if (jdbcName == null || jdbcName.trim().isEmpty()) { + jdbcName = producerPlatform.getPlatformId(); + } + final Connection connection = producerPlatform - .createDatabaseDescriptor(executor.getConfiguration(),input.getJdbcName()) + .createDatabaseDescriptor(executor.getConfiguration(), jdbcName) .createJdbcConnection(); Iterator resultSetIterator = new ResultSetIterator(connection, input.getSqlQuery()); - Spliterator resultSetSpliterator = Spliterators.spliteratorUnknownSize(resultSetIterator, 0); - Stream resultSetStream = StreamSupport.stream(resultSetSpliterator, false); + Spliterator resultSetSpliterator = + Spliterators.spliteratorUnknownSize(resultSetIterator, 0); + Stream resultSetStream = + StreamSupport.stream(resultSetSpliterator, false); output.accept(resultSetStream); ExecutionLineageNode queryLineageNode = new ExecutionLineageNode(operatorContext); - queryLineageNode.add(LoadProfileEstimators.createFromSpecification( - String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()), + queryLineageNode.add( + LoadProfileEstimators.createFromSpecification( + String.format("wayang.%s.sqltostream.load.query", + this.jdbcPlatform.getPlatformId()), executor.getConfiguration() - )); + ) + ); queryLineageNode.addPredecessor(input.getLineage()); + ExecutionLineageNode outputLineageNode = new ExecutionLineageNode(operatorContext); - outputLineageNode.add(LoadProfileEstimators.createFromSpecification( - String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId()), - executor.getConfiguration() - )); + outputLineageNode.add( + LoadProfileEstimators.createFromSpecification( + String.format("wayang.%s.sqltostream.load.output", + this.jdbcPlatform.getPlatformId()), + executor.getConfiguration() + ) + ); output.getLineage().addPredecessor(outputLineageNode); return queryLineageNode.collectAndMark(); @@ -119,7 +125,8 @@ public Tuple, Collection> eval @Override public List getSupportedInputChannels(int index) { - return Collections.singletonList(this.jdbcPlatform.getGenericSqlQueryChannelDescriptor()); + return Collections.singletonList( + this.jdbcPlatform.getGenericSqlQueryChannelDescriptor()); } @Override @@ -130,8 +137,10 @@ public List getSupportedOutputChannels(int index) { @Override public Collection getLoadProfileEstimatorConfigurationKeys() { return Arrays.asList( - String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()), - String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId()) + String.format("wayang.%s.sqltostream.load.query", + this.jdbcPlatform.getPlatformId()), + String.format("wayang.%s.sqltostream.load.output", + this.jdbcPlatform.getPlatformId()) ); } @@ -140,27 +149,12 @@ public Collection getLoadProfileEstimatorConfigurationKeys() { */ private static class ResultSetIterator implements Iterator, AutoCloseable { - /** - * Keeps around the {@link ResultSet} of the SQL query. - */ private ResultSet resultSet; - - /** - * The next {@link Record} to be delivered via {@link #next()}. - */ private Record next; - /** - * Creates a new instance. - * - * @param connection the JDBC connection on which to execute a SQL query - * @param sqlQuery the SQL query - */ ResultSetIterator(Connection connection, String sqlQuery) { try { - //connection.setAutoCommit(false); Statement st = connection.createStatement(); - //st.setFetchSize(100000000); this.resultSet = st.executeQuery(sqlQuery); } catch (SQLException e) { this.close(); @@ -169,26 +163,28 @@ private static class ResultSetIterator implements Iterator, AutoCloseabl this.moveToNext(); } - /** - * Moves this instance to the next {@link Record}. - */ private void moveToNext() { try { if (this.resultSet == null || !this.resultSet.next()) { this.next = null; this.close(); } else { - final int recordWidth = this.resultSet.getMetaData().getColumnCount(); + final int recordWidth = + this.resultSet.getMetaData().getColumnCount(); + Object[] values = new Object[recordWidth]; + for (int i = 0; i < recordWidth; i++) { values[i] = this.resultSet.getObject(i + 1); } + this.next = new Record(values); } } catch (SQLException e) { this.next = null; this.close(); - throw new WayangException("Exception while iterating the result set.", e); + throw new WayangException( + "Exception while iterating the result set.", e); } } @@ -210,7 +206,8 @@ public void close() { try { this.resultSet.close(); } catch (Throwable t) { - LogManager.getLogger(this.getClass()).error("Could not close result set.", t); + LogManager.getLogger(this.getClass()) + .error("Could not close result set.", t); } finally { this.resultSet = null; } @@ -220,13 +217,15 @@ public void close() { @Override public WayangJsonObj toJson() { - return new WayangJsonObj().put("platform", this.jdbcPlatform.getClass().getCanonicalName()); + return new WayangJsonObj() + .put("platform", this.jdbcPlatform.getClass().getCanonicalName()); } @SuppressWarnings("unused") public static GenericSqlToStreamOperator fromJson(WayangJsonObj wayangJsonObj) { final String platformClassName = wayangJsonObj.getString("platform"); - GenericJdbcPlatform jdbcPlatform = ReflectionUtils.evaluate(platformClassName + ".getInstance()"); + GenericJdbcPlatform jdbcPlatform = + ReflectionUtils.evaluate(platformClassName + ".getInstance()"); return new GenericSqlToStreamOperator(jdbcPlatform); } -} +} \ No newline at end of file diff --git a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java index ff7e48c3b..99ec02ab8 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java +++ b/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/platform/GenericJdbcPlatform.java @@ -61,7 +61,7 @@ public Executor.Factory getExecutorFactory() { @Override public String getJdbcDriverClassName() { - return "None"; + return "org.hsqldb.jdbcDriver"; } diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcFilterOperatorTest.java b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcFilterOperatorTest.java new file mode 100644 index 000000000..c0f22de98 --- /dev/null +++ b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcFilterOperatorTest.java @@ -0,0 +1,122 @@ +package org.apache.wayang.genericjdbc.operators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.Job; +import org.apache.wayang.core.function.PredicateDescriptor; +import org.apache.wayang.core.optimizer.DefaultOptimizationContext; +import org.apache.wayang.core.plan.executionplan.ExecutionStage; +import org.apache.wayang.core.plan.executionplan.ExecutionTask; +import org.apache.wayang.core.platform.CrossPlatformExecutor; +import org.apache.wayang.core.profiling.NoInstrumentationStrategy; +import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform; +import org.apache.wayang.jdbc.channels.SqlQueryChannel; +import org.apache.wayang.jdbc.execution.JdbcExecutor; +import org.apache.wayang.jdbc.operators.SqlToStreamOperator; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class GenericJdbcFilterOperatorTest { + + @Test + void testFilterExecution() { + + Configuration configuration = new Configuration(); + + Job job = mock(Job.class); + when(job.getConfiguration()).thenReturn(configuration); + when(job.getCrossPlatformExecutor()) + .thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy())); + + GenericJdbcPlatform platform = GenericJdbcPlatform.getInstance(); + SqlQueryChannel.Descriptor sqlChannelDescriptor = + platform.getGenericSqlQueryChannelDescriptor(); + + ExecutionStage sqlStage = mock(ExecutionStage.class); + + GenericJdbcTableSource tableSource = + new GenericJdbcTableSource("T1", "A", "VAL1"); + + ExecutionTask tableSourceTask = new ExecutionTask(tableSource); + tableSourceTask.setOutputChannel( + 0, + new SqlQueryChannel(sqlChannelDescriptor, tableSource.getOutput(0)) + ); + tableSourceTask.setStage(sqlStage); + + GenericJdbcFilterOperator filterOperator = + new GenericJdbcFilterOperator( + new PredicateDescriptor<>( + record -> (Integer) record.getField(0) > 0, + Record.class + ).withSqlImplementation("A > 0") + ); + + ExecutionTask filterTask = new ExecutionTask(filterOperator); + + tableSourceTask.getOutputChannel(0).addConsumer(filterTask, 0); + + filterTask.setOutputChannel( + 0, + new SqlQueryChannel(sqlChannelDescriptor, filterOperator.getOutput(0)) + ); + filterTask.setStage(sqlStage); + + when(sqlStage.getStartTasks()) + .thenReturn(Collections.singleton(tableSourceTask)); + + when(sqlStage.getTerminalTasks()) + .thenReturn(Collections.singleton(filterTask)); + + ExecutionStage nextStage = mock(ExecutionStage.class); + + SqlToStreamOperator sqlToStreamOperator = + new SqlToStreamOperator(platform); + + ExecutionTask sqlToStreamTask = + new ExecutionTask(sqlToStreamOperator); + + filterTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0); + sqlToStreamTask.setStage(nextStage); + + JdbcExecutor executor = new JdbcExecutor(platform, job); + + executor.execute( + sqlStage, + new DefaultOptimizationContext(job), + job.getCrossPlatformExecutor() + ); + + SqlQueryChannel.Instance sqlQueryChannelInstance = + (SqlQueryChannel.Instance) + job.getCrossPlatformExecutor() + .getChannelInstance(sqlToStreamTask.getInputChannel(0)); + + String sql = sqlQueryChannelInstance.getSqlQuery(); + + assertTrue(sql.contains("WHERE")); + } +} \ No newline at end of file diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java index 02a3ea72a..36b83775b 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java +++ b/wayang-platforms/wayang-generic-jdbc/src/test/java/org/apache/wayang/genericjdbc/operators/GenericJdbcJoinOperatorTest.java @@ -1,13 +1,15 @@ package org.apache.wayang.genericjdbc.operators; + /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,96 +17,116 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + import org.apache.wayang.basic.data.Record; -import org.apache.wayang.basic.data.Tuple2; -import org.apache.wayang.basic.operators.JoinOperator; -import org.apache.wayang.basic.operators.LocalCallbackSink; import org.apache.wayang.core.api.Configuration; -import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.api.Job; import org.apache.wayang.core.function.TransformationDescriptor; -import org.apache.wayang.core.plan.wayangplan.WayangPlan; -import org.apache.wayang.genericjdbc.GenericJdbc; +import org.apache.wayang.core.optimizer.DefaultOptimizationContext; +import org.apache.wayang.core.plan.executionplan.ExecutionStage; +import org.apache.wayang.core.plan.executionplan.ExecutionTask; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.CrossPlatformExecutor; +import org.apache.wayang.core.profiling.NoInstrumentationStrategy; import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform; -import org.apache.wayang.java.platform.JavaPlatform; +import org.apache.wayang.jdbc.channels.SqlQueryChannel; +import org.apache.wayang.jdbc.execution.JdbcExecutor; +import org.apache.wayang.jdbc.operators.SqlToStreamOperator; import org.junit.jupiter.api.Test; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; +import java.util.Collections; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class GenericJdbcJoinOperatorTest { @Test - void testJoinExecution() throws Exception { + void testJoinSqlGeneration() { - String url = "jdbc:hsqldb:mem:wayang_test_db;DB_CLOSE_DELAY=-1"; + Configuration configuration = new Configuration(); - Class.forName("org.hsqldb.jdbcDriver"); + Job job = mock(Job.class); + when(job.getConfiguration()).thenReturn(configuration); + when(job.getCrossPlatformExecutor()) + .thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy())); - try (Connection conn = DriverManager.getConnection(url, "SA", "")) { - Statement stmt = conn.createStatement(); + GenericJdbcPlatform platform = GenericJdbcPlatform.getInstance(); + SqlQueryChannel.Descriptor sqlChannelDescriptor = + platform.getGenericSqlQueryChannelDescriptor(); - stmt.execute("CREATE TABLE T1 (A INT, VAL1 VARCHAR(20));"); - stmt.execute("INSERT INTO T1 VALUES (1, 'Apache');"); + ExecutionStage sqlStage = mock(ExecutionStage.class); - stmt.execute("CREATE TABLE T2 (A INT, VAL2 INT);"); - stmt.execute("INSERT INTO T2 VALUES (1, 2026);"); - } + GenericJdbcTableSource tableSourceA = new GenericJdbcTableSource("T1", "A", "VAL1"); + GenericJdbcTableSource tableSourceB = new GenericJdbcTableSource("T2", "A", "VAL2"); - Configuration config = new Configuration(); - config.setProperty("wayang.genericjdbc.jdbc.url", url); - config.setProperty("wayang.genericjdbc.jdbc.user", "SA"); - config.setProperty("wayang.genericjdbc.jdbc.password", ""); - config.setProperty("wayang.genericjdbc.jdbc.driverName", "org.hsqldb.jdbcDriver"); + ExecutionTask tableSourceATask = new ExecutionTask(tableSourceA); + tableSourceATask.setOutputChannel( + 0, + new SqlQueryChannel(sqlChannelDescriptor, tableSourceA.getOutput(0)) + ); + tableSourceATask.setStage(sqlStage); - GenericJdbcTableSource source1 = new GenericJdbcTableSource("T1", "A", "VAL1"); - GenericJdbcTableSource source2 = new GenericJdbcTableSource("T2", "A", "VAL2"); + ExecutionTask tableSourceBTask = new ExecutionTask(tableSourceB); + tableSourceBTask.setOutputChannel( + 0, + new SqlQueryChannel(sqlChannelDescriptor, tableSourceB.getOutput(0)) + ); + tableSourceBTask.setStage(sqlStage); - TransformationDescriptor keyExtractor0 = - new TransformationDescriptor<>( - r -> (Integer) r.getField(0), + ExecutionOperator joinOperator = new GenericJdbcJoinOperator( + new TransformationDescriptor( + record -> (Integer) record.getField(0), Record.class, Integer.class - ).withSqlImplementation("T1", "A"); - - TransformationDescriptor keyExtractor1 = - new TransformationDescriptor<>( - r -> (Integer) r.getField(0), + ).withSqlImplementation("T1", "A"), + new TransformationDescriptor( + record -> (Integer) record.getField(0), Record.class, Integer.class - ).withSqlImplementation("T2", "A"); + ).withSqlImplementation("T2", "A") + ); + + ExecutionTask joinTask = new ExecutionTask(joinOperator); + + tableSourceATask.getOutputChannel(0).addConsumer(joinTask, 0); + tableSourceBTask.getOutputChannel(0).addConsumer(joinTask, 1); + + joinTask.setOutputChannel( + 0, + new SqlQueryChannel(sqlChannelDescriptor, joinOperator.getOutput(0)) + ); + joinTask.setStage(sqlStage); + + when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceATask)); + when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(joinTask)); - JoinOperator join = - new JoinOperator<>(keyExtractor0, keyExtractor1); + // Add SqlToStreamOperator stage (required by JdbcExecutor) + ExecutionStage nextStage = mock(ExecutionStage.class); - List> results = new ArrayList<>(); + SqlToStreamOperator sqlToStreamOperator = new SqlToStreamOperator(platform); + ExecutionTask sqlToStreamTask = new ExecutionTask(sqlToStreamOperator); - @SuppressWarnings("unchecked") - LocalCallbackSink> sink = - LocalCallbackSink.createCollectingSink( - results, - (Class>) (Class) Tuple2.class - ); + joinTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0); + sqlToStreamTask.setStage(nextStage); - source1.addTargetPlatform(GenericJdbcPlatform.getInstance()); - source2.addTargetPlatform(GenericJdbcPlatform.getInstance()); - join.addTargetPlatform(JavaPlatform.getInstance()); - sink.addTargetPlatform(JavaPlatform.getInstance()); + JdbcExecutor executor = new JdbcExecutor(platform, job); - source1.connectTo(0, join, 0); - source2.connectTo(0, join, 1); - join.connectTo(0, sink, 0); + executor.execute( + sqlStage, + new DefaultOptimizationContext(job), + job.getCrossPlatformExecutor() + ); - WayangContext ctx = new WayangContext(config) - .with(GenericJdbc.plugin()) - .with(org.apache.wayang.java.Java.basicPlugin()); + SqlQueryChannel.Instance sqlQueryChannelInstance = + (SqlQueryChannel.Instance) + job.getCrossPlatformExecutor() + .getChannelInstance(sqlToStreamTask.getInputChannel(0)); - ctx.execute(new WayangPlan(sink)); + String sql = sqlQueryChannelInstance.getSqlQuery(); - assertEquals(1, results.size()); + assertTrue(sql.contains("JOIN")); + assertTrue(sql.contains("ON")); } } \ No newline at end of file diff --git a/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties b/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties index d1907cfb1..5492a0237 100644 --- a/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties +++ b/wayang-platforms/wayang-generic-jdbc/src/test/resources/wayang.properties @@ -22,4 +22,9 @@ wayang.T1.jdbc.driverName=org.hsqldb.jdbcDriver wayang.T2.jdbc.url=jdbc:hsqldb:mem:wayang_test_db;DB_CLOSE_DELAY=-1 wayang.T2.jdbc.user=SA wayang.T2.jdbc.password= -wayang.T2.jdbc.driverName=org.hsqldb.jdbcDriver \ No newline at end of file +wayang.T2.jdbc.driverName=org.hsqldb.jdbcDriver + +wayang.genericjdbc.jdbc.url=jdbc:hsqldb:mem:wayang_test_db_filter;DB_CLOSE_DELAY=-1 +wayang.genericjdbc.jdbc.user=SA +wayang.genericjdbc.jdbc.password= +wayang.genericjdbc.jdbc.driverName=org.hsqldb.jdbcDriver \ No newline at end of file diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index f7a9d7c5a..efa88f5fd 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -1,3 +1,4 @@ +package org.apache.wayang.jdbc.execution; /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -15,9 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.wayang.jdbc.execution; - import org.apache.wayang.basic.channels.FileChannel; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.basic.operators.TableSource; @@ -28,19 +26,13 @@ import org.apache.wayang.core.plan.executionplan.ExecutionStage; import org.apache.wayang.core.plan.executionplan.ExecutionTask; import org.apache.wayang.core.platform.ExecutionState; -import org.apache.wayang.core.platform.Executor; import org.apache.wayang.core.platform.ExecutorTemplate; import org.apache.wayang.core.platform.Platform; -import org.apache.wayang.core.util.WayangCollections; import org.apache.wayang.core.util.fs.FileSystem; import org.apache.wayang.core.util.fs.FileSystems; import org.apache.wayang.jdbc.channels.SqlQueryChannel; import org.apache.wayang.jdbc.compiler.FunctionCompiler; -import org.apache.wayang.jdbc.operators.JdbcExecutionOperator; -import org.apache.wayang.jdbc.operators.JdbcFilterOperator; -import org.apache.wayang.jdbc.operators.JdbcJoinOperator; -import org.apache.wayang.jdbc.operators.JdbcProjectionOperator; -import org.apache.wayang.jdbc.operators.JdbcTableSource; +import org.apache.wayang.jdbc.operators.*; import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,17 +49,11 @@ import java.util.Set; import java.util.stream.Collectors; -/** - * {@link Executor} implementation for the {@link JdbcPlatformTemplate}. - */ public class JdbcExecutor extends ExecutorTemplate { private final JdbcPlatformTemplate platform; - private final Connection connection; - private final Logger logger = LogManager.getLogger(this.getClass()); - private final FunctionCompiler functionCompiler = new FunctionCompiler(); public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) { @@ -77,150 +63,166 @@ public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) { } @Override - public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) { - final Tuple2 pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this); + public void execute(final ExecutionStage stage, + final OptimizationContext optimizationContext, + final ExecutionState executionState) { + + final Tuple2 pair = + JdbcExecutor.createSqlQuery(stage, optimizationContext, this); + final String query = pair.field0; final SqlQueryChannel.Instance queryChannel = pair.field1; queryChannel.setSqlQuery(query); - - // Return the tipChannelInstance. executionState.register(queryChannel); } /** - * Retrieves the follow-up {@link ExecutionTask} of the given {@code task} - * unless it is not comprising a - * {@link JdbcExecutionOperator} and/or not in the given {@link ExecutionStage}. - * - * @param task whose follow-up {@link ExecutionTask} is requested; should have - * a single follower - * @param stage in which the follow-up {@link ExecutionTask} should be - * @return the said follow-up {@link ExecutionTask} or {@code null} if none + * Safe version (removes WayangCollections.getSingle crash) */ - private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) { + private static ExecutionTask findJdbcExecutionOperatorTaskInStage( + final ExecutionTask task, + final ExecutionStage stage) { + assert task.getNumOuputChannels() == 1; + final Channel outputChannel = task.getOutputChannel(0); - final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers()); - return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer + + if (outputChannel.getConsumers().size() != 1) { + return null; + } + + final ExecutionTask consumer = outputChannel.getConsumers().iterator().next(); + + return consumer.getStage() == stage && + consumer.getOperator() instanceof JdbcExecutionOperator + ? consumer : null; } - /** - * Instantiates the outbound {@link SqlQueryChannel} of an - * {@link ExecutionTask}. - * - * @param task whose outbound {@link SqlQueryChannel} should be - * instantiated - * @param optimizationContext provides information about the - * {@link ExecutionTask} - * @return the {@link SqlQueryChannel.Instance} - */ - private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, - final OptimizationContext optimizationContext, final JdbcExecutor jdbcExecutor) { - assert task.getNumOuputChannels() == 1 : String.format("Illegal task: %s.", task); - assert task.getOutputChannel(0) instanceof SqlQueryChannel : String.format("Illegal task: %s.", task); + private static SqlQueryChannel.Instance instantiateOutboundChannel( + final ExecutionTask task, + final OptimizationContext optimizationContext, + final JdbcExecutor jdbcExecutor) { + + assert task.getNumOuputChannels() == 1; + assert task.getOutputChannel(0) instanceof SqlQueryChannel; final SqlQueryChannel outputChannel = (SqlQueryChannel) task.getOutputChannel(0); - final OptimizationContext.OperatorContext operatorContext = optimizationContext - .getOperatorContext(task.getOperator()); + + final OptimizationContext.OperatorContext operatorContext = + optimizationContext.getOperatorContext(task.getOperator()); + return outputChannel.createInstance(jdbcExecutor, operatorContext, 0); } - /** - * Instantiates the outbound {@link SqlQueryChannel} of an - * {@link ExecutionTask}. - * - * @param task whose outbound {@link SqlQueryChannel} - * should be instantiated - * @param optimizationContext provides information about the - * {@link ExecutionTask} - * @param predecessorChannelInstance preceeding {@link SqlQueryChannel.Instance} - * to keep track of lineage - * @return the {@link SqlQueryChannel.Instance} - */ - private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, + private static SqlQueryChannel.Instance instantiateOutboundChannel( + final ExecutionTask task, final OptimizationContext optimizationContext, - final SqlQueryChannel.Instance predecessorChannelInstance, final JdbcExecutor jdbcExecutor) { - final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor); + final SqlQueryChannel.Instance predecessorChannelInstance, + final JdbcExecutor jdbcExecutor) { + + final SqlQueryChannel.Instance newInstance = + instantiateOutboundChannel(task, optimizationContext, jdbcExecutor); + newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage()); + return newInstance; } /** - * Creates a query channel and the sql statement - * - * @param stage - * @param context - * @return a tuple containing the sql statement + * Main SQL builder */ - protected static Tuple2 createSqlQuery(final ExecutionStage stage, - final OptimizationContext context, final JdbcExecutor jdbcExecutor) { + protected static Tuple2 createSqlQuery( + final ExecutionStage stage, + final OptimizationContext context, + final JdbcExecutor jdbcExecutor) { + final Collection startTasks = stage.getStartTasks(); final Collection termTasks = stage.getTerminalTasks(); - // Verify that we can handle this instance. - assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported"; - final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; - assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; - final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; - assert startTask.getOperator() instanceof TableSource - : "Invalid JDBC stage: Start task has to be a TableSource"; + if (startTasks.isEmpty()) { + throw new WayangException("Invalid jdbc stage: no sources found"); + } + + final ExecutionTask startTask = (ExecutionTask) startTasks.iterator().next(); + + if (termTasks.size() != 1) { + throw new WayangException("Invalid JDBC stage: multiple terminal tasks not supported."); + } - // Extract the different types of ExecutionOperators from the stage. final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator(); - SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor); + + SqlQueryChannel.Instance tipChannelInstance = + instantiateOutboundChannel(startTask, context, jdbcExecutor); + final Collection filterTasks = new ArrayList<>(4); JdbcProjectionOperator projectionTask = null; final Collection> joinTasks = new ArrayList<>(); + final Set allTasks = stage.getAllTasks(); - assert allTasks.size() <= 3; - ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); + + ExecutionTask nextTask = + findJdbcExecutionOperatorTaskInStage(startTask, stage); + while (nextTask != null) { - // Evaluate the nextTask. - if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) { + + if (nextTask.getOperator() instanceof JdbcFilterOperator filterOperator) { filterTasks.add(filterOperator); } else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) { - assert projectionTask == null; // Allow one projection operator per stage for now. projectionTask = projectionOperator; } else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) { joinTasks.add(joinOperator); } else { - throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); + throw new WayangException("Unsupported JDBC execution task " + nextTask); } - // Move the tipChannelInstance. - tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor); + tipChannelInstance = + instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor); - // Go to the next nextTask. - nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage); + nextTask = + findJdbcExecutionOperatorTaskInStage(nextTask, stage); } - // Create the SQL query. - final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); + final StringBuilder query = + createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); + return new Tuple2<>(query.toString(), tipChannelInstance); } - public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp, - final Collection filterTasks, JdbcProjectionOperator projectionTask, + public static StringBuilder createSqlString( + final JdbcExecutor jdbcExecutor, + final JdbcTableSource tableOp, + final Collection filterTasks, + JdbcProjectionOperator projectionTask, final Collection> joinTasks) { - final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); - final Collection conditions = filterTasks.stream() - .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) - .collect(Collectors.toList()); - final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); - final Collection joins = joinTasks.stream() - .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) - .collect(Collectors.toList()); + + final String tableName = + tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + + final Collection conditions = + filterTasks.stream() + .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) + .collect(Collectors.toList()); + + final String projection = + projectionTask == null + ? "*" + : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + + final Collection joins = + joinTasks.stream() + .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) + .collect(Collectors.toList()); final StringBuilder sb = new StringBuilder(1000); + sb.append("SELECT ").append(projection).append(" FROM ").append(tableName); - if (!joins.isEmpty()) { - final String separator = " "; - for (final String join : joins) { - sb.append(separator).append(join); - } + + for (final String join : joins) { + sb.append(" ").append(join); } + if (!conditions.isEmpty()) { sb.append(" WHERE "); String separator = ""; @@ -229,7 +231,9 @@ public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, fin separator = " AND "; } } + sb.append(';'); + return sb; } @@ -238,7 +242,7 @@ public void dispose() { try { this.connection.close(); } catch (final SQLException e) { - this.logger.error("Could not close JDBC connection to PostgreSQL correctly.", e); + this.logger.error("Could not close JDBC connection correctly.", e); } } @@ -247,27 +251,30 @@ public Platform getPlatform() { return this.platform; } - private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs) + private void saveResult(final FileChannel.Instance outputFileChannelInstance, + final ResultSet rs) throws IOException, SQLException { - // Output results. - final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); - try (final OutputStreamWriter writer = new OutputStreamWriter( - outFs.create(outputFileChannelInstance.getSinglePath()))) { + + final FileSystem outFs = + FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); + + try (final OutputStreamWriter writer = + new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) { + while (rs.next()) { - // System.out.println(rs.getInt(1) + " " + rs.getString(2)); + final ResultSetMetaData rsmd = rs.getMetaData(); + for (int i = 1; i <= rsmd.getColumnCount(); i++) { writer.write(rs.getString(i)); - if (i < rsmd.getColumnCount()) { - writer.write('\t'); - } - } - if (!rs.isLast()) { - writer.write('\n'); + if (i < rsmd.getColumnCount()) writer.write('\t'); } + + if (!rs.isLast()) writer.write('\n'); } + } catch (final UncheckedIOException e) { throw e.getCause(); } } -} +} \ No newline at end of file