Skip to content

feat: add support for asyncpg driver #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
cda8f6c
chore: add asyncpg dependency
jackwotherspoon Jan 18, 2022
124b732
chore: add connect method for asyncpg
jackwotherspoon Jan 18, 2022
4fa7fcc
chore: testing async_connect method
jackwotherspoon Jan 28, 2022
db86be5
chore: merge in main
jackwotherspoon Feb 15, 2022
2a871ca
chore: attempt new async_connect method
jackwotherspoon Feb 18, 2022
43aeb56
chore: merge main and fix conflicts
jackwotherspoon Apr 4, 2022
2489bbf
chore: add loop param to Connector __init__
jackwotherspoon Apr 4, 2022
cfa17b8
chore: merge main
jackwotherspoon Apr 6, 2022
7eafe68
chore: testing asyncpg.connect
jackwotherspoon Apr 8, 2022
3fe34f5
chore: merge main
jackwotherspoon May 9, 2022
3d632a2
chore: update asyncpg param to direct_tls
jackwotherspoon Jun 13, 2022
4348c98
chore: merge in main
jackwotherspoon Jun 16, 2022
bf59a2f
chore: add asyncpg unit test
jackwotherspoon Jun 30, 2022
77dc3a7
chore: add system test for asyncpg driver
jackwotherspoon Jul 4, 2022
bab8483
chore: Merge branch 'main' into asyncpg-support
jackwotherspoon Jul 4, 2022
2445ee7
chore: add asyncpg to readme
jackwotherspoon Jul 4, 2022
add5dfc
chore: update test header
jackwotherspoon Jul 4, 2022
00ff19a
chore: add header to asyncpg.py file
jackwotherspoon Jul 4, 2022
6a76007
chore: add iam auth test for asyncpg
jackwotherspoon Jul 5, 2022
f30a8e0
chore: fix iam auth system test
jackwotherspoon Jul 5, 2022
3c8f376
chore: add connection pool return option
jackwotherspoon Jul 5, 2022
164037f
chore: Merge branch 'main' into asyncpg-support
jackwotherspoon Jul 7, 2022
5f2daf0
chore: lint
jackwotherspoon Jul 7, 2022
90c66f7
chore: remove connection pooling
jackwotherspoon Jul 12, 2022
b575839
chore: update Connector loop logic
jackwotherspoon Jul 12, 2022
f79b4eb
chore: add custom exception for invalid loop state
jackwotherspoon Jul 13, 2022
025da5d
chore: merge in main
jackwotherspoon Jul 13, 2022
4d9ce2c
chore: add create_async_connector function
jackwotherspoon Jul 13, 2022
85075a9
chore: update system tests to use create_async_connector
jackwotherspoon Jul 13, 2022
9c68d12
chore: expose Connector.close_async
jackwotherspoon Jul 13, 2022
69bdebc
chore: update comments
jackwotherspoon Jul 13, 2022
11aeea4
chore: wrap_keys in Connector instead of Instance
jackwotherspoon Jul 13, 2022
25b2734
chore: update keys in fixture
jackwotherspoon Jul 13, 2022
cbf9d6d
chore: add test coverage
jackwotherspoon Jul 13, 2022
9a9df43
chore: add async sample to readme
jackwotherspoon Jul 13, 2022
d44c2e6
chore: update README link
jackwotherspoon Jul 13, 2022
d564fdc
chore: fix link in README
jackwotherspoon Jul 13, 2022
c12178c
chore: update readme and add async context manager to Connector
jackwotherspoon Jul 16, 2022
4805a77
chore: remove helper function from mocks
jackwotherspoon Jul 20, 2022
0156ceb
Merge branch 'main' into asyncpg-support
jackwotherspoon Jul 20, 2022
a1cf7ee
Merge branch 'main' into asyncpg-support
jackwotherspoon Jul 21, 2022
c357e9c
chore: add comment explaining fixture
jackwotherspoon Jul 25, 2022
6761278
Merge branch 'main' into asyncpg-support
jackwotherspoon Jul 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ ignore_missing_imports = True
[mypy-pg8000]
ignore_missing_imports = True

[mypy-asyncpg]
ignore_missing_imports = True

[mypy-pytds]
ignore_missing_imports = True

