Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pramen {
# Maximum number of attempts allowed for the pipeline run
runtime.max.attempts = 1

# Force re-create Hive tables
runtime.hive.force.recreate = false

# Send an email even if there are no changes and no late or not ready data
email.if.no.changes = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ case class RuntimeConfig(
historicalRunMode: RunMode,
sparkAppDescriptionTemplate: Option[String],
attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation)
maxAttempts: Int // Maximum number of attempts allowed for the pipeline run
maxAttempts: Int, // Maximum number of attempts allowed for the pipeline run
forceReCreateHiveTables: Boolean
)

object RuntimeConfig {
Expand Down Expand Up @@ -76,6 +77,7 @@ object RuntimeConfig {
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"
val ATTEMPT = "pramen.runtime.attempt"
val MAX_ATTEMPTS = "pramen.runtime.max.attempts"
val FORCE_RECREATE_HIVE_TABLES = "pramen.runtime.hive.force.recreate"

def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
Expand Down Expand Up @@ -163,7 +165,8 @@ object RuntimeConfig {
runMode,
sparkAppDescriptionTemplate,
attempt,
maxAttempts
maxAttempts,
forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false)
)
}

Expand All @@ -188,7 +191,8 @@ object RuntimeConfig {
historicalRunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate = None,
attempt = 1,
maxAttempts = 1
maxAttempts = 1,
forceReCreateHiveTables = false
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ case class CmdLineConfig(
overrideLogLevel: Option[String] = None,
logEffectiveConfig: Option[Boolean] = None,
attempt: Option[Int] = None,
maxAttempts: Option[Int] = None
maxAttempts: Option[Int] = None,
forceReCreateHiveTables: Option[Boolean] = None
)

object CmdLineConfig {
Expand Down Expand Up @@ -137,6 +138,9 @@ object CmdLineConfig {
for (maxAttempts <- cmd.maxAttempts)
accumulatedConfig = accumulatedConfig.withValue(MAX_ATTEMPTS, ConfigValueFactory.fromAnyRef(maxAttempts))

for (forcereCreateHiveTables <- cmd.forceReCreateHiveTables)
accumulatedConfig = accumulatedConfig.withValue(FORCE_RECREATE_HIVE_TABLES, ConfigValueFactory.fromAnyRef(forcereCreateHiveTables))

accumulatedConfig
}

Expand Down Expand Up @@ -254,6 +258,10 @@ object CmdLineConfig {
config.copy(logEffectiveConfig = Option(value)))
.text("When true (default), Pramen logs the effective configuration.")

opt[Unit]("force-recreate-hive-tables").optional().action((_, config) =>
config.copy(forceReCreateHiveTables = Some(true)))
.text("When specified, Hive tables configured for metastore tables will be re-created, and partitions repaired.")

help("help").text("prints this usage text")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ abstract class TaskRunnerBase(conf: Config,
}

val hiveWarnings = if (task.job.outputTable.hiveTable.nonEmpty) {
val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || task.reason == TaskRunReason.Rerun
val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || runtimeConfig.forceReCreateHiveTables
task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, recreate)
Comment on lines 413 to 415
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for tests that verify the recreate parameter value in createOrRefreshHiveTable calls

echo "=== Searching for tests that check recreate parameter ==="
rg -n "recreate" --type scala pramen/core/src/test/

echo ""
echo "=== Check JobSpy implementation for recreate handling ==="
rg -n -A5 "createOrRefreshHiveTable" pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala

Repository: AbsaOSS/pramen

Length of output: 3190


🏁 Script executed:

#!/bin/bash
echo "=== Search for TaskRunnerBaseSuite ==="
fd -t f "TaskRunnerBaseSuite" pramen/core/src/test/

echo ""
echo "=== Search for tests that verify recreate behavior based on schema changes or rerun reason ==="
rg -n "schemaChanges|forceReCreateHiveTables|recreate.*true|recreate.*false" --type scala pramen/core/src/test/ | grep -i "task.*runner\|runner.*test" | head -30

echo ""
echo "=== Check if TaskRunnerBaseSuite exists and its content ==="
find pramen/core/src/test -name "*TaskRunner*" -type f

Repository: AbsaOSS/pramen

Length of output: 812


🏁 Script executed:

#!/bin/bash
echo "=== Read TaskRunnerBaseSuite to examine test coverage ==="
wc -l pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala

echo ""
echo "=== Show content with line numbers ==="
cat -n pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala

Repository: AbsaOSS/pramen

Length of output: 33640


Core behavioral change looks correct, but test coverage gap should be addressed.

The change correctly implements the PR objective: Hive table recreation now occurs only when schema changes are detected OR when explicitly requested via runtimeConfig.forceReCreateHiveTables, rather than automatically on every rerun.

However, existing tests only verify that createOrRefreshHiveTable is called, not the value of the recreate parameter. Add tests that explicitly verify:

  1. recreate = false when isRerun = true, forceReCreateHiveTables = false, and no schema changes
  2. recreate = true when forceReCreateHiveTables = true

This ensures the behavioral change is properly validated and prevents regressions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`
around lines 413 - 415, Test coverage is missing for the new recreate logic in
TaskRunnerBase: add unit tests that call the code path that reaches
task.job.createOrRefreshHiveTable and assert the boolean passed for the recreate
parameter; specifically, add one test where isRerun = true,
runtimeConfig.forceReCreateHiveTables = false and both
schemaChangesBeforeTransform and schemaChangesAfterTransform are empty and
assert createOrRefreshHiveTable was called with recreate = false, and another
where runtimeConfig.forceReCreateHiveTables = true and assert recreate = true;
target the TaskRunnerBase behavior (mock task.job and verify the
createOrRefreshHiveTable(...) call and its recreate argument) so changes to
recreate logic are validated.

} else {
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ object RuntimeConfigFactory {
historicalRunMode: RunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate: Option[String] = None,
attempt: Int = 1,
maxAttempts: Int = 1): RuntimeConfig = {
maxAttempts: Int = 1,
forceReCreateHiveTables: Boolean = false): RuntimeConfig = {
RuntimeConfig(isDryRun,
isRerun,
runTables,
Expand All @@ -62,7 +63,8 @@ object RuntimeConfigFactory {
historicalRunMode,
sparkAppDescriptionTemplate,
attempt,
maxAttempts)
maxAttempts,
forceReCreateHiveTables)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,14 @@ class CmdLineConfigSuite extends AnyWordSpec {

assert(cmd.isEmpty)
}

"return the modified config if force-recreate-hive-tables = true" in {
val cmd = CmdLineConfig.parseCmdLine(Array("--workflow", "dummy.config", "--force-recreate-hive-tables"))
val config = CmdLineConfig.applyCmdLineToConfig(populatedConfig, cmd.get)

assert(config.hasPath(FORCE_RECREATE_HIVE_TABLES))
assert(config.getBoolean(FORCE_RECREATE_HIVE_TABLES))
}
}

}
Loading