diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 2818982baba..d28d94e9c9a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -394,7 +394,7 @@ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo contain Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); // Used for -SNAPSHOT versions of jars Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); - Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; + Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir; Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir); LOGGER.info("Container execution-private jars root dir: " + containerJarsUnsharedDir); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index d2f2180fff9..4311e4ad77a 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -608,7 +608,7 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(), resource.getMemory()))); if (this.jarCacheEnabled) { - Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs); // Retain at least the current and last month's jars to handle executions running for ~30 days max boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs); if (!cleanedSuccessfully) { @@ -675,7 +675,7 @@ private Resource prepareContainerResource(GetNewApplicationResponse newApplicati private Map addAppMasterLocalResources(ApplicationId applicationId) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString()); - Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; + Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir; Path appMasterWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); Path appMasterJarsCacheDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); @@ -730,7 +730,7 @@ private Map addAppMasterLocalResources(ApplicationId appl private void addContainerLocalResources(ApplicationId applicationId) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString()); - Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; + Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir; Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); Path containerJarsRootDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); LOGGER.info("Configured Container work directory to: {}", containerWorkDir); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index 10bc9f97093..00af259a227 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -55,6 +55,12 @@ public class GobblinYarnConfigurationKeys { public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir"; + public static final String JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.root.dir"; + + public static final String FALLBACK_JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.fallback.root.dir"; + + public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX + "jar.cache.suffix"; + public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list"; /** diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java new file mode 100644 index 00000000000..302791ab9e4 --- /dev/null +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.yarn; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; + +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Utility class for resolving the jar cache directory path by validating filesystem on path existence and applying fallback logic. + * + *

Resolution logic:

+ *
    + *
  1. If JAR_CACHE_DIR is explicitly configured, uses it as-is (for backward compatibility)
  2. + *
  3. Otherwise, validates JAR_CACHE_ROOT_DIR exists on filesystem
  4. + *
  5. If not found, tries FALLBACK_JAR_CACHE_ROOT_DIR
  6. + *
  7. Combines validated root with JAR_CACHE_SUFFIX (or default suffix) to form final path
  8. + *
  9. If no valid root found, throws IOException
  10. + *
