From 1257a625b5c7147f3f1c449a3ddbd15417388844 Mon Sep 17 00:00:00 2001 From: Yanjun Qiu <153984347+qiuyanjun888@users.noreply.github.com> Date: Mon, 22 Jun 2026 20:57:15 +0800 Subject: [PATCH] [FLINK-39215][python] Clean up PythonDriver tmp dir on launch failure --- .../flink/client/python/PythonDriver.java | 20 ++++++++- .../flink/client/python/PythonDriverTest.java | 42 +++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java index 404c5585346ca..cc548510bab3d 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,19 +83,21 @@ public static void main(String[] args) throws Throwable { PythonEnvUtils.setGatewayServer(gatewayServer); PythonEnvUtils.PythonProcessShutdownHook shutdownHook = null; + Process pythonProcess = null; + String tmpDir = null; // commands which will be exec in python progress. final List commands = constructPythonCommands(pythonDriverOptions); try { // prepare the exec environment of python progress. - String tmpDir = + tmpDir = System.getProperty("java.io.tmpdir") + File.separator + "pyflink" + File.separator + UUID.randomUUID(); // start the python process. - Process pythonProcess = + pythonProcess = PythonEnvUtils.launchPy4jPythonClient( gatewayServer, config, @@ -132,6 +135,19 @@ public static void main(String[] args) throws Throwable { } catch (Throwable e) { LOG.error("Run python process failed", e); + if (shutdownHook == null) { + if (pythonProcess != null) { + new PythonEnvUtils.PythonProcessShutdownHook( + pythonProcess, gatewayServer, tmpDir) + .run(); + } else { + if (tmpDir != null) { + FileUtils.deleteDirectoryQuietly(new File(tmpDir)); + } + gatewayServer.shutdown(); + } + } + if (PythonEnvUtils.capturedJavaException != null) { throw PythonEnvUtils.capturedJavaException; } else { diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java index 6aebd7cc8ec49..f8e063bc57b49 100644 --- a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java +++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java @@ -18,18 +18,24 @@ package org.apache.flink.client.python; +import org.apache.flink.client.program.ProgramAbortException; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import py4j.GatewayServer; import java.io.IOException; import java.net.Socket; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link PythonDriver}. */ class PythonDriverTest { @@ -46,6 +52,42 @@ void testStartGatewayServer() throws ExecutionException, InterruptedException { } } + @Test + void testCleanupTmpDirWhenPythonClientLaunchFails(@TempDir Path tmpDir) throws IOException { + Path entryPointScript = tmpDir.resolve("entrypoint.py"); + Files.write(entryPointScript, new byte[0]); + Path pyflinkTmpDir = tmpDir.resolve("pyflink"); + String originalTmpDir = System.getProperty("java.io.tmpdir"); + + try { + System.setProperty("java.io.tmpdir", tmpDir.toString()); + + assertThatThrownBy( + () -> + PythonDriver.main( + new String[] { + "-py", + entryPointScript.toString(), + "-pyclientexec", + tmpDir.resolve("missing-python-executable") + .toString() + })) + .isInstanceOf(ProgramAbortException.class); + + if (Files.exists(pyflinkTmpDir)) { + try (Stream remainingTmpDirs = Files.list(pyflinkTmpDir)) { + assertThat(remainingTmpDirs).isEmpty(); + } + } + } finally { + if (originalTmpDir == null) { + System.clearProperty("java.io.tmpdir"); + } else { + System.setProperty("java.io.tmpdir", originalTmpDir); + } + } + } + @Test void testConstructCommandsWithEntryPointModule() { List args = new ArrayList<>();