77- test_commit: Make a transaction and commit.
88- test_rollback: Make a transaction and rollback.
99- test_invalid_connection_string: Check if initializing with an invalid connection string raises an exception.
10- - test_connection_pooling_speed: Test connection pooling speed.
11- - test_connection_pooling_basic: Test basic connection pooling functionality.
1210- test_autocommit_default: Check if autocommit is False by default.
1311- test_autocommit_setter: Test setting autocommit mode and its effect on transactions.
1412- test_set_autocommit: Test the setautocommit method.
@@ -290,131 +288,6 @@ def test_connection_close(conn_str):
290288 # Check if the database connection can be closed
291289 temp_conn .close ()
292290
293- def test_connection_pooling_speed (conn_str ):
294- """Test that connection pooling provides performance benefits over multiple iterations."""
295- import statistics
296-
297- # Warm up to eliminate cold start effects
298- for _ in range (3 ):
299- conn = connect (conn_str )
300- conn .close ()
301-
302- # Disable pooling first
303- pooling (enabled = False )
304-
305- # Test without pooling (multiple times)
306- no_pool_times = []
307- for _ in range (10 ):
308- start = time .perf_counter ()
309- conn = connect (conn_str )
310- conn .close ()
311- end = time .perf_counter ()
312- no_pool_times .append (end - start )
313-
314- # Enable pooling
315- pooling (max_size = 5 , idle_timeout = 30 )
316-
317- # Test with pooling (multiple times)
318- pool_times = []
319- for _ in range (10 ):
320- start = time .perf_counter ()
321- conn = connect (conn_str )
322- conn .close ()
323- end = time .perf_counter ()
324- pool_times .append (end - start )
325-
326- # Use median times to reduce impact of outliers
327- median_no_pool = statistics .median (no_pool_times )
328- median_pool = statistics .median (pool_times )
329-
330- # Allow for some variance - pooling should be at least 30% faster on average
331- improvement_threshold = 0.7 # Pool should be <= 70% of no-pool time
332-
333- print (f"No pool median: { median_no_pool :.6f} s" )
334- print (f"Pool median: { median_pool :.6f} s" )
335- print (f"Improvement ratio: { median_pool / median_no_pool :.2f} " )
336-
337- # Clean up - disable pooling for other tests
338- pooling (enabled = False )
339-
340- assert median_pool <= median_no_pool * improvement_threshold , \
341- f"Expected pooling to be at least 30% faster. No-pool: { median_no_pool :.6f} s, Pool: { median_pool :.6f} s"
342-
343- def test_connection_pooling_reuse_spid (conn_str ):
344- """Test that connections are actually reused from the pool"""
345- # Enable pooling
346- pooling (max_size = 1 , idle_timeout = 30 )
347-
348- # Create and close a connection
349- conn1 = connect (conn_str )
350- cursor1 = conn1 .cursor ()
351- cursor1 .execute ("SELECT @@SPID" ) # Get SQL Server process ID
352- spid1 = cursor1 .fetchone ()[0 ]
353- conn1 .close ()
354-
355- # Get another connection - should be the same one from pool
356- conn2 = connect (conn_str )
357- cursor2 = conn2 .cursor ()
358- cursor2 .execute ("SELECT @@SPID" )
359- spid2 = cursor2 .fetchone ()[0 ]
360- conn2 .close ()
361-
362- # The SPID should be the same, indicating connection reuse
363- assert spid1 == spid2 , "Connections not reused - different SPIDs"
364-
365- # Clean up
366-
367- def test_pool_exhaustion_max_size_1 (conn_str ):
368- """Test pool exhaustion when max_size=1 and multiple concurrent connections are requested."""
369- pooling (max_size = 1 , idle_timeout = 30 )
370- conn1 = connect (conn_str )
371- results = []
372-
373- def try_connect ():
374- try :
375- conn2 = connect (conn_str )
376- results .append ("success" )
377- conn2 .close ()
378- except Exception as e :
379- results .append (str (e ))
380-
381- # Start a thread that will attempt to get a second connection while the first is open
382- t = threading .Thread (target = try_connect )
383- t .start ()
384- t .join (timeout = 2 )
385- conn1 .close ()
386-
387- # Depending on implementation, either blocks, raises, or times out
388- assert results , "Second connection attempt did not complete"
389- # If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes
390- assert results [0 ] == "success" or "pool" in results [0 ].lower () or "timeout" in results [0 ].lower (), \
391- f"Unexpected pool exhaustion result: { results [0 ]} "
392- pooling (enabled = False )
393-
394- def test_pool_idle_timeout_removes_connections (conn_str ):
395- """Test that idle_timeout removes connections from the pool after the timeout."""
396- pooling (max_size = 2 , idle_timeout = 2 )
397- conn1 = connect (conn_str )
398- spid_list = []
399- cursor1 = conn1 .cursor ()
400- cursor1 .execute ("SELECT @@SPID" )
401- spid1 = cursor1 .fetchone ()[0 ]
402- spid_list .append (spid1 )
403- conn1 .close ()
404-
405- # Wait for longer than idle_timeout
406- time .sleep (3 )
407-
408- # Get a new connection, which should not reuse the previous SPID
409- conn2 = connect (conn_str )
410- cursor2 = conn2 .cursor ()
411- cursor2 .execute ("SELECT @@SPID" )
412- spid2 = cursor2 .fetchone ()[0 ]
413- spid_list .append (spid2 )
414- conn2 .close ()
415-
416- assert spid1 != spid2 , "Idle timeout did not remove connection from pool"
417-
418291def test_connection_timeout_invalid_password (conn_str ):
419292 """Test that connecting with an invalid password raises an exception quickly (timeout)."""
420293 # Modify the connection string to use an invalid password
@@ -431,6 +304,7 @@ def test_connection_timeout_invalid_password(conn_str):
431304 # Should fail quickly (within 10 seconds)
432305 assert elapsed < 10 , f"Connection with invalid password took too long: { elapsed :.2f} s"
433306
307+
434308def test_connection_timeout_invalid_host (conn_str ):
435309 """Test that connecting to an invalid host fails with a timeout."""
436310 # Replace server/host with an invalid one
@@ -451,106 +325,6 @@ def test_connection_timeout_invalid_host(conn_str):
451325 # If it takes too long, it may indicate a misconfiguration or network issue.
452326 assert elapsed < 30 , f"Connection to invalid host took too long: { elapsed :.2f} s"
453327
454- def test_pool_removes_invalid_connections (conn_str ):
455- """Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
456- pooling (max_size = 1 , idle_timeout = 30 )
457- conn = connect (conn_str )
458- cursor = conn .cursor ()
459- cursor .execute ("SELECT 1" )
460- # Simulate invalidation by forcibly closing the connection at the driver level
461- try :
462- # Try to access a private attribute or method to forcibly close the underlying connection
463- # This is implementation-specific; if not possible, skip
464- if hasattr (conn , "_conn" ) and hasattr (conn ._conn , "close" ):
465- conn ._conn .close ()
466- else :
467- pytest .skip ("Cannot forcibly close underlying connection for this driver" )
468- except Exception :
469- pass
470- # Safely close the connection, ignoring errors due to forced invalidation
471- try :
472- conn .close ()
473- except RuntimeError as e :
474- if "not initialized" not in str (e ):
475- raise
476- # Now, get a new connection from the pool and ensure it works
477- new_conn = connect (conn_str )
478- new_cursor = new_conn .cursor ()
479- try :
480- new_cursor .execute ("SELECT 1" )
481- result = new_cursor .fetchone ()
482- assert result is not None and result [0 ] == 1 , "Pool did not remove invalid connection"
483- finally :
484- new_conn .close ()
485- pooling (enabled = False )
486-
487- def test_pool_recovery_after_failed_connection (conn_str ):
488- """Test that the pool recovers after a failed connection attempt."""
489- pooling (max_size = 1 , idle_timeout = 30 )
490- # First, try to connect with a bad password (should fail)
491- if "Pwd=" in conn_str :
492- bad_conn_str = conn_str .replace ("Pwd=" , "Pwd=wrongpassword" )
493- elif "Password=" in conn_str :
494- bad_conn_str = conn_str .replace ("Password=" , "Password=wrongpassword" )
495- else :
496- pytest .skip ("No password found in connection string to modify" )
497- with pytest .raises (Exception ):
498- connect (bad_conn_str )
499- # Now, connect with the correct string and ensure it works
500- conn = connect (conn_str )
501- cursor = conn .cursor ()
502- cursor .execute ("SELECT 1" )
503- result = cursor .fetchone ()
504- assert result is not None and result [0 ] == 1 , "Pool did not recover after failed connection"
505- conn .close ()
506- pooling (enabled = False )
507-
508- def test_pool_capacity_limit_and_overflow (conn_str ):
509- """Test that pool does not grow beyond max_size and handles overflow gracefully."""
510- pooling (max_size = 2 , idle_timeout = 30 )
511- conns = []
512- try :
513- # Open up to max_size connections
514- conns .append (connect (conn_str ))
515- conns .append (connect (conn_str ))
516- # Try to open a third connection, which should fail or block
517- overflow_result = []
518- def try_overflow ():
519- try :
520- c = connect (conn_str )
521- overflow_result .append ("success" )
522- c .close ()
523- except Exception as e :
524- overflow_result .append (str (e ))
525- t = threading .Thread (target = try_overflow )
526- t .start ()
527- t .join (timeout = 2 )
528- assert overflow_result , "Overflow connection attempt did not complete"
529- # Accept either block, error, or success if pool implementation allows overflow
530- assert overflow_result [0 ] == "success" or "pool" in overflow_result [0 ].lower () or "timeout" in overflow_result [0 ].lower (), \
531- f"Unexpected pool overflow result: { overflow_result [0 ]} "
532- finally :
533- for c in conns :
534- c .close ()
535- pooling (enabled = False )
536-
537- def test_connection_pooling_basic (conn_str ):
538- # Enable pooling with small pool size
539- pooling (max_size = 2 , idle_timeout = 5 )
540- conn1 = connect (conn_str )
541- conn2 = connect (conn_str )
542- assert conn1 is not None
543- assert conn2 is not None
544- try :
545- conn3 = connect (conn_str )
546- assert conn3 is not None , "Third connection failed — pooling is not working or limit is too strict"
547- conn3 .close ()
548- except Exception as e :
549- print (f"Expected: Could not open third connection due to max_size=2: { e } " )
550-
551- conn1 .close ()
552- conn2 .close ()
553-
554328def test_context_manager_commit (conn_str ):
555329 """Test that context manager closes connection on normal exit"""
556330 # Create a permanent table for testing across connections
0 commit comments