-
Notifications
You must be signed in to change notification settings - Fork 3
#722 Autoclose JDBC connections on job timeout #723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
717e850
806a9dd
4f8c6b5
edc6bc4
ffabd33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| /* | ||
| * Copyright 2022 ABSA Group Limited | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package za.co.absa.pramen.core.runner.task | ||
|
|
||
| import org.slf4j.LoggerFactory | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.util.control.NonFatal | ||
|
|
||
| object ThreadClosableRegistry { | ||
| private val log = LoggerFactory.getLogger(this.getClass) | ||
| private val closeables = new java.util.LinkedList[(Long, AutoCloseable)] | ||
|
|
||
| /** | ||
| * Registers a closeable resource for the current thread. | ||
| * The resource will be automatically closed when [[cleanupThread]] is called for this thread. | ||
| * | ||
| * @param closeable The AutoCloseable resource to register | ||
| */ | ||
| def registerCloseable(closeable: AutoCloseable): Unit = synchronized { | ||
| val threadId = Thread.currentThread().getId | ||
|
|
||
| val iterator = closeables.iterator() | ||
| var alreadyRegistered = false | ||
| while (iterator.hasNext && !alreadyRegistered) { | ||
| alreadyRegistered = iterator.next()._2 == closeable | ||
| } | ||
| if (!alreadyRegistered) { | ||
| closeables.add((threadId, closeable)) | ||
| } | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Unregisters a closeable resource from the registry. | ||
| * This removes the resource regardless of which thread it was registered from. | ||
| * | ||
| * @param closeable The AutoCloseable resource to unregister | ||
| */ | ||
| def unregisterCloseable(closeable: AutoCloseable): Unit = synchronized { | ||
| val iterator = closeables.iterator() | ||
| while (iterator.hasNext) { | ||
| val (_, c) = iterator.next() | ||
| if (c == closeable) { | ||
| iterator.remove() | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Closes all registered resources for the specified thread in LIFO (Last-In-First-Out) order. | ||
| * This method is typically called when a thread times out or completes execution. | ||
| * Any exceptions during closing are logged but do not prevent other resources from being closed. | ||
| * | ||
| * @param threadId The ID of the thread whose resources should be cleaned up | ||
| */ | ||
| def cleanupThread(threadId: Long): Unit = synchronized { | ||
| val threadCloseables = closeables.asScala.filter(_._1 == threadId).map(_._2).toList | ||
| threadCloseables.reverse.foreach { closeable => | ||
| try { | ||
| closeable.close() | ||
| } catch { | ||
| case NonFatal(ex) => | ||
| log.warn(s"Error closing resource for thread $threadId.", ex) | ||
| } finally { | ||
| unregisterCloseable(closeable) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package za.co.absa.pramen.core.utils.hive | |
| import org.slf4j.LoggerFactory | ||
| import za.co.absa.pramen.core.reader.JdbcUrlSelector | ||
| import za.co.absa.pramen.core.reader.model.JdbcConfig | ||
| import za.co.absa.pramen.core.runner.task.ThreadClosableRegistry | ||
|
|
||
| import java.sql._ | ||
| import scala.util.control.NonFatal | ||
|
|
@@ -66,15 +67,43 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B | |
| executeActionOnConnection { conn => | ||
| val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) | ||
|
|
||
| val autoCloseStatement: AutoCloseable = new AutoCloseable { | ||
| val statementClosed = new java.util.concurrent.atomic.AtomicBoolean(false) | ||
|
|
||
| override def close(): Unit = { | ||
| if (statementClosed.compareAndSet(false, true)) { | ||
| try { | ||
| log.info(s"Cancelling SQL statement: $query") | ||
| statement.cancel() | ||
| } finally { | ||
| log.info(s"Closing the SQL statement...") | ||
| statement.close() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| ThreadClosableRegistry.registerCloseable(autoCloseStatement) | ||
|
|
||
| try { | ||
| statement.execute(query) | ||
| } finally { | ||
| statement.close() | ||
| ThreadClosableRegistry.unregisterCloseable(autoCloseStatement) | ||
| autoCloseStatement.close() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def close(): Unit = if (connection != null) connection.close() | ||
| override def close(): Unit = { | ||
| if (connection != null) { | ||
| ThreadClosableRegistry.unregisterCloseable(connection) | ||
| try { | ||
| connection.close() | ||
| } catch { | ||
| case NonFatal(ex) => log.warn("Failed to close JDBC connection", ex) | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+97
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check if close() is called from contexts that might be on different threads
# Look for usages of QueryExecutorJdbc and how/where close() is invoked
echo "=== QueryExecutorJdbc instantiation sites ==="
rg -n "new QueryExecutorJdbc" --type scala -A 3
echo -e "\n=== QueryExecutor close() calls ==="
rg -n "\.close\(\)" --type scala -C 2 | rg -A 2 -B 2 "QueryExecutor|HiveHelper"
echo -e "\n=== AutoCloseable/try-with-resources patterns ==="
ast-grep --pattern 'for ($_ <- $_) yield { $$$ }'Repository: AbsaOSS/pramen Length of output: 17609 🏁 Script executed: #!/bin/bash
echo "=== QueryExecutorJdbc constructor and registration ==="
sed -n '1,100p' pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala | head -50
echo -e "\n=== HiveHelper.scala context around line 67 ==="
sed -n '50,85p' pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scala
echo -e "\n=== Search for try-with-resources usage ==="
rg -n "\.withResource|try \{" --type scala pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/ -A 2Repository: AbsaOSS/pramen Length of output: 5271 🏁 Script executed: #!/bin/bash
echo "=== Full QueryExecutorJdbc.scala ==="
wc -l pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
cat pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scala
echo -e "\n=== HiveHelperSql lifecycle management ==="
rg -n "queryExecutor|close|getConnection" pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala -B 2 -A 2Repository: AbsaOSS/pramen Length of output: 6474 Cross-thread
This is a minor issue because the connection is still properly closed via 🤖 Prompt for AI Agents |
||
|
|
||
| private[core] def executeActionOnConnection(action: Connection => Boolean): Boolean = { | ||
| val currentConnection = getConnection(forceReconnect = false) | ||
|
|
@@ -97,7 +126,9 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector, optimizedExistQuery: B | |
| if (connection == null || forceReconnect) { | ||
| val (newConnection, url) = jdbcUrlSelector.getWorkingConnection(retries) | ||
| log.info(s"Selected query executor connection: $url") | ||
| close() | ||
| connection = newConnection | ||
| ThreadClosableRegistry.registerCloseable(connection) | ||
| } | ||
| connection | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid flipping this global default in a timeout-focused PR.
Line 64 changes production behavior for all deployments (
MSCK REPAIRpreference →ADD PARTITIONpreference), which is broader than the JDBC-timeout cleanup objective and may surprise existing users.Suggested minimal change
📝 Committable suggestion
🤖 Prompt for AI Agents