Skip to content

Commit d42a7ce

Browse files
authored
Revert "optimize pool" (#682)
Revert "optimize pool (#681)" This reverts commit 3edd095.
1 parent e46c805 commit d42a7ce

File tree

1 file changed

+19
-116
lines changed

1 file changed

+19
-116
lines changed

src/memos/graph_dbs/polardb.py

Lines changed: 19 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -201,53 +201,28 @@ def _get_connection_old(self):
201201
return conn
202202

203203
def _get_connection(self):
204-
"""
205-
Get a connection from the pool.
206-
207-
This function:
208-
1. Gets a connection from ThreadedConnectionPool
209-
2. Checks if connection is closed or unhealthy
210-
3. Returns healthy connection or retries (max 3 times)
211-
4. Handles connection pool exhaustion gracefully
212-
213-
Returns:
214-
psycopg2 connection object
215-
216-
Raises:
217-
RuntimeError: If connection pool is closed or exhausted after retries
218-
"""
204+
"""Get a connection from the pool."""
219205
if self._pool_closed:
220206
raise RuntimeError("Connection pool has been closed")
221207

222-
max_retries = 5
223-
import psycopg2.pool
224-
208+
max_retries = 3
225209
for attempt in range(max_retries):
226210
conn = None
227211
try:
228-
# Try to get connection from pool
229-
# This may raise PoolError if pool is exhausted
230212
conn = self.connection_pool.getconn()
231213

232214
# Check if connection is closed
233215
if conn.closed != 0:
234216
# Connection is closed, return it to pool with close flag and try again
235-
logger.warning(
236-
f"[_get_connection] Got closed connection, attempt {attempt + 1}/{max_retries}"
237-
)
238217
try:
239218
self.connection_pool.putconn(conn, close=True)
240219
except Exception as e:
241-
logger.warning(
242-
f"[_get_connection] Failed to return closed connection to pool: {e}"
243-
)
220+
logger.warning(f"Failed to return closed connection to pool: {e}")
244221
with suppress(Exception):
245222
conn.close()
246223

247224
conn = None
248225
if attempt < max_retries - 1:
249-
# Exponential backoff: 0.1s, 0.2s, 0.4s
250-
time.sleep(0.1 * (2**attempt))
251226
continue
252227
else:
253228
raise RuntimeError("Pool returned a closed connection after all retries")
@@ -264,21 +239,19 @@ def _get_connection(self):
264239
except Exception as health_check_error:
265240
# Connection is not usable, return it to pool with close flag and try again
266241
logger.warning(
267-
f"[_get_connection] Connection health check failed (attempt {attempt + 1}/{max_retries}): {health_check_error}"
242+
f"Connection health check failed: {health_check_error}, returning connection to pool and retrying..."
268243
)
269244
try:
270245
self.connection_pool.putconn(conn, close=True)
271246
except Exception as putconn_error:
272247
logger.warning(
273-
f"[_get_connection] Failed to return unhealthy connection to pool: {putconn_error}"
248+
f"Failed to return unhealthy connection to pool: {putconn_error}"
274249
)
275250
with suppress(Exception):
276251
conn.close()
277252

278253
conn = None
279254
if attempt < max_retries - 1:
280-
# Exponential backoff: 0.1s, 0.2s, 0.4s
281-
time.sleep(0.1 * (2**attempt))
282255
continue
283256
else:
284257
raise RuntimeError(
@@ -287,132 +260,62 @@ def _get_connection(self):
287260

288261
# Connection is healthy, return it
289262
return conn
290-
291-
except psycopg2.pool.PoolError as pool_error:
292-
# Pool exhausted or other pool-related error
293-
# Don't retry immediately for pool exhaustion - it's unlikely to resolve quickly
294-
error_msg = str(pool_error).lower()
295-
if "exhausted" in error_msg or "pool" in error_msg:
296-
# Log pool status for debugging
297-
try:
298-
# Try to get pool stats if available
299-
pool_info = f"Pool config: minconn={self.connection_pool.minconn}, maxconn={self.connection_pool.maxconn}"
300-
logger.error(
301-
f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries}). {pool_info}"
302-
)
303-
except Exception:
304-
logger.error(
305-
f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries})"
306-
)
307-
308-
# For pool exhaustion, wait longer before retry (connections may be returned)
309-
if attempt < max_retries - 1:
310-
# Longer backoff for pool exhaustion: 0.5s, 1.0s, 2.0s
311-
wait_time = 0.5 * (2**attempt)
312-
logger.info(f"[_get_connection] Waiting {wait_time}s before retry...")
313-
time.sleep(wait_time)
314-
continue
315-
else:
316-
raise RuntimeError(
317-
f"Connection pool exhausted after {max_retries} attempts. "
318-
f"This usually means connections are not being returned to the pool. "
319-
f"Check for connection leaks in your code."
320-
) from pool_error
321-
else:
322-
# Other pool errors - retry with normal backoff
323-
if attempt < max_retries - 1:
324-
time.sleep(0.1 * (2**attempt))
325-
continue
326-
else:
327-
raise RuntimeError(
328-
f"Failed to get connection from pool: {pool_error}"
329-
) from pool_error
330-
331263
except Exception as e:
332-
# Other exceptions (not pool-related)
333264
# Only try to return connection if we actually got one
334265
# If getconn() failed (e.g., pool exhausted), conn will be None
335266
if conn is not None:
336267
try:
337-
# Return connection to pool if it's valid
338-
self.connection_pool.putconn(conn, close=True)
268+
# If it's a PoolError or similar, close the connection instead of returning
269+
if "pool" in str(e).lower() or "exhausted" in str(e).lower():
270+
with suppress(Exception):
271+
conn.close()
272+
else:
273+
self.connection_pool.putconn(conn, close=True)
339274
except Exception as putconn_error:
340-
logger.warning(
341-
f"[_get_connection] Failed to return connection after error: {putconn_error}"
342-
)
275+
logger.warning(f"Failed to handle connection after error: {putconn_error}")
343276
with suppress(Exception):
344277
conn.close()
345278

