From ad18c946553d0d673732f5dfe446d5b1b9fcc558 Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Tue, 16 Dec 2025 11:28:08 +0530 Subject: [PATCH 1/5] added fallback caching dir --- .../gobblin/yarn/GobblinYarnAppLauncher.java | 32 ++++++ .../yarn/GobblinYarnConfigurationKeys.java | 3 + .../yarn/GobblinYarnAppLauncherTest.java | 101 ++++++++++++++++++ 3 files changed, 136 insertions(+) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index d2f2180fff9..87fa4d7ff70 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -323,6 +323,7 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration try { config = addDynamicConfig(config); + config = addJarCachingConfig(config, this.fs); outputConfigToFile(config); } catch (SchemaRegistryException e) { throw new IOException(e); @@ -999,6 +1000,37 @@ private static Config addDynamicConfig(Config config) throws IOException { } } + private static Config addJarCachingConfig(Config config, FileSystem fs) throws IOException { + // Check if JAR_CACHE_DIR is configured and exists + if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) { + Path jarCacheDir = new Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)); + if (fs.exists(jarCacheDir)) { + // JAR_CACHE_DIR exists, nothing to do + return config; + } + LOGGER.warn("Configured JAR_CACHE_DIR does not exist: {}", jarCacheDir); + } + + // JAR_CACHE_DIR doesn't exist or not configured, try fallback + if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR)) { + Path fallbackDir = new Path(config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR)); + if (fs.exists(fallbackDir)) { + LOGGER.info("Using FALLBACK_JAR_CACHE_DIR: {}", fallbackDir); + config = config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, + ConfigValueFactory.fromAnyRef(fallbackDir.toString())); + return config; + } + LOGGER.warn("Configured FALLBACK_JAR_CACHE_DIR does not exist: {}", fallbackDir); + } + + // Neither JAR_CACHE_DIR nor FALLBACK_JAR_CACHE_DIR exist, disable jar caching + LOGGER.warn("Neither JAR_CACHE_DIR nor FALLBACK_JAR_CACHE_DIR exist, disabling jar caching"); + config = config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, + ConfigValueFactory.fromAnyRef(false)); + + return config; + } + /** * Write the config to the file specified with the config key {@value GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it * is configured. diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index 10bc9f97093..36c74845abd 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -55,6 +55,9 @@ public class GobblinYarnConfigurationKeys { public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir"; + public static final String FALLBACK_JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir"; + + public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list"; /** diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index 622c6d56479..28ec6c8c025 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -514,6 +514,107 @@ public String answer(InvocationOnMock invocation) { Assert.assertFalse(config.hasPath(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID)); } + @Test + public void testAddJarCachingConfig_JarCacheDirExists() throws Exception { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String jarCacheDir = "/test/jar/cache/dir"; + + // Mock: JAR_CACHE_DIR exists + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(true); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(jarCacheDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + + Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); + + // Should keep the original JAR_CACHE_DIR + Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), jarCacheDir); + Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + } + + @Test + public void testAddJarCachingConfig_JarCacheDirNotExistsFallbackExists() throws Exception { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String jarCacheDir = "/test/jar/cache/dir"; + String fallbackDir = "/test/fallback/dir"; + + // Mock: JAR_CACHE_DIR doesn't exist, but FALLBACK_JAR_CACHE_DIR exists + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + org.apache.hadoop.fs.Path path = invocation.getArgument(0); + return path.toString().equals(fallbackDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(jarCacheDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(fallbackDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + + Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); + + // Should use fallback directory + Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), fallbackDir); + Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + } + + @Test + public void testAddJarCachingConfig_NeitherDirExists() throws Exception { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String jarCacheDir = "/test/jar/cache/dir"; + String fallbackDir = "/test/fallback/dir"; + + // Mock: Neither directory exists + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(jarCacheDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(fallbackDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + + Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); + + // Should disable jar caching + Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + } + + @Test + public void testAddJarCachingConfig_NoDirConfiguredFallbackExists() throws Exception { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String fallbackDir = "/test/fallback/dir"; + + // Mock: Fallback directory exists + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(true); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(fallbackDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + + Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); + + // Should use fallback directory + Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), fallbackDir); + Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + } + + @Test + public void testAddJarCachingConfig_NoDirConfigured() throws Exception { + FileSystem mockFs = Mockito.mock(FileSystem.class); + + // Mock: No directories exist + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + + Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); + + // Should disable jar caching + Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + } + /** * An application master for accessing protected fields in {@link GobblinApplicationMaster} * for testing. From 74175e6fe5068dfa9d456b54bcd66fa122b3bff1 Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Tue, 16 Dec 2025 11:59:37 +0530 Subject: [PATCH 2/5] addressed comments --- .../gobblin/yarn/GobblinYarnAppLauncher.java | 93 ++++++++++---- .../yarn/GobblinYarnConfigurationKeys.java | 6 +- .../yarn/GobblinYarnAppLauncherTest.java | 118 ++++++++++++------ 3 files changed, 156 insertions(+), 61 deletions(-) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 87fa4d7ff70..21323a50be0 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -1000,35 +1000,86 @@ private static Config addDynamicConfig(Config config) throws IOException { } } + /** + * Configures jar caching by validating root directories and setting JAR_CACHE_DIR. + * + *

This method validates that the configured jar cache root directory exists on the filesystem + * before enabling jar caching. It follows this logic:

+ *
    + *
  1. Check if jar caching is enabled, if not return config as-is
  2. + *
  3. Read JAR_CACHE_ROOT_DIR and JAR_CACHE_SUFFIX from config
  4. + *
  5. Check if JAR_CACHE_ROOT_DIR exists on filesystem (e.g., /user/${user.to.proxy})
  6. + *
  7. If it exists: Set JAR_CACHE_DIR = JAR_CACHE_ROOT_DIR + JAR_CACHE_SUFFIX
  8. + *
  9. If not: Try FALLBACK_JAR_CACHE_ROOT_DIR with same logic
  10. + *
  11. If neither exists: Disable jar caching by setting JAR_CACHE_ENABLED to false
  12. + *
+ * + *

This ensures that the base user directory exists before attempting to cache jars in nested + * subdirectories, preventing runtime failures from misconfigured paths.

+ * + * @param config the application configuration + * @param fs the filesystem to use for validation + * @return updated config with JAR_CACHE_DIR set or JAR_CACHE_ENABLED disabled + * @throws IOException if filesystem operations fail + */ private static Config addJarCachingConfig(Config config, FileSystem fs) throws IOException { - // Check if JAR_CACHE_DIR is configured and exists - if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) { - Path jarCacheDir = new Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)); - if (fs.exists(jarCacheDir)) { - // JAR_CACHE_DIR exists, nothing to do - return config; + // Skip validation if jar caching is not enabled + boolean jarCachingEnabled = ConfigUtils.getBoolean(config, + GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, + GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); + + if (!jarCachingEnabled) { + LOGGER.info("Jar caching is not enabled, skipping jar cache directory validation"); + return config; + } + + String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ""); + + // Try primary root directory + if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) { + String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); + Config result = validateAndSetJarCacheDir(config, fs, rootDir, suffix, "JAR_CACHE_ROOT_DIR"); + if (result != null) { + return result; } - LOGGER.warn("Configured JAR_CACHE_DIR does not exist: {}", jarCacheDir); } - // JAR_CACHE_DIR doesn't exist or not configured, try fallback - if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR)) { - Path fallbackDir = new Path(config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR)); - if (fs.exists(fallbackDir)) { - LOGGER.info("Using FALLBACK_JAR_CACHE_DIR: {}", fallbackDir); - config = config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, - ConfigValueFactory.fromAnyRef(fallbackDir.toString())); - return config; + // Try fallback root directory + if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) { + String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); + Config result = validateAndSetJarCacheDir(config, fs, fallbackRootDir, suffix, "FALLBACK_JAR_CACHE_ROOT_DIR"); + if (result != null) { + return result; } - LOGGER.warn("Configured FALLBACK_JAR_CACHE_DIR does not exist: {}", fallbackDir); } - // Neither JAR_CACHE_DIR nor FALLBACK_JAR_CACHE_DIR exist, disable jar caching - LOGGER.warn("Neither JAR_CACHE_DIR nor FALLBACK_JAR_CACHE_DIR exist, disabling jar caching"); - config = config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, + // Neither root directory exists, disable jar caching + LOGGER.warn("No valid jar cache root directory found, disabling jar caching"); + return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(false)); - - return config; + } + + /** + * Validates if the root directory exists and sets JAR_CACHE_DIR if it does. + * + * @param config the configuration + * @param fs the filesystem to check + * @param rootDir the root directory to validate + * @param suffix the suffix to append to root directory + * @param configName the config name for logging + * @return updated config if valid, null otherwise + */ + private static Config validateAndSetJarCacheDir(Config config, FileSystem fs, String rootDir, + String suffix, String configName) throws IOException { + Path rootPath = new Path(rootDir); + if (fs.exists(rootPath)) { + String fullPath = new Path(rootPath, suffix).toString(); + LOGGER.info("{} exists: {}, setting JAR_CACHE_DIR to: {}", configName, rootDir, fullPath); + return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, + ConfigValueFactory.fromAnyRef(fullPath)); + } + LOGGER.warn("Configured {} does not exist: {}", configName, rootDir); + return null; } /** diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index 36c74845abd..da34de3bd8f 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -55,7 +55,11 @@ public class GobblinYarnConfigurationKeys { public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir"; - public static final String FALLBACK_JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir"; + public static final String JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.root.dir"; + + public static final String FALLBACK_JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.fallback.root.dir"; + + public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX + "jar.cache.suffix"; public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list"; diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index 28ec6c8c025..6159f2d4405 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -515,106 +515,146 @@ public String answer(InvocationOnMock invocation) { } @Test - public void testAddJarCachingConfig_JarCacheDirExists() throws Exception { + public void testAddJarCachingConfig_JarCachingDisabled() throws Exception { FileSystem mockFs = Mockito.mock(FileSystem.class); - String jarCacheDir = "/test/jar/cache/dir"; - // Mock: JAR_CACHE_DIR exists - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(true); + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(false)); + + Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); + + // Should skip validation and return config as-is when jar caching is disabled + Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + // Verify no filesystem checks were made + Mockito.verify(mockFs, Mockito.never()).exists(Mockito.any(org.apache.hadoop.fs.Path.class)); + } + + @Test + public void testAddJarCachingConfig_NoRootDirsConfigured() throws Exception { + FileSystem mockFs = Mockito.mock(FileSystem.class); + + // Mock: filesystem calls return false + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(jarCacheDir)) .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - // Should keep the original JAR_CACHE_DIR - Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), jarCacheDir); - Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + // Should disable jar caching when no root directories are configured + Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); } @Test - public void testAddJarCachingConfig_JarCacheDirNotExistsFallbackExists() throws Exception { + public void testAddJarCachingConfig_RootDirExists() throws Exception { FileSystem mockFs = Mockito.mock(FileSystem.class); - String jarCacheDir = "/test/jar/cache/dir"; - String fallbackDir = "/test/fallback/dir"; + String rootDir = "/user/testuser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; - // Mock: JAR_CACHE_DIR doesn't exist, but FALLBACK_JAR_CACHE_DIR exists + // Mock: Root directory exists Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenAnswer(new Answer() { @Override public Boolean answer(InvocationOnMock invocation) { org.apache.hadoop.fs.Path path = invocation.getArgument(0); - return path.toString().equals(fallbackDir); + return path.toString().equals(rootDir); } }); Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(jarCacheDir)) - .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(fallbackDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - // Should use fallback directory - Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), fallbackDir); + // Should set JAR_CACHE_DIR to root + suffix + Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), expectedFullPath); Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); } @Test - public void testAddJarCachingConfig_NeitherDirExists() throws Exception { + public void testAddJarCachingConfig_RootDirNotExistsFallbackRootExists() throws Exception { FileSystem mockFs = Mockito.mock(FileSystem.class); - String jarCacheDir = "/test/jar/cache/dir"; - String fallbackDir = "/test/fallback/dir"; + String rootDir = "/user/baduser"; + String fallbackRootDir = "/user/gooduser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/gooduser/.gobblinCache/gobblin-temporal/myproject"; - // Mock: Neither directory exists - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); + // Mock: Root dir doesn't exist, but fallback root exists + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + org.apache.hadoop.fs.Path path = invocation.getArgument(0); + // Only fallback root directory exists + return path.toString().equals(fallbackRootDir); + } + }); Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(jarCacheDir)) - .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(fallbackDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - // Should disable jar caching - Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + // Should set JAR_CACHE_DIR to fallback root + suffix + Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), expectedFullPath); + Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); } @Test - public void testAddJarCachingConfig_NoDirConfiguredFallbackExists() throws Exception { + public void testAddJarCachingConfig_NeitherRootDirExists() throws Exception { FileSystem mockFs = Mockito.mock(FileSystem.class); - String fallbackDir = "/test/fallback/dir"; + String rootDir = "/user/baduser1"; + String fallbackRootDir = "/user/baduser2"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; - // Mock: Fallback directory exists - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(true); + // Mock: Neither root directory exists + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(fallbackDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - // Should use fallback directory - Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), fallbackDir); - Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + // Should disable jar caching + Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); } @Test - public void testAddJarCachingConfig_NoDirConfigured() throws Exception { + public void testAddJarCachingConfig_OnlyFallbackRootConfigured() throws Exception { FileSystem mockFs = Mockito.mock(FileSystem.class); + String fallbackRootDir = "/user/testuser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; - // Mock: No directories exist - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); + // Mock: Fallback root directory exists + Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + org.apache.hadoop.fs.Path path = invocation.getArgument(0); + return path.toString().equals(fallbackRootDir); + } + }); Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - // Should disable jar caching - Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); + // Should set JAR_CACHE_DIR to fallback root + suffix + Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), expectedFullPath); + Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); } + /** * An application master for accessing protected fields in {@link GobblinApplicationMaster} * for testing. From a58515867cb83bc56d118648cdf7f6eff7724da3 Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Tue, 16 Dec 2025 12:09:50 +0530 Subject: [PATCH 3/5] refactor --- .../gobblin/yarn/GobblinYarnAppLauncher.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 21323a50be0..7572702b948 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -1002,7 +1002,7 @@ private static Config addDynamicConfig(Config config) throws IOException { /** * Configures jar caching by validating root directories and setting JAR_CACHE_DIR. - * + * *

This method validates that the configured jar cache root directory exists on the filesystem * before enabling jar caching. It follows this logic:

*
    @@ -1013,10 +1013,10 @@ private static Config addDynamicConfig(Config config) throws IOException { *
  1. If not: Try FALLBACK_JAR_CACHE_ROOT_DIR with same logic
  2. *
  3. If neither exists: Disable jar caching by setting JAR_CACHE_ENABLED to false
  4. *
- * + * *

This ensures that the base user directory exists before attempting to cache jars in nested * subdirectories, preventing runtime failures from misconfigured paths.

- * + * * @param config the application configuration * @param fs the filesystem to use for validation * @return updated config with JAR_CACHE_DIR set or JAR_CACHE_ENABLED disabled @@ -1024,44 +1024,44 @@ private static Config addDynamicConfig(Config config) throws IOException { */ private static Config addJarCachingConfig(Config config, FileSystem fs) throws IOException { // Skip validation if jar caching is not enabled - boolean jarCachingEnabled = ConfigUtils.getBoolean(config, - GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, + boolean jarCachingEnabled = ConfigUtils.getBoolean(config, + GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); - + if (!jarCachingEnabled) { LOGGER.info("Jar caching is not enabled, skipping jar cache directory validation"); return config; } - + String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ""); - + // Try primary root directory if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) { String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); - Config result = validateAndSetJarCacheDir(config, fs, rootDir, suffix, "JAR_CACHE_ROOT_DIR"); + Config result = validateAndSetJarCacheDir(config, fs, rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); if (result != null) { return result; } } - + // Try fallback root directory if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) { String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); - Config result = validateAndSetJarCacheDir(config, fs, fallbackRootDir, suffix, "FALLBACK_JAR_CACHE_ROOT_DIR"); + Config result = validateAndSetJarCacheDir(config, fs, fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); if (result != null) { return result; } } - + // Neither root directory exists, disable jar caching LOGGER.warn("No valid jar cache root directory found, disabling jar caching"); - return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, + return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(false)); } - + /** * Validates if the root directory exists and sets JAR_CACHE_DIR if it does. - * + * * @param config the configuration * @param fs the filesystem to check * @param rootDir the root directory to validate @@ -1069,13 +1069,13 @@ private static Config addJarCachingConfig(Config config, FileSystem fs) throws I * @param configName the config name for logging * @return updated config if valid, null otherwise */ - private static Config validateAndSetJarCacheDir(Config config, FileSystem fs, String rootDir, + private static Config validateAndSetJarCacheDir(Config config, FileSystem fs, String rootDir, String suffix, String configName) throws IOException { Path rootPath = new Path(rootDir); if (fs.exists(rootPath)) { String fullPath = new Path(rootPath, suffix).toString(); LOGGER.info("{} exists: {}, setting JAR_CACHE_DIR to: {}", configName, rootDir, fullPath); - return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, + return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(fullPath)); } LOGGER.warn("Configured {} does not exist: {}", configName, rootDir); From 03eeb93aa0071145c3bc8f359fd122b60798d3cc Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Wed, 17 Dec 2025 19:28:53 +0530 Subject: [PATCH 4/5] resolved comments --- .../gobblin/temporal/yarn/YarnService.java | 2 +- .../gobblin/yarn/GobblinYarnAppLauncher.java | 89 +------ .../yarn/GobblinYarnConfigurationKeys.java | 1 - .../gobblin/yarn/JarCachePathResolver.java | 127 ++++++++++ .../apache/gobblin/yarn/YarnHelixUtils.java | 21 +- .../yarn/GobblinYarnAppLauncherTest.java | 141 ----------- .../yarn/JarCachePathResolverTest.java | 224 ++++++++++++++++++ .../gobblin/yarn/YarnHelixUtilsTest.java | 12 +- 8 files changed, 376 insertions(+), 241 deletions(-) create mode 100644 gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java create mode 100644 gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 2818982baba..d28d94e9c9a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -394,7 +394,7 @@ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo contain Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); // Used for -SNAPSHOT versions of jars Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); - Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; + Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir; Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir); LOGGER.info("Container execution-private jars root dir: " + containerJarsUnsharedDir); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 7572702b948..4311e4ad77a 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -323,7 +323,6 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration try { config = addDynamicConfig(config); - config = addJarCachingConfig(config, this.fs); outputConfigToFile(config); } catch (SchemaRegistryException e) { throw new IOException(e); @@ -609,7 +608,7 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(), resource.getMemory()))); if (this.jarCacheEnabled) { - Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs); // Retain at least the current and last month's jars to handle executions running for ~30 days max boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs); if (!cleanedSuccessfully) { @@ -676,7 +675,7 @@ private Resource prepareContainerResource(GetNewApplicationResponse newApplicati private Map addAppMasterLocalResources(ApplicationId applicationId) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString()); - Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; + Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir; Path appMasterWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); Path appMasterJarsCacheDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); @@ -731,7 +730,7 @@ private Map addAppMasterLocalResources(ApplicationId appl private void addContainerLocalResources(ApplicationId applicationId) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString()); - Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; + Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir; Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); Path containerJarsRootDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); LOGGER.info("Configured Container work directory to: {}", containerWorkDir); @@ -1000,88 +999,6 @@ private static Config addDynamicConfig(Config config) throws IOException { } } - /** - * Configures jar caching by validating root directories and setting JAR_CACHE_DIR. - * - *

This method validates that the configured jar cache root directory exists on the filesystem - * before enabling jar caching. It follows this logic:

- *
    - *
  1. Check if jar caching is enabled, if not return config as-is
  2. - *
  3. Read JAR_CACHE_ROOT_DIR and JAR_CACHE_SUFFIX from config
  4. - *
  5. Check if JAR_CACHE_ROOT_DIR exists on filesystem (e.g., /user/${user.to.proxy})
  6. - *
  7. If it exists: Set JAR_CACHE_DIR = JAR_CACHE_ROOT_DIR + JAR_CACHE_SUFFIX
  8. - *
  9. If not: Try FALLBACK_JAR_CACHE_ROOT_DIR with same logic
  10. - *
  11. If neither exists: Disable jar caching by setting JAR_CACHE_ENABLED to false
  12. - *
- * - *

This ensures that the base user directory exists before attempting to cache jars in nested - * subdirectories, preventing runtime failures from misconfigured paths.

- * - * @param config the application configuration - * @param fs the filesystem to use for validation - * @return updated config with JAR_CACHE_DIR set or JAR_CACHE_ENABLED disabled - * @throws IOException if filesystem operations fail - */ - private static Config addJarCachingConfig(Config config, FileSystem fs) throws IOException { - // Skip validation if jar caching is not enabled - boolean jarCachingEnabled = ConfigUtils.getBoolean(config, - GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, - GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); - - if (!jarCachingEnabled) { - LOGGER.info("Jar caching is not enabled, skipping jar cache directory validation"); - return config; - } - - String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ""); - - // Try primary root directory - if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) { - String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); - Config result = validateAndSetJarCacheDir(config, fs, rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); - if (result != null) { - return result; - } - } - - // Try fallback root directory - if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) { - String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); - Config result = validateAndSetJarCacheDir(config, fs, fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); - if (result != null) { - return result; - } - } - - // Neither root directory exists, disable jar caching - LOGGER.warn("No valid jar cache root directory found, disabling jar caching"); - return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, - ConfigValueFactory.fromAnyRef(false)); - } - - /** - * Validates if the root directory exists and sets JAR_CACHE_DIR if it does. - * - * @param config the configuration - * @param fs the filesystem to check - * @param rootDir the root directory to validate - * @param suffix the suffix to append to root directory - * @param configName the config name for logging - * @return updated config if valid, null otherwise - */ - private static Config validateAndSetJarCacheDir(Config config, FileSystem fs, String rootDir, - String suffix, String configName) throws IOException { - Path rootPath = new Path(rootDir); - if (fs.exists(rootPath)) { - String fullPath = new Path(rootPath, suffix).toString(); - LOGGER.info("{} exists: {}, setting JAR_CACHE_DIR to: {}", configName, rootDir, fullPath); - return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, - ConfigValueFactory.fromAnyRef(fullPath)); - } - LOGGER.warn("Configured {} does not exist: {}", configName, rootDir); - return null; - } - /** * Write the config to the file specified with the config key {@value GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it * is configured. diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index da34de3bd8f..00af259a227 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -61,7 +61,6 @@ public class GobblinYarnConfigurationKeys { public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX + "jar.cache.suffix"; - public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list"; /** diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java new file mode 100644 index 00000000000..888f96df4e8 --- /dev/null +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.yarn; + +import java.io.IOException; +import java.text.SimpleDateFormat; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; + +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Resolves the jar cache directory path by validating filesystem state and applying fallback logic. + * + *

This class separates the concern of computing the resolved jar cache directory from config initialization, + * making it easier to debug and test. The resolution happens lazily when {@link #resolveJarCachePath()} is called.

+ * + *

Resolution logic:

+ *
    + *
  1. If JAR_CACHE_DIR is explicitly configured, uses it as-is (for backward compatibility)
  2. + *
  3. Otherwise, validates JAR_CACHE_ROOT_DIR exists on filesystem
  4. + *
  5. If not found, tries FALLBACK_JAR_CACHE_ROOT_DIR
  6. + *
  7. Combines validated root with JAR_CACHE_SUFFIX to form final path
  8. + *
  9. If no valid root found, throws IOException
  10. + *
+ */ +public class JarCachePathResolver { + private static final Logger LOGGER = LoggerFactory.getLogger(JarCachePathResolver.class); + + private final Config config; + private final FileSystem fs; + + public JarCachePathResolver(Config config, FileSystem fs) { + this.config = config; + this.fs = fs; + } + + /** + * Resolves the jar cache directory path, applying validation and fallback logic. + * + * @return the resolved jar cache directory path + * @throws IOException if filesystem operations fail or no valid root directory is found + */ + public Path resolveJarCachePath() throws IOException { + // If JAR_CACHE_DIR is explicitly set, use it as-is (backward compatibility) + if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) { + String explicitCacheDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR); + LOGGER.info("Using explicitly configured JAR_CACHE_DIR: {}", explicitCacheDir); + return new Path(explicitCacheDir); + } + + String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ""); + + // Try primary root directory + if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) { + String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); + Path resolvedPath = validateAndComputePath(rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); + if (resolvedPath != null) { + return resolvedPath; + } + } + + // Try fallback root directory + if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) { + String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); + Path resolvedPath = validateAndComputePath(fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); + if (resolvedPath != null) { + return resolvedPath; + } + } + + // No valid root directory found - fail fast + throw new IOException("No valid jar cache root directory found. Please configure " + + GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or " + + GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR + + " with a valid directory path, or explicitly set " + + GobblinYarnConfigurationKeys.JAR_CACHE_DIR); + } + + /** + * Validates if the root directory exists and computes the full path with suffix. + * + * @param rootDir the root directory to validate + * @param suffix the suffix to append + * @param configName the config name for logging + * @return the computed path if valid, null otherwise + * @throws IOException if filesystem operations fail + */ + @VisibleForTesting + Path validateAndComputePath(String rootDir, String suffix, String configName) throws IOException { + Path rootPath = new Path(rootDir); + if (fs.exists(rootPath)) { + // Strip leading '/' from suffix to ensure proper concatenation + // Otherwise, Hadoop Path treats it as absolute path and ignores the parent + String normalizedSuffix = suffix.startsWith("/") ? suffix.substring(1) : suffix; + Path fullPath = new Path(rootPath, normalizedSuffix); + LOGGER.info("{} exists: {}, resolved JAR_CACHE_DIR to: {}", configName, rootDir, fullPath); + return fullPath; + } + LOGGER.warn("Configured {} does not exist: {}", configName, rootDir); + return null; + } + +} + diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java index d38ebe52ee1..705253be78c 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java @@ -209,15 +209,20 @@ public static void setYarnClassPath(Config config, Configuration yarnConfigurati /** * Calculate the path of a jar cache on HDFS, which is retained on a monthly basis. * Should be used in conjunction with {@link #retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a periodic basis - * @param config - * @return - * @throws IOException + * @param config the configuration + * @param fs the filesystem to use for validation + * @return the monthly jar cache path + * @throws IOException if filesystem operations fail */ - public static Path calculatePerMonthJarCachePath(Config config) throws IOException { - Path jarsCacheDirMonthly = new Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)); - String monthSuffix = new SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY)); - return new Path(jarsCacheDirMonthly, monthSuffix); - + public static Path calculatePerMonthJarCachePath(Config config, FileSystem fs) throws IOException { + // Use JarCachePathResolver to resolve the base jar cache directory + JarCachePathResolver resolver = new JarCachePathResolver(config, fs); + Path baseCacheDir = resolver.resolveJarCachePath(); + + // Append monthly suffix + String monthSuffix = new SimpleDateFormat("yyyy-MM").format( + config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY)); + return new Path(baseCacheDir, monthSuffix); } /** diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index 6159f2d4405..622c6d56479 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -514,147 +514,6 @@ public String answer(InvocationOnMock invocation) { Assert.assertFalse(config.hasPath(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID)); } - @Test - public void testAddJarCachingConfig_JarCachingDisabled() throws Exception { - FileSystem mockFs = Mockito.mock(FileSystem.class); - - Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(false)); - - Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - - // Should skip validation and return config as-is when jar caching is disabled - Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); - // Verify no filesystem checks were made - Mockito.verify(mockFs, Mockito.never()).exists(Mockito.any(org.apache.hadoop.fs.Path.class)); - } - - @Test - public void testAddJarCachingConfig_NoRootDirsConfigured() throws Exception { - FileSystem mockFs = Mockito.mock(FileSystem.class); - - // Mock: filesystem calls return false - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); - - Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); - - Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - - // Should disable jar caching when no root directories are configured - Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); - } - - @Test - public void testAddJarCachingConfig_RootDirExists() throws Exception { - FileSystem mockFs = Mockito.mock(FileSystem.class); - String rootDir = "/user/testuser"; - String suffix = ".gobblinCache/gobblin-temporal/myproject"; - String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; - - // Mock: Root directory exists - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenAnswer(new Answer() { - @Override - public Boolean answer(InvocationOnMock invocation) { - org.apache.hadoop.fs.Path path = invocation.getArgument(0); - return path.toString().equals(rootDir); - } - }); - - Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); - - Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - - // Should set JAR_CACHE_DIR to root + suffix - Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), expectedFullPath); - Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); - } - - @Test - public void testAddJarCachingConfig_RootDirNotExistsFallbackRootExists() throws Exception { - FileSystem mockFs = Mockito.mock(FileSystem.class); - String rootDir = "/user/baduser"; - String fallbackRootDir = "/user/gooduser"; - String suffix = ".gobblinCache/gobblin-temporal/myproject"; - String expectedFullPath = "/user/gooduser/.gobblinCache/gobblin-temporal/myproject"; - - // Mock: Root dir doesn't exist, but fallback root exists - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenAnswer(new Answer() { - @Override - public Boolean answer(InvocationOnMock invocation) { - org.apache.hadoop.fs.Path path = invocation.getArgument(0); - // Only fallback root directory exists - return path.toString().equals(fallbackRootDir); - } - }); - - Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) - .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); - - Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - - // Should set JAR_CACHE_DIR to fallback root + suffix - Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), expectedFullPath); - Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); - } - - @Test - public void testAddJarCachingConfig_NeitherRootDirExists() throws Exception { - FileSystem mockFs = Mockito.mock(FileSystem.class); - String rootDir = "/user/baduser1"; - String fallbackRootDir = "/user/baduser2"; - String suffix = ".gobblinCache/gobblin-temporal/myproject"; - - // Mock: Neither root directory exists - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenReturn(false); - - Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) - .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); - - Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - - // Should disable jar caching - Assert.assertFalse(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); - } - - @Test - public void testAddJarCachingConfig_OnlyFallbackRootConfigured() throws Exception { - FileSystem mockFs = Mockito.mock(FileSystem.class); - String fallbackRootDir = "/user/testuser"; - String suffix = ".gobblinCache/gobblin-temporal/myproject"; - String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; - - // Mock: Fallback root directory exists - Mockito.when(mockFs.exists(Mockito.any(org.apache.hadoop.fs.Path.class))).thenAnswer(new Answer() { - @Override - public Boolean answer(InvocationOnMock invocation) { - org.apache.hadoop.fs.Path path = invocation.getArgument(0); - return path.toString().equals(fallbackRootDir); - } - }); - - Config config = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); - - Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs); - - // Should set JAR_CACHE_DIR to fallback root + suffix - Assert.assertEquals(result.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR), expectedFullPath); - Assert.assertTrue(result.getBoolean(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED)); - } - - /** * An application master for accessing protected fields in {@link GobblinApplicationMaster} * for testing. diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java new file mode 100644 index 00000000000..6715b2c49bb --- /dev/null +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.yarn; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + + +/** + * Tests for {@link JarCachePathResolver}. + */ +public class JarCachePathResolverTest { + + + @Test + public void testResolveJarCachePath_ExplicitJarCacheDir() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String explicitCacheDir = "/explicit/cache/dir"; + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(explicitCacheDir)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); + Path result = resolver.resolveJarCachePath(); + + // Should use explicitly configured JAR_CACHE_DIR + Assert.assertEquals(result.toString(), explicitCacheDir); + // Verify no filesystem checks were made (explicit config is used as-is) + Mockito.verify(mockFs, Mockito.never()).exists(Mockito.any(Path.class)); + } + + @Test + public void testResolveJarCachePath_RootDirExists() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); + Path result = resolver.resolveJarCachePath(); + + // Should resolve to root + suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test + public void testResolveJarCachePath_RootDirNotExistsFallbackRootExists() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/baduser"; + String fallbackRootDir = "/user/gooduser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/gooduser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Root dir doesn't exist, but fallback root exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + // Only fallback root directory exists + return path.toString().equals(fallbackRootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); + Path result = resolver.resolveJarCachePath(); + + // Should resolve to fallback root + suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test(expectedExceptions = IOException.class, + expectedExceptionsMessageRegExp = ".*No valid jar cache root directory found.*") + public void testResolveJarCachePath_NeitherRootDirExists() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/baduser1"; + String fallbackRootDir = "/user/baduser2"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + + // Mock: Neither root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenReturn(false); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); + // Should throw IOException when no valid root directory found + resolver.resolveJarCachePath(); + } + + @Test + public void testResolveJarCachePath_OnlyFallbackRootConfigured() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String fallbackRootDir = "/user/testuser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Fallback root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(fallbackRootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); + Path result = resolver.resolveJarCachePath(); + + // Should resolve to fallback root + suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test(expectedExceptions = IOException.class, + expectedExceptionsMessageRegExp = ".*No valid jar cache root directory found.*") + public void testResolveJarCachePath_NoRootDirsConfigured() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); + // Should throw IOException when no root directories are configured + resolver.resolveJarCachePath(); + } + + @Test + public void testResolveJarCachePath_SuffixNormalization() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + String suffixWithLeadingSlash = "/.gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffixWithLeadingSlash)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); + Path result = resolver.resolveJarCachePath(); + + // Should normalize suffix by stripping leading '/' to avoid absolute path issue + Assert.assertEquals(result.toString(), expectedFullPath); + } + +} + diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java index 1030e7b0241..df37a0cf8a7 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java @@ -27,6 +27,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.junit.Assert; +import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -71,10 +72,12 @@ public void testUpdateToken() @Test public void testGetJarCachePath() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); Config config = ConfigFactory.empty() .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(1726074000013L)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef("/tmp")); - Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config); + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef("/tmp")) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config, mockFs); Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09")); } @@ -84,8 +87,9 @@ public void retainLatestKJarCachePaths() throws IOException { FileSystem fs = FileSystem.getLocal(new Configuration()); Config config = ConfigFactory.empty() .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(1726074000013L)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp")); - Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config); + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp")) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config, fs); fs.mkdirs(jarCachePath); fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08")); fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07")); From e83b2b4a0b6a6e1199b44ad30b64e99e715af300 Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Wed, 17 Dec 2025 19:38:56 +0530 Subject: [PATCH 5/5] refactor --- .../gobblin/yarn/JarCachePathResolver.java | 46 ++++++----- .../apache/gobblin/yarn/YarnHelixUtils.java | 3 +- .../yarn/JarCachePathResolverTest.java | 80 +++++++++++++++---- 3 files changed, 91 insertions(+), 38 deletions(-) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java index 888f96df4e8..302791ab9e4 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java @@ -18,7 +18,6 @@ package org.apache.gobblin.yarn; import java.io.IOException; -import java.text.SimpleDateFormat; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,51 +31,53 @@ /** - * Resolves the jar cache directory path by validating filesystem state and applying fallback logic. - * - *

This class separates the concern of computing the resolved jar cache directory from config initialization, - * making it easier to debug and test. The resolution happens lazily when {@link #resolveJarCachePath()} is called.

+ * Utility class for resolving the jar cache directory path by validating filesystem on path existence and applying fallback logic. * *

Resolution logic:

*
    *
  1. If JAR_CACHE_DIR is explicitly configured, uses it as-is (for backward compatibility)
  2. *
  3. Otherwise, validates JAR_CACHE_ROOT_DIR exists on filesystem
  4. *
  5. If not found, tries FALLBACK_JAR_CACHE_ROOT_DIR
  6. - *
  7. Combines validated root with JAR_CACHE_SUFFIX to form final path
  8. + *
  9. Combines validated root with JAR_CACHE_SUFFIX (or default suffix) to form final path
  10. *
  11. If no valid root found, throws IOException
  12. *
*/ public class JarCachePathResolver { private static final Logger LOGGER = LoggerFactory.getLogger(JarCachePathResolver.class); + // Note: Trailing slash will be normalized away by Hadoop Path + private static final String DEFAULT_JAR_CACHE_SUFFIX = ".gobblinCache/gobblin-temporal"; - private final Config config; - private final FileSystem fs; - - public JarCachePathResolver(Config config, FileSystem fs) { - this.config = config; - this.fs = fs; + // Private constructor to prevent instantiation + private JarCachePathResolver() { } /** * Resolves the jar cache directory path, applying validation and fallback logic. * + * @param config the configuration + * @param fs the filesystem to use for validation * @return the resolved jar cache directory path * @throws IOException if filesystem operations fail or no valid root directory is found */ - public Path resolveJarCachePath() throws IOException { - // If JAR_CACHE_DIR is explicitly set, use it as-is (backward compatibility) + public static Path resolveJarCachePath(Config config, FileSystem fs) throws IOException { + // If JAR_CACHE_DIR is explicitly set, use it as-is if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) { String explicitCacheDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR); LOGGER.info("Using explicitly configured JAR_CACHE_DIR: {}", explicitCacheDir); return new Path(explicitCacheDir); } + // Get suffix from config, or use default if not configured or empty String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ""); + if (suffix == null || suffix.trim().isEmpty()) { + LOGGER.info("JAR_CACHE_SUFFIX not configured or empty, using default: {}", DEFAULT_JAR_CACHE_SUFFIX); + suffix = DEFAULT_JAR_CACHE_SUFFIX; + } // Try primary root directory if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) { String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); - Path resolvedPath = validateAndComputePath(rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); + Path resolvedPath = validateAndComputePath(fs, rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); if (resolvedPath != null) { return resolvedPath; } @@ -85,23 +86,24 @@ public Path resolveJarCachePath() throws IOException { // Try fallback root directory if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) { String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); - Path resolvedPath = validateAndComputePath(fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); + Path resolvedPath = validateAndComputePath(fs, fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); if (resolvedPath != null) { return resolvedPath; } } - // No valid root directory found - fail fast - throw new IOException("No valid jar cache root directory found. Please configure " - + GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or " - + GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR - + " with a valid directory path, or explicitly set " + // No valid root directory found - fail + throw new IOException("No valid jar cache root directory found. Please configure " + + GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or " + + GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR + + " with a valid directory path, or explicitly set " + GobblinYarnConfigurationKeys.JAR_CACHE_DIR); } /** * Validates if the root directory exists and computes the full path with suffix. * + * @param fs the filesystem to check * @param rootDir the root directory to validate * @param suffix the suffix to append * @param configName the config name for logging @@ -109,7 +111,7 @@ public Path resolveJarCachePath() throws IOException { * @throws IOException if filesystem operations fail */ @VisibleForTesting - Path validateAndComputePath(String rootDir, String suffix, String configName) throws IOException { + static Path validateAndComputePath(FileSystem fs, String rootDir, String suffix, String configName) throws IOException { Path rootPath = new Path(rootDir); if (fs.exists(rootPath)) { // Strip leading '/' from suffix to ensure proper concatenation diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java index 705253be78c..1a4113dbeed 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java @@ -216,8 +216,7 @@ public static void setYarnClassPath(Config config, Configuration yarnConfigurati */ public static Path calculatePerMonthJarCachePath(Config config, FileSystem fs) throws IOException { // Use JarCachePathResolver to resolve the base jar cache directory - JarCachePathResolver resolver = new JarCachePathResolver(config, fs); - Path baseCacheDir = resolver.resolveJarCachePath(); + Path baseCacheDir = JarCachePathResolver.resolveJarCachePath(config, fs); // Append monthly suffix String monthSuffix = new SimpleDateFormat("yyyy-MM").format( diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java index 6715b2c49bb..466543be1c8 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java @@ -49,8 +49,7 @@ public void testResolveJarCachePath_ExplicitJarCacheDir() throws IOException { .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); - JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); - Path result = resolver.resolveJarCachePath(); + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); // Should use explicitly configured JAR_CACHE_DIR Assert.assertEquals(result.toString(), explicitCacheDir); @@ -81,8 +80,7 @@ public Boolean answer(InvocationOnMock invocation) { .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); - JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); - Path result = resolver.resolveJarCachePath(); + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); // Should resolve to root + suffix Assert.assertEquals(result.toString(), expectedFullPath); @@ -114,8 +112,7 @@ public Boolean answer(InvocationOnMock invocation) { .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); - JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); - Path result = resolver.resolveJarCachePath(); + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); // Should resolve to fallback root + suffix Assert.assertEquals(result.toString(), expectedFullPath); @@ -140,9 +137,8 @@ public void testResolveJarCachePath_NeitherRootDirExists() throws IOException { .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); - JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); // Should throw IOException when no valid root directory found - resolver.resolveJarCachePath(); + JarCachePathResolver.resolveJarCachePath(config, mockFs); } @Test @@ -168,8 +164,7 @@ public Boolean answer(InvocationOnMock invocation) { .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); - JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); - Path result = resolver.resolveJarCachePath(); + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); // Should resolve to fallback root + suffix Assert.assertEquals(result.toString(), expectedFullPath); @@ -185,9 +180,67 @@ public void testResolveJarCachePath_NoRootDirsConfigured() throws IOException { .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); - JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); // Should throw IOException when no root directories are configured - resolver.resolveJarCachePath(); + JarCachePathResolver.resolveJarCachePath(config, mockFs); + } + + @Test + public void testResolveJarCachePath_DefaultSuffix() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + // Note: Hadoop Path normalizes and removes trailing slashes + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + // Config without JAR_CACHE_SUFFIX - should use default + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should use default suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test + public void testResolveJarCachePath_EmptySuffixUsesDefault() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + // Note: Hadoop Path normalizes and removes trailing slashes + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + // Config with empty JAR_CACHE_SUFFIX - should use default + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef("")) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should use default suffix when configured suffix is empty + Assert.assertEquals(result.toString(), expectedFullPath); } @Test @@ -213,8 +266,7 @@ public Boolean answer(InvocationOnMock invocation) { .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); - JarCachePathResolver resolver = new JarCachePathResolver(config, mockFs); - Path result = resolver.resolveJarCachePath(); + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); // Should normalize suffix by stripping leading '/' to avoid absolute path issue Assert.assertEquals(result.toString(), expectedFullPath);