diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index e580efc635726a..488fd823dd35e9 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -19,11 +19,13 @@ package org.apache.flink.table.api.bridge.java; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; @@ -88,7 +90,12 @@ public interface StreamTableEnvironment extends TableEnvironment { * TableEnvironment}. */ static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) { - return create(executionEnvironment, EnvironmentSettings.newInstance().build()); + final EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance(); + if (executionEnvironment.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + == RuntimeExecutionMode.BATCH) { + settingsBuilder.inBatchMode(); + } + return create(executionEnvironment, settingsBuilder.build()); } /** diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala index a0d3df3338ae50..4f81118c292aed 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala @@ -18,8 +18,10 @@ package org.apache.flink.table.api.bridge.scala import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.configuration.ExecutionOptions import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.{TableEnvironment, _} @@ -833,7 +835,14 @@ object StreamTableEnvironment { * The Scala [[StreamExecutionEnvironment]] of the [[TableEnvironment]]. */ def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment = { - create(executionEnvironment, EnvironmentSettings.newInstance().build) + val settingsBuilder = EnvironmentSettings.newInstance() + if ( + executionEnvironment.getConfiguration.get(ExecutionOptions.RUNTIME_MODE) == + RuntimeExecutionMode.BATCH + ) { + settingsBuilder.inBatchMode() + } + create(executionEnvironment, settingsBuilder.build) } /** diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/StreamTableEnvironmentRuntimeModeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/StreamTableEnvironmentRuntimeModeTest.java new file mode 100644 index 00000000000000..99dbcba266fcac --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/StreamTableEnvironmentRuntimeModeTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.table; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that {@link StreamTableEnvironment#create(StreamExecutionEnvironment)} inherits the runtime + * execution mode from the given {@link StreamExecutionEnvironment} instead of always defaulting to + * streaming (FLINK-39014). + */ +class StreamTableEnvironmentRuntimeModeTest { + + @Test + void testCreateInheritsBatchRuntimeMode() { + final Configuration configuration = new Configuration(); + configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + assertThat(tEnv.getConfig().get(ExecutionOptions.RUNTIME_MODE)) + .isEqualTo(RuntimeExecutionMode.BATCH); + } + + @Test + void testCreateDefaultsToStreamingRuntimeMode() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + assertThat(tEnv.getConfig().get(ExecutionOptions.RUNTIME_MODE)) + .isEqualTo(RuntimeExecutionMode.STREAMING); + } +}