Skip to content

Commit 0cbcb30

Browse files
committed
Added database ORM support (Async SQLAlchemy)
* Updated .env.example file with sample config * Added database setup, session and dependency functions * Added utils file for common utility functions * Added User management resource to demonstrate database queries
1 parent 274aa08 commit 0cbcb30

17 files changed

+377
-4
lines changed

.env.example

+3
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
SECRET_KEY = "SecretSecretSecret!"
22
UVICORN_PORT = 8000
3+
4+
DATABASE_URL = "sqlite+aiosqlite:///dev/app.db"
5+
ECHO_SQL = false

sample_fastapi/app/app_factory.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from fastapi import FastAPI
66
from tqdm.contrib.logging import logging_redirect_tqdm
77

8-
from . import middleware, resources
8+
from . import database, middleware, resources
99

1010
LOG = logging.getLogger(__name__)
1111

@@ -18,16 +18,22 @@ async def app_lifespan_handler(app: FastAPI):
1818
"""
1919

2020
async with AsyncExitStack() as stack:
21-
LOG.info('Adding lifespan handler for "resources"...')
21+
LOG.info('Executing lifespan handler for "resources"...')
2222
await stack.enter_async_context(resources.lifespan(app))
23-
LOG.info('Adding lifespan handler for "tqdm_logging"...')
23+
LOG.info('Executing lifespan handler for "database"...')
24+
await stack.enter_async_context(database.lifespan(app))
25+
LOG.info('Executing lifespan handler for "tqdm_logging"...')
2426
stack.enter_context(logging_redirect_tqdm())
2527
yield
2628

2729

2830
def init_app():
31+
db_settings = database.get_db_settings()
32+
2933
app = FastAPI(lifespan=app_lifespan_handler)
34+
3035
resources.init_app(app)
3136
middleware.init_middleware(app)
37+
database.init_app_db(app, db_settings)
3238

3339
return app
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from .base import BaseDBModel
2+
from .config import DBSettings
3+
from .depends import get_app_db_session_manager, get_db_session, get_db_settings
4+
from .engine import init_app_db, lifespan
5+
from .session import AsyncSession
6+
7+
__all__ = [
8+
"DBSettings",
9+
"get_db_settings",
10+
"get_app_db_session_manager",
11+
"get_db_session",
12+
"lifespan",
13+
"init_app_db",
14+
"BaseDBModel",
15+
"AsyncSession",
16+
]

sample_fastapi/app/database/base.py

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from sqlalchemy.orm import DeclarativeBase
2+
3+
4+
class BaseDBModel(DeclarativeBase):
5+
pass

sample_fastapi/app/database/config.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import logging
2+
3+
from pydantic import Field
4+
5+
from ..config import BaseAppSettings
6+
7+
LOG = logging.getLogger(__name__)
8+
9+
APP_DB_SESSION_MANAGER_KEY = "_dbsm" # Lol
10+
11+
12+
def _default_db_dsn():
13+
LOG.warning("Using SQLite In-memory database as `database_url` config was not specified. Content WILL be lost after shutting down the app!")
14+
return "sqlite+aiosqlite:///:memory:"
15+
16+
17+
class DBSettings(BaseAppSettings):
18+
database_url: str = Field(default_factory=_default_db_dsn)
19+
echo_sql: bool = Field(default=True)
+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from functools import lru_cache
2+
from typing import Annotated
3+
4+
from fastapi import Depends, Request
5+
6+
from .config import APP_DB_SESSION_MANAGER_KEY, DBSettings
7+
from .session import AsyncDatabaseSessionManager
8+
9+
10+
@lru_cache()
11+
def get_db_settings():
12+
return DBSettings()
13+
14+
15+
def get_app_db_session_manager(req: Request) -> AsyncDatabaseSessionManager:
16+
sessionmanager: AsyncDatabaseSessionManager | None = req.app.extra.get(APP_DB_SESSION_MANAGER_KEY)
17+
if sessionmanager is None:
18+
raise RuntimeError(
19+
"Couldn't get the app's DB session manager - DB Session Manager with key `%s` was not initialized"
20+
% APP_DB_SESSION_MANAGER_KEY
21+
)
22+
return sessionmanager
23+
24+
25+
async def get_db_session(sessionmanager: Annotated[AsyncDatabaseSessionManager, Depends(get_app_db_session_manager)]):
26+
"""Dependency to get a session object. Use this in request handlers to get a database session."""
27+
28+
async with sessionmanager.session() as session:
29+
yield session

sample_fastapi/app/database/engine.py

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import logging
2+
from contextlib import asynccontextmanager
3+
from typing import Annotated
4+
5+
from fastapi import FastAPI
6+
7+
from .. import utils
8+
from .config import APP_DB_SESSION_MANAGER_KEY, DBSettings
9+
from .session import AsyncDatabaseSessionManager
10+
11+
LOG = logging.getLogger(__name__)
12+
13+
14+
async def create_schema_models(sessionmanager: AsyncDatabaseSessionManager):
15+
"""Create schema tables from models under `BaseDBModel`."""
16+
async with sessionmanager._engine.connect() as connection:
17+
from .base import BaseDBModel
18+
19+
LOG.info("Creating schema (if not already exists)...")
20+
LOG.info("DB models registered: [%s]", ",".join(map(utils.str_quote, BaseDBModel.metadata.tables.keys())))
21+
22+
await connection.run_sync(BaseDBModel.metadata.create_all)
23+
await connection.commit()
24+
25+
26+
@asynccontextmanager
27+
async def lifespan(app: FastAPI):
28+
"""Session manager, manage database connections for this app"""
29+
sessionmanager: AsyncDatabaseSessionManager = app.extra[APP_DB_SESSION_MANAGER_KEY]
30+
async with sessionmanager:
31+
await create_schema_models(sessionmanager)
32+
yield
33+
34+
35+
def init_app_db(app: FastAPI, db_settings: Annotated[DBSettings, DBSettings()]):
36+
"""Initialize database session manager and bind it with the FastAPI application"""
37+
app.extra[APP_DB_SESSION_MANAGER_KEY] = AsyncDatabaseSessionManager(
38+
db_settings.database_url,
39+
{
40+
"echo": db_settings.echo_sql,
41+
},
42+
)
+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import contextlib
2+
import logging
3+
from typing import Any, AsyncIterator
4+
5+
from sqlalchemy.ext.asyncio import (
6+
AsyncConnection,
7+
AsyncEngine,
8+
AsyncSession,
9+
async_sessionmaker,
10+
create_async_engine,
11+
)
12+
13+
__all__ = [
14+
"AsyncConnection",
15+
"AsyncSession",
16+
"AsyncDatabaseSessionManager",
17+
]
18+
19+
20+
LOG = logging.getLogger(__name__)
21+
22+
23+
# Heavily inspired by https://praciano.com.br/fastapi-and-async-sqlalchemy-20-with-pytest-done-right.html
24+
25+
26+
class AsyncDatabaseSessionManager:
27+
def __init__(self, host: str, engine_kwargs: dict[str, Any] = {}, session_kwargs: dict[str, Any] = {}):
28+
session_kwargs.setdefault("autocommit", False)
29+
session_kwargs.setdefault("expire_on_commit", False)
30+
31+
self._engine: AsyncEngine = create_async_engine(host, **engine_kwargs)
32+
self._sessionmaker: async_sessionmaker[AsyncSession] = async_sessionmaker(bind=self._engine, **session_kwargs)
33+
34+
async def close(self):
35+
await self._engine.dispose()
36+
37+
async def __aenter__(self):
38+
return self
39+
40+
async def __aexit__(self, exc_type, exc_val, exc_tb):
41+
await self.close()
42+
43+
@contextlib.asynccontextmanager
44+
async def connect(self) -> AsyncIterator[AsyncConnection]:
45+
async with self._engine.begin() as connection:
46+
try:
47+
yield connection
48+
except Exception:
49+
LOG.exception("DB connection was rolled back due to an exception:")
50+
await connection.rollback()
51+
raise
52+
53+
@contextlib.asynccontextmanager
54+
async def session(self) -> AsyncIterator[AsyncSession]:
55+
async with self._sessionmaker() as sess:
56+
yield sess
57+
# try:
58+
# except Exception:
59+
# LOG.exception("DB transaction was rolled back due to an exception:")
60+
# # TODO: Not sure if this is needed. Test if it does auto-rollback
61+
# await sess.rollback()
62+
# raise

sample_fastapi/app/resources/routes.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
def init_app(app: FastAPI):
55
"""Initialize routes, middleware, sub-apps, etc. to the given Application"""
6-
from . import calculator, hello
6+
from . import calculator, hello, user
77
hello.init_app(app)
88
calculator.init_app(app)
9+
user.init_app(app)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .routes import init_app, init_routes
2+
3+
__all__ = ["init_app", "init_routes"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import datetime
2+
import uuid
3+
4+
from sqlalchemy.sql.schema import Column, ForeignKey
5+
from sqlalchemy.sql.sqltypes import DateTime, Integer, String, Uuid
6+
7+
from ...database.base import BaseDBModel
8+
9+
10+
class User(BaseDBModel):
11+
__tablename__ = "users"
12+
13+
id: Column[uuid.UUID] = Column(Uuid, primary_key=True, default=uuid.uuid4)
14+
username = Column(String, unique=True)
15+
16+
first_name = Column(String, nullable=False)
17+
middle_name = Column(String, nullable=True)
18+
last_name = Column(String, nullable=True)
19+
20+
created_ts = Column(DateTime, default=datetime.datetime.now)
21+
updated_ts = Column(DateTime, nullable=True)
22+
23+
24+
# TODO:
25+
# class UserPassword(BaseDBModel):
26+
# __tablename__ = "user_passwords"
27+
# id = Column(Integer, primary_key=True, autoincrement=True)
28+
# user_id = Column(Uuid, ForeignKey(User.id))
29+
# password_hash = Column(String, nullable=False)
30+
# created_ts = Column(DateTime, default=datetime.datetime.now)
31+
# expiry_ts = Column(DateTime, nullable=True)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from sqlalchemy import bindparam, insert, select
2+
3+
from .db_models import User
4+
5+
QUERY_ALL_USERS = select(User)
6+
7+
QUERY_USER_BY_ID = QUERY_ALL_USERS.where(User.id == bindparam("id"))
8+
QUERY_USER_BY_USERNAME = QUERY_ALL_USERS.where(User.username == bindparam("username"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from uuid import UUID
2+
3+
from pydantic import BaseModel, ConfigDict, Field
4+
5+
6+
class UserCreatedResponse(BaseModel):
7+
model_config = ConfigDict(from_attributes=True)
8+
9+
user_id: UUID = Field(validation_alias="id")
10+
username: str
11+
12+
13+
class UserInfoResponse(BaseModel):
14+
model_config = ConfigDict(from_attributes=True)
15+
16+
user_id: UUID = Field(validation_alias="id")
17+
username: str
18+
first_name: str
19+
middle_name: str | None
20+
last_name: str | None
21+
22+
23+
class UserCreateError(ValueError):
24+
pass
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from fastapi import APIRouter, FastAPI
2+
3+
from .views import user_create, user_data, user_list
4+
5+
6+
def init_routes():
7+
router = APIRouter(prefix="/user", tags=["user"])
8+
9+
router.add_api_route("/register", user_create, methods={"POST"})
10+
router.add_api_route("/list", user_list, methods={"GET"})
11+
router.add_api_route("/{user_id}", user_data, methods={"GET"})
12+
13+
return router
14+
15+
16+
def init_app(app: FastAPI):
17+
app.include_router(init_routes())
18+
19+
# Bind models
20+
from . import db_models # noqa
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from typing import Annotated
2+
from uuid import UUID
3+
4+
import sqlalchemy
5+
from fastapi import Depends
6+
7+
from ...database import AsyncSession, get_db_session
8+
from . import db_models, db_queries
9+
from .models import UserCreatedResponse, UserCreateError, UserInfoResponse
10+
11+
12+
class UserService:
13+
"""User management service"""
14+
15+
def __init__(self, db: Annotated[AsyncSession, Depends(get_db_session)]):
16+
self._db = db
17+
18+
async def create_user(
19+
self,
20+
username: str,
21+
first_name: str,
22+
middle_name: str | None = None,
23+
last_name: str | None = None,
24+
) -> UserCreatedResponse:
25+
"""Create a new user (if `username` doesn't exist) with the
26+
given details and save it in the database.
27+
28+
:param str username: Username of the new user
29+
:param str first_name: First name
30+
:param str or None middle_name: Middle name
31+
:param str or None last_name: Last name
32+
:return UserCreatedResponse: Contains the ID of the newly created user
33+
34+
:raises UserCreateError: If an error occurres while creating the user (Eg. already existing username)"""
35+
try:
36+
new_user = db_models.User(
37+
username=username,
38+
first_name=first_name,
39+
middle_name=middle_name,
40+
last_name=last_name,
41+
)
42+
43+
self._db.add(new_user)
44+
await self._db.commit()
45+
46+
return UserCreatedResponse.model_validate(new_user)
47+
except sqlalchemy.exc.IntegrityError as ex:
48+
raise UserCreateError("Username `%s` already exists." % username) from ex
49+
50+
async def user_query_all(self) -> list[UserInfoResponse]:
51+
"""Returns a list of users that have registered.
52+
53+
:return list[UserInfoResponse]: A list of `UserInfoResponse` items containing various information about each user
54+
"""
55+
results = await self._db.scalars(db_queries.QUERY_ALL_USERS)
56+
user_list = map(UserInfoResponse.model_validate, results.all())
57+
return list(user_list)
58+
59+
async def user_query_id(self, user_id: UUID) -> UserInfoResponse | None:
60+
result = await self._db.scalar(db_queries.QUERY_USER_BY_ID, {"id": user_id})
61+
if result:
62+
return UserInfoResponse.model_validate(result)
63+
64+
async def user_query_username(self, username: str) -> UserInfoResponse | None:
65+
result = await self._db.scalar(db_queries.QUERY_USER_BY_USERNAME, {"username": username})
66+
if result:
67+
return UserInfoResponse.model_validate(result)

0 commit comments

Comments
 (0)