From 1bd194b77213aff68fa63f5aa6d7bf060cca7c88 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 24 Nov 2025 19:01:30 +0000 Subject: [PATCH 1/3] fix: Database cache logic --- src/firebolt/async_db/cursor.py | 7 +- src/firebolt/db/cursor.py | 7 +- .../dbapi/async/V2/test_queries_async.py | 82 ++++++ tests/integration/dbapi/sync/V2/conftest.py | 2 +- .../integration/dbapi/sync/V2/test_queries.py | 78 ++++++ tests/unit/async_db/test_caching.py | 246 +++++++++++++++++ tests/unit/db/test_caching.py | 247 ++++++++++++++++++ tests/unit/db_conftest.py | 50 ++++ 8 files changed, 712 insertions(+), 7 deletions(-) diff --git a/src/firebolt/async_db/cursor.py b/src/firebolt/async_db/cursor.py index 694db97650f..dbf617a703d 100644 --- a/src/firebolt/async_db/cursor.py +++ b/src/firebolt/async_db/cursor.py @@ -310,9 +310,10 @@ async def use_engine(self, engine: str, cache: bool = True) -> None: self._update_set_parameters(cache_obj.engines[engine].params) else: await self.execute(f'USE ENGINE "{engine}"') - cache_obj.engines[engine] = EngineInfo( - self.engine_url, self.parameters | self._set_parameters - ) + params = self.parameters | self._set_parameters + # Ensure 'database' parameter is not cached with engine info + params = {k: v for k, v in params.items() if k != "database"} + cache_obj.engines[engine] = EngineInfo(self.engine_url, params) self.set_cache_record(cache_obj) else: await self.execute(f'USE ENGINE "{engine}"') diff --git a/src/firebolt/db/cursor.py b/src/firebolt/db/cursor.py index 80fec9939bf..f862adb8f4c 100644 --- a/src/firebolt/db/cursor.py +++ b/src/firebolt/db/cursor.py @@ -314,9 +314,10 @@ def use_engine(self, engine: str, cache: bool = True) -> None: self._update_set_parameters(cache_obj.engines[engine].params) else: self.execute(f'USE ENGINE "{engine}"') - cache_obj.engines[engine] = EngineInfo( - self.engine_url, self.parameters | self._set_parameters - ) + params = self.parameters | self._set_parameters + # Ensure 'database' parameter is not cached with engine info + params = {k: v for k, v in params.items() if k != "database"} + cache_obj.engines[engine] = EngineInfo(self.engine_url, params) self.set_cache_record(cache_obj) else: self.execute(f'USE ENGINE "{engine}"') diff --git a/tests/integration/dbapi/async/V2/test_queries_async.py b/tests/integration/dbapi/async/V2/test_queries_async.py index b4fbc8cfde2..3167b14e32c 100644 --- a/tests/integration/dbapi/async/V2/test_queries_async.py +++ b/tests/integration/dbapi/async/V2/test_queries_async.py @@ -1188,3 +1188,85 @@ async def test_connection_close_no_rollback_with_autocommit_on( await check_data_visibility_async( table_name, 1, connection_factory, True, [1, "autocommit_close_test"] ) + + +async def test_database_switching_with_same_engine_preserves_database_context( + database_name: str, + connection_factory: Callable[..., Connection], +) -> None: + """ + Async integration test for database context preservation with caching on Firebolt. + + This test verifies against a live Firebolt instance: + 1. Connect with database1 + engine1 (cache entry created) + 2. Connect with database2 + engine1 (should add database2 to cache) + 3. Cursors from second connection should have database2, not database1 + """ + first_db_name = database_name + second_db_name = f"{database_name}_second_async" + + # Create a system connection to set up test databases + async with await connection_factory() as system_connection: + system_cursor = system_connection.cursor() + + try: + # Create the second test database + await system_cursor.execute( + f'CREATE DATABASE IF NOT EXISTS "{second_db_name}"' + ) + + # First connection: database1 + engine1 + async with await connection_factory(database=first_db_name) as connection1: + cursor1 = connection1.cursor() + await cursor1.execute("SELECT current_database()") + result1 = await cursor1.fetchone() + + # Verify first connection has correct database + assert ( + result1[0] == first_db_name + ), f"First cursor should have database {first_db_name}" + assert ( + cursor1.database == first_db_name + ), f"First cursor database property should be {first_db_name}" + + # Second connection: database2 + engine1 (same engine) + async with await connection_factory(database=second_db_name) as connection2: + cursor2 = connection2.cursor() + await cursor2.execute("SELECT current_database()") + result2 = await cursor2.fetchone() + + # Verify second connection has correct database + assert result2[0] == second_db_name, ( + f"Second cursor should have database {second_db_name}, " + f"but got {result2[0]}. This indicates the database context was overwritten." + ) + assert cursor2.database == second_db_name, ( + f"Second cursor database property should be {second_db_name}, " + f"but has {cursor2.database}. This indicates the database context was overwritten." + ) + + # Third connection: back to database1 + engine1 (should use cache) + async with await connection_factory(database=first_db_name) as connection3: + cursor3 = connection3.cursor() + await cursor3.execute("SELECT current_database()") + result3 = await cursor3.fetchone() + + # Verify third connection has correct database (should be from cache) + assert result3[0] == first_db_name, ( + f"Third cursor should have database {first_db_name}, " + f"but got {result3[0]}. This indicates cached database context is incorrect." + ) + assert cursor3.database == first_db_name, ( + f"Third cursor database property should be {first_db_name}, " + f"but has {cursor3.database}. This indicates cached database context is incorrect." + ) + + finally: + # Clean up: Drop the test database + try: + await system_cursor.execute( + f'DROP DATABASE IF EXISTS "{second_db_name}"' + ) + except Exception: + # Ignore cleanup errors to avoid masking the real test failure + pass diff --git a/tests/integration/dbapi/sync/V2/conftest.py b/tests/integration/dbapi/sync/V2/conftest.py index 6b250d4a872..604dc67b3d6 100644 --- a/tests/integration/dbapi/sync/V2/conftest.py +++ b/tests/integration/dbapi/sync/V2/conftest.py @@ -41,7 +41,7 @@ def connection_factory( def factory(**kwargs: Any) -> Connection: if request.param == "core": base_kwargs = { - "database": "firebolt", + "database": kwargs.pop("database", "firebolt"), "auth": core_auth, "url": core_url, } diff --git a/tests/integration/dbapi/sync/V2/test_queries.py b/tests/integration/dbapi/sync/V2/test_queries.py index 5541ffe5c37..81fd37f657f 100644 --- a/tests/integration/dbapi/sync/V2/test_queries.py +++ b/tests/integration/dbapi/sync/V2/test_queries.py @@ -1176,3 +1176,81 @@ def test_connection_close_no_rollback_with_autocommit_on( check_data_visibility( table_name, 1, connection_factory, True, [1, "autocommit_close_test"] ) + + +def test_database_switching_with_same_engine_preserves_database_context( + database_name: str, + connection_factory: Callable[..., Connection], +) -> None: + """ + Integration test for database context preservation with caching on Firebolt. + + This test verifies against a live Firebolt instance: + 1. Connect with database1 + engine1 (cache entry created) + 2. Connect with database2 + engine1 (should add database2 to cache) + 3. Cursors from second connection should have database2, not database1 + """ + first_db_name = database_name + second_db_name = f"{database_name}_second" + + # Create a system connection to set up test databases + with connection_factory() as system_connection: + system_cursor = system_connection.cursor() + + try: + # Create the second test database + system_cursor.execute(f'CREATE DATABASE IF NOT EXISTS "{second_db_name}"') + + # First connection: database1 + engine1 + with connection_factory(database=first_db_name) as connection1: + cursor1 = connection1.cursor() + cursor1.execute("SELECT current_database()") + result1 = cursor1.fetchone() + + # Verify first connection has correct database + assert ( + result1[0] == first_db_name + ), f"First cursor should have database {first_db_name}" + assert ( + cursor1.database == first_db_name + ), f"First cursor database property should be {first_db_name}" + + # Second connection: database2 + engine1 (same engine) + with connection_factory(database=second_db_name) as connection2: + cursor2 = connection2.cursor() + cursor2.execute("SELECT current_database()") + result2 = cursor2.fetchone() + + # Verify second connection has correct database + assert result2[0] == second_db_name, ( + f"Second cursor should have database {second_db_name}, " + f"but got {result2[0]}. This indicates the database context was overwritten." + ) + assert cursor2.database == second_db_name, ( + f"Second cursor database property should be {second_db_name}, " + f"but has {cursor2.database}. This indicates the database context was overwritten." + ) + + # Third connection: back to database1 + engine1 (should use cache) + with connection_factory(database=first_db_name) as connection3: + cursor3 = connection3.cursor() + cursor3.execute("SELECT current_database()") + result3 = cursor3.fetchone() + + # Verify third connection has correct database (should be from cache) + assert result3[0] == first_db_name, ( + f"Third cursor should have database {first_db_name}, " + f"but got {result3[0]}. This indicates cached database context is incorrect." + ) + assert cursor3.database == first_db_name, ( + f"Third cursor database property should be {first_db_name}, " + f"but has {cursor3.database}. This indicates cached database context is incorrect." + ) + + finally: + # Clean up: Drop the test database + try: + system_cursor.execute(f'DROP DATABASE IF EXISTS "{second_db_name}"') + except Exception: + # Ignore cleanup errors to avoid masking the real test failure + pass diff --git a/tests/unit/async_db/test_caching.py b/tests/unit/async_db/test_caching.py index 4833166bc86..5a6525048d2 100644 --- a/tests/unit/async_db/test_caching.py +++ b/tests/unit/async_db/test_caching.py @@ -627,3 +627,249 @@ def use_engine_callback_counter(request, **kwargs): # Verify USE ENGINE was not called again (cache hit) assert use_engine_call_counter == 1, "USE ENGINE was called when cache should hit" + + +async def test_database_switching_with_same_engine_preserves_database_context( + db_name: str, + engine_name: str, + auth_url: str, + httpx_mock: HTTPXMock, + check_credentials_callback: Callable, + get_system_engine_url: str, + get_system_engine_callback: Callable, + system_engine_query_url: str, + system_engine_no_db_query_url: str, + query_url: str, + use_engine_callback: Callable, + query_callback: Callable, + api_endpoint: str, + auth: Auth, + account_name: str, + dynamic_use_database_callback: Callable, +): + """ + Test that switching databases with the same engine preserves correct database context. + + This test verifies the bug scenario where: + 1. Connect with database1 + engine1 (cache entry created) + 2. Connect with database2 + engine1 (should add database2 to cache) + 3. Cursors from second connection should have database2, not database1 + """ + second_db_name = f"{db_name}_second" + + # Mock HTTP calls + httpx_mock.add_callback(check_credentials_callback, url=auth_url, is_reusable=True) + httpx_mock.add_callback( + get_system_engine_callback, + url=get_system_engine_url, + is_reusable=True, + ) + + # Create dynamic callback for both databases + use_db_callback = dynamic_use_database_callback(db_name, second_db_name) + + # Add USE DATABASE callbacks for both databases + httpx_mock.add_callback( + use_db_callback, + url=system_engine_no_db_query_url, + match_content=f'USE DATABASE "{db_name}"'.encode("utf-8"), + is_reusable=True, + ) + httpx_mock.add_callback( + use_db_callback, + url=system_engine_no_db_query_url, + match_content=f'USE DATABASE "{second_db_name}"'.encode("utf-8"), + is_reusable=True, + ) + + # Add USE ENGINE callback + httpx_mock.add_callback( + use_engine_callback, + url=system_engine_query_url, + match_content=f'USE ENGINE "{engine_name}"'.encode("utf-8"), + is_reusable=True, + ) + httpx_mock.add_callback( + query_callback, + url=query_url, + is_reusable=True, + ) + + # Also add callback for second database query URL + second_query_url = query_url.copy_with( + params={**query_url.params, "database": second_db_name} + ) + httpx_mock.add_callback( + query_callback, + url=second_query_url, + is_reusable=True, + ) + + # First connection: database1 + engine1 + async with await connect( + database=db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection1: + cursor1 = connection1.cursor() + await cursor1.execute("SELECT 1") + + # Verify first connection has correct database + assert ( + cursor1.database == db_name + ), f"First cursor should have database {db_name}" + + # Verify cache contains first database + cache_record = cursor1.get_cache_record() + assert cache_record is not None, "Cache should have connection info" + assert ( + db_name in cache_record.databases + ), f"Cache should contain database {db_name}" + + # Second connection: database2 + engine1 (same engine) + async with await connect( + database=second_db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection2: + cursor2 = connection2.cursor() + await cursor2.execute("SELECT 1") + + # Verify second connection has correct database + assert cursor2.database == second_db_name, ( + f"Second cursor should have database {second_db_name}, " + f"but has {cursor2.database}. This indicates the database context was overwritten." + ) + + # Verify cache contains both databases + cache_record = cursor2.get_cache_record() + assert cache_record is not None, "Cache should have connection info" + assert ( + db_name in cache_record.databases + ), f"Cache should still contain database {db_name}" + assert ( + second_db_name in cache_record.databases + ), f"Cache should contain database {second_db_name}" + + # Third connection: back to database1 + engine1 (should use cache) + async with await connect( + database=db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection3: + cursor3 = connection3.cursor() + await cursor3.execute("SELECT 1") + + # Verify third connection has correct database (should be from cache) + assert cursor3.database == db_name, ( + f"Third cursor should have database {db_name}, " + f"but has {cursor3.database}. This indicates cached database context is incorrect." + ) + + +async def test_engine_cache_does_not_contain_database_parameter( + db_name: str, + engine_name: str, + auth_url: str, + httpx_mock: HTTPXMock, + check_credentials_callback: Callable, + get_system_engine_url: str, + get_system_engine_callback: Callable, + system_engine_query_url: str, + system_engine_no_db_query_url: str, + use_database_callback: Callable, + use_engine_with_params_callback: Callable, + query_callback: Callable, + api_endpoint: str, + auth: Auth, + account_name: str, + test_update_parameters: Dict[str, str], +): + """ + Test that cached engine parameters do not include database parameter. + + This test verifies the bug scenario where: + 1. Connect with both database and engine defined + 2. Engine cache should not contain 'database' parameter + 3. Only engine-specific parameters should be cached + """ + # Mock HTTP calls + httpx_mock.add_callback(check_credentials_callback, url=auth_url, is_reusable=True) + httpx_mock.add_callback( + get_system_engine_callback, + url=get_system_engine_url, + is_reusable=True, + ) + + # Add USE DATABASE callback + httpx_mock.add_callback( + use_database_callback, + url=system_engine_no_db_query_url, + match_content=f'USE DATABASE "{db_name}"'.encode("utf-8"), + is_reusable=True, + ) + + # Add USE ENGINE callback that returns parameters (including engine-specific ones) + httpx_mock.add_callback( + use_engine_with_params_callback, + url=system_engine_query_url, + match_content=f'USE ENGINE "{engine_name}"'.encode("utf-8"), + is_reusable=True, + ) + + # The query callback needs to be more flexible to handle additional parameters + # in case there's a database parameter + def flexible_query_callback(request, **kwargs): + # This will handle queries to the engine with any parameters + if engine_name in str(request.url) and "SELECT 1" in str(request.content): + return query_callback(request, **kwargs) + + httpx_mock.add_callback( + flexible_query_callback, + is_reusable=True, + ) + + # Create connection with both database and engine + async with await connect( + database=db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + cursor = connection.cursor() + await cursor.execute("SELECT 1") + + # Get the cache record + cache_record = cursor.get_cache_record() + assert cache_record is not None, "Cache should have connection info" + assert ( + engine_name in cache_record.engines + ), f"Cache should contain engine {engine_name}" + + # Get cached engine info + cached_engine = cache_record.engines[engine_name] + + # Verify that engine cache contains expected parameters from test_update_parameters + for param_name, expected_value in test_update_parameters.items(): + assert ( + param_name in cached_engine.params + ), f"Engine cache should contain parameter {param_name}" + assert cached_engine.params[param_name] == expected_value, ( + f"Engine cache parameter {param_name} should be {expected_value}, " + f"but is {cached_engine.params[param_name]}" + ) + + # Verify that engine cache does NOT contain database parameter + assert "database" not in cached_engine.params, ( + f"Engine cache should NOT contain 'database' parameter, " + f"but found database={cached_engine.params.get('database')}. " + f"Engine parameters should be separate from database context. " + f"Full engine params: {cached_engine.params}" + ) diff --git a/tests/unit/db/test_caching.py b/tests/unit/db/test_caching.py index 3b39e5792e4..33a468445d3 100644 --- a/tests/unit/db/test_caching.py +++ b/tests/unit/db/test_caching.py @@ -627,3 +627,250 @@ def use_engine_callback_counter(request, **kwargs): # Verify USE ENGINE was not called again (cache hit) assert use_engine_call_counter == 1, "USE ENGINE was called when cache should hit" + + +def test_database_switching_with_same_engine_preserves_database_context( + db_name: str, + engine_name: str, + auth_url: str, + httpx_mock: HTTPXMock, + check_credentials_callback: Callable, + get_system_engine_url: str, + get_system_engine_callback: Callable, + system_engine_query_url: str, + system_engine_no_db_query_url: str, + query_url: str, + use_engine_callback: Callable, + query_callback: Callable, + api_endpoint: str, + auth: Auth, + account_name: str, + dynamic_use_database_callback: Callable, +): + """ + Test that switching databases with the same engine preserves correct database context. + + This test verifies the bug scenario where: + 1. Connect with database1 + engine1 (cache entry created) + 2. Connect with database2 + engine1 (should add database2 to cache) + 3. Cursors from second connection should have database2, not database1 + """ + second_db_name = f"{db_name}_second" + + # Mock HTTP calls + httpx_mock.add_callback(check_credentials_callback, url=auth_url, is_reusable=True) + httpx_mock.add_callback( + get_system_engine_callback, + url=get_system_engine_url, + is_reusable=True, + ) + + # Create dynamic callback for both databases + use_db_callback = dynamic_use_database_callback(db_name, second_db_name) + + # Add USE DATABASE callbacks for both databases + httpx_mock.add_callback( + use_db_callback, + url=system_engine_no_db_query_url, + match_content=f'USE DATABASE "{db_name}"'.encode("utf-8"), + is_reusable=True, + ) + httpx_mock.add_callback( + use_db_callback, + url=system_engine_no_db_query_url, + match_content=f'USE DATABASE "{second_db_name}"'.encode("utf-8"), + is_reusable=True, + ) + + # Add USE ENGINE callback + httpx_mock.add_callback( + use_engine_callback, + url=system_engine_query_url, + match_content=f'USE ENGINE "{engine_name}"'.encode("utf-8"), + is_reusable=True, + ) + + httpx_mock.add_callback( + query_callback, + url=query_url, + is_reusable=True, + ) + + # Also add callback for second database query URL + second_query_url = query_url.copy_with( + params={**query_url.params, "database": second_db_name} + ) + httpx_mock.add_callback( + query_callback, + url=second_query_url, + is_reusable=True, + ) + + # First connection: database1 + engine1 + with connect( + database=db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection1: + cursor1 = connection1.cursor() + cursor1.execute("SELECT 1") + + # Verify first connection has correct database + assert ( + cursor1.database == db_name + ), f"First cursor should have database {db_name}" + + # Verify cache contains first database + cache_record = cursor1.get_cache_record() + assert cache_record is not None, "Cache should have connection info" + assert ( + db_name in cache_record.databases + ), f"Cache should contain database {db_name}" + + # Second connection: database2 + engine1 (same engine) + with connect( + database=second_db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection2: + cursor2 = connection2.cursor() + cursor2.execute("SELECT 1") + + # Verify second connection has correct database + assert cursor2.database == second_db_name, ( + f"Second cursor should have database {second_db_name}, " + f"but has {cursor2.database}. This indicates the database context was overwritten." + ) + + # Verify cache contains both databases + cache_record = cursor2.get_cache_record() + assert cache_record is not None, "Cache should have connection info" + assert ( + db_name in cache_record.databases + ), f"Cache should still contain database {db_name}" + assert ( + second_db_name in cache_record.databases + ), f"Cache should contain database {second_db_name}" + + # Third connection: back to database1 + engine1 (should use cache) + with connect( + database=db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection3: + cursor3 = connection3.cursor() + cursor3.execute("SELECT 1") + + # Verify third connection has correct database (should be from cache) + assert cursor3.database == db_name, ( + f"Third cursor should have database {db_name}, " + f"but has {cursor3.database}. This indicates cached database context is incorrect." + ) + + +def test_engine_cache_does_not_contain_database_parameter( + db_name: str, + engine_name: str, + auth_url: str, + httpx_mock: HTTPXMock, + check_credentials_callback: Callable, + get_system_engine_url: str, + get_system_engine_callback: Callable, + system_engine_query_url: str, + system_engine_no_db_query_url: str, + use_database_callback: Callable, + use_engine_with_params_callback: Callable, + query_callback: Callable, + api_endpoint: str, + auth: Auth, + account_name: str, + test_update_parameters: Dict[str, str], +): + """ + Test that cached engine parameters do not include database parameter. + + This test verifies the bug scenario where: + 1. Connect with both database and engine defined + 2. Engine cache should not contain 'database' parameter + 3. Only engine-specific parameters should be cached + """ + # Mock HTTP calls + httpx_mock.add_callback(check_credentials_callback, url=auth_url, is_reusable=True) + httpx_mock.add_callback( + get_system_engine_callback, + url=get_system_engine_url, + is_reusable=True, + ) + + # Add USE DATABASE callback + httpx_mock.add_callback( + use_database_callback, + url=system_engine_no_db_query_url, + match_content=f'USE DATABASE "{db_name}"'.encode("utf-8"), + is_reusable=True, + ) + + # Add USE ENGINE callback that returns parameters (including engine-specific ones) + httpx_mock.add_callback( + use_engine_with_params_callback, + url=system_engine_query_url, + match_content=f'USE ENGINE "{engine_name}"'.encode("utf-8"), + is_reusable=True, + ) + + # The query callback needs to be more flexible to handle additional parameters + # in case there's a database parameter + def flexible_query_callback(request, **kwargs): + # This will handle queries to the engine with any parameters + if engine_name in str(request.url) and "SELECT 1" in str(request.content): + return query_callback(request, **kwargs) + + httpx_mock.add_callback( + flexible_query_callback, + is_reusable=True, + ) + + # Create connection with both database and engine + with connect( + database=db_name, + engine_name=engine_name, + auth=auth, + account_name=account_name, + api_endpoint=api_endpoint, + ) as connection: + cursor = connection.cursor() + cursor.execute("SELECT 1") + + # Get the cache record + cache_record = cursor.get_cache_record() + assert cache_record is not None, "Cache should have connection info" + assert ( + engine_name in cache_record.engines + ), f"Cache should contain engine {engine_name}" + + # Get cached engine info + cached_engine = cache_record.engines[engine_name] + + # Verify that engine cache contains expected parameters from test_update_parameters + for param_name, expected_value in test_update_parameters.items(): + assert ( + param_name in cached_engine.params + ), f"Engine cache should contain parameter {param_name}" + assert cached_engine.params[param_name] == expected_value, ( + f"Engine cache parameter {param_name} should be {expected_value}, " + f"but is {cached_engine.params[param_name]}" + ) + + # Verify that engine cache does NOT contain database parameter + assert "database" not in cached_engine.params, ( + f"Engine cache should NOT contain 'database' parameter, " + f"but found database={cached_engine.params.get('database')}. " + f"Engine parameters should be separate from database context. " + f"Full engine params: {cached_engine.params}" + ) diff --git a/tests/unit/db_conftest.py b/tests/unit/db_conftest.py index 38facf1772e..d9e9ed646cf 100644 --- a/tests/unit/db_conftest.py +++ b/tests/unit/db_conftest.py @@ -450,6 +450,56 @@ def inner( return inner +@fixture +def dynamic_use_database_callback(query_statistics: Dict[str, Any]) -> Callable: + """ + Dynamic USE DATABASE callback that returns the correct database name based on request content. + + This fixture creates a callback that can handle multiple database names by parsing + the request content to determine which database is being requested, then returns + the appropriate database name in the response headers. + """ + + def create_callback(*database_names: str) -> Callable: + def inner( + request: Request = None, + **kwargs, + ) -> Response: + assert request, "empty request" + assert request.method == "POST", "invalid request method" + + # Extract database name from request content + request_content = request.content.decode("utf-8") + requested_db = None + + # Check for each database name in the request + for db_name in database_names: + if f'USE DATABASE "{db_name}"' in request_content: + requested_db = db_name + break + + # Fallback to first database if we can't determine + if requested_db is None: + requested_db = database_names[0] if database_names else "default_db" + + query_response = { + "meta": [], + "data": [], + "rows": 0, + "statistics": query_statistics, + } + + return Response( + status_code=codes.OK, + json=query_response, + headers={UPDATE_PARAMETERS_HEADER: f"database={requested_db}"}, + ) + + return inner + + return create_callback + + @fixture def use_engine_callback(engine_url: str, query_statistics: Dict[str, Any]) -> Callable: def inner( From 3670187bceb4f40a6531d79bba6f17460761ec40 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 24 Nov 2025 19:21:54 +0000 Subject: [PATCH 2/3] fix async test --- tests/integration/dbapi/async/V2/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/dbapi/async/V2/conftest.py b/tests/integration/dbapi/async/V2/conftest.py index 9eda2b646cc..3894557c9d2 100644 --- a/tests/integration/dbapi/async/V2/conftest.py +++ b/tests/integration/dbapi/async/V2/conftest.py @@ -33,7 +33,7 @@ async def connection_factory( async def factory(**kwargs: Any) -> Connection: if request.param == "core": base_kwargs = { - "database": "firebolt", + "database": kwargs.pop("database", "firebolt"), "auth": core_auth, "url": core_url, } From 9ac84ffe4795b64a040b8358ab319f8d5f56995f Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 24 Nov 2025 19:45:32 +0000 Subject: [PATCH 3/3] fix remote tests --- tests/integration/dbapi/async/V2/conftest.py | 2 +- tests/integration/dbapi/sync/V2/conftest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/dbapi/async/V2/conftest.py b/tests/integration/dbapi/async/V2/conftest.py index 3894557c9d2..65a061d4f75 100644 --- a/tests/integration/dbapi/async/V2/conftest.py +++ b/tests/integration/dbapi/async/V2/conftest.py @@ -40,7 +40,7 @@ async def factory(**kwargs: Any) -> Connection: else: base_kwargs = { "engine_name": engine_name, - "database": database_name, + "database": kwargs.pop("database", database_name), "auth": auth, "account_name": account_name, "api_endpoint": api_endpoint, diff --git a/tests/integration/dbapi/sync/V2/conftest.py b/tests/integration/dbapi/sync/V2/conftest.py index 604dc67b3d6..b8b1e3ab107 100644 --- a/tests/integration/dbapi/sync/V2/conftest.py +++ b/tests/integration/dbapi/sync/V2/conftest.py @@ -48,7 +48,7 @@ def factory(**kwargs: Any) -> Connection: else: base_kwargs = { "engine_name": engine_name, - "database": database_name, + "database": kwargs.pop("database", database_name), "auth": auth, "account_name": account_name, "api_endpoint": api_endpoint,