Data Access Objects or Data Access Operations? - jbrucker/home-log GitHub Wiki
CRUD Operations for Entities using SqlAlchemy ORM & Pydantic Schemas
Most code for CRUD operations using SqlAlchemy ORM is written as functions, at least the code I have seen.
In Java and Kotlin frameworks, these functions would be encapsulated in a Data Access Object (DAO) class. In Java, of course, all code must be in a class so you don't have much choice. :-) DAO classes provide clarity, nice encapsulation, and code reuse (common operations are done in a base class).
Here's a comparison of approaches in Python.
Option 1: Functional Approach
DeepSeek recommends this approach for FastAPI applications.
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import EmailStr
from datetime import datetime, timezone
from . import schemas
from .models import User
async def get_user(session: AsyncSession, user_id: int) -> Optional[schemas.User]:
"""Get a user by ID and return as Pydantic schema."""
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
if user := result.scalar_one_or_none():
return schemas.User.model_validate(user)
return None
async def get_user_by_email(session: AsyncSession, email: EmailStr) -> Optional[schemas.User]:
"""Get a user by email and return as Pydantic schema."""
stmt = select(User).where(User.email == email)
result = await session.execute(stmt)
if user := result.scalar_one_or_none():
return schemas.User.model_validate(user)
return None
async def create_user(session: AsyncSession, user_data: schemas.UserCreate) -> schemas.User:
"""Create a new user from Pydantic schema."""
user = User(
email=user_data.email,
username=user_data.username
)
session.add(user)
await session.commit()
await session.refresh(user)
return schemas.User.model_validate(user)
async def update_user(
session: AsyncSession,
user_id: int,
user_data: schemas.UserCreate
) -> Optional[schemas.User]:
"""Update user data from Pydantic schema."""
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
if user := result.scalar_one_or_none():
user.email = user_data.email
user.username = user_data.username
user.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(user)
return schemas.User.model_validate(user)
return None
async def delete_user(session: AsyncSession, user_id: int) -> bool:
"""Delete a user by ID."""
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
if user := result.scalar_one_or_none():
await session.delete(user)
await session.commit()
return True
return False
Option 2: DAO Class Approach
Recommended for more complex applications.
class UserDAO:
def __init__(self, session: AsyncSession):
self.session = session
async def get(self, user_id: int) -> Optional[schemas.User]:
stmt = select(User).where(User.id == user_id)
result = await self.session.execute(stmt)
if user := result.scalar_one_or_none():
return schemas.User.model_validate(user)
return None
async def get_by_email(self, email: EmailStr) -> Optional[schemas.User]:
stmt = select(User).where(User.email == email)
result = await self.session.execute(stmt)
if user := result.scalar_one_or_none():
return schemas.User.model_validate(user)
return None
async def create(self, user_data: schemas.UserCreate) -> schemas.User:
user = User(
email=user_data.email,
username=user_data.username
)
self.session.add(user)
await self.session.commit()
await self.session.refresh(user)
return schemas.User.model_validate(user)
async def update(
self,
user_id: int,
user_data: schemas.UserCreate
) -> Optional[schemas.User]:
if user := await self.get(user_id):
user.email = user_data.email
user.username = user_data.username
user.updated_at = datetime.now(timezone.utc)
await self.session.commit()
await self.session.refresh(user)
return schemas.User.model_validate(user)
return None
async def delete(self, user_id: int) -> bool:
if user := await self.get(user_id):
await self.session.delete(user)
await self.session.commit()
return True
return False
Key Improvements
- Type Safety: All functions return proper Pydantic schema objects
- Schema Validation: Inputs are validated using
UserCreateschema - Consistent Interface: All operations return Pydantic models
- Time Handling: Explicit UTC timezone handling for timestamps
- Modern Pydantic: Uses
model_validate()instead of deprecatedfrom_orm()
Usage in FastAPI Example
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from . import schemas
from .database import get_async_session
from .crud import get_user, create_user
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=schemas.User)
async def create_new_user(
user_data: schemas.UserCreate,
db: AsyncSession = Depends(get_async_session)
):
if await get_user_by_email(db, user_data.email):
raise HTTPException(status_code=400, detail="Email already registered")
return await create_user(db, user_data)
@router.get("/{user_id}", response_model=schemas.User)
async def read_user(
user_id: int,
db: AsyncSession = Depends(get_async_session)
):
if user := await get_user(db, user_id):
return user
raise HTTPException(status_code=404, detail="User not found")
Recommendation
- For FastAPI applications, use the functional approach as it integrates seamlessly with FastAPI's dependency injection
- For complex domain logic, consider the DAO pattern to encapsulate business rules
- Always convert to Pydantic schemas at the boundary of your data layer to ensure proper validation and serialization
SqlAlchemy Becoming More Async?
SqlAlchemy 2.0 appears to be making more operations async by default. The async select method for example.
In version 1.x you need to write:
from sqlalchemy.future import select
# usage (`models.User` is an SqlAlchemy "model" class that extends `Base`):
async def get_user_by_id(session: AsyncSession, user_id: int) -> models.User | None:
stmt = select(models.User).where(models.User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
but (according to Chat bots) in version 2.0 you can write simply:
from sqlalchemy import select