Skip to content

#722 Autoclose JDBC connections on job timeout#723

Merged
yruslan merged 5 commits intomainfrom
feature/722-autoclose-hive-jdbc-connections
Mar 20, 2026
Merged

#722 Autoclose JDBC connections on job timeout#723
yruslan merged 5 commits intomainfrom
feature/722-autoclose-hive-jdbc-connections

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Mar 19, 2026

Closes #722

Summary by CodeRabbit

  • New Features

    • Default changed to prefer ADD PARTITION over MSCK REPAIR for Hive metastore partition updates on Parquet tables.
  • Improvements

    • Added pre-interrupt per-thread resource cleanup during operation timeouts.
    • Improved JDBC connection and statement lifecycle handling to reduce resource leaks and surface close warnings.
  • Tests

    • Added tests for per-thread registration, LIFO cleanup, idempotence, exception handling, and concurrency.

@coderabbitai
Copy link

coderabbitai bot commented Mar 19, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a thread-scoped registry for AutoCloseable resources, wires it into timeout handling to close JDBC Statements and Connections on timeout, updates QueryExecutorJdbc to register/unregister resources, flips pramen.hive.prefer.add.partition default to true, and adjusts a metastore test expectation.

Changes

Cohort / File(s) Summary
Configuration
pramen/core/src/main/resources/reference.conf
Changed default pramen.hive.prefer.add.partition from false to true.
Resource Registry
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala
Added singleton registry tracking per-thread AutoCloseable instances with registerCloseable, unregisterCloseable, and cleanupThread (LIFO close; non-fatal exceptions logged).
Timeout Handling
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ThreadUtils.scala
On timeout, attempts ThreadClosableRegistry.cleanupThread(thread.getId) before interrupting worker thread; warns and suppresses cleanup exceptions.
JDBC Resource Tracking
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
Registers Statements and Connections with ThreadClosableRegistry; wraps Statement cancel/close in an AutoCloseable, unregisters in finally, and unregisters connections before closing.
Tests
pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala
Updated metastore test to expect ALTER TABLE (reflecting ADD PARTITION). Added comprehensive ThreadClosableRegistrySuite validating registration, LIFO ordering, exception-tolerant cleanup, idempotence, isolation, concurrency, and unregister behavior.

Sequence Diagram(s)

sequenceDiagram
    participant Job as Job/Task
    participant Utils as ThreadUtils
    participant Registry as ThreadClosableRegistry
    participant JDBC as QueryExecutorJdbc
    participant Hive as Hive/JDBC

    Job->>Utils: runWithTimeout(task)
    activate Utils
    Utils->>JDBC: execute(query) / start worker thread
    activate JDBC
    JDBC->>Hive: open Connection & create Statement
    JDBC->>Registry: registerCloseable(Statement)
    JDBC->>Registry: registerCloseable(Connection)
    JDBC->>Hive: execute query
    deactivate JDBC

    Note over Utils,JDBC: timeout elapsed, worker thread still alive

    Utils->>Registry: cleanupThread(workerThreadId)
    activate Registry
    Registry->>Statement: close() (LIFO)
    Registry->>Connection: close()
    deactivate Registry

    Utils->>Job: interrupt worker thread
    Utils->>Job: throw TimeoutException
    deactivate Utils
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • jozefbakus

Poem

🐇
I hopped through threads with a careful ear,
I tracked each closeable both far and near,
On timeout I tidy, in LIFO I prune,
I cancel and close by the light of the moon,
No dangling Conns — a neat little tune.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Out of Scope Changes check ⚠️ Warning One change is out of scope: updating the Hive metastore preference setting in reference.conf is unrelated to the JDBC auto-close timeout mechanism requested in issue #722. Remove the configuration change to pramen.hive.prefer.add.partition or clarify its relationship to the timeout mechanism in the PR description.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: implementing auto-close of JDBC connections when job timeout occurs, which is the primary objective of this PR.
Linked Issues check ✅ Passed All coding requirements from issue #722 are met: a ThreadClosableRegistry tracks AutoCloseable instances by thread ID, ThreadUtils calls cleanup before interruption on timeout, and QueryExecutorJdbc registers/unregisters JDBC connections.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/722-autoclose-hive-jdbc-connections
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala (1)

309-309: Make this assertion self-contained by explicitly setting the config in test setup.

