Skip to content

Commit 3edd095

Browse files
authored
optimize pool (#681)
1 parent d76d56c commit 3edd095

File tree

1 file changed

+116
-19
lines changed

1 file changed

+116
-19
lines changed

src/memos/graph_dbs/polardb.py

Lines changed: 116 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -201,28 +201,53 @@ def _get_connection_old(self):
201201
return conn
202202

203203
def _get_connection(self):
204-
"""Get a connection from the pool."""
204+
"""
205+
Get a connection from the pool.
206+
207+
This function:
208+
1. Gets a connection from ThreadedConnectionPool
209+
2. Checks if connection is closed or unhealthy
210+
3. Returns healthy connection or retries (max 3 times)
211+
4. Handles connection pool exhaustion gracefully
212+
213+
Returns:
214+
psycopg2 connection object
215+
216+
Raises:
217+
RuntimeError: If connection pool is closed or exhausted after retries
218+
"""
205219
if self._pool_closed:
206220
raise RuntimeError("Connection pool has been closed")
207221

208-
max_retries = 3
222+
max_retries = 5
223+
import psycopg2.pool
224+
209225
for attempt in range(max_retries):
210226
conn = None
211227
try:
228+
# Try to get connection from pool
229+
# This may raise PoolError if pool is exhausted
212230
conn = self.connection_pool.getconn()
213231

214232
# Check if connection is closed
215233
if conn.closed != 0:
216234
# Connection is closed, return it to pool with close flag and try again
235+
logger.warning(
236+
f"[_get_connection] Got closed connection, attempt {attempt + 1}/{max_retries}"
237+
)
217238
try:
218239
self.connection_pool.putconn(conn, close=True)
219240
except Exception as e:
220-
logger.warning(f"Failed to return closed connection to pool: {e}")
241+
logger.warning(
242+
f"[_get_connection] Failed to return closed connection to pool: {e}"
243+
)
221244
with suppress(Exception):
222245
conn.close()
223246

224247
conn = None
225248
if attempt < max_retries - 1:
249+
# Exponential backoff: 0.1s, 0.2s, 0.4s
250+
time.sleep(0.1 * (2**attempt))
226251
continue
227252
else:
228253
raise RuntimeError("Pool returned a closed connection after all retries")
@@ -239,19 +264,21 @@ def _get_connection(self):
239264
except Exception as health_check_error:
240265
# Connection is not usable, return it to pool with close flag and try again
241266
logger.warning(
242-
f"Connection health check failed: {health_check_error}, returning connection to pool and retrying..."
267+
f"[_get_connection] Connection health check failed (attempt {attempt + 1}/{max_retries}): {health_check_error}"
243268
)
244269
try:
245270
self.connection_pool.putconn(conn, close=True)
246271
except Exception as putconn_error:
247272
logger.warning(
248-
f"Failed to return unhealthy connection to pool: {putconn_error}"
273+
f"[_get_connection] Failed to return unhealthy connection to pool: {putconn_error}"
249274
)
250275
with suppress(Exception):
251276
conn.close()
252277

253278
conn = None
254279
if attempt < max_retries - 1:
280+
# Exponential backoff: 0.1s, 0.2s, 0.4s
281+
time.sleep(0.1 * (2**attempt))
255282
continue
256283
else:
257284
raise RuntimeError(
@@ -260,62 +287,132 @@ def _get_connection(self):
260287

261288
# Connection is healthy, return it
262289
return conn
290+
291+
except psycopg2.pool.PoolError as pool_error:
292+
# Pool exhausted or other pool-related error
293+
# Don't retry immediately for pool exhaustion - it's unlikely to resolve quickly
294+
error_msg = str(pool_error).lower()
295+
if "exhausted" in error_msg or "pool" in error_msg:
296+
# Log pool status for debugging
297+
try:
298+
# Try to get pool stats if available
299+
pool_info = f"Pool config: minconn={self.connection_pool.minconn}, maxconn={self.connection_pool.maxconn}"
300+
logger.error(
301+
f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries}). {pool_info}"
302+
)
303+
except Exception:
304+
logger.error(
305+
f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries})"
306+
)
307+
308+
# For pool exhaustion, wait longer before retry (connections may be returned)
309+
if attempt < max_retries - 1:
310+
# Longer backoff for pool exhaustion: 0.5s, 1.0s, 2.0s
311+
wait_time = 0.5 * (2**attempt)
312+
logger.info(f"[_get_connection] Waiting {wait_time}s before retry...")
313+
time.sleep(wait_time)
314+
continue
315+
else:
316+
raise RuntimeError(
317+
f"Connection pool exhausted after {max_retries} attempts. "
318+
f"This usually means connections are not being returned to the pool. "
319+
f"Check for connection leaks in your code."
320+
) from pool_error
321+
else:
322+
# Other pool errors - retry with normal backoff
323+
if attempt < max_retries - 1:
324+
time.sleep(0.1 * (2**attempt))
325+
continue
326+
else:
327+
raise RuntimeError(
328+
f"Failed to get connection from pool: {pool_error}"
329+
) from pool_error
330+
263331
except Exception as e:
332+
# Other exceptions (not pool-related)
264333
# Only try to return connection if we actually got one
265334
# If getconn() failed (e.g., pool exhausted), conn will be None
266335
if conn is not None:
267336
try:
268-
# If it's a PoolError or similar, close the connection instead of returning
269-
if "pool" in str(e).lower() or "exhausted" in str(e).lower():
270-
with suppress(Exception):
271-
conn.close()
272-
else:
273-
self.connection_pool.putconn(conn, close=True)
337+
# Return connection to pool if it's valid
338+
self.connection_pool.putconn(conn, close=True)
274339
except Exception as putconn_error:
275-
logger.warning(f"Failed to handle connection after error: {putconn_error}")
340+
logger.warning(
341+
f"[_get_connection] Failed to return connection after error: {putconn_error}"
342+
)
276343
with suppress(Exception):
277344
conn.close()
278345

