Skip to content

Commit bb1c72a

Browse files
docs: document SQLAlchemy async connection pooling (#758)
1 parent aaa343f commit bb1c72a

File tree

4 files changed

+175
-114
lines changed

4 files changed

+175
-114
lines changed

README.md

Lines changed: 70 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -430,48 +430,55 @@ Once a `Connector` object is returned by `create_async_connector` you can call
430430
its `connect_async` method, just as you would the `connect` method:
431431

432432
```python
433-
import asyncio
434433
import asyncpg
435-
from google.cloud.sql.connector import create_async_connector
436434

435+
import sqlalchemy
436+
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
437437

438-
async def main():
439-
# intialize Connector object using 'create_async_connector'
440-
connector = await create_async_connector()
438+
from google.cloud.sql.connector import Connector, create_async_connector
441439

442-
# create connection to Cloud SQL database
443-
conn: asyncpg.Connection = await connector.connect_async(
444-
"project:region:instance", # Cloud SQL instance connection name
445-
"asyncpg",
446-
user="my-user",
447-
password="my-password",
448-
db="my-db-name"
449-
# ... additional database driver args
450-
)
440+
async def init_connection_pool(connector: Connector) -> AsyncEngine:
441+
# initialize Connector object for connections to Cloud SQL
442+
async def getconn() -> asyncpg.Connection:
443+
conn: asyncpg.Connection = await connector.connect_async(
444+
"project:region:instance", # Cloud SQL instance connection name
445+
"asyncpg",
446+
user="my-user",
447+
password="my-password",
448+
db="my-db-name"
449+
# ... additional database driver args
450+
)
451+
return conn
451452

452-
# insert into Cloud SQL database (example)
453-
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")
453+
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
454+
# 'async_creator' argument to 'create_async_engine'
455+
pool = create_async_engine(
456+
"postgresql+asyncpg://",
457+
async_creator=getconn,
458+
)
459+
return pool
454460

455-
# query Cloud SQL database (example)
456-
results = await conn.fetch("SELECT * from ratings")
461+
async def main():
462+
# initialize Connector object for connections to Cloud SQL
463+
connector = await create_async_connector()
457464

458-
# ... do something with results
459-
for row in results:
460-
print(row)
465+
# initialize connection pool
466+
pool = await init_connection_pool(connector)
461467

462-
# close asyncpg connection
463-
await conn.close()
468+
# example query
469+
async with pool.connect() as conn:
470+
await conn.execute(sqlalchemy.text("SELECT NOW()"))
464471

465-
# close Cloud SQL Connector
472+
# close Connector
466473
await connector.close_async()
467474

468-
469-
# Test connection with `asyncio`
470-
asyncio.run(main())
475+
# dispose of connection pool
476+
await pool.dispose()
471477
```
472478

473-
For more details on interacting with an `asyncpg.Connection`, please visit
474-
the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html).
479+
For more details on additional database arguments with an `asyncpg.Connection`
480+
, please visit the
481+
[official documentation](https://magicstack.github.io/asyncpg/current/api/index.html).
475482

476483
### Async Context Manager
477484

@@ -485,44 +492,52 @@ passed in as the `loop` argument to `Connector()`.
485492
```python
486493
import asyncio
487494
import asyncpg
495+
496+
import sqlalchemy
497+
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
498+
488499
from google.cloud.sql.connector import Connector
489500

501+
async def init_connection_pool(connector: Connector) -> AsyncEngine:
502+
# initialize Connector object for connections to Cloud SQL
503+
async def getconn() -> asyncpg.Connection:
504+
conn: asyncpg.Connection = await connector.connect_async(
505+
"project:region:instance", # Cloud SQL instance connection name
506+
"asyncpg",
507+
user="my-user",
508+
password="my-password",
509+
db="my-db-name"
510+
# ... additional database driver args
511+
)
512+
return conn
513+
514+
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
515+
# 'async_creator' argument to 'create_async_engine'
516+
pool = create_async_engine(
517+
"postgresql+asyncpg://",
518+
async_creator=getconn,
519+
)
520+
return pool
521+
490522
async def main():
491-
# get current running event loop to be used with Connector
523+
# initialize Connector object for connections to Cloud SQL
492524
loop = asyncio.get_running_loop()
493-
# intialize Connector object as async context manager
494525
async with Connector(loop=loop) as connector:
526+
# initialize connection pool
527+
pool = await init_connection_pool(connector)
495528

496-
# create connection to Cloud SQL database
497-
conn: asyncpg.Connection = await connector.connect_async(
498-
"project:region:instance", # Cloud SQL instance connection name
499-
"asyncpg",
500-
user="my-user",
501-
password="my-password",
502-
db="my-db-name"
503-
# ... additional database driver args
504-
)
505-
506-
# insert into Cloud SQL database (example)
507-
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")
508-
509-
# query Cloud SQL database (example)
510-
results = await conn.fetch("SELECT * from ratings")
511-
512-
# ... do something with results
513-
for row in results:
514-
print(row)
529+
# example query
530+
async with pool.connect() as conn:
531+
await conn.execute(sqlalchemy.text("SELECT NOW()"))
515532

516-
# close asyncpg connection
517-
await conn.close()
518-
519-
# Test connection with `asyncio`
520-
asyncio.run(main())
533+
# dispose of connection pool
534+
await pool.dispose()
521535
```
522536

523537
## Support policy
524538

525539
### Major version lifecycle
540+
526541
This project uses [semantic versioning](https://semver.org/), and uses the
527542
following lifecycle regarding support for a major version:
528543

requirements-test.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pytest==7.3.1
22
mock==5.0.2
33
pytest-cov==4.1.0
44
pytest-asyncio==0.21.0
5-
SQLAlchemy==2.0.15
5+
SQLAlchemy==2.0.16
66
sqlalchemy-pytds==0.3.5
77
flake8==5.0.4
88
flake8-annotations==2.9.1

tests/system/test_asyncpg_connection.py

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,53 +13,76 @@
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
"""
16+
import asyncio
1617
import os
1718
from typing import AsyncGenerator
1819
import uuid
1920

2021
import asyncpg
2122
import pytest
23+
import sqlalchemy
24+
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
2225

23-
from google.cloud.sql.connector import create_async_connector
26+
from google.cloud.sql.connector import Connector
2427

2528
table_name = f"books_{uuid.uuid4().hex}"
2629

2730

28-
@pytest.fixture(name="conn")
29-
async def setup() -> AsyncGenerator:
30-
# initialize Cloud SQL Python Connector object
31-
connector = await create_async_connector()
32-
conn: asyncpg.Connection = await connector.connect_async(
33-
os.environ["POSTGRES_CONNECTION_NAME"],
34-
"asyncpg",
35-
user=os.environ["POSTGRES_USER"],
36-
password=os.environ["POSTGRES_PASS"],
37-
db=os.environ["POSTGRES_DB"],
38-
)
39-
await conn.execute(
40-
f"CREATE TABLE IF NOT EXISTS {table_name}"
41-
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
31+
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
32+
# 'async_creator' argument to 'create_async_engine'
33+
async def init_connection_pool() -> AsyncEngine:
34+
async def getconn() -> asyncpg.Connection:
35+
loop = asyncio.get_running_loop()
36+
# initialize Connector object for connections to Cloud SQL
37+
async with Connector(loop=loop) as connector:
38+
conn: asyncpg.Connection = await connector.connect_async(
39+
os.environ["POSTGRES_CONNECTION_NAME"],
40+
"asyncpg",
41+
user=os.environ["POSTGRES_USER"],
42+
password=os.environ["POSTGRES_PASS"],
43+
db=os.environ["POSTGRES_DB"],
44+
)
45+
return conn
46+
47+
# create SQLAlchemy connection pool
48+
pool = create_async_engine(
49+
"postgresql+asyncpg://",
50+
async_creator=getconn,
51+
execution_options={"isolation_level": "AUTOCOMMIT"},
4252
)
53+
return pool
4354

44-
yield conn
4555

46-
await conn.execute(f"DROP TABLE IF EXISTS {table_name}")
47-
# close asyncpg connection
48-
await conn.close()
49-
# cleanup Connector object
50-
await connector.close_async()
56+
@pytest.fixture(name="pool")
57+
async def setup() -> AsyncGenerator:
58+
pool = await init_connection_pool()
59+
async with pool.connect() as conn:
60+
await conn.execute(
61+
sqlalchemy.text(
62+
f"CREATE TABLE IF NOT EXISTS {table_name}"
63+
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
64+
)
65+
)
66+
67+
yield pool
68+
69+
async with pool.connect() as conn:
70+
await conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}"))
71+
# dispose of asyncpg connection pool
72+
await pool.dispose()
5173

5274

5375
@pytest.mark.asyncio
54-
async def test_connection_with_asyncpg(conn: asyncpg.Connection) -> None:
55-
await conn.execute(
56-
f"INSERT INTO {table_name} (id, title) VALUES ('book1', 'Book One')"
57-
)
58-
await conn.execute(
59-
f"INSERT INTO {table_name} (id, title) VALUES ('book2', 'Book Two')"
76+
async def test_connection_with_asyncpg(pool: AsyncEngine) -> None:
77+
insert_stmt = sqlalchemy.text(
78+
f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)",
6079
)
80+
async with pool.connect() as conn:
81+
await conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
82+
await conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"})
6183

62-
rows = await conn.fetch(f"SELECT title FROM {table_name} ORDER BY ID")
63-
titles = [row[0] for row in rows]
84+
select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;")
85+
rows = (await conn.execute(select_stmt)).fetchall()
86+
titles = [row[0] for row in rows]
6487

6588
assert titles == ["Book One", "Book Two"]

tests/system/test_asyncpg_iam_auth.py

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,53 +13,76 @@
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
"""
16+
import asyncio
1617
import os
1718
from typing import AsyncGenerator
1819
import uuid
1920

2021
import asyncpg
2122
import pytest
23+
import sqlalchemy
24+
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
2225

23-
from google.cloud.sql.connector import create_async_connector
26+
from google.cloud.sql.connector import Connector
2427

2528
table_name = f"books_{uuid.uuid4().hex}"
2629

2730

28-
@pytest.fixture(name="conn")
29-
async def setup() -> AsyncGenerator:
30-
# initialize Cloud SQL Python Connector object
31-
connector = await create_async_connector()
32-
conn: asyncpg.Connection = await connector.connect_async(
33-
os.environ["POSTGRES_IAM_CONNECTION_NAME"],
34-
"asyncpg",
35-
user=os.environ["POSTGRES_IAM_USER"],
36-
db=os.environ["POSTGRES_DB"],
37-
enable_iam_auth=True,
38-
)
39-
await conn.execute(
40-
f"CREATE TABLE IF NOT EXISTS {table_name}"
41-
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
31+
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
32+
# 'async_creator' argument to 'create_async_engine'
33+
async def init_connection_pool() -> AsyncEngine:
34+
async def getconn() -> asyncpg.Connection:
35+
loop = asyncio.get_running_loop()
36+
# initialize Connector object for connections to Cloud SQL
37+
async with Connector(loop=loop) as connector:
38+
conn: asyncpg.Connection = await connector.connect_async(
39+
os.environ["POSTGRES_IAM_CONNECTION_NAME"],
40+
"asyncpg",
41+
user=os.environ["POSTGRES_IAM_USER"],
42+
db=os.environ["POSTGRES_DB"],
43+
enable_iam_auth=True,
44+
)
45+
return conn
46+
47+
# create SQLAlchemy connection pool
48+
pool = create_async_engine(
49+
"postgresql+asyncpg://",
50+
async_creator=getconn,
51+
execution_options={"isolation_level": "AUTOCOMMIT"},
4252
)
53+
return pool
4354

44-
yield conn
4555

46-
await conn.execute(f"DROP TABLE IF EXISTS {table_name}")
47-
# close asyncpg connection
48-
await conn.close()
49-
# cleanup Connector object
50-
await connector.close_async()
56+
@pytest.fixture(name="pool")
57+
async def setup() -> AsyncGenerator:
58+
pool = await init_connection_pool()
59+
async with pool.connect() as conn:
60+
await conn.execute(
61+
sqlalchemy.text(
62+
f"CREATE TABLE IF NOT EXISTS {table_name}"
63+
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
64+
)
65+
)
66+
67+
yield pool
68+
69+
async with pool.connect() as conn:
70+
await conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}"))
71+
# dispose of asyncpg connection pool
72+
await pool.dispose()
5173

5274

5375
@pytest.mark.asyncio
54-
async def test_connection_with_asyncpg_iam_auth(conn: asyncpg.Connection) -> None:
55-
await conn.execute(
56-
f"INSERT INTO {table_name} (id, title) VALUES ('book1', 'Book One')"
57-
)
58-
await conn.execute(
59-
f"INSERT INTO {table_name} (id, title) VALUES ('book2', 'Book Two')"
76+
async def test_connection_with_asyncpg_iam_auth(pool: AsyncEngine) -> None:
77+
insert_stmt = sqlalchemy.text(
78+
f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)",
6079
)
80+
async with pool.connect() as conn:
81+
await conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
82+
await conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"})
6183

62-
rows = await conn.fetch(f"SELECT title FROM {table_name} ORDER BY ID")
63-
titles = [row[0] for row in rows]
84+
select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;")
85+
rows = (await conn.execute(select_stmt)).fetchall()
86+
titles = [row[0] for row in rows]
6487

6588
assert titles == ["Book One", "Book Two"]

0 commit comments

Comments
 (0)