Skip to content

Commit 3170b1f

Browse files
feat: add support for asyncpg driver (#390)
1 parent 91c08c4 commit 3170b1f

15 files changed

+567
-37
lines changed

.mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ ignore_missing_imports = True
1212
[mypy-pg8000]
1313
ignore_missing_imports = True
1414

15+
[mypy-asyncpg]
16+
ignore_missing_imports = True
17+
1518
[mypy-pytds]
1619
ignore_missing_imports = True
1720

README.md

Lines changed: 115 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The Cloud SQL Python Connector is a package to be used alongside a database driv
2424
Currently supported drivers are:
2525
- [`pymysql`](https://github.com/PyMySQL/PyMySQL) (MySQL)
2626
- [`pg8000`](https://github.com/tlocke/pg8000) (PostgreSQL)
27+
- [`asyncpg`](https://github.com/MagicStack/asyncpg) (PostgreSQL)
2728
- [`pytds`](https://github.com/denisenkom/pytds) (SQL Server)
2829

2930

@@ -37,9 +38,16 @@ based on your database dialect.
3738
pip install "cloud-sql-python-connector[pymysql]"
3839
```
3940
### Postgres
41+
There are two different database drivers that are supported for the Postgres dialect:
42+
43+
#### pg8000
4044
```
4145
pip install "cloud-sql-python-connector[pg8000]"
4246
```
47+
#### asyncpg
48+
```
49+
pip install "cloud-sql-python-connector[asyncpg]"
50+
```
4351
### SQL Server
4452
```
4553
pip install "cloud-sql-python-connector[pytds]"
@@ -111,9 +119,9 @@ def getconn() -> pymysql.connections.Connection:
111119
conn: pymysql.connections.Connection = connector.connect(
112120
"project:region:instance",
113121
"pymysql",
114-
user="root",
115-
password="shhh",
116-
db="your-db-name"
122+
user="my-user",
123+
password="my-password",
124+
db="my-db-name"
117125
)
118126
return conn
119127

@@ -188,9 +196,9 @@ def getconn() -> pymysql.connections.Connection:
188196
conn = connector.connect(
189197
"project:region:instance",
190198
"pymysql",
191-
user="root",
192-
password="shhh",
193-
db="your-db-name"
199+
user="my-user",
200+
password="my-password",
201+
db="my-db-name"
194202
)
195203
return conn
196204

@@ -245,7 +253,7 @@ connector.connect(
245253
"project:region:instance",
246254
"pg8000",
247255
248-
db="my_database",
256+
db="my-db-name",
249257
enable_iam_auth=True,
250258
)
251259
```
@@ -258,7 +266,7 @@ Once you have followed the steps linked above, you can run the following code to
258266
connector.connect(
259267
"project:region:instance",
260268
"pytds",
261-
db="my_database",
269+
db="my-db-name",
262270
active_directory_auth=True,
263271
server_name="public.[instance].[location].[project].cloudsql.[domain]",
264272
)
@@ -268,13 +276,111 @@ Or, if using Private IP:
268276
connector.connect(
269277
"project:region:instance",
270278
"pytds",
271-
db="my_database",
279+
db="my-db-name",
272280
active_directory_auth=True,
273281
server_name="private.[instance].[location].[project].cloudsql.[domain]",
274282
ip_type=IPTypes.PRIVATE
275283
)
276284
```
277285

286+
### Async Driver Usage
287+
The Cloud SQL Connector is compatible with
288+
[asyncio](https://docs.python.org/3/library/asyncio.html) to improve the speed
289+
and efficiency of database connections through concurrency. You can use all
290+
non-asyncio drivers through the `Connector.connect_async` function, in addition
291+
to the following asyncio database drivers:
292+
- [asyncpg](https://magicstack.github.io/asyncpg) (Postgres)
293+
294+
The Cloud SQL Connector has a helper `create_async_connector` function that is
295+
recommended for asyncio database connections. It returns a `Connector`
296+
object that uses the current thread's running event loop. This is different
297+
than `Connector()` which by default initializes a new event loop in a
298+
background thread.
299+
300+
The `create_async_connector` allows all the same input arguments as the
301+
[Connector](#configuring-the-connector) object.
302+
303+
Once a `Connector` object is returned by `create_async_connector` you can call
304+
its `connect_async` method, just as you would the `connect` method:
305+
306+
```python
307+
import asyncpg
308+
from google.cloud.sql.connector import create_async_connector
309+
310+
async def main():
311+
# intialize Connector object using 'create_async_connector'
312+
connector = await create_async_connector()
313+
314+
# create connection to Cloud SQL database
315+
conn: asyncpg.Connection = await connector.connect_async(
316+
"project:region:instance", # Cloud SQL instance connection name
317+
"asyncpg",
318+
user="my-user",
319+
password="my-password",
320+
db="my-db-name"
321+
# ... additional database driver args
322+
)
323+
324+
# insert into Cloud SQL database (example)
325+
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")
326+
327+
# query Cloud SQL database (example)
328+
results = await conn.fetch("SELECT * from ratings")
329+
for row in results:
330+
# ... do something with results
331+
332+
# close asyncpg connection
333+
await conn.close
334+
335+
# close Cloud SQL Connector
336+
await connector.close_async()
337+
```
338+
339+
For more details on interacting with an `asyncpg.Connection`, please visit
340+
the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html).
341+
342+
### Async Context Manager
343+
344+
An alternative to using the `create_async_connector` function is initializing
345+
a `Connector` as an async context manager, removing the need for explicit
346+
calls to `connector.close_async()` to cleanup resources.
347+
348+
**Note:** This alternative requires that the running event loop be
349+
passed in as the `loop` argument to `Connector()`.
350+
351+
```python
352+
import asyncio
353+
import asyncpg
354+
from google.cloud.sql.connector import Connector
355+
356+
async def main():
357+
# get current running event loop to be used with Connector
358+
loop = asyncio.get_running_loop()
359+
# intialize Connector object as async context manager
360+
async with Connector(loop=loop) as connector:
361+
362+
# create connection to Cloud SQL database
363+
conn: asyncpg.Connection = await connector.connect_async(
364+
"project:region:instance", # Cloud SQL instance connection name
365+
"asyncpg",
366+
user="my-user",
367+
password="my-password",
368+
db="my-db-name"
369+
# ... additional database driver args
370+
)
371+
372+
# insert into Cloud SQL database (example)
373+
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")
374+
375+
# query Cloud SQL database (example)
376+
results = await conn.fetch("SELECT * from ratings")
377+
for row in results:
378+
# ... do something with results
379+
380+
# close asyncpg connection
381+
await conn.close
382+
```
383+
278384
## Support policy
279385

280386
### Major version lifecycle

google/cloud/sql/connector/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
"""
16-
from .connector import Connector
16+
from .connector import Connector, create_async_connector
1717
from .instance import IPTypes
1818

1919

20-
__ALL__ = [Connector, IPTypes]
20+
__ALL__ = [create_async_connector, Connector, IPTypes]
2121

2222
try:
2323
import pkg_resources

google/cloud/sql/connector/asyncpg.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""
2+
Copyright 2022 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
16+
import ssl
17+
from typing import Any, TYPE_CHECKING
18+
19+
SERVER_PROXY_PORT = 3307
20+
21+
if TYPE_CHECKING:
22+
import asyncpg
23+
24+
25+
async def connect(
26+
ip_address: str, ctx: ssl.SSLContext, **kwargs: Any
27+
) -> "asyncpg.Connection":
28+
"""Helper function to create an asyncpg DB-API connection object.
29+
30+
:type ip_address: str
31+
:param ip_address: A string containing an IP address for the Cloud SQL
32+
instance.
33+
34+
:type ctx: ssl.SSLContext
35+
:param ctx: An SSLContext object created from the Cloud SQL server CA
36+
cert and ephemeral cert.
37+
38+
:type kwargs: Any
39+
:param kwargs: Keyword arguments for establishing asyncpg connection
40+
object to Cloud SQL instance.
41+
42+
:rtype: asyncpg.Connection
43+
:returns: An asyncpg.Connection object to a Cloud SQL instance.
44+
"""
45+
try:
46+
import asyncpg
47+
except ImportError:
48+
raise ImportError(
49+
'Unable to import module "asyncpg." Please install and try again.'
50+
)
51+
user = kwargs.pop("user")
52+
db = kwargs.pop("db")
53+
passwd = kwargs.pop("password", None)
54+
55+
return await asyncpg.connect(
56+
user=user,
57+
database=db,
58+
password=passwd,
59+
host=ip_address,
60+
port=SERVER_PROXY_PORT,
61+
ssl=ctx,
62+
direct_tls=True,
63+
**kwargs,
64+
)

0 commit comments

Comments
 (0)