diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf index a9431025..f7692a75 100644 --- a/pramen/core/src/main/resources/reference.conf +++ b/pramen/core/src/main/resources/reference.conf @@ -112,6 +112,9 @@ pramen { # Maximum number of attempts allowed for the pipeline run runtime.max.attempts = 1 + # Force re-create Hive tables + runtime.hive.force.recreate = false + # Send an email even if there are no changes and no late or not ready data email.if.no.changes = true diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index b886ae84..5463666b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -47,7 +47,8 @@ case class RuntimeConfig( historicalRunMode: RunMode, sparkAppDescriptionTemplate: Option[String], attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation) - maxAttempts: Int // Maximum number of attempts allowed for the pipeline run + maxAttempts: Int, // Maximum number of attempts allowed for the pipeline run + forceReCreateHiveTables: Boolean ) object RuntimeConfig { @@ -76,6 +77,7 @@ object RuntimeConfig { val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template" val ATTEMPT = "pramen.runtime.attempt" val MAX_ATTEMPTS = "pramen.runtime.max.attempts" + val FORCE_RECREATE_HIVE_TABLES = "pramen.runtime.hive.force.recreate" def fromConfig(conf: Config): RuntimeConfig = { val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP) @@ -163,7 +165,8 @@ object RuntimeConfig { runMode, sparkAppDescriptionTemplate, attempt, - maxAttempts + maxAttempts, + forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false) ) } @@ -188,7 +191,8 @@ object RuntimeConfig { historicalRunMode = RunMode.CheckUpdates, sparkAppDescriptionTemplate = None, attempt = 1, - maxAttempts = 1 + maxAttempts = 1, + forceReCreateHiveTables = false ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala index 994c6a3d..c0c649bb 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala @@ -48,7 +48,8 @@ case class CmdLineConfig( overrideLogLevel: Option[String] = None, logEffectiveConfig: Option[Boolean] = None, attempt: Option[Int] = None, - maxAttempts: Option[Int] = None + maxAttempts: Option[Int] = None, + forceReCreateHiveTables: Option[Boolean] = None ) object CmdLineConfig { @@ -137,6 +138,9 @@ object CmdLineConfig { for (maxAttempts <- cmd.maxAttempts) accumulatedConfig = accumulatedConfig.withValue(MAX_ATTEMPTS, ConfigValueFactory.fromAnyRef(maxAttempts)) + for (forcereCreateHiveTables <- cmd.forceReCreateHiveTables) + accumulatedConfig = accumulatedConfig.withValue(FORCE_RECREATE_HIVE_TABLES, ConfigValueFactory.fromAnyRef(forcereCreateHiveTables)) + accumulatedConfig } @@ -254,6 +258,10 @@ object CmdLineConfig { config.copy(logEffectiveConfig = Option(value))) .text("When true (default), Pramen logs the effective configuration.") + opt[Unit]("force-recreate-hive-tables").optional().action((_, config) => + config.copy(forceReCreateHiveTables = Some(true))) + .text("When specified, Hive tables configured for metastore tables will be re-created, and partitions repaired.") + help("help").text("prints this usage text") } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 29cc77fc..062244e9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -411,7 +411,7 @@ abstract class TaskRunnerBase(conf: Config, } val hiveWarnings = if (task.job.outputTable.hiveTable.nonEmpty) { - val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || task.reason == TaskRunReason.Rerun + val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || runtimeConfig.forceReCreateHiveTables task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, recreate) } else { Seq.empty diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index 80d36158..28edb251 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -42,7 +42,8 @@ object RuntimeConfigFactory { historicalRunMode: RunMode = RunMode.CheckUpdates, sparkAppDescriptionTemplate: Option[String] = None, attempt: Int = 1, - maxAttempts: Int = 1): RuntimeConfig = { + maxAttempts: Int = 1, + forceReCreateHiveTables: Boolean = false): RuntimeConfig = { RuntimeConfig(isDryRun, isRerun, runTables, @@ -62,7 +63,8 @@ object RuntimeConfigFactory { historicalRunMode, sparkAppDescriptionTemplate, attempt, - maxAttempts) + maxAttempts, + forceReCreateHiveTables) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala index 59671732..4b1fc3c5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala @@ -366,6 +366,14 @@ class CmdLineConfigSuite extends AnyWordSpec { assert(cmd.isEmpty) } + + "return the modified config if force-recreate-hive-tables = true" in { + val cmd = CmdLineConfig.parseCmdLine(Array("--workflow", "dummy.config", "--force-recreate-hive-tables")) + val config = CmdLineConfig.applyCmdLineToConfig(populatedConfig, cmd.get) + + assert(config.hasPath(FORCE_RECREATE_HIVE_TABLES)) + assert(config.getBoolean(FORCE_RECREATE_HIVE_TABLES)) + } } }