346279
if attempt >= max_retries - 1:
347280
raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e
348281
else:
349-
# Exponential backoff: 0.1s, 0.2s, 0.4s
350-
time.sleep(0.1 * (2**attempt))
282+
time.sleep(0.1)
351283
continue
352284

353-
# Should never reach here, but just in case
354-
raise RuntimeError("Failed to get connection after all retries")
355-
356285
def _return_connection(self, connection):
357-
"""
358-
Return a connection to the pool.
359-
360-
This function safely returns a connection to the pool, handling:
361-
- Closed connections (close them instead of returning)
362-
- Pool closed state (close connection directly)
363-
- None connections (no-op)
364-
- putconn() failures (close connection as fallback)
365-
366-
Args:
367-
connection: psycopg2 connection object or None
368-
"""
286+
"""Return a connection to the pool."""
369287
if self._pool_closed:
370288
# Pool is closed, just close the connection if it exists
371289
if connection:
372290
try:
373291
connection.close()
374-
logger.debug("[_return_connection] Closed connection (pool is closed)")
375292
except Exception as e:
376-
logger.warning(
377-
f"[_return_connection] Failed to close connection after pool closed: {e}"
378-
)
293+
logger.warning(f"Failed to close connection after pool closed: {e}")
379294
return
380295

381296
if not connection:
382-
# No connection to return - this is normal if _get_connection() failed
297+
# No connection to return
383298
return
384299

385300
try:
386301
# Check if connection is closed
387302
if hasattr(connection, "closed") and connection.closed != 0:
388303
# Connection is closed, just close it explicitly and don't return to pool
389-
logger.debug(
390-
"[_return_connection] Connection is closed, closing it instead of returning to pool"
391-
)
392304
try:
393305
connection.close()
394306
except Exception as e:
395-
logger.warning(f"[_return_connection] Failed to close closed connection: {e}")
307+
logger.warning(f"Failed to close closed connection: {e}")
396308
return
397309

398310
# Connection is valid, return to pool
399311
self.connection_pool.putconn(connection)
400-
logger.debug("[_return_connection] Successfully returned connection to pool")
401312
except Exception as e:
402313
# If putconn fails, try to close the connection
403-
# This prevents connection leaks if putconn() fails
404-
logger.error(
405-
f"[_return_connection] Failed to return connection to pool: {e}", exc_info=True
406-
)
314+
logger.warning(f"Failed to return connection to pool: {e}")
407315
try:
408316
connection.close()
409-
logger.debug(
410-
"[_return_connection] Closed connection as fallback after putconn failure"
411-
)
412317
except Exception as close_error:
413-
logger.warning(
414-
f"[_return_connection] Failed to close connection after putconn error: {close_error}"
415-
)
318+
logger.warning(f"Failed to close connection after putconn error: {close_error}")
416319

417320
def _return_connection_old(self, connection):
418321
"""Return a connection to the pool."""

0 commit comments

Comments
 (0)