Line 309 now expects ALTER TABLE, but the suite currently gets that behavior indirectly from reference.conf fallback. Please set pramen.hive.prefer.add.partition explicitly in this test path to avoid brittle coupling to global defaults.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala`
at line 309, The test relies on a global default for
pramen.hive.prefer.add.partition to produce an "ALTER TABLE" query; make the
test self-contained by explicitly setting pramen.hive.prefer.add.partition to
the value that yields ALTER TABLE (e.g. true) in this test's config setup within
MetastoreSuite (where qe.queries is asserted). Add a config override (via
ConfigFactory.parseString(...) or the test's existing config builder) for the
key "pramen.hive.prefer.add.partition" scoped to this test so the assertion
assert(qe.queries.exists(_.contains("ALTER TABLE"))) no longer depends on
reference.conf fallbacks.
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala (1)

23-23: Nit: Consider consistent spelling "Closeable".

Java uses "Closeable" (with 'e') in AutoCloseable. Consider renaming to ThreadCloseableRegistry for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala`
at line 23, Rename the object ThreadClosableRegistry to ThreadCloseableRegistry
to match Java/Scala spelling (Closeable) and update all references/usages
accordingly; change the object name (and file/class name if present) and adjust
any imports, companion objects, tests, and calls that reference
ThreadClosableRegistry to the new ThreadCloseableRegistry identifier to keep API
consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pramen/core/src/main/resources/reference.conf`:
- Line 64: This change flips the global config key hive.prefer.add.partition
which alters production behavior; revert the change to restore the previous
default (undo the edit to hive.prefer.add.partition so it returns to its prior
value) or remove it from this timeout-focused PR and open a separate PR for
behavioral config changes; reference the config key hive.prefer.add.partition in
your update and ensure only JDBC-timeout related edits remain in this branch.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala`:
- Around line 25-30: ThreadClosableRegistry currently maps thread IDs to a
single AutoCloseable (closables) so later registrations overwrite earlier ones;
change closables to map each threadId to a collection of AutoCloseable (e.g.,
Map[Long, mutable.Buffer[AutoCloseable]] or util.HashMap[Long,
java.util.List[AutoCloseable]]), update registerCloseable to append the given
closeable into the thread's collection instead of replacing it, and update any
closing logic (e.g., closeAll/cleanup methods) to iterate and close all entries
for that thread and remove the thread's collection; keep identifiers
ThreadClosableRegistry, closables, and registerCloseable to locate edits.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala`:
- Around line 70-89: The autoCloseStatement may close the JDBC statement twice
because autoCloseStatement.close() calls statement.close() and the outer finally
also calls statement.close(); replace the boolean close logic by adding an
AtomicBoolean closed guard (e.g., val closed = new AtomicBoolean(false)) inside
the AutoCloseable implementation used for autoCloseStatement so close() cancels
and closes the statement only when closed.compareAndSet(false, true) succeeds,
and remove or skip the duplicate statement.close() in the outer finally (or wrap
it with the same guard); also change the two log.warn calls inside
AutoCloseable.close() to log.info to reflect expected timeout handling.
- Around line 93-102: The close() method currently calls
ThreadClosableRegistry.unregisterCloseable(connection) which uses
Thread.currentThread().getId() (the registration in getConnection()), so if
close() runs on a different thread the unregister silently fails; either
document this limitation in QueryExecutorJdbc.close() or modify the registration
to store the originating thread id with the connection (e.g., attach a metadata
wrapper or map entry when registering in getConnection()) and update
unregisterCloseable to accept that id or to remove entries by connection
regardless of calling thread so cross-thread close correctly unregisters the
connection.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala`:
- Line 23: Rename the object ThreadClosableRegistry to ThreadCloseableRegistry
to match Java/Scala spelling (Closeable) and update all references/usages
accordingly; change the object name (and file/class name if present) and adjust
any imports, companion objects, tests, and calls that reference
ThreadClosableRegistry to the new ThreadCloseableRegistry identifier to keep API
consistent.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala`:
- Line 309: The test relies on a global default for
pramen.hive.prefer.add.partition to produce an "ALTER TABLE" query; make the
test self-contained by explicitly setting pramen.hive.prefer.add.partition to
the value that yields ALTER TABLE (e.g. true) in this test's config setup within
MetastoreSuite (where qe.queries is asserted). Add a config override (via
ConfigFactory.parseString(...) or the test's existing config builder) for the
key "pramen.hive.prefer.add.partition" scoped to this test so the assertion
assert(qe.queries.exists(_.contains("ALTER TABLE"))) no longer depends on
reference.conf fallbacks.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 360893a2-c54a-480f-b5a4-0ace4f8d6602

📥 Commits

Reviewing files that changed from the base of the PR and between dedd5f2 and 717e850.

📒 Files selected for processing (5)
  • pramen/core/src/main/resources/reference.conf
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ThreadUtils.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala

# Also ADD PARTTITION is only for Parquet format.
# This option can be overridden per metatable.
hive.prefer.add.partition = false
hive.prefer.add.partition = true
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid flipping this global default in a timeout-focused PR.

Line 64 changes production behavior for all deployments (MSCK REPAIR preference → ADD PARTITION preference), which is broader than the JDBC-timeout cleanup objective and may surprise existing users.

Suggested minimal change
-  hive.prefer.add.partition = true
+  hive.prefer.add.partition = false
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
hive.prefer.add.partition = true
hive.prefer.add.partition = false
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/resources/reference.conf` at line 64, This change flips
the global config key hive.prefer.add.partition which alters production
behavior; revert the change to restore the previous default (undo the edit to
hive.prefer.add.partition so it returns to its prior value) or remove it from
this timeout-focused PR and open a separate PR for behavioral config changes;
reference the config key hive.prefer.add.partition in your update and ensure
only JDBC-timeout related edits remain in this branch.

