-
Notifications
You must be signed in to change notification settings - Fork 77
feat: add support for asyncpg driver #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit
Hold shift + click to select a range
cda8f6c
chore: add asyncpg dependency
jackwotherspoon 124b732
chore: add connect method for asyncpg
jackwotherspoon 4fa7fcc
chore: testing async_connect method
jackwotherspoon db86be5
chore: merge in main
jackwotherspoon 2a871ca
chore: attempt new async_connect method
jackwotherspoon 43aeb56
chore: merge main and fix conflicts
jackwotherspoon 2489bbf
chore: add loop param to Connector __init__
jackwotherspoon cfa17b8
chore: merge main
jackwotherspoon 7eafe68
chore: testing asyncpg.connect
jackwotherspoon 3fe34f5
chore: merge main
jackwotherspoon 3d632a2
chore: update asyncpg param to direct_tls
jackwotherspoon 4348c98
chore: merge in main
jackwotherspoon bf59a2f
chore: add asyncpg unit test
jackwotherspoon 77dc3a7
chore: add system test for asyncpg driver
jackwotherspoon bab8483
chore: Merge branch 'main' into asyncpg-support
jackwotherspoon 2445ee7
chore: add asyncpg to readme
jackwotherspoon add5dfc
chore: update test header
jackwotherspoon 00ff19a
chore: add header to asyncpg.py file
jackwotherspoon 6a76007
chore: add iam auth test for asyncpg
jackwotherspoon f30a8e0
chore: fix iam auth system test
jackwotherspoon 3c8f376
chore: add connection pool return option
jackwotherspoon 164037f
chore: Merge branch 'main' into asyncpg-support
jackwotherspoon 5f2daf0
chore: lint
jackwotherspoon 90c66f7
chore: remove connection pooling
jackwotherspoon b575839
chore: update Connector loop logic
jackwotherspoon f79b4eb
chore: add custom exception for invalid loop state
jackwotherspoon 025da5d
chore: merge in main
jackwotherspoon 4d9ce2c
chore: add create_async_connector function
jackwotherspoon 85075a9
chore: update system tests to use create_async_connector
jackwotherspoon 9c68d12
chore: expose Connector.close_async
jackwotherspoon 69bdebc
chore: update comments
jackwotherspoon 11aeea4
chore: wrap_keys in Connector instead of Instance
jackwotherspoon 25b2734
chore: update keys in fixture
jackwotherspoon cbf9d6d
chore: add test coverage
jackwotherspoon 9a9df43
chore: add async sample to readme
jackwotherspoon d44c2e6
chore: update README link
jackwotherspoon d564fdc
chore: fix link in README
jackwotherspoon c12178c
chore: update readme and add async context manager to Connector
jackwotherspoon 4805a77
chore: remove helper function from mocks
jackwotherspoon 0156ceb
Merge branch 'main' into asyncpg-support
jackwotherspoon a1cf7ee
Merge branch 'main' into asyncpg-support
jackwotherspoon c357e9c
chore: add comment explaining fixture
jackwotherspoon 6761278
Merge branch 'main' into asyncpg-support
jackwotherspoon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ The Cloud SQL Python Connector is a package to be used alongside a database driv | |
Currently supported drivers are: | ||
- [`pymysql`](https://github.com/PyMySQL/PyMySQL) (MySQL) | ||
- [`pg8000`](https://github.com/tlocke/pg8000) (PostgreSQL) | ||
- [`asyncpg`](https://github.com/MagicStack/asyncpg) (PostgreSQL) | ||
- [`pytds`](https://github.com/denisenkom/pytds) (SQL Server) | ||
|
||
|
||
|
@@ -37,9 +38,16 @@ based on your database dialect. | |
pip install "cloud-sql-python-connector[pymysql]" | ||
``` | ||
### Postgres | ||
There are two different database drivers that are supported for the Postgres dialect: | ||
|
||
#### pg8000 | ||
``` | ||
pip install "cloud-sql-python-connector[pg8000]" | ||
``` | ||
#### asyncpg | ||
``` | ||
pip install "cloud-sql-python-connector[asyncpg]" | ||
``` | ||
### SQL Server | ||
``` | ||
pip install "cloud-sql-python-connector[pytds]" | ||
|
@@ -111,9 +119,9 @@ def getconn() -> pymysql.connections.Connection: | |
conn: pymysql.connections.Connection = connector.connect( | ||
"project:region:instance", | ||
"pymysql", | ||
user="root", | ||
password="shhh", | ||
db="your-db-name" | ||
user="my-user", | ||
password="my-password", | ||
db="my-db-name" | ||
) | ||
return conn | ||
|
||
|
@@ -188,9 +196,9 @@ def getconn() -> pymysql.connections.Connection: | |
conn = connector.connect( | ||
"project:region:instance", | ||
"pymysql", | ||
user="root", | ||
password="shhh", | ||
db="your-db-name" | ||
user="my-user", | ||
password="my-password", | ||
db="my-db-name" | ||
) | ||
return conn | ||
|
||
|
@@ -245,7 +253,7 @@ connector.connect( | |
"project:region:instance", | ||
"pg8000", | ||
user="[email protected]", | ||
db="my_database", | ||
db="my-db-name", | ||
enable_iam_auth=True, | ||
) | ||
``` | ||
|
@@ -258,7 +266,7 @@ Once you have followed the steps linked above, you can run the following code to | |
connector.connect( | ||
"project:region:instance", | ||
"pytds", | ||
db="my_database", | ||
db="my-db-name", | ||
active_directory_auth=True, | ||
server_name="public.[instance].[location].[project].cloudsql.[domain]", | ||
) | ||
|
@@ -268,13 +276,111 @@ Or, if using Private IP: | |
connector.connect( | ||
"project:region:instance", | ||
"pytds", | ||
db="my_database", | ||
db="my-db-name", | ||
active_directory_auth=True, | ||
server_name="private.[instance].[location].[project].cloudsql.[domain]", | ||
ip_type=IPTypes.PRIVATE | ||
) | ||
``` | ||
|
||
### Async Driver Usage | ||
The Cloud SQL Connector is compatible with | ||
[asyncio](https://docs.python.org/3/library/asyncio.html) to improve the speed | ||
and efficiency of database connections through concurrency. You can use all | ||
non-asyncio drivers through the `Connector.connect_async` function, in addition | ||
to the following asyncio database drivers: | ||
- [asyncpg](https://magicstack.github.io/asyncpg) (Postgres) | ||
|
||
The Cloud SQL Connector has a helper `create_async_connector` function that is | ||
recommended for asyncio database connections. It returns a `Connector` | ||
object that uses the current thread's running event loop. This is different | ||
than `Connector()` which by default initializes a new event loop in a | ||
background thread. | ||
|
||
The `create_async_connector` allows all the same input arguments as the | ||
[Connector](#configuring-the-connector) object. | ||
|
||
Once a `Connector` object is returned by `create_async_connector` you can call | ||
its `connect_async` method, just as you would the `connect` method: | ||
|
||
```python | ||
import asyncpg | ||
from google.cloud.sql.connector import create_async_connector | ||
|
||
async def main(): | ||
# intialize Connector object using 'create_async_connector' | ||
connector = await create_async_connector() | ||
|
||
# create connection to Cloud SQL database | ||
conn: asyncpg.Connection = await connector.connect_async( | ||
"project:region:instance", # Cloud SQL instance connection name | ||
"asyncpg", | ||
user="my-user", | ||
password="my-password", | ||
db="my-db-name" | ||
# ... additional database driver args | ||
) | ||
|
||
# insert into Cloud SQL database (example) | ||
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)") | ||
|
||
# query Cloud SQL database (example) | ||
results = await conn.fetch("SELECT * from ratings") | ||
for row in results: | ||
# ... do something with results | ||
|
||
# close asyncpg connection | ||
await conn.close | ||
|
||
# close Cloud SQL Connector | ||
await connector.close_async() | ||
``` | ||
|
||
For more details on interacting with an `asyncpg.Connection`, please visit | ||
the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html). | ||
|
||
### Async Context Manager | ||
|
||
An alternative to using the `create_async_connector` function is initializing | ||
a `Connector` as an async context manager, removing the need for explicit | ||
calls to `connector.close_async()` to cleanup resources. | ||
|
||
**Note:** This alternative requires that the running event loop be | ||
passed in as the `loop` argument to `Connector()`. | ||
|
||
```python | ||
import asyncio | ||
import asyncpg | ||
from google.cloud.sql.connector import Connector | ||
|
||
async def main(): | ||
# get current running event loop to be used with Connector | ||
loop = asyncio.get_running_loop() | ||
# intialize Connector object as async context manager | ||
async with Connector(loop=loop) as connector: | ||
|
||
# create connection to Cloud SQL database | ||
conn: asyncpg.Connection = await connector.connect_async( | ||
"project:region:instance", # Cloud SQL instance connection name | ||
"asyncpg", | ||
user="my-user", | ||
password="my-password", | ||
db="my-db-name" | ||
# ... additional database driver args | ||
) | ||
|
||
# insert into Cloud SQL database (example) | ||
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)") | ||
|
||
# query Cloud SQL database (example) | ||
results = await conn.fetch("SELECT * from ratings") | ||
for row in results: | ||
# ... do something with results | ||
|
||
# close asyncpg connection | ||
await conn.close | ||
``` | ||
|
||
## Support policy | ||
|
||
### Major version lifecycle | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
""" | ||
Copyright 2022 Google LLC | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
https://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
""" | ||
import ssl | ||
from typing import Any, TYPE_CHECKING | ||
|
||
SERVER_PROXY_PORT = 3307 | ||
|
||
if TYPE_CHECKING: | ||
import asyncpg | ||
|
||
|
||
async def connect( | ||
ip_address: str, ctx: ssl.SSLContext, **kwargs: Any | ||
) -> "asyncpg.Connection": | ||
"""Helper function to create an asyncpg DB-API connection object. | ||
|
||
:type ip_address: str | ||
:param ip_address: A string containing an IP address for the Cloud SQL | ||
instance. | ||
|
||
:type ctx: ssl.SSLContext | ||
:param ctx: An SSLContext object created from the Cloud SQL server CA | ||
cert and ephemeral cert. | ||
|
||
:type kwargs: Any | ||
:param kwargs: Keyword arguments for establishing asyncpg connection | ||
object to Cloud SQL instance. | ||
|
||
:rtype: asyncpg.Connection | ||
:returns: An asyncpg.Connection object to a Cloud SQL instance. | ||
""" | ||
try: | ||
import asyncpg | ||
except ImportError: | ||
raise ImportError( | ||
'Unable to import module "asyncpg." Please install and try again.' | ||
) | ||
user = kwargs.pop("user") | ||
db = kwargs.pop("db") | ||
passwd = kwargs.pop("password", None) | ||
|
||
return await asyncpg.connect( | ||
user=user, | ||
database=db, | ||
password=passwd, | ||
host=ip_address, | ||
port=SERVER_PROXY_PORT, | ||
ssl=ctx, | ||
direct_tls=True, | ||
**kwargs, | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this connection should probably use an async with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the connector or connection itself?
async with create_async_connector()
orasync with connector.connect_async(...)
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connection itself.
Do we support async enters and exit with the Connector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not currently, we don't have an
__aenter__
or__aexit__
implemented.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the ability to create an async context manager with
Connector
and added example to README. I can't useasync with
for the connection because the asyncpg.Connection class does not have an__aenter__
or__aexit__
. We could potentially create aConnection
wrapper class of our own to implement an async context manager for the database connection. (Example). However, if in a future PR we are going to support connection pools (either through SQLAlchemy or nativeasyncpg.Pool
then they will have context manager built in so not sure it is worth it to create the wrapper class.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this is surprising to me that they don't support those two functions. I think it's fine to leave for now (maybe worth asking why they haven't?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there is an open issue for it actually, if I have some spare time I will try and put up another PR to asyncpg and add the two functions.