Expand Down
124 changes: 115 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The Cloud SQL Python Connector is a package to be used alongside a database driv
Currently supported drivers are:
- [`pymysql`](https://github.com/PyMySQL/PyMySQL) (MySQL)
- [`pg8000`](https://github.com/tlocke/pg8000) (PostgreSQL)
- [`asyncpg`](https://github.com/MagicStack/asyncpg) (PostgreSQL)
- [`pytds`](https://github.com/denisenkom/pytds) (SQL Server)


Expand All @@ -37,9 +38,16 @@ based on your database dialect.
pip install "cloud-sql-python-connector[pymysql]"
```
### Postgres
There are two different database drivers that are supported for the Postgres dialect:

#### pg8000
```
pip install "cloud-sql-python-connector[pg8000]"
```
#### asyncpg
```
pip install "cloud-sql-python-connector[asyncpg]"
```
### SQL Server
```
pip install "cloud-sql-python-connector[pytds]"
Expand Down Expand Up @@ -111,9 +119,9 @@ def getconn() -> pymysql.connections.Connection:
conn: pymysql.connections.Connection = connector.connect(
"project:region:instance",
"pymysql",
user="root",
password="shhh",
db="your-db-name"
user="my-user",
password="my-password",
db="my-db-name"
)
return conn

Expand Down Expand Up @@ -188,9 +196,9 @@ def getconn() -> pymysql.connections.Connection:
conn = connector.connect(
"project:region:instance",
"pymysql",
user="root",
password="shhh",
db="your-db-name"
user="my-user",
password="my-password",
db="my-db-name"
)
return conn

Expand Down Expand Up @@ -245,7 +253,7 @@ connector.connect(
"project:region:instance",
"pg8000",
user="[email protected]",
db="my_database",
db="my-db-name",
enable_iam_auth=True,
)
```
Expand All @@ -258,7 +266,7 @@ Once you have followed the steps linked above, you can run the following code to
connector.connect(
"project:region:instance",
"pytds",
db="my_database",
db="my-db-name",
active_directory_auth=True,
server_name="public.[instance].[location].[project].cloudsql.[domain]",
)
Expand All @@ -268,13 +276,111 @@ Or, if using Private IP:
connector.connect(
"project:region:instance",
"pytds",
db="my_database",
db="my-db-name",
active_directory_auth=True,
server_name="private.[instance].[location].[project].cloudsql.[domain]",
ip_type=IPTypes.PRIVATE
)
```

### Async Driver Usage
The Cloud SQL Connector is compatible with
[asyncio](https://docs.python.org/3/library/asyncio.html) to improve the speed
and efficiency of database connections through concurrency. You can use all
non-asyncio drivers through the `Connector.connect_async` function, in addition
to the following asyncio database drivers:
- [asyncpg](https://magicstack.github.io/asyncpg) (Postgres)

The Cloud SQL Connector has a helper `create_async_connector` function that is
recommended for asyncio database connections. It returns a `Connector`
object that uses the current thread's running event loop. This is different
than `Connector()` which by default initializes a new event loop in a
background thread.

The `create_async_connector` allows all the same input arguments as the
[Connector](#configuring-the-connector) object.

Once a `Connector` object is returned by `create_async_connector` you can call
its `connect_async` method, just as you would the `connect` method:

```python
import asyncpg
from google.cloud.sql.connector import create_async_connector

async def main():
# intialize Connector object using 'create_async_connector'
connector = await create_async_connector()

# create connection to Cloud SQL database
conn: asyncpg.Connection = await connector.connect_async(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this connection should probably use an async with

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the connector or connection itself? async with create_async_connector() or async with connector.connect_async(...) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection itself.

Do we support async enters and exit with the Connector?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not currently, we don't have an __aenter__ or __aexit__ implemented.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the ability to create an async context manager with Connector and added example to README. I can't use async with for the connection because the asyncpg.Connection class does not have an __aenter__ or __aexit__. We could potentially create a Connection wrapper class of our own to implement an async context manager for the database connection. (Example). However, if in a future PR we are going to support connection pools (either through SQLAlchemy or native asyncpg.Pool then they will have context manager built in so not sure it is worth it to create the wrapper class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this is surprising to me that they don't support those two functions. I think it's fine to leave for now (maybe worth asking why they haven't?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there is an open issue for it actually, if I have some spare time I will try and put up another PR to asyncpg and add the two functions.

"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)

# insert into Cloud SQL database (example)
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")

# query Cloud SQL database (example)
results = await conn.fetch("SELECT * from ratings")
for row in results:
# ... do something with results

# close asyncpg connection
await conn.close

# close Cloud SQL Connector
await connector.close_async()
```

For more details on interacting with an `asyncpg.Connection`, please visit
the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html).

### Async Context Manager

An alternative to using the `create_async_connector` function is initializing
a `Connector` as an async context manager, removing the need for explicit
calls to `connector.close_async()` to cleanup resources.

**Note:** This alternative requires that the running event loop be
passed in as the `loop` argument to `Connector()`.

```python
import asyncio
import asyncpg
from google.cloud.sql.connector import Connector

async def main():
# get current running event loop to be used with Connector
loop = asyncio.get_running_loop()
# intialize Connector object as async context manager
async with Connector(loop=loop) as connector:

# create connection to Cloud SQL database
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)

# insert into Cloud SQL database (example)
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")

# query Cloud SQL database (example)
results = await conn.fetch("SELECT * from ratings")
for row in results:
# ... do something with results

# close asyncpg connection
await conn.close
```

## Support policy

### Major version lifecycle
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/sql/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from .connector import Connector
from .connector import Connector, create_async_connector
from .instance import IPTypes


__ALL__ = [Connector, IPTypes]
__ALL__ = [create_async_connector, Connector, IPTypes]

try:
import pkg_resources
Expand Down
64 changes: 64 additions & 0 deletions google/cloud/sql/connector/asyncpg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Copyright 2022 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import ssl
from typing import Any, TYPE_CHECKING

SERVER_PROXY_PORT = 3307

if TYPE_CHECKING:
import asyncpg


async def connect(
ip_address: str, ctx: ssl.SSLContext, **kwargs: Any
) -> "asyncpg.Connection":
"""Helper function to create an asyncpg DB-API connection object.

:type ip_address: str
:param ip_address: A string containing an IP address for the Cloud SQL
instance.

:type ctx: ssl.SSLContext
:param ctx: An SSLContext object created from the Cloud SQL server CA
cert and ephemeral cert.

:type kwargs: Any
:param kwargs: Keyword arguments for establishing asyncpg connection
object to Cloud SQL instance.

:rtype: asyncpg.Connection
:returns: An asyncpg.Connection object to a Cloud SQL instance.
"""
try:
import asyncpg
except ImportError:
raise ImportError(
'Unable to import module "asyncpg." Please install and try again.'
)
user = kwargs.pop("user")
db = kwargs.pop("db")
passwd = kwargs.pop("password", None)

return await asyncpg.connect(
user=user,
database=db,
password=passwd,
host=ip_address,
port=SERVER_PROXY_PORT,
ssl=ctx,
direct_tls=True,
**kwargs,
)
Loading