SQLAlchemy Cheat Sheet
A comprehensive reference for SQLAlchemy 2.0+ ORM and Core functionality, covering models, sessions, queries, and migrations.
Table of Contents
- Quick Start
- Basic Setup
- Model Definition
- Column Types & Constraints
- Relationships
- Session Management
- CRUD Operations
- Query Patterns
- Joins & Eager Loading
- Advanced Querying
- Migrations with Alembic
- Best Practices
Quick Start
# Installation
pip install sqlalchemy[asyncio] alembic
# Basic setup
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
class Base(DeclarativeBase):
pass
# Create engine and session
engine = create_engine("sqlite:///example.db", echo=True)
SessionLocal = sessionmaker(bind=engine)
# Create tables
Base.metadata.create_all(engine)
# Use session
with SessionLocal() as session:
# Your database operations here
session.commit()
Basic Setup
Database Connection
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase
# SQLite
engine = create_engine("sqlite:///database.db", echo=True)
# PostgreSQL
engine = create_engine("postgresql://user:password@localhost/dbname")
# MySQL
engine = create_engine("mysql+pymysql://user:password@localhost/dbname")
# Connection with pool settings
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=20,
max_overflow=0,
pool_pre_ping=True,
echo=False
)
Declarative Base
# Modern SQLAlchemy 2.0+ approach
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
# Alternative with type annotations
from typing import Any
from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
metadata: MetaData
type_annotation_map = {
str: String(50) # Default string length
}
Model Definition
Basic Model
from typing import Optional
from sqlalchemy import String, Integer, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
class User(Base):
__tablename__ = "users"
# Primary key
id: Mapped[int] = mapped_column(primary_key=True)
# Required fields
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(100))
# Optional fields
full_name: Mapped[Optional[str]] = mapped_column(String(100))
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now()
)
updated_at: Mapped[Optional[datetime]] = mapped_column(
DateTime, onupdate=func.now()
)
def __repr__(self) -> str:
return f"User(id={self.id}, username='{self.username}')"
Model with Table Configuration
class Product(Base):
__tablename__ = "products"
# Table arguments
__table_args__ = (
UniqueConstraint('name', 'category_id'),
CheckConstraint('price > 0'),
Index('idx_category_price', 'category_id', 'price'),
{'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8'}
)
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
price: Mapped[Decimal] = mapped_column(Numeric(10, 2))
category_id: Mapped[int] = mapped_column(ForeignKey("categories.id"))
Column Types & Constraints
Common Column Types
from sqlalchemy import (
String, Text, Integer, BigInteger, Float, Numeric,
Boolean, Date, DateTime, Time, JSON, LargeBinary
)
from decimal import Decimal
import datetime
class Example(Base):
__tablename__ = "examples"
# Integers
id: Mapped[int] = mapped_column(Integer, primary_key=True)
big_id: Mapped[int] = mapped_column(BigInteger)
# Strings
name: Mapped[str] = mapped_column(String(50))
description: Mapped[Optional[str]] = mapped_column(Text)
# Numbers
price: Mapped[Decimal] = mapped_column(Numeric(10, 2))
rating: Mapped[float] = mapped_column(Float)
# Boolean
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
# Dates and times
birth_date: Mapped[date] = mapped_column(Date)
created_at: Mapped[datetime] = mapped_column(DateTime)
login_time: Mapped[time] = mapped_column(Time)
# JSON (PostgreSQL, MySQL 5.7+, SQLite 3.38+)
metadata: Mapped[dict] = mapped_column(JSON)
# Binary data
file_data: Mapped[bytes] = mapped_column(LargeBinary)
Constraints and Indexes
from sqlalchemy import CheckConstraint, UniqueConstraint, Index
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
# Column-level constraints
username: Mapped[str] = mapped_column(
String(50),
unique=True,
nullable=False,
index=True
)
age: Mapped[int] = mapped_column(
Integer,
CheckConstraint('age >= 0'),
default=0
)
# Table-level constraints
__table_args__ = (
UniqueConstraint('email', 'username'),
CheckConstraint('age BETWEEN 0 AND 150'),
Index('idx_user_created', 'created_at'),
Index('idx_user_multi', 'username', 'email'),
)
Relationships
One-to-Many
from typing import List
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50))
# One user has many posts
posts: Mapped[List["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan"
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(100))
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# Many posts belong to one user
author: Mapped["User"] = relationship(back_populates="posts")
Many-to-Many
from sqlalchemy import Table, Column
# Association table
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True)
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(100))
# Many-to-many with tags
tags: Mapped[List["Tag"]] = relationship(
secondary=post_tags,
back_populates="posts"
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
posts: Mapped[List["Post"]] = relationship(
secondary=post_tags,
back_populates="tags"
)
Association Object Pattern
class PostTag(Base):
__tablename__ = "post_tags"
post_id: Mapped[int] = mapped_column(ForeignKey("posts.id"), primary_key=True)
tag_id: Mapped[int] = mapped_column(ForeignKey("tags.id"), primary_key=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
# Relationships to parent objects
post: Mapped["Post"] = relationship(back_populates="tag_associations")
tag: Mapped["Tag"] = relationship(back_populates="post_associations")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
# Direct many-to-many (read-only)
tags: Mapped[List["Tag"]] = relationship(
secondary="post_tags",
viewonly=True
)
# Association object access
tag_associations: Mapped[List["PostTag"]] = relationship(
back_populates="post"
)
Self-Referential
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
parent_id: Mapped[Optional[int]] = mapped_column(ForeignKey("categories.id"))
# Self-referential relationship
parent: Mapped[Optional["Category"]] = relationship(
"Category",
remote_side=[id],
back_populates="children"
)
children: Mapped[List["Category"]] = relationship(
"Category",
back_populates="parent"
)
Session Management
Basic Session Usage
from sqlalchemy.orm import sessionmaker, Session
# Create session factory
SessionLocal = sessionmaker(bind=engine)
# Context manager (recommended)
def get_user_by_id(user_id: int):
with SessionLocal() as session:
user = session.get(User, user_id)
return user
# Manual session management
def create_user(username: str):
session = SessionLocal()
try:
user = User(username=username)
session.add(user)
session.commit()
return user
except Exception:
session.rollback()
raise
finally:
session.close()
Session Configuration
# Session with custom configuration
SessionLocal = sessionmaker(
bind=engine,
expire_on_commit=False, # Keep objects accessible after commit
autoflush=True, # Auto-flush before queries
autocommit=False # Manual transaction control
)
# Scoped sessions (thread-local)
from sqlalchemy.orm import scoped_session
SessionLocal = scoped_session(sessionmaker(bind=engine))
# Usage
session = SessionLocal()
# ... use session
SessionLocal.remove() # Clean up
Async Sessions
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
# Async engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=True
)
# Async session factory
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
expire_on_commit=False
)
# Async usage
async def create_user_async(username: str):
async with AsyncSessionLocal() as session:
async with session.begin():
user = User(username=username)
session.add(user)
# Auto-commit on exit
return user
CRUD Operations
Create
# Single object
def create_user(session: Session, username: str, email: str):
user = User(username=username, email=email)
session.add(user)
session.commit()
session.refresh(user) # Get auto-generated fields
return user
# Multiple objects
def create_users_bulk(session: Session, user_data: List[dict]):
users = [User(**data) for data in user_data]
session.add_all(users)
session.commit()
return users
# Bulk insert (more efficient)
def bulk_insert_users(session: Session, user_data: List[dict]):
session.execute(
insert(User).values(user_data)
)
session.commit()
Read
# Get by primary key
def get_user(session: Session, user_id: int):
return session.get(User, user_id)
# Get first matching record
def get_user_by_username(session: Session, username: str):
stmt = select(User).where(User.username == username)
return session.scalar(stmt)
# Modern approach with select()
from sqlalchemy import select
def get_user_by_email(session: Session, email: str):
stmt = select(User).where(User.email == email)
return session.scalar(stmt)
# Get all with conditions
def get_active_users(session: Session):
stmt = select(User).where(User.is_active == True)
return session.scalars(stmt).all()
# Paginated results
def get_users_paginated(session: Session, page: int, per_page: int):
stmt = (
select(User)
.offset((page - 1) * per_page)
.limit(per_page)
)
return session.scalars(stmt).all()
Update
# Update single object
def update_user(session: Session, user_id: int, **kwargs):
user = session.get(User, user_id)
if user:
for key, value in kwargs.items():
setattr(user, key, value)
session.commit()
return user
return None
# Bulk update
from sqlalchemy import update
def deactivate_old_users(session: Session, cutoff_date: date):
stmt = (
update(User)
.where(User.last_login < cutoff_date)
.values(is_active=False)
)
result = session.execute(stmt)
session.commit()
return result.rowcount
Delete
# Delete single object
def delete_user(session: Session, user_id: int):
user = session.get(User, user_id)
if user:
session.delete(user)
session.commit()
return True
return False
# Bulk delete
from sqlalchemy import delete
def delete_inactive_users(session: Session):
stmt = delete(User).where(User.is_active == False)
result = session.execute(stmt)
session.commit()
return result.rowcount
Query Patterns
Basic Filtering
from sqlalchemy import select, and_, or_, not_
# Simple conditions
stmt = select(User).where(User.age > 18)
stmt = select(User).where(User.username.like('%john%'))
stmt = select(User).where(User.email.in_(['a@b.com', 'c@d.com']))
# Multiple conditions
stmt = select(User).where(
and_(
User.age >= 18,
User.is_active == True,
User.email.like('%@gmail.com')
)
)
# OR conditions
stmt = select(User).where(
or_(
User.username == 'admin',
User.role == 'superuser'
)
)
# NOT conditions
stmt = select(User).where(not_(User.is_deleted))
Ordering and Limiting
# Order by
stmt = select(User).order_by(User.created_at.desc())
stmt = select(User).order_by(User.last_name, User.first_name)
# Limit and offset
stmt = select(User).limit(10).offset(20)
# Distinct
stmt = select(User.city).distinct()
# First, one, or scalar
user = session.scalars(stmt).first() # First or None
user = session.scalars(stmt).one() # Exactly one or error
users = session.scalars(stmt).all() # All results
Aggregations
from sqlalchemy import func, desc
# Count
stmt = select(func.count(User.id))
total_users = session.scalar(stmt)
# Group by with aggregates
stmt = (
select(User.city, func.count(User.id).label('user_count'))
.group_by(User.city)
.order_by(desc('user_count'))
)
# Having clause
stmt = (
select(User.city, func.count(User.id).label('user_count'))
.group_by(User.city)
.having(func.count(User.id) > 5)
)
Subqueries and CTEs
# Subquery
subq = select(func.avg(User.age)).scalar_subquery()
stmt = select(User).where(User.age > subq)
# Correlated subquery
subq = (
select(func.count(Post.id))
.where(Post.user_id == User.id)
.scalar_subquery()
)
stmt = select(User).where(subq > 5)
# Common Table Expression (CTE)
cte = select(User.city, func.count().label('cnt')).group_by(User.city).cte()
stmt = select(cte).where(cte.c.cnt > 10)
Joins & Eager Loading
Explicit Joins
# Inner join
stmt = (
select(User, Post)
.join(Post, User.id == Post.user_id)
.where(Post.published == True)
)
# Left outer join
stmt = (
select(User)
.outerjoin(Post)
.where(Post.id.is_(None)) # Users with no posts
)
# Multiple joins
stmt = (
select(User, Post, Category)
.join(Post)
.join(Category, Post.category_id == Category.id)
)
Eager Loading
from sqlalchemy.orm import selectinload, joinedload, contains_eager
# Select IN loading (separate query)
stmt = select(User).options(selectinload(User.posts))
# Joined loading (single query)
stmt = select(User).options(joinedload(User.posts))
# Nested relationships
stmt = select(User).options(
selectinload(User.posts).selectinload(Post.comments)
)
# With explicit join
stmt = (
select(User)
.join(User.posts)
.options(contains_eager(User.posts))
.where(Post.published == True)
)
Lazy Loading Control
# Relationship configuration
class User(Base):
posts: Mapped[List["Post"]] = relationship(
lazy='selectin', # Always use selectin loading
# Other options: 'joined', 'subquery', 'raise', 'noload'
)
# Query-level control
from sqlalchemy.orm import lazyload, noload, raiseload
stmt = select(User).options(
lazyload(User.posts), # Force lazy loading
noload(User.profile), # Don't load at all
raiseload(User.settings) # Raise error if accessed
)
Advanced Querying
Window Functions
from sqlalchemy import func
# Row number
stmt = select(
User.username,
func.row_number().over(
order_by=User.created_at
).label('row_num')
)
# Ranking
stmt = select(
User.username,
User.score,
func.rank().over(
order_by=User.score.desc()
).label('rank')
)
# Partition by
stmt = select(
User.username,
User.department,
func.avg(User.salary).over(
partition_by=User.department
).label('dept_avg_salary')
)
Raw SQL and Text
from sqlalchemy import text
# Raw SQL query
stmt = text("SELECT * FROM users WHERE age > :age")
result = session.execute(stmt, {'age': 18})
# Mixed SQL and ORM
stmt = select(User).where(text("age > :age")).params(age=18)
# Complex expressions
stmt = select(User).where(
text("extract(year from created_at) = :year")
).params(year=2023)
Dynamic Queries
def build_user_query(session: Session, **filters):
stmt = select(User)
if name := filters.get('name'):
stmt = stmt.where(User.name.like(f'%{name}%'))
if min_age := filters.get('min_age'):
stmt = stmt.where(User.age >= min_age)
if cities := filters.get('cities'):
stmt = stmt.where(User.city.in_(cities))
if order_by := filters.get('order_by'):
if order_by == 'name':
stmt = stmt.order_by(User.name)
elif order_by == 'age':
stmt = stmt.order_by(User.age.desc())
return session.scalars(stmt).all()
Migrations with Alembic
Setup
# Initialize Alembic
alembic init alembic
# Configure alembic.ini
sqlalchemy.url = sqlite:///database.db
# alembic/env.py configuration
from myapp.models import Base
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"}
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
Migration Commands
# Generate migration
alembic revision --autogenerate -m "Add user table"
# Apply migrations
alembic upgrade head
alembic upgrade +1 # Upgrade one revision
alembic upgrade ae10 # Upgrade to specific revision
# Downgrade
alembic downgrade -1 # Downgrade one revision
alembic downgrade base # Downgrade to beginning
# Show current revision
alembic current
# Show migration history
alembic history
# Check if any new operations would be detected
alembic check
Manual Migration Script
"""Add user table
Revision ID: abc123
Revises: def456
Create Date: 2024-01-01 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('username', sa.String(50), nullable=False),
sa.Column('email', sa.String(100), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now())
)
op.create_index('ix_users_username', 'users', ['username'])
op.create_unique_constraint('uq_users_email', 'users', ['email'])
def downgrade():
op.drop_constraint('uq_users_email', 'users', type_='unique')
op.drop_index('ix_users_username', 'users')
op.drop_table('users')
Data Migrations
from alembic import op
from sqlalchemy import text
def upgrade():
# Schema change
op.add_column('users', sa.Column('full_name', sa.String(100)))
# Data migration
connection = op.get_bind()
connection.execute(
text("UPDATE users SET full_name = first_name || ' ' || last_name")
)
def downgrade():
op.drop_column('users', 'full_name')
Best Practices
Session Management
# ✅ Good: Use context managers
def get_user_posts(user_id: int):
with SessionLocal() as session:
stmt = (
select(User)
.options(selectinload(User.posts))
.where(User.id == user_id)
)
return session.scalar(stmt)
# ✅ Good: Dependency injection pattern
def create_user(session: Session, username: str) -> User:
user = User(username=username)
session.add(user)
session.commit()
session.refresh(user)
return user
# ❌ Bad: Long-lived sessions
global_session = SessionLocal() # Don't do this
Query Optimization
# ✅ Good: Use selectinload for one-to-many
stmt = select(User).options(selectinload(User.posts))
# ✅ Good: Use joinedload for many-to-one
stmt = select(Post).options(joinedload(Post.author))
# ✅ Good: Load only needed columns
stmt = select(User.id, User.username).where(User.is_active == True)
# ✅ Good: Use bulk operations for large datasets
session.execute(
update(User).where(User.last_login < cutoff).values(is_active=False)
)
# ❌ Bad: N+1 queries
users = session.scalars(select(User)).all()
for user in users:
print(user.posts) # Triggers separate query for each user
Model Design
# ✅ Good: Use type hints and proper defaults
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
def __repr__(self) -> str:
return f"User(id={self.id}, username='{self.username}')"
# ✅ Good: Use appropriate relationship loading
class User(Base):
posts: Mapped[List["Post"]] = relationship(
back_populates="author",
lazy="selectin", # Efficient for one-to-many
cascade="all, delete-orphan"
)
# ✅ Good: Index frequently queried columns
class Post(Base):
__tablename__ = "posts"
__table_args__ = (
Index('idx_post_user_created', 'user_id', 'created_at'),
Index('idx_post_status', 'status'),
)
Error Handling
from sqlalchemy.exc import IntegrityError, NoResultFound
def create_user_safe(session: Session, username: str, email: str):
try:
user = User(username=username, email=email)
session.add(user)
session.commit()
return user, None
except IntegrityError as e:
session.rollback()
if 'username' in str(e):
return None, "Username already exists"
elif 'email' in str(e):
return None, "Email already exists"
return None, "Database constraint error"
except Exception as e:
session.rollback()
return None, f"Unexpected error: {str(e)}"
def get_user_or_404(session: Session, user_id: int):
user = session.get(User, user_id)
if not user:
raise NoResultFound(f"User {user_id} not found")
return user
Testing Patterns
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def db_session():
# Use in-memory SQLite for tests
engine = create_engine("sqlite:///:memory:", echo=False)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
yield session
finally:
session.close()
def test_create_user(db_session):
user = User(username="testuser", email="test@example.com")
db_session.add(user)
db_session.commit()
assert user.id is not None
assert user.username == "testuser"
# Test retrieval
retrieved = db_session.get(User, user.id)
assert retrieved.username == "testuser"
Common Gotchas
- Expired objects after commit: Use
expire_on_commit=Falseorsession.refresh(obj) - N+1 queries: Use eager loading (
selectinload,joinedload) - Session scope: Always use context managers or proper cleanup
- Timezone-aware datetimes: Use
timezone=Truefor DateTime columns - Bulk operations: Use
bulk_insert_mappings()for large datasets - Connection pooling: Configure pool settings for production
- Migration conflicts: Always review auto-generated migrations
- Relationship loading: Choose appropriate
lazyloading strategies
This cheat sheet covers the essential SQLAlchemy patterns for modern Python applications. For async usage, replace Session with AsyncSession and add async/await keywords as needed.