SqlAlchemy ORM Async Mode - jbrucker/home-log GitHub Wiki
Example of SqlAlchemy ORM using Async Mode
1. Define URL for an Async Database Connection and Create an Engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
# Create an async engine using SQLite. For an in-memory database use "sqlite_aiosqlite:///:memory:" (lots of colons)
DATABASE_URL = "sqlite+aiosqlite:///./test.db" # or postgresql+asyncpg://...
engine = create_async_engine(
DATABASE_URL,
echo=True, # Set to False in production
future=True
)
# Create an async session factory.
# Confusing name collision!
# sqlalchemy.ext.asyncio.async_sessionmaker returns a *class*
# of type sqlalchemy.ext.asyncio.session.async_sessionmaker
# It returns a *class* not a session object.
async_session_factory = async_sessionmaker(
bind=engine,
class_=AsyncSession, # Not necessary
expire_on_commit=False,
autoflush=False
)
type(async_session_factory)
<class 'sqlalchemy.ext.asyncio.session.async_sessionmaker'>
2. Using the Async Session Factory in Your Code
from typing import AsyncGenerator
# Misleading Name! This does *NOT* return a session, it returns a AsyncSession generator.
# typing.AsyncGenerator is a Generator that uses "async def" and "yield" to produce objects
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""Dependency to get an async database session"""
async with async_session_factory() as session:
yield session
async def add_users():
# Get a session
async with async_session_factory() as session:
try:
# Example: Create a new record
user = User(username="Example", email="[email protected]")
session.add(user)
# Commit the transaction
await session.commit()
# Refresh the instance to get any database-generated values
await session.refresh(user)
print(user)
# Query records
result = await session.execute(select(User))
users = result.scalars().all()
return users
except Exception:
# Rollback on error
await session.rollback()
raise
finally:
# Session is automatically closed when exiting the context manager
pass
3. Common Async Database Operations
Create
async def create_item(item_data):
async with async_session_factory() as session:
new_item = Item(**item_data)
session.add(new_item)
await session.commit()
await session.refresh(new_item)
return new_item
Read
async def get_item(item_id: int):
async with async_session_factory() as session:
result = await session.execute(select(Item).where(Item.id == item_id))
return result.scalars().first()
Update
async def update_item(item_id: int, update_data: dict):
async with async_session_factory() as session:
result = await session.execute(select(Item).where(Item.id == item_id))
item = result.scalars().first()
if item:
for key, value in update_data.items():
setattr(item, key, value)
await session.commit()
await session.refresh(item)
return item
Delete
async def delete_item(item_id: int):
async with async_session_factory() as session:
result = await session.execute(select(Item).where(Item.id == item_id))
item = result.scalars().first()
if item:
await session.delete(item)
await session.commit()
return True
return False
4. Using with FastAPI (Dependency Injection)
To integrate this into FastAPI use dependency injection
from fastapi import Depends
async def get_session() -> AsyncSession:
async with async_session_factory() as session:
yield session
@app.post("/items/")
async def create_item(item_data: dict, session: AsyncSession = Depends(get_session)):
new_item = Item(**item_data)
session.add(new_item)
await session.commit()
await session.refresh(new_item)
return new_item
Important Notes
- Always use
awaitwith async session methods (commit,execute,refresh, etc.) - Transactions: Remember to commit or rollback your transactions
- Session Lifecycle: Let the context manager (
async with) handle session closing - N+1 Problem: Be mindful of lazy loading in async context - use
selectinloador similar - Connection Pooling: Configure your engine with appropriate pool settings for production