Comment on lines +93 to +102
override def close(): Unit = {
if (connection != null) {
ThreadClosableRegistry.unregisterCloseable(connection)
try {
connection.close()
} catch {
case NonFatal(ex) => log.warn("Failed to close JDBC connection", ex)
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if close() is called from contexts that might be on different threads
# Look for usages of QueryExecutorJdbc and how/where close() is invoked

echo "=== QueryExecutorJdbc instantiation sites ==="
rg -n "new QueryExecutorJdbc" --type scala -A 3

echo -e "\n=== QueryExecutor close() calls ==="
rg -n "\.close\(\)" --type scala -C 2 | rg -A 2 -B 2 "QueryExecutor|HiveHelper"

echo -e "\n=== AutoCloseable/try-with-resources patterns ==="
ast-grep --pattern 'for ($_ <- $_) yield { $$$ }'

Repository: AbsaOSS/pramen

Length of output: 17609


🏁 Script executed:

#!/bin/bash

echo "=== QueryExecutorJdbc constructor and registration ==="
sed -n '1,100p' pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala | head -50

echo -e "\n=== HiveHelper.scala context around line 67 ==="
sed -n '50,85p' pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scala

echo -e "\n=== Search for try-with-resources usage ==="
rg -n "\.withResource|try \{" --type scala pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/ -A 2

Repository: AbsaOSS/pramen

Length of output: 5271


🏁 Script executed:

#!/bin/bash

echo "=== Full QueryExecutorJdbc.scala ==="
wc -l pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
cat pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala

echo -e "\n=== HiveHelperSql lifecycle management ==="
rg -n "queryExecutor|close|getConnection" pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala -B 2 -A 2

Repository: AbsaOSS/pramen

Length of output: 6474


Cross-thread close() won't unregister from registry.

ThreadClosableRegistry.unregisterCloseable(connection) uses Thread.currentThread().getId() to locate the registry entry (registered in getConnection() at line 119). If close() is called from a different thread than the one that called getConnection(), unregistration silently fails, leaving a stale registry entry.

This is a minor issue because the connection is still properly closed via connection.close() on line 97, and the registry's cleanup thread will eventually remove stale entries. However, consider documenting this limitation or storing the registration thread ID to enable cross-thread unregistration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala`
around lines 93 - 102, The close() method currently calls
ThreadClosableRegistry.unregisterCloseable(connection) which uses
Thread.currentThread().getId() (the registration in getConnection()), so if
close() runs on a different thread the unregister silently fails; either
document this limitation in QueryExecutorJdbc.close() or modify the registration
to store the originating thread id with the connection (e.g., attach a metadata
wrapper or map entry when registering in getConnection()) and update
unregisterCloseable to accept that id or to remove entries by connection
regardless of calling thread so cross-thread close correctly unregisters the
connection.

@github-actions
Copy link

github-actions bot commented Mar 19, 2026

Unit Test Coverage

Overall Project 84.4% -0.07% 🍏
Files changed 76.01%

Module Coverage
pramen:core Jacoco Report 86.35% -0.08%
Files
Module File Coverage
pramen:core Jacoco Report QueryExecutorJdbc.scala 88.65% -5.58%
ThreadClosableRegistry.scala 87.5% -12.5% 🍏
ThreadUtils.scala 84.35% -15.65%

… order, and add the test suite for the thread locable registry
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala (1)

153-169: Missing test for unregister-then-cleanup pattern.

This test verifies idempotent cleanup (calling cleanupThread twice), but there's no test for the pattern used by QueryExecutorJdbc.execute(): register → unregister → cleanupThread.

The expected behavior is that an explicitly unregistered resource should NOT be closed by a subsequent cleanupThread() call.

🧪 Suggested additional test case
"not close an explicitly unregistered resource during cleanup" in {
  var closeCalled = 0
  val closeable = new AutoCloseable {
    override def close(): Unit = closeCalled += 1
  }
  val threadId = Thread.currentThread().getId

  ThreadClosableRegistry.registerCloseable(closeable)
  ThreadClosableRegistry.unregisterCloseable(closeable)

  // cleanupThread should not close the unregistered resource
  ThreadClosableRegistry.cleanupThread(threadId)
  closeCalled shouldBe 0
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala`
around lines 153 - 169, Add a new test in ThreadClosableRegistrySuite that
registers an AutoCloseable with ThreadClosableRegistry.registerCloseable, then
calls ThreadClosableRegistry.unregisterCloseable for that instance, and then
invokes ThreadClosableRegistry.cleanupThread(threadId) asserting that the close
method was NOT called (closeCalled remains 0); this mirrors the
register→unregister→cleanup pattern used by QueryExecutorJdbc.execute and
ensures unregister prevents cleanupThread from closing the resource.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala`:
- Around line 153-169: Add a new test in ThreadClosableRegistrySuite that
registers an AutoCloseable with ThreadClosableRegistry.registerCloseable, then
calls ThreadClosableRegistry.unregisterCloseable for that instance, and then
invokes ThreadClosableRegistry.cleanupThread(threadId) asserting that the close
method was NOT called (closeCalled remains 0); this mirrors the
register→unregister→cleanup pattern used by QueryExecutorJdbc.execute and
ensures unregister prevents cleanupThread from closing the resource.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fa682717-8305-4529-9e50-c01530226e48

📥 Commits

Reviewing files that changed from the base of the PR and between 717e850 and 806a9dd.

📒 Files selected for processing (3)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala

yruslan added 2 commits March 20, 2026 09:25
… and cleanup tests, ensuring accurate counting and LIFO order handling.
@yruslan yruslan force-pushed the feature/722-autoclose-hive-jdbc-connections branch from 052b458 to edc6bc4 Compare March 20, 2026 08:53
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala (1)

42-57: Minor doc clarification needed.

The Scaladoc states "from the current thread's registry," but the implementation removes the closeable regardless of which thread registered it. Based on the commit message, this cross-thread behavior is intentional. Consider updating the doc to reflect that unregisterCloseable removes the resource globally, not just for the calling thread.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala`
around lines 42 - 57, Update the Scaladoc for unregisterCloseable to reflect its
actual global behavior: state that def unregisterCloseable(closeable:
AutoCloseable) removes the given closeable from the shared closeables registry
for all threads (not just the current thread), and that it performs a
synchronized global removal by iterating over the closeables map and removing
the first matching entry; reference the method name unregisterCloseable and the
closeables collection in the comment so callers understand the cross-thread
semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala`:
- Around line 34-40: The duplicate-check in registerCloseable is wrong because
closeables is a LinkedList[(Long, AutoCloseable)] so
closeables.indexOf(closeable) will never match; change the check to search the
list for an entry whose second element equals (or is the same instance as) the
provided closeable (and optionally also matches the current threadId). In other
words, inside registerCloseable (and using threadId from
Thread.currentThread().getId) replace the indexOf-based test with
closeables.exists { case (tid, c) => tid == threadId && (c eq closeable) } or
similar equality logic so duplicates are correctly detected before calling
closeables.add((threadId, closeable)).

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala`:
- Around line 42-57: Update the Scaladoc for unregisterCloseable to reflect its
actual global behavior: state that def unregisterCloseable(closeable:
AutoCloseable) removes the given closeable from the shared closeables registry
for all threads (not just the current thread), and that it performs a
synchronized global removal by iterating over the closeables map and removing
the first matching entry; reference the method name unregisterCloseable and the
closeables collection in the comment so callers understand the cross-thread
semantics.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7dfe08ae-cea7-46e0-b9bf-b6f0cfcf8e74

📥 Commits

Reviewing files that changed from the base of the PR and between 4f8c6b5 and 052b458.

📒 Files selected for processing (2)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/ThreadClosableRegistry.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala
✅ Files skipped from review due to trivial changes (1)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/ThreadClosableRegistrySuite.scala

@yruslan yruslan merged commit 253f431 into main Mar 20, 2026
7 checks passed
@yruslan yruslan deleted the feature/722-autoclose-hive-jdbc-connections branch March 20, 2026 09:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Have a mechanism in Pramen to force close JDBC connections to Hive when the the job timeout is breached

1 participant