Skip to content

[FLINK-39014][table] Fix the conversion to relational algebra issue in Batch Mode#28500

Open
argoyal2212 wants to merge 1 commit into
apache:masterfrom
argoyal2212:FLINK-39014
Open

[FLINK-39014][table] Fix the conversion to relational algebra issue in Batch Mode#28500
argoyal2212 wants to merge 1 commit into
apache:masterfrom
argoyal2212:FLINK-39014

Conversation

@argoyal2212

@argoyal2212 argoyal2212 commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

FLINK-39014
When creating a StreamTableEnvironment using StreamTableEnvironment.create(env) where the StreamExecutionEnvironment is configured with RUNTIME_MODE=BATCH, the planner was incorrectly created in STREAMING mode, causing a mismatch between execution mode and planner mode.

Brief change log

The fix ensures that EnvironmentSettings inherits the configuration from StreamExecutionEnvironment by calling .withConfiguration(executionEnvironment.getConfiguration()) when creating default environment settings. This maintains consistency between the execution mode and planner mode throughout the system.

  • StreamTableEnvironment.java: Modified create(StreamExecutionEnvironment) to pass the execution environment's configuration to EnvironmentSettings, ensuring the runtime mode is properly inherited
  • WatermarkExampleITCase.java: Added integration test to verify watermark handling works correctly in both batch and streaming execution modes

Verifying this change

This change is covered by new integration tests in WatermarkExampleITCase.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@argoyal2212

Copy link
Copy Markdown
Contributor Author

@dawidwys @twalthr can I get some help to review this small PR. Thanks!

@flinkbot

flinkbot commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@spuru9

spuru9 commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Hey @argoyal2212
Can you create a JIRA for this and make the title apt as per the cummunity guidlines?

@argoyal2212 argoyal2212 changed the title Fix the conversion to relational algebra issue in Batch Mode [FLINK-39014][table] Fix the conversion to relational algebra issue in Batch Mode Jun 22, 2026

@spuru9 spuru9 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have some comments.

Also, The JIRA's "Fix Versions" (1.16.3 / 1.20.3) and "Components" (Deployment/K8s/YARN) look wrong.
Can you fix those as well.

executionEnvironment,
EnvironmentSettings.newInstance()
.withConfiguration(
(org.apache.flink.configuration.Configuration)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executionEnvironment.getConfiguration() returns ReadableConfig, not Configuration, and the source of that method explicitly warns against this cast:

// Note to implementers:
// In theory, you can cast the return value of this method to Configuration and perform
// mutations. In practice, this could cause side effects. A better approach is to implement
// the ReadableConfig interface and create a layered configuration.

Could be something like:

    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
        final EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
        if (executionEnvironment.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                == RuntimeExecutionMode.BATCH) {
            settingsBuilder.inBatchMode();
        }
        return create(executionEnvironment, settingsBuilder.build());
    }

Can you check this.

*/
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
return create(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala bridge has the identical bug — create(executionEnvironment, EnvironmentSettings.newInstance().build) ignores the env's runtime mode. A user hitting FLINK-39014 from Scala won't be fixed by this PR. Worth fixing in this PR (or at least noting in the JIRA as a follow-up) so the two bridges stay consistent.

* limitations under the License.
*/

package org.apache.flink.table.examples.java.basics;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks misplaced. flink-examples-table/.../basics tests cover specific example programs — there's no WatermarkExample, and this test is really exercising StreamTableEnvironment.create. It most probably belongs next to the code it covers (e.g. an ITCase under flink-table-api-java-bridge).

Comment on lines +61 to +62
configuration.setString(
ExecutionOptions.RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.toString());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use the typed config setter instead of setString(opt.key(), enum.toString())

Suggested change
configuration.setString(
ExecutionOptions.RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.toString());
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);

Similarly for other similar statements.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jun 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants