Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pramen {
# It is not always possible. When a table is initially created, MSCK REPAIR is always used to pick up all partitions.
# Also ADD PARTTITION is only for Parquet format.
# This option can be overridden per metatable.
hive.prefer.add.partition = false
hive.prefer.add.partition = true
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid flipping this global default in a timeout-focused PR.

Line 64 changes production behavior for all deployments (MSCK REPAIR preference → ADD PARTITION preference), which is broader than the JDBC-timeout cleanup objective and may surprise existing users.

Suggested minimal change
-  hive.prefer.add.partition = true
+  hive.prefer.add.partition = false
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
hive.prefer.add.partition = true
hive.prefer.add.partition = false
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/resources/reference.conf` at line 64, This change flips
the global config key hive.prefer.add.partition which alters production
behavior; revert the change to restore the previous default (undo the edit to
hive.prefer.add.partition so it returns to its prior value) or remove it from
this timeout-focused PR and open a separate PR for behavioral config changes;
reference the config key hive.prefer.add.partition in your update and ensure
only JDBC-timeout related edits remain in this branch.


# If enabled, the job will wait for the output table to become available before running a job
# If the number of seconds <=0 the waiting will be infinite
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.runner.task

import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

object ThreadClosableRegistry {
private val log = LoggerFactory.getLogger(this.getClass)
private val closeables = new java.util.LinkedList[(Long, AutoCloseable)]

/**
* Registers a closeable resource for the current thread.
* The resource will be automatically closed when [[cleanupThread]] is called for this thread.
*
* @param closeable The AutoCloseable resource to register
*/
def registerCloseable(closeable: AutoCloseable): Unit = synchronized {
val threadId = Thread.currentThread().getId

val iterator = closeables.iterator()
var alreadyRegistered = false
while (iterator.hasNext && !alreadyRegistered) {
alreadyRegistered = iterator.next()._2 == closeable
}
if (!alreadyRegistered) {
closeables.add((threadId, closeable))
}
}

/**
* Unregisters a closeable resource from the registry.
* This removes the resource regardless of which thread it was registered from.
*
* @param closeable The AutoCloseable resource to unregister
*/
def unregisterCloseable(closeable: AutoCloseable): Unit = synchronized {
val iterator = closeables.iterator()
while (iterator.hasNext) {
val (_, c) = iterator.next()
if (c == closeable) {
iterator.remove()
return
}
}
}

/**
* Closes all registered resources for the specified thread in LIFO (Last-In-First-Out) order.
* This method is typically called when a thread times out or completes execution.
* Any exceptions during closing are logged but do not prevent other resources from being closed.
*
* @param threadId The ID of the thread whose resources should be cleaned up
*/
def cleanupThread(threadId: Long): Unit = synchronized {
val threadCloseables = closeables.asScala.filter(_._1 == threadId).map(_._2).toList
threadCloseables.reverse.foreach { closeable =>
try {
closeable.close()
} catch {
case NonFatal(ex) =>
log.warn(s"Error closing resource for thread $threadId.", ex)
} finally {
unregisterCloseable(closeable)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,26 @@

package za.co.absa.pramen.core.utils

import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.exceptions.TimeoutException
import za.co.absa.pramen.core.runner.task.ThreadClosableRegistry
import za.co.absa.pramen.core.utils.impl.ThreadWithException

import java.lang.Thread.UncaughtExceptionHandler
import scala.concurrent.duration.Duration

object ThreadUtils {
private val log = LoggerFactory.getLogger(this.getClass)

/**
* Executes an action with a timeout. If the timeout is breached the task is killed (using Thread.interrupt())
*
* If the task times out, an exception is thrown.
*
* Any exception is passed to the caller.
*
* @param timeout The task timeout.
* @param action An action to execute.
* @param timeout The task timeout.
* @param action An action to execute.
*/
@throws[TimeoutException]
def runWithTimeout(timeout: Duration)(action: => Unit): Unit = {
Expand All @@ -54,6 +58,15 @@ object ThreadUtils {

if (thread.isAlive) {
val stackTrace = thread.getStackTrace

try {
// Execute cleanup BEFORE interrupt - e.g. close the JDBC connection/statement
ThreadClosableRegistry.cleanupThread(thread.getId)
} catch {
case ex: Throwable =>
log.warn(s"Exception during timeout cleanup: ${ex.getMessage}")
}

thread.interrupt()

val prettyTimeout = TimeUtils.prettyPrintElapsedTimeShort(timeout.toMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.utils.hive
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.runner.task.ThreadClosableRegistry

import java.sql._
import scala.util.control.NonFatal
Expand Down Expand Up @@ -66,15 +67,43 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B
executeActionOnConnection { conn =>
val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

val autoCloseStatement: AutoCloseable = new AutoCloseable {
val statementClosed = new java.util.concurrent.atomic.AtomicBoolean(false)

override def close(): Unit = {
if (statementClosed.compareAndSet(false, true)) {
try {
log.info(s"Cancelling SQL statement: $query")
statement.cancel()
} finally {
log.info(s"Closing the SQL statement...")
statement.close()
}
}
}
}

ThreadClosableRegistry.registerCloseable(autoCloseStatement)

try {
statement.execute(query)
} finally {
statement.close()
ThreadClosableRegistry.unregisterCloseable(autoCloseStatement)
autoCloseStatement.close()
}
}
}

override def close(): Unit = if (connection != null) connection.close()
override def close(): Unit = {
if (connection != null) {
ThreadClosableRegistry.unregisterCloseable(connection)
try {
connection.close()
} catch {
case NonFatal(ex) => log.warn("Failed to close JDBC connection", ex)
}
}
}
Comment on lines +97 to +106
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if close() is called from contexts that might be on different threads
# Look for usages of QueryExecutorJdbc and how/where close() is invoked

echo "=== QueryExecutorJdbc instantiation sites ==="
rg -n "new QueryExecutorJdbc" --type scala -A 3

echo -e "\n=== QueryExecutor close() calls ==="
rg -n "\.close\(\)" --type scala -C 2 | rg -A 2 -B 2 "QueryExecutor|HiveHelper"

echo -e "\n=== AutoCloseable/try-with-resources patterns ==="
ast-grep --pattern 'for ($_ <- $_) yield { $$$ }'

Repository: AbsaOSS/pramen

Length of output: 17609


🏁 Script executed:

#!/bin/bash

echo "=== QueryExecutorJdbc constructor and registration ==="
sed -n '1,100p' pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala | head -50

echo -e "\n=== HiveHelper.scala context around line 67 ==="
sed -n '50,85p' pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scala

echo -e "\n=== Search for try-with-resources usage ==="
rg -n "\.withResource|try \{" --type scala pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/ -A 2

Repository: AbsaOSS/pramen

Length of output: 5271


🏁 Script executed:

#!/bin/bash

echo "=== Full QueryExecutorJdbc.scala ==="
wc -l pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
cat pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala

echo -e "\n=== HiveHelperSql lifecycle management ==="
rg -n "queryExecutor|close|getConnection" pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala -B 2 -A 2

Repository: AbsaOSS/pramen

Length of output: 6474


Cross-thread close() won't unregister from registry.

ThreadClosableRegistry.unregisterCloseable(connection) uses Thread.currentThread().getId() to locate the registry entry (registered in getConnection() at line 119). If close() is called from a different thread than the one that called getConnection(), unregistration silently fails, leaving a stale registry entry.

This is a minor issue because the connection is still properly closed via connection.close() on line 97, and the registry's cleanup thread will eventually remove stale entries. However, consider documenting this limitation or storing the registration thread ID to enable cross-thread unregistration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala`
around lines 93 - 102, The close() method currently calls
ThreadClosableRegistry.unregisterCloseable(connection) which uses
Thread.currentThread().getId() (the registration in getConnection()), so if
close() runs on a different thread the unregister silently fails; either
document this limitation in QueryExecutorJdbc.close() or modify the registration
to store the originating thread id with the connection (e.g., attach a metadata
wrapper or map entry when registering in getConnection()) and update
unregisterCloseable to accept that id or to remove entries by connection
regardless of calling thread so cross-thread close correctly unregisters the
connection.


private[core] def executeActionOnConnection(action: Connection => Boolean): Boolean = {
val currentConnection = getConnection(forceReconnect = false)
Expand All @@ -97,7 +126,9 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B
if (connection == null || forceReconnect) {
val (newConnection, url) = jdbcUrlSelector.getWorkingConnection(retries)
log.info(s"Selected query executor connection: $url")
close()
connection = newConnection
ThreadClosableRegistry.registerCloseable(connection)
}
connection
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF
m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, recreate = false)

assert(qe.queries.length == 1)
assert(qe.queries.exists(_.contains("REPAIR")))
assert(qe.queries.exists(_.contains("ALTER TABLE")))
}

"do nothing for a delta since it does not need repairing" in {
Expand Down
Loading
Loading