+ */ +public class JarCachePathResolver { + private static final Logger LOGGER = LoggerFactory.getLogger(JarCachePathResolver.class); + // Note: Trailing slash will be normalized away by Hadoop Path + private static final String DEFAULT_JAR_CACHE_SUFFIX = ".gobblinCache/gobblin-temporal"; + + // Private constructor to prevent instantiation + private JarCachePathResolver() { + } + + /** + * Resolves the jar cache directory path, applying validation and fallback logic. + * + * @param config the configuration + * @param fs the filesystem to use for validation + * @return the resolved jar cache directory path + * @throws IOException if filesystem operations fail or no valid root directory is found + */ + public static Path resolveJarCachePath(Config config, FileSystem fs) throws IOException { + // If JAR_CACHE_DIR is explicitly set, use it as-is + if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) { + String explicitCacheDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR); + LOGGER.info("Using explicitly configured JAR_CACHE_DIR: {}", explicitCacheDir); + return new Path(explicitCacheDir); + } + + // Get suffix from config, or use default if not configured or empty + String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ""); + if (suffix == null || suffix.trim().isEmpty()) { + LOGGER.info("JAR_CACHE_SUFFIX not configured or empty, using default: {}", DEFAULT_JAR_CACHE_SUFFIX); + suffix = DEFAULT_JAR_CACHE_SUFFIX; + } + + // Try primary root directory + if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) { + String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); + Path resolvedPath = validateAndComputePath(fs, rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR); + if (resolvedPath != null) { + return resolvedPath; + } + } + + // Try fallback root directory + if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) { + String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); + Path resolvedPath = validateAndComputePath(fs, fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR); + if (resolvedPath != null) { + return resolvedPath; + } + } + + // No valid root directory found - fail + throw new IOException("No valid jar cache root directory found. Please configure " + + GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or " + + GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR + + " with a valid directory path, or explicitly set " + + GobblinYarnConfigurationKeys.JAR_CACHE_DIR); + } + + /** + * Validates if the root directory exists and computes the full path with suffix. + * + * @param fs the filesystem to check + * @param rootDir the root directory to validate + * @param suffix the suffix to append + * @param configName the config name for logging + * @return the computed path if valid, null otherwise + * @throws IOException if filesystem operations fail + */ + @VisibleForTesting + static Path validateAndComputePath(FileSystem fs, String rootDir, String suffix, String configName) throws IOException { + Path rootPath = new Path(rootDir); + if (fs.exists(rootPath)) { + // Strip leading '/' from suffix to ensure proper concatenation + // Otherwise, Hadoop Path treats it as absolute path and ignores the parent + String normalizedSuffix = suffix.startsWith("/") ? suffix.substring(1) : suffix; + Path fullPath = new Path(rootPath, normalizedSuffix); + LOGGER.info("{} exists: {}, resolved JAR_CACHE_DIR to: {}", configName, rootDir, fullPath); + return fullPath; + } + LOGGER.warn("Configured {} does not exist: {}", configName, rootDir); + return null; + } + +} + diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java index d38ebe52ee1..1a4113dbeed 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java @@ -209,15 +209,19 @@ public static void setYarnClassPath(Config config, Configuration yarnConfigurati /** * Calculate the path of a jar cache on HDFS, which is retained on a monthly basis. * Should be used in conjunction with {@link #retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a periodic basis - * @param config - * @return - * @throws IOException + * @param config the configuration + * @param fs the filesystem to use for validation + * @return the monthly jar cache path + * @throws IOException if filesystem operations fail */ - public static Path calculatePerMonthJarCachePath(Config config) throws IOException { - Path jarsCacheDirMonthly = new Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)); - String monthSuffix = new SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY)); - return new Path(jarsCacheDirMonthly, monthSuffix); - + public static Path calculatePerMonthJarCachePath(Config config, FileSystem fs) throws IOException { + // Use JarCachePathResolver to resolve the base jar cache directory + Path baseCacheDir = JarCachePathResolver.resolveJarCachePath(config, fs); + + // Append monthly suffix + String monthSuffix = new SimpleDateFormat("yyyy-MM").format( + config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY)); + return new Path(baseCacheDir, monthSuffix); } /** diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java new file mode 100644 index 00000000000..466543be1c8 --- /dev/null +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.yarn; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + + +/** + * Tests for {@link JarCachePathResolver}. + */ +public class JarCachePathResolverTest { + + + @Test + public void testResolveJarCachePath_ExplicitJarCacheDir() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String explicitCacheDir = "/explicit/cache/dir"; + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(explicitCacheDir)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should use explicitly configured JAR_CACHE_DIR + Assert.assertEquals(result.toString(), explicitCacheDir); + // Verify no filesystem checks were made (explicit config is used as-is) + Mockito.verify(mockFs, Mockito.never()).exists(Mockito.any(Path.class)); + } + + @Test + public void testResolveJarCachePath_RootDirExists() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should resolve to root + suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test + public void testResolveJarCachePath_RootDirNotExistsFallbackRootExists() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/baduser"; + String fallbackRootDir = "/user/gooduser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/gooduser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Root dir doesn't exist, but fallback root exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + // Only fallback root directory exists + return path.toString().equals(fallbackRootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should resolve to fallback root + suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test(expectedExceptions = IOException.class, + expectedExceptionsMessageRegExp = ".*No valid jar cache root directory found.*") + public void testResolveJarCachePath_NeitherRootDirExists() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/baduser1"; + String fallbackRootDir = "/user/baduser2"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + + // Mock: Neither root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenReturn(false); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + // Should throw IOException when no valid root directory found + JarCachePathResolver.resolveJarCachePath(config, mockFs); + } + + @Test + public void testResolveJarCachePath_OnlyFallbackRootConfigured() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String fallbackRootDir = "/user/testuser"; + String suffix = ".gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Fallback root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(fallbackRootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(fallbackRootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffix)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should resolve to fallback root + suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test(expectedExceptions = IOException.class, + expectedExceptionsMessageRegExp = ".*No valid jar cache root directory found.*") + public void testResolveJarCachePath_NoRootDirsConfigured() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + // Should throw IOException when no root directories are configured + JarCachePathResolver.resolveJarCachePath(config, mockFs); + } + + @Test + public void testResolveJarCachePath_DefaultSuffix() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + // Note: Hadoop Path normalizes and removes trailing slashes + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + // Config without JAR_CACHE_SUFFIX - should use default + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should use default suffix + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test + public void testResolveJarCachePath_EmptySuffixUsesDefault() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + // Note: Hadoop Path normalizes and removes trailing slashes + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + // Config with empty JAR_CACHE_SUFFIX - should use default + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef("")) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should use default suffix when configured suffix is empty + Assert.assertEquals(result.toString(), expectedFullPath); + } + + @Test + public void testResolveJarCachePath_SuffixNormalization() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); + String rootDir = "/user/testuser"; + String suffixWithLeadingSlash = "/.gobblinCache/gobblin-temporal/myproject"; + String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal/myproject"; + + // Mock: Root directory exists + Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) { + Path path = invocation.getArgument(0); + return path.toString().equals(rootDir); + } + }); + + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, ConfigValueFactory.fromAnyRef(suffixWithLeadingSlash)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs); + + // Should normalize suffix by stripping leading '/' to avoid absolute path issue + Assert.assertEquals(result.toString(), expectedFullPath); + } + +} + diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java index 1030e7b0241..df37a0cf8a7 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java @@ -27,6 +27,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.junit.Assert; +import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -71,10 +72,12 @@ public void testUpdateToken() @Test public void testGetJarCachePath() throws IOException { + FileSystem mockFs = Mockito.mock(FileSystem.class); Config config = ConfigFactory.empty() .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(1726074000013L)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef("/tmp")); - Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config); + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef("/tmp")) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config, mockFs); Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09")); } @@ -84,8 +87,9 @@ public void retainLatestKJarCachePaths() throws IOException { FileSystem fs = FileSystem.getLocal(new Configuration()); Config config = ConfigFactory.empty() .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(1726074000013L)) - .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp")); - Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config); + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp")) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, ConfigValueFactory.fromAnyRef(true)); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config, fs); fs.mkdirs(jarCachePath); fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08")); fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07"));