279346
if attempt >= max_retries - 1:
280347
raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e
281348
else:
282-
time.sleep(0.1)
349+
# Exponential backoff: 0.1s, 0.2s, 0.4s
350+
time.sleep(0.1 * (2**attempt))
283351
continue
284352

353+
# Should never reach here, but just in case
354+
raise RuntimeError("Failed to get connection after all retries")
355+
285356
def _return_connection(self, connection):
286-
"""Return a connection to the pool."""
357+
"""
358+
Return a connection to the pool.
359+
360+
This function safely returns a connection to the pool, handling:
361+
- Closed connections (close them instead of returning)
362+
- Pool closed state (close connection directly)
363+
- None connections (no-op)
364+
- putconn() failures (close connection as fallback)
365+
366+
Args:
367+
connection: psycopg2 connection object or None
368+
"""
287369
if self._pool_closed:
288370
# Pool is closed, just close the connection if it exists
289371
if connection:
290372
try:
291373
connection.close()
374+
logger.debug("[_return_connection] Closed connection (pool is closed)")
292375
except Exception as e:
293-
logger.warning(f"Failed to close connection after pool closed: {e}")
376+
logger.warning(
377+
f"[_return_connection] Failed to close connection after pool closed: {e}"
378+
)
294379
return
295380

296381
if not connection:
297-
# No connection to return
382+
# No connection to return - this is normal if _get_connection() failed
298383
return
299384

300385
try:
301386
# Check if connection is closed
302387
if hasattr(connection, "closed") and connection.closed != 0:
303388
# Connection is closed, just close it explicitly and don't return to pool
389+
logger.debug(
390+
"[_return_connection] Connection is closed, closing it instead of returning to pool"
391+
)
304392
try:
305393
connection.close()
306394
except Exception as e:
307-
logger.warning(f"Failed to close closed connection: {e}")
395+
logger.warning(f"[_return_connection] Failed to close closed connection: {e}")
308396
return
309397

310398
# Connection is valid, return to pool
311399
self.connection_pool.putconn(connection)
400+
logger.debug("[_return_connection] Successfully returned connection to pool")
312401
except Exception as e:
313402
# If putconn fails, try to close the connection
314-
logger.warning(f"Failed to return connection to pool: {e}")
403+
# This prevents connection leaks if putconn() fails
404+
logger.error(
405+
f"[_return_connection] Failed to return connection to pool: {e}", exc_info=True
406+
)
315407
try:
316408
connection.close()
409+
logger.debug(
410+
"[_return_connection] Closed connection as fallback after putconn failure"
411+
)
317412
except Exception as close_error:
318-
logger.warning(f"Failed to close connection after putconn error: {close_error}")
413+
logger.warning(
414+
f"[_return_connection] Failed to close connection after putconn error: {close_error}"
415+
)
319416

320417
def _return_connection_old(self, connection):
321418
"""Return a connection to the pool."""

0 commit comments

Comments
 (0)