From 717e850224f8cc578779be6cb3fe4ac03c3e0448 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 19 Mar 2026 11:20:41 +0100 Subject: [PATCH 1/5] #722 Autoclose JDBC connections on job timeout --- pramen/core/src/main/resources/reference.conf | 2 +- .../runner/task/ThreadClosableRegistry.scala | 48 +++++++++++++++++++ .../absa/pramen/core/utils/ThreadUtils.scala | 17 ++++++- .../core/utils/hive/QueryExecutorJdbc.scala | 29 ++++++++++- .../core/metastore/MetastoreSuite.scala | 2 +- 5 files changed, 93 insertions(+), 5 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf index e14a5968..a9431025 100644 --- a/pramen/core/src/main/resources/reference.conf +++ b/pramen/core/src/main/resources/reference.conf @@ -61,7 +61,7 @@ pramen { # It is not always possible. When a table is initially created, MSCK REPAIR is always used to pick up all partitions. # Also ADD PARTTITION is only for Parquet format. # This option can be overridden per metatable. - hive.prefer.add.partition = false + hive.prefer.add.partition = true # If enabled, the job will wait for the output table to become available before running a job # If the number of seconds <=0 the waiting will be infinite diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala new file mode 100644 index 00000000..2dbc7441 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.runner.task + +import org.slf4j.LoggerFactory + +import java.util + +object ThreadClosableRegistry { + private val log = LoggerFactory.getLogger(this.getClass) + private val closables = new util.HashMap[Long, AutoCloseable]() + + def registerCloseable(closeable: AutoCloseable): Unit = synchronized { + val threadId = Thread.currentThread().getId + closables.put(threadId, closeable) + } + + def unregisterCloseable(closeable: AutoCloseable): Unit = synchronized { + val threadId = Thread.currentThread().getId + closables.remove(threadId, closeable) + } + + def cleanupThread(threadId: Long): Unit = synchronized { + Option(closables.remove(threadId)).foreach { closeable => + try { + closeable.close() + } catch { + case ex: Exception => + log.warn(s"Failed to close resource for thread $threadId", ex) + } + } + } + +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ThreadUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ThreadUtils.scala index b8da1f8a..2173efac 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ThreadUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ThreadUtils.scala @@ -16,13 +16,17 @@ package za.co.absa.pramen.core.utils +import org.slf4j.LoggerFactory import za.co.absa.pramen.core.exceptions.TimeoutException +import za.co.absa.pramen.core.runner.task.ThreadClosableRegistry import za.co.absa.pramen.core.utils.impl.ThreadWithException import java.lang.Thread.UncaughtExceptionHandler import scala.concurrent.duration.Duration object ThreadUtils { + private val log = LoggerFactory.getLogger(this.getClass) + /** * Executes an action with a timeout. If the timeout is breached the task is killed (using Thread.interrupt()) * @@ -30,8 +34,8 @@ object ThreadUtils { * * Any exception is passed to the caller. * - * @param timeout The task timeout. - * @param action An action to execute. + * @param timeout The task timeout. + * @param action An action to execute. */ @throws[TimeoutException] def runWithTimeout(timeout: Duration)(action: => Unit): Unit = { @@ -54,6 +58,15 @@ object ThreadUtils { if (thread.isAlive) { val stackTrace = thread.getStackTrace + + try { + // Execute cleanup BEFORE interrupt - e.g. close the JDBC connection/statement + ThreadClosableRegistry.cleanupThread(thread.getId) + } catch { + case ex: Throwable => + log.warn(s"Exception during timeout cleanup: ${ex.getMessage}") + } + thread.interrupt() val prettyTimeout = TimeUtils.prettyPrintElapsedTimeShort(timeout.toMillis) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala index 62906fd8..d429c6f9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.utils.hive import org.slf4j.LoggerFactory import za.co.absa.pramen.core.reader.JdbcUrlSelector import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.runner.task.ThreadClosableRegistry import java.sql._ import scala.util.control.NonFatal @@ -66,15 +67,39 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B executeActionOnConnection { conn => val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + val autoCloseStatement = new AutoCloseable { + override def close(): Unit = { + try { + log.warn(s"Cancelling SQL statement: $query") + statement.cancel() + } finally { + log.warn(s"Closing the SQL statement...") + statement.close() + } + } + } + + ThreadClosableRegistry.registerCloseable(autoCloseStatement) + try { statement.execute(query) } finally { + ThreadClosableRegistry.unregisterCloseable(autoCloseStatement) statement.close() } } } - override def close(): Unit = if (connection != null) connection.close() + override def close(): Unit = { + if (connection != null) { + ThreadClosableRegistry.unregisterCloseable(connection) + try { + connection.close() + } catch { + case NonFatal(ex) => log.warn("Failed to close JDBC connection", ex) + } + } + } private[core] def executeActionOnConnection(action: Connection => Boolean): Boolean = { val currentConnection = getConnection(forceReconnect = false) @@ -97,7 +122,9 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B if (connection == null || forceReconnect) { val (newConnection, url) = jdbcUrlSelector.getWorkingConnection(retries) log.info(s"Selected query executor connection: $url") + close() connection = newConnection + ThreadClosableRegistry.registerCloseable(connection) } connection } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 2c6d13a8..4a845bed 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -306,7 +306,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, recreate = false) assert(qe.queries.length == 1) - assert(qe.queries.exists(_.contains("REPAIR"))) + assert(qe.queries.exists(_.contains("ALTER TABLE"))) } "do nothing for a delta since it does not need repairing" in { From 806a9dd2b743cf0371ff8c0d163102fa9cd34812 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 19 Mar 2026 12:47:07 +0100 Subject: [PATCH 2/5] #722 Add support for multiple closeable per thread, ensure LIFO close order, and add the test suite for the thread locable registry --- .../runner/task/ThreadClosableRegistry.scala | 54 ++++- .../core/utils/hive/QueryExecutorJdbc.scala | 20 +- .../task/ThreadClosableRegistrySuite.scala | 228 ++++++++++++++++++ 3 files changed, 284 insertions(+), 18 deletions(-) create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala index 2dbc7441..58aff378 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala @@ -19,30 +19,64 @@ package za.co.absa.pramen.core.runner.task import org.slf4j.LoggerFactory import java.util +import scala.collection.JavaConverters._ object ThreadClosableRegistry { private val log = LoggerFactory.getLogger(this.getClass) - private val closables = new util.HashMap[Long, AutoCloseable]() + private val closables = new util.HashMap[Long, util.List[AutoCloseable]]() + /** + * Registers a closeable resource for the current thread. + * The resource will be automatically closed when [[cleanupThread]] is called for this thread. + * + * @param closeable The AutoCloseable resource to register + */ def registerCloseable(closeable: AutoCloseable): Unit = synchronized { val threadId = Thread.currentThread().getId - closables.put(threadId, closeable) + val list = Option(closables.get(threadId)).getOrElse { + val newList = new util.ArrayList[AutoCloseable]() + closables.put(threadId, newList) + newList + } + list.add(closeable) } + /** + * Unregisters a closeable resource from the current thread's registry. + * If this was the last resource for the thread, the thread entry is removed from the registry. + * + * @param closeable The AutoCloseable resource to unregister + */ def unregisterCloseable(closeable: AutoCloseable): Unit = synchronized { val threadId = Thread.currentThread().getId - closables.remove(threadId, closeable) + Option(closables.get(threadId)).foreach { list => + list.remove(closeable) + if (list.isEmpty) { + closables.remove(threadId) + } + } } + /** + * Closes all registered resources for the specified thread in LIFO (Last-In-First-Out) order. + * This method is typically called when a thread times out or completes execution. + * Any exceptions during closing are logged but do not prevent other resources from being closed. + * + * @param threadId The ID of the thread whose resources should be cleaned up + */ def cleanupThread(threadId: Long): Unit = synchronized { - Option(closables.remove(threadId)).foreach { closeable => - try { - closeable.close() - } catch { - case ex: Exception => - log.warn(s"Failed to close resource for thread $threadId", ex) + Option(closables.remove(threadId)).foreach { list => + // Ensure LIFO order + val iterator = list.asScala.reverseIterator + while (iterator.hasNext) { + val closeable = iterator.next() + try { + closeable.close() + } catch { + case ex: Exception => + log.warn(s"Failed to close resource for thread $threadId", ex) + } } } } - } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala index d429c6f9..b7288e9a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala @@ -67,14 +67,18 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B executeActionOnConnection { conn => val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - val autoCloseStatement = new AutoCloseable { + val autoCloseStatement: AutoCloseable = new AutoCloseable { + val statementClosed = new java.util.concurrent.atomic.AtomicBoolean(false) + override def close(): Unit = { - try { - log.warn(s"Cancelling SQL statement: $query") - statement.cancel() - } finally { - log.warn(s"Closing the SQL statement...") - statement.close() + if (statementClosed.compareAndSet(false, true)) { + try { + log.info(s"Cancelling SQL statement: $query") + statement.cancel() + } finally { + log.info(s"Closing the SQL statement...") + statement.close() + } } } } @@ -85,7 +89,7 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B statement.execute(query) } finally { ThreadClosableRegistry.unregisterCloseable(autoCloseStatement) - statement.close() + autoCloseStatement.close() } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala new file mode 100644 index 00000000..0e89dbb9 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala @@ -0,0 +1,228 @@ + +package za.co.absa.pramen.core.tests.runner.task + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.core.runner.task.ThreadClosableRegistry + +import scala.collection.mutable + +class ThreadClosableRegistrySuite extends AnyWordSpec with Matchers { + + "ThreadClosableRegistry" should { + "register a closeable resource for the current thread" in { + var closeCalled = 0 + val closeable = new AutoCloseable { + override def close(): Unit = closeCalled += 1 + } + val threadId = Thread.currentThread().getId + + ThreadClosableRegistry.registerCloseable(closeable) + + // Cleanup to verify it was registered + ThreadClosableRegistry.cleanupThread(threadId) + closeCalled shouldBe 1 + } + + "register multiple closeable resources for the same thread" in { + var closeCalled1 = 0 + var closeCalled2 = 0 + var closeCalled3 = 0 + val closeable1 = new AutoCloseable { + override def close(): Unit = closeCalled1 += 1 + } + val closeable2 = new AutoCloseable { + override def close(): Unit = closeCalled2 += 1 + } + val closeable3 = new AutoCloseable { + override def close(): Unit = closeCalled3 += 1 + } + val threadId = Thread.currentThread().getId + + ThreadClosableRegistry.registerCloseable(closeable1) + ThreadClosableRegistry.registerCloseable(closeable2) + ThreadClosableRegistry.registerCloseable(closeable3) + + ThreadClosableRegistry.cleanupThread(threadId) + + closeCalled1 shouldBe 1 + closeCalled2 shouldBe 1 + closeCalled3 shouldBe 1 + } + + "cleanup all registered resources for a specific thread" in { + var closeCalled1 = 0 + var closeCalled2 = 0 + val closeable1 = new AutoCloseable { + override def close(): Unit = closeCalled1 += 1 + } + val closeable2 = new AutoCloseable { + override def close(): Unit = closeCalled2 += 1 + } + + val threadId = Thread.currentThread().getId + + ThreadClosableRegistry.registerCloseable(closeable1) + ThreadClosableRegistry.registerCloseable(closeable2) + + ThreadClosableRegistry.cleanupThread(threadId) + + closeCalled1 shouldBe 1 + closeCalled2 shouldBe 1 + } + + "not affect other threads when cleaning up a specific thread" in { + var closeCalled1 = 0 + var closeCalled2 = 0 + val closeableThread1 = new AutoCloseable { + override def close(): Unit = closeCalled1 += 1 + } + val closeableThread2 = new AutoCloseable { + override def close(): Unit = closeCalled2 += 1 + } + + val thread1Results = mutable.ArrayBuffer[Long]() + val thread2Results = mutable.ArrayBuffer[Long]() + + val thread1 = new Thread(() => { + thread1Results += Thread.currentThread().getId + ThreadClosableRegistry.registerCloseable(closeableThread1) + }) + + val thread2 = new Thread(() => { + thread2Results += Thread.currentThread().getId + ThreadClosableRegistry.registerCloseable(closeableThread2) + }) + + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + val thread1Id = thread1Results.head + val thread2Id = thread2Results.head + + // Cleanup only thread1 + ThreadClosableRegistry.cleanupThread(thread1Id) + + closeCalled1 shouldBe 1 + closeCalled2 shouldBe 0 + + // Cleanup thread2 + ThreadClosableRegistry.cleanupThread(thread2Id) + closeCalled2 shouldBe 1 + } + + "handle exceptions during resource cleanup gracefully" in { + var closeCalled1 = 0 + var closeCalled2 = 0 + var closeCalled3 = 0 + val closeable1 = new AutoCloseable { + override def close(): Unit = closeCalled1 += 1 + } + val closeable2 = new AutoCloseable { + override def close(): Unit = { + closeCalled2 += 1 + throw new RuntimeException("Close failed") + } + } + val closeable3 = new AutoCloseable { + override def close(): Unit = closeCalled3 += 1 + } + + val threadId = Thread.currentThread().getId + + ThreadClosableRegistry.registerCloseable(closeable1) + ThreadClosableRegistry.registerCloseable(closeable2) + ThreadClosableRegistry.registerCloseable(closeable3) + + // Should not throw exception, should attempt to close all resources + noException should be thrownBy ThreadClosableRegistry.cleanupThread(threadId) + + closeCalled1 shouldBe 1 + closeCalled2 shouldBe 1 + closeCalled3 shouldBe 1 + } + + "do nothing when cleaning up a thread with no registered resources" in { + val nonExistentThreadId = 999999L + + noException should be thrownBy ThreadClosableRegistry.cleanupThread(nonExistentThreadId) + } + + "do nothing when cleaning up an already cleaned thread" in { + var closeCalled = 0 + val closeable = new AutoCloseable { + override def close(): Unit = closeCalled += 1 + } + val threadId = Thread.currentThread().getId + + ThreadClosableRegistry.registerCloseable(closeable) + ThreadClosableRegistry.cleanupThread(threadId) + + closeCalled shouldBe 1 + + // Cleanup again - should not call close again + ThreadClosableRegistry.cleanupThread(threadId) + + closeCalled shouldBe 1 + } + + "close resources in LIFO order (last registered, first closed)" in { + val closeOrder = mutable.ArrayBuffer[Int]() + + val closeable1 = new AutoCloseable { + override def close(): Unit = closeOrder += 1 + } + val closeable2 = new AutoCloseable { + override def close(): Unit = closeOrder += 2 + } + val closeable3 = new AutoCloseable { + override def close(): Unit = closeOrder += 3 + } + + val threadId = Thread.currentThread().getId + + ThreadClosableRegistry.registerCloseable(closeable1) + ThreadClosableRegistry.registerCloseable(closeable2) + ThreadClosableRegistry.registerCloseable(closeable3) + + ThreadClosableRegistry.cleanupThread(threadId) + + closeOrder should contain theSameElementsInOrderAs Seq(3, 2, 1) + } + + "handle concurrent registrations from multiple threads" in { + val numThreads = 10 + val closeablesPerThread = 5 + val allCloseables = mutable.Map[Long, Seq[(AutoCloseable, () => Int)]]() + + val threads = (1 to numThreads).map { i => + new Thread(() => { + val threadId = Thread.currentThread().getId + val closeables = (1 to closeablesPerThread).map { _ => + var closeCalled = 0 + val closeable = new AutoCloseable { + override def close(): Unit = closeCalled += 1 + } + (closeable, () => closeCalled) + } + allCloseables.synchronized { + allCloseables(threadId) = closeables + } + closeables.foreach { case (closeable, _) => ThreadClosableRegistry.registerCloseable(closeable) } + }) + } + + threads.foreach(_.start()) + threads.foreach(_.join()) + + allCloseables.foreach { case (threadId, closeables) => + ThreadClosableRegistry.cleanupThread(threadId) + closeables.foreach { case (_, getCloseCalled) => + getCloseCalled() shouldBe 1 + } + } + } + } +} From 4f8c6b5790dadefe56a196a296cfd1ce94ddc5dd Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 20 Mar 2026 09:25:16 +0100 Subject: [PATCH 3/5] Refactor ThreadClosableRegistrySuite to improve resource registration and cleanup tests, ensuring accurate counting and LIFO order handling. --- .../task/ThreadClosableRegistrySuite.scala | 205 +++++++----------- 1 file changed, 80 insertions(+), 125 deletions(-) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala index 0e89dbb9..ac07b5a2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package za.co.absa.pramen.core.tests.runner.task @@ -9,88 +24,47 @@ import scala.collection.mutable class ThreadClosableRegistrySuite extends AnyWordSpec with Matchers { - "ThreadClosableRegistry" should { - "register a closeable resource for the current thread" in { - var closeCalled = 0 - val closeable = new AutoCloseable { - override def close(): Unit = closeCalled += 1 - } - val threadId = Thread.currentThread().getId - - ThreadClosableRegistry.registerCloseable(closeable) - - // Cleanup to verify it was registered - ThreadClosableRegistry.cleanupThread(threadId) - closeCalled shouldBe 1 - } - - "register multiple closeable resources for the same thread" in { - var closeCalled1 = 0 - var closeCalled2 = 0 - var closeCalled3 = 0 - val closeable1 = new AutoCloseable { - override def close(): Unit = closeCalled1 += 1 - } - val closeable2 = new AutoCloseable { - override def close(): Unit = closeCalled2 += 1 - } - val closeable3 = new AutoCloseable { - override def close(): Unit = closeCalled3 += 1 - } - val threadId = Thread.currentThread().getId - - ThreadClosableRegistry.registerCloseable(closeable1) - ThreadClosableRegistry.registerCloseable(closeable2) - ThreadClosableRegistry.registerCloseable(closeable3) - - ThreadClosableRegistry.cleanupThread(threadId) - - closeCalled1 shouldBe 1 - closeCalled2 shouldBe 1 - closeCalled3 shouldBe 1 + private def createCountingCloseable(): (AutoCloseable, () => Int) = { + var count = 0 + val closeable = new AutoCloseable { + override def close(): Unit = count += 1 } + (closeable, () => count) + } - "cleanup all registered resources for a specific thread" in { - var closeCalled1 = 0 - var closeCalled2 = 0 - val closeable1 = new AutoCloseable { - override def close(): Unit = closeCalled1 += 1 - } - val closeable2 = new AutoCloseable { - override def close(): Unit = closeCalled2 += 1 - } + private def currentThreadId: Long = Thread.currentThread().getId - val threadId = Thread.currentThread().getId + "ThreadClosableRegistry" should { + "register and cleanup single or multiple closeable resources" in { + val (closeable1, getCount1) = createCountingCloseable() + val (closeable2, getCount2) = createCountingCloseable() + val (closeable3, getCount3) = createCountingCloseable() ThreadClosableRegistry.registerCloseable(closeable1) ThreadClosableRegistry.registerCloseable(closeable2) + ThreadClosableRegistry.registerCloseable(closeable3) - ThreadClosableRegistry.cleanupThread(threadId) + ThreadClosableRegistry.cleanupThread(currentThreadId) - closeCalled1 shouldBe 1 - closeCalled2 shouldBe 1 + getCount1() shouldBe 1 + getCount2() shouldBe 1 + getCount3() shouldBe 1 } "not affect other threads when cleaning up a specific thread" in { - var closeCalled1 = 0 - var closeCalled2 = 0 - val closeableThread1 = new AutoCloseable { - override def close(): Unit = closeCalled1 += 1 - } - val closeableThread2 = new AutoCloseable { - override def close(): Unit = closeCalled2 += 1 - } + val (closeableThread1, getCount1) = createCountingCloseable() + val (closeableThread2, getCount2) = createCountingCloseable() - val thread1Results = mutable.ArrayBuffer[Long]() - val thread2Results = mutable.ArrayBuffer[Long]() + var thread1Id: Long = 0 + var thread2Id: Long = 0 val thread1 = new Thread(() => { - thread1Results += Thread.currentThread().getId + thread1Id = Thread.currentThread().getId ThreadClosableRegistry.registerCloseable(closeableThread1) }) val thread2 = new Thread(() => { - thread2Results += Thread.currentThread().getId + thread2Id = Thread.currentThread().getId ThreadClosableRegistry.registerCloseable(closeableThread2) }) @@ -99,49 +73,38 @@ class ThreadClosableRegistrySuite extends AnyWordSpec with Matchers { thread1.join() thread2.join() - val thread1Id = thread1Results.head - val thread2Id = thread2Results.head - // Cleanup only thread1 ThreadClosableRegistry.cleanupThread(thread1Id) - - closeCalled1 shouldBe 1 - closeCalled2 shouldBe 0 + getCount1() shouldBe 1 + getCount2() shouldBe 0 // Cleanup thread2 ThreadClosableRegistry.cleanupThread(thread2Id) - closeCalled2 shouldBe 1 + getCount2() shouldBe 1 } "handle exceptions during resource cleanup gracefully" in { - var closeCalled1 = 0 + val (closeable1, getCount1) = createCountingCloseable() + val (closeable3, getCount3) = createCountingCloseable() + var closeCalled2 = 0 - var closeCalled3 = 0 - val closeable1 = new AutoCloseable { - override def close(): Unit = closeCalled1 += 1 - } val closeable2 = new AutoCloseable { override def close(): Unit = { closeCalled2 += 1 throw new RuntimeException("Close failed") } } - val closeable3 = new AutoCloseable { - override def close(): Unit = closeCalled3 += 1 - } - - val threadId = Thread.currentThread().getId ThreadClosableRegistry.registerCloseable(closeable1) ThreadClosableRegistry.registerCloseable(closeable2) ThreadClosableRegistry.registerCloseable(closeable3) // Should not throw exception, should attempt to close all resources - noException should be thrownBy ThreadClosableRegistry.cleanupThread(threadId) + noException should be thrownBy ThreadClosableRegistry.cleanupThread(currentThreadId) - closeCalled1 shouldBe 1 + getCount1() shouldBe 1 closeCalled2 shouldBe 1 - closeCalled3 shouldBe 1 + getCount3() shouldBe 1 } "do nothing when cleaning up a thread with no registered resources" in { @@ -151,43 +114,28 @@ class ThreadClosableRegistrySuite extends AnyWordSpec with Matchers { } "do nothing when cleaning up an already cleaned thread" in { - var closeCalled = 0 - val closeable = new AutoCloseable { - override def close(): Unit = closeCalled += 1 - } - val threadId = Thread.currentThread().getId + val (closeable, getCount) = createCountingCloseable() ThreadClosableRegistry.registerCloseable(closeable) - ThreadClosableRegistry.cleanupThread(threadId) - - closeCalled shouldBe 1 + ThreadClosableRegistry.cleanupThread(currentThreadId) + getCount() shouldBe 1 // Cleanup again - should not call close again - ThreadClosableRegistry.cleanupThread(threadId) - - closeCalled shouldBe 1 + ThreadClosableRegistry.cleanupThread(currentThreadId) + getCount() shouldBe 1 } "close resources in LIFO order (last registered, first closed)" in { val closeOrder = mutable.ArrayBuffer[Int]() - val closeable1 = new AutoCloseable { - override def close(): Unit = closeOrder += 1 - } - val closeable2 = new AutoCloseable { - override def close(): Unit = closeOrder += 2 - } - val closeable3 = new AutoCloseable { - override def close(): Unit = closeOrder += 3 + val closeables = (1 to 3).map { id => + new AutoCloseable { + override def close(): Unit = closeOrder += id + } } - val threadId = Thread.currentThread().getId - - ThreadClosableRegistry.registerCloseable(closeable1) - ThreadClosableRegistry.registerCloseable(closeable2) - ThreadClosableRegistry.registerCloseable(closeable3) - - ThreadClosableRegistry.cleanupThread(threadId) + closeables.foreach(ThreadClosableRegistry.registerCloseable) + ThreadClosableRegistry.cleanupThread(currentThreadId) closeOrder should contain theSameElementsInOrderAs Seq(3, 2, 1) } @@ -195,34 +143,41 @@ class ThreadClosableRegistrySuite extends AnyWordSpec with Matchers { "handle concurrent registrations from multiple threads" in { val numThreads = 10 val closeablesPerThread = 5 - val allCloseables = mutable.Map[Long, Seq[(AutoCloseable, () => Int)]]() + val threadData = mutable.Map[Long, Seq[() => Int]]() - val threads = (1 to numThreads).map { i => + val threads = (1 to numThreads).map { _ => new Thread(() => { val threadId = Thread.currentThread().getId - val closeables = (1 to closeablesPerThread).map { _ => - var closeCalled = 0 - val closeable = new AutoCloseable { - override def close(): Unit = closeCalled += 1 - } - (closeable, () => closeCalled) + val closeablesWithCounters = (1 to closeablesPerThread).map(_ => createCountingCloseable()) + + threadData.synchronized { + threadData(threadId) = closeablesWithCounters.map(_._2) } - allCloseables.synchronized { - allCloseables(threadId) = closeables + + closeablesWithCounters.foreach { case (closeable, _) => + ThreadClosableRegistry.registerCloseable(closeable) } - closeables.foreach { case (closeable, _) => ThreadClosableRegistry.registerCloseable(closeable) } }) } threads.foreach(_.start()) threads.foreach(_.join()) - allCloseables.foreach { case (threadId, closeables) => + threadData.foreach { case (threadId, getCounters) => ThreadClosableRegistry.cleanupThread(threadId) - closeables.foreach { case (_, getCloseCalled) => - getCloseCalled() shouldBe 1 - } + getCounters.foreach(getCount => getCount() shouldBe 1) } } + + "not close an explicitly unregistered resource during cleanup" in { + val (closeable, getCount) = createCountingCloseable() + + ThreadClosableRegistry.registerCloseable(closeable) + ThreadClosableRegistry.unregisterCloseable(closeable) + + // cleanupThread should not close the unregistered resource + ThreadClosableRegistry.cleanupThread(currentThreadId) + getCount() shouldBe 0 + } } } From edc6bc4cbf8f3325da84c5eae8e1850e1d16149a Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 20 Mar 2026 09:51:00 +0100 Subject: [PATCH 4/5] Fix Scala 2.11 compilation, and ensure closing a resource in a different thread also removes it from registry. --- .../runner/task/ThreadClosableRegistry.scala | 46 +++++++++---------- .../task/ThreadClosableRegistrySuite.scala | 44 ++++++++++-------- 2 files changed, 46 insertions(+), 44 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala index 58aff378..b2534285 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala @@ -18,12 +18,12 @@ package za.co.absa.pramen.core.runner.task import org.slf4j.LoggerFactory -import java.util import scala.collection.JavaConverters._ +import scala.util.control.NonFatal object ThreadClosableRegistry { private val log = LoggerFactory.getLogger(this.getClass) - private val closables = new util.HashMap[Long, util.List[AutoCloseable]]() + private val closeables = new java.util.LinkedList[(Long, AutoCloseable)] /** * Registers a closeable resource for the current thread. @@ -33,26 +33,24 @@ object ThreadClosableRegistry { */ def registerCloseable(closeable: AutoCloseable): Unit = synchronized { val threadId = Thread.currentThread().getId - val list = Option(closables.get(threadId)).getOrElse { - val newList = new util.ArrayList[AutoCloseable]() - closables.put(threadId, newList) - newList + + if (closeables.indexOf(closeable) < 0) { + closeables.add((threadId, closeable)) } - list.add(closeable) } /** - * Unregisters a closeable resource from the current thread's registry. - * If this was the last resource for the thread, the thread entry is removed from the registry. + * Unregisters a closeable resource from the thread's registry. * * @param closeable The AutoCloseable resource to unregister */ def unregisterCloseable(closeable: AutoCloseable): Unit = synchronized { - val threadId = Thread.currentThread().getId - Option(closables.get(threadId)).foreach { list => - list.remove(closeable) - if (list.isEmpty) { - closables.remove(threadId) + val iterator = closeables.iterator() + while (iterator.hasNext) { + val (_, c) = iterator.next() + if (c == closeable) { + iterator.remove() + return } } } @@ -65,17 +63,15 @@ object ThreadClosableRegistry { * @param threadId The ID of the thread whose resources should be cleaned up */ def cleanupThread(threadId: Long): Unit = synchronized { - Option(closables.remove(threadId)).foreach { list => - // Ensure LIFO order - val iterator = list.asScala.reverseIterator - while (iterator.hasNext) { - val closeable = iterator.next() - try { - closeable.close() - } catch { - case ex: Exception => - log.warn(s"Failed to close resource for thread $threadId", ex) - } + val threadCloseables = closeables.asScala.filter(_._1 == threadId).map(_._2).toList + threadCloseables.reverse.foreach { closeable => + try { + closeable.close() + } catch { + case NonFatal(ex) => + log.warn(s"Error closing resource for thread $threadId.", ex) + } finally { + unregisterCloseable(closeable) } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala index ac07b5a2..aa497bea 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala @@ -58,15 +58,19 @@ class ThreadClosableRegistrySuite extends AnyWordSpec with Matchers { var thread1Id: Long = 0 var thread2Id: Long = 0 - val thread1 = new Thread(() => { - thread1Id = Thread.currentThread().getId - ThreadClosableRegistry.registerCloseable(closeableThread1) - }) + val thread1 = new Thread { + override def run(): Unit = { + thread1Id = Thread.currentThread().getId + ThreadClosableRegistry.registerCloseable(closeableThread1) + } + } - val thread2 = new Thread(() => { - thread2Id = Thread.currentThread().getId - ThreadClosableRegistry.registerCloseable(closeableThread2) - }) + val thread2 = new Thread { + override def run(): Unit = { + thread2Id = Thread.currentThread().getId + ThreadClosableRegistry.registerCloseable(closeableThread2) + } + } thread1.start() thread2.start() @@ -146,18 +150,20 @@ class ThreadClosableRegistrySuite extends AnyWordSpec with Matchers { val threadData = mutable.Map[Long, Seq[() => Int]]() val threads = (1 to numThreads).map { _ => - new Thread(() => { - val threadId = Thread.currentThread().getId - val closeablesWithCounters = (1 to closeablesPerThread).map(_ => createCountingCloseable()) - - threadData.synchronized { - threadData(threadId) = closeablesWithCounters.map(_._2) - } - - closeablesWithCounters.foreach { case (closeable, _) => - ThreadClosableRegistry.registerCloseable(closeable) + new Thread { + override def run(): Unit = { + val threadId = Thread.currentThread().getId + val closeablesWithCounters = (1 to closeablesPerThread).map(_ => createCountingCloseable()) + + threadData.synchronized { + threadData(threadId) = closeablesWithCounters.map(_._2) + } + + closeablesWithCounters.foreach { case (closeable, _) => + ThreadClosableRegistry.registerCloseable(closeable) + } } - }) + } } threads.foreach(_.start()) From ffabd33a4af78f5fafaa9ea60aaaf56e0d07a85b Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 20 Mar 2026 10:24:07 +0100 Subject: [PATCH 5/5] Optimize closeable registration check by replacing indexOf with iterator-based equality comparison --- .../core/runner/task/ThreadClosableRegistry.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala index b2534285..b611082e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala @@ -34,13 +34,19 @@ object ThreadClosableRegistry { def registerCloseable(closeable: AutoCloseable): Unit = synchronized { val threadId = Thread.currentThread().getId - if (closeables.indexOf(closeable) < 0) { + val iterator = closeables.iterator() + var alreadyRegistered = false + while (iterator.hasNext && !alreadyRegistered) { + alreadyRegistered = iterator.next()._2 == closeable + } + if (!alreadyRegistered) { closeables.add((threadId, closeable)) } } /** - * Unregisters a closeable resource from the thread's registry. + * Unregisters a closeable resource from the registry. + * This removes the resource regardless of which thread it was registered from. * * @param closeable The AutoCloseable resource to unregister */