Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ async def use_engine(self, engine: str, cache: bool = True) -> None:
self._update_set_parameters(cache_obj.engines[engine].params)
else:
await self.execute(f'USE ENGINE "{engine}"')
cache_obj.engines[engine] = EngineInfo(
self.engine_url, self.parameters | self._set_parameters
)
params = self.parameters | self._set_parameters
# Ensure 'database' parameter is not cached with engine info
params = {k: v for k, v in params.items() if k != "database"}
cache_obj.engines[engine] = EngineInfo(self.engine_url, params)
self.set_cache_record(cache_obj)
else:
await self.execute(f'USE ENGINE "{engine}"')
Expand Down
7 changes: 4 additions & 3 deletions src/firebolt/db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,10 @@ def use_engine(self, engine: str, cache: bool = True) -> None:
self._update_set_parameters(cache_obj.engines[engine].params)
else:
self.execute(f'USE ENGINE "{engine}"')
cache_obj.engines[engine] = EngineInfo(
self.engine_url, self.parameters | self._set_parameters
)
params = self.parameters | self._set_parameters
# Ensure 'database' parameter is not cached with engine info
params = {k: v for k, v in params.items() if k != "database"}
cache_obj.engines[engine] = EngineInfo(self.engine_url, params)
self.set_cache_record(cache_obj)
else:
self.execute(f'USE ENGINE "{engine}"')
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/dbapi/async/V2/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ async def connection_factory(
async def factory(**kwargs: Any) -> Connection:
if request.param == "core":
base_kwargs = {
"database": "firebolt",
"database": kwargs.pop("database", "firebolt"),
"auth": core_auth,
"url": core_url,
}
else:
base_kwargs = {
"engine_name": engine_name,
"database": database_name,
"database": kwargs.pop("database", database_name),
"auth": auth,
"account_name": account_name,
"api_endpoint": api_endpoint,
Expand Down
82 changes: 82 additions & 0 deletions tests/integration/dbapi/async/V2/test_queries_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,3 +1188,85 @@ async def test_connection_close_no_rollback_with_autocommit_on(
await check_data_visibility_async(
table_name, 1, connection_factory, True, [1, "autocommit_close_test"]
)


async def test_database_switching_with_same_engine_preserves_database_context(
database_name: str,
connection_factory: Callable[..., Connection],
) -> None:
"""
Async integration test for database context preservation with caching on Firebolt.

This test verifies against a live Firebolt instance:
1. Connect with database1 + engine1 (cache entry created)
2. Connect with database2 + engine1 (should add database2 to cache)
3. Cursors from second connection should have database2, not database1
"""
first_db_name = database_name
second_db_name = f"{database_name}_second_async"

# Create a system connection to set up test databases
async with await connection_factory() as system_connection:
system_cursor = system_connection.cursor()

try:
# Create the second test database
await system_cursor.execute(
f'CREATE DATABASE IF NOT EXISTS "{second_db_name}"'
)

# First connection: database1 + engine1
async with await connection_factory(database=first_db_name) as connection1:
cursor1 = connection1.cursor()
await cursor1.execute("SELECT current_database()")
result1 = await cursor1.fetchone()

# Verify first connection has correct database
assert (
result1[0] == first_db_name
), f"First cursor should have database {first_db_name}"
assert (
cursor1.database == first_db_name
), f"First cursor database property should be {first_db_name}"

# Second connection: database2 + engine1 (same engine)
async with await connection_factory(database=second_db_name) as connection2:
cursor2 = connection2.cursor()
await cursor2.execute("SELECT current_database()")
result2 = await cursor2.fetchone()

# Verify second connection has correct database
assert result2[0] == second_db_name, (
f"Second cursor should have database {second_db_name}, "
f"but got {result2[0]}. This indicates the database context was overwritten."
)
assert cursor2.database == second_db_name, (
f"Second cursor database property should be {second_db_name}, "
f"but has {cursor2.database}. This indicates the database context was overwritten."
)

# Third connection: back to database1 + engine1 (should use cache)
async with await connection_factory(database=first_db_name) as connection3:
cursor3 = connection3.cursor()
await cursor3.execute("SELECT current_database()")
result3 = await cursor3.fetchone()

# Verify third connection has correct database (should be from cache)
assert result3[0] == first_db_name, (
f"Third cursor should have database {first_db_name}, "
f"but got {result3[0]}. This indicates cached database context is incorrect."
)
assert cursor3.database == first_db_name, (
f"Third cursor database property should be {first_db_name}, "
f"but has {cursor3.database}. This indicates cached database context is incorrect."
)

finally:
# Clean up: Drop the test database
try:
await system_cursor.execute(
f'DROP DATABASE IF EXISTS "{second_db_name}"'
)
except Exception:
# Ignore cleanup errors to avoid masking the real test failure
pass
4 changes: 2 additions & 2 deletions tests/integration/dbapi/sync/V2/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ def connection_factory(
def factory(**kwargs: Any) -> Connection:
if request.param == "core":
base_kwargs = {
"database": "firebolt",
"database": kwargs.pop("database", "firebolt"),
"auth": core_auth,
"url": core_url,
}
else:
base_kwargs = {
"engine_name": engine_name,
"database": database_name,
"database": kwargs.pop("database", database_name),
"auth": auth,
"account_name": account_name,
"api_endpoint": api_endpoint,
Expand Down
78 changes: 78 additions & 0 deletions tests/integration/dbapi/sync/V2/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,3 +1176,81 @@ def test_connection_close_no_rollback_with_autocommit_on(
check_data_visibility(
table_name, 1, connection_factory, True, [1, "autocommit_close_test"]
)


def test_database_switching_with_same_engine_preserves_database_context(
database_name: str,
connection_factory: Callable[..., Connection],
) -> None:
"""
Integration test for database context preservation with caching on Firebolt.

This test verifies against a live Firebolt instance:
1. Connect with database1 + engine1 (cache entry created)
2. Connect with database2 + engine1 (should add database2 to cache)
3. Cursors from second connection should have database2, not database1
"""
first_db_name = database_name
second_db_name = f"{database_name}_second"

# Create a system connection to set up test databases
with connection_factory() as system_connection:
system_cursor = system_connection.cursor()

try:
# Create the second test database
system_cursor.execute(f'CREATE DATABASE IF NOT EXISTS "{second_db_name}"')

# First connection: database1 + engine1
with connection_factory(database=first_db_name) as connection1:
cursor1 = connection1.cursor()
cursor1.execute("SELECT current_database()")
result1 = cursor1.fetchone()

# Verify first connection has correct database
assert (
result1[0] == first_db_name
), f"First cursor should have database {first_db_name}"
assert (
cursor1.database == first_db_name
), f"First cursor database property should be {first_db_name}"

# Second connection: database2 + engine1 (same engine)
with connection_factory(database=second_db_name) as connection2:
cursor2 = connection2.cursor()
cursor2.execute("SELECT current_database()")
result2 = cursor2.fetchone()

# Verify second connection has correct database
assert result2[0] == second_db_name, (
f"Second cursor should have database {second_db_name}, "
f"but got {result2[0]}. This indicates the database context was overwritten."
)
assert cursor2.database == second_db_name, (
f"Second cursor database property should be {second_db_name}, "
f"but has {cursor2.database}. This indicates the database context was overwritten."
)

# Third connection: back to database1 + engine1 (should use cache)
with connection_factory(database=first_db_name) as connection3:
cursor3 = connection3.cursor()
cursor3.execute("SELECT current_database()")
result3 = cursor3.fetchone()

# Verify third connection has correct database (should be from cache)
assert result3[0] == first_db_name, (
f"Third cursor should have database {first_db_name}, "
f"but got {result3[0]}. This indicates cached database context is incorrect."
)
assert cursor3.database == first_db_name, (
f"Third cursor database property should be {first_db_name}, "
f"but has {cursor3.database}. This indicates cached database context is incorrect."
)

finally:
# Clean up: Drop the test database
try:
system_cursor.execute(f'DROP DATABASE IF EXISTS "{second_db_name}"')
except Exception:
# Ignore cleanup errors to avoid masking the real test failure
pass
Loading