Skip to main content

Tutorial Progress

1

Database Setup and Basic Models

20 minutes

2

Relationships and Advanced Queries

20 minutes

3

Migrations and FastAPI Integration

20 minutes

Overall Progress 0%

Tutorial Info

Difficulty
Intermediate
Duration 60 minutes
Reading Time 21 min
Last Updated 2024-01-15
Tutorials Database Integration with SQLAlchemy

Database Integration with SQLAlchemy

Featured Intermediate

Learn how to use SQLAlchemy for database operations, relationships, and migrations in Python applications

What You'll Learn

  • Set up SQLAlchemy with database connections
  • Create models with relationships
  • Perform CRUD operations
  • Handle database migrations with Alembic
  • Integrate with FastAPI and Pydantic

Prerequisites

  • Python fundamentals
  • Basic SQL knowledge
  • Understanding of database concepts
  • Python 3.8+ installed

What You'll Build

A complete database-driven application with proper ORM patterns and migrations

Overview

SQLAlchemy is Python’s most popular SQL toolkit and Object-Relational Mapping (ORM) library. In this tutorial, you’ll learn how to use SQLAlchemy to build robust database-driven applications with proper relationships, migrations, and integration with FastAPI.

Step 1: Database Setup and Basic Models

Let’s start by setting up SQLAlchemy and creating our first models.

Installation

1
pip install sqlalchemy alembic fastapi pydantic psycopg2-binary uvicorn

Database Configuration

Create database.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os

# Database URL - use SQLite for development, PostgreSQL for production
DATABASE_URL = os.getenv(
    "DATABASE_URL", 
    "sqlite:///./app.db"  # SQLite for development
    # "postgresql://user:password@localhost/dbname"  # PostgreSQL for production
)

# Create engine
engine = create_engine(
    DATABASE_URL,
    # SQLite specific settings
    connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {}
)

# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Create Base class
Base = declarative_base()

# Dependency to get DB session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Basic Models

Create models.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey, Table
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base

# Association table for many-to-many relationship between users and roles
user_roles = Table(
    'user_roles',
    Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id'), primary_key=True),
    Column('role_id', Integer, ForeignKey('roles.id'), primary_key=True)
)

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, index=True, nullable=False)
    email = Column(String(100), unique=True, index=True, nullable=False)
    full_name = Column(String(100), nullable=False)
    hashed_password = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # Relationships
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
    comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")
    roles = relationship("Role", secondary=user_roles, back_populates="users")
    
    def __repr__(self):
        return f"<User(username='{self.username}', email='{self.email}')>"

class Role(Base):
    __tablename__ = "roles"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, nullable=False)
    description = Column(String(200))
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    
    # Relationships
    users = relationship("User", secondary=user_roles, back_populates="roles")
    
    def __repr__(self):
        return f"<Role(name='{self.name}')>"

class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False, index=True)
    content = Column(Text, nullable=False)
    is_published = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # Foreign key
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    
    # Relationships
    author = relationship("User", back_populates="posts")
    comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
    
    def __repr__(self):
        return f"<Post(title='{self.title}', author='{self.author.username if self.author else None}')>"

class Comment(Base):
    __tablename__ = "comments"
    
    id = Column(Integer, primary_key=True, index=True)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    
    # Foreign keys
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)
    
    # Relationships
    author = relationship("User", back_populates="comments")
    post = relationship("Post", back_populates="comments")
    
    def __repr__(self):
        return f"<Comment(author='{self.author.username if self.author else None}', post_id={self.post_id})>"

Create Database Tables

Create create_tables.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from database import engine, Base
from models import User, Role, Post, Comment

def create_tables():
    """Create all tables in the database"""
    Base.metadata.create_all(bind=engine)
    print("✅ All tables created successfully!")

if __name__ == "__main__":
    create_tables()

Basic CRUD Operations

Create crud_operations.py:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from database import SessionLocal
from models import User, Role, Post, Comment
from datetime import datetime
import hashlib

def hash_password(password: str) -> str:
    """Simple password hashing (use proper hashing in production)"""
    return hashlib.sha256(password.encode()).hexdigest()

def create_sample_data():
    """Create sample data for testing"""
    db = SessionLocal()
    
    try:
        # Create roles
        admin_role = Role(name="admin", description="Administrator role")
        user_role = Role(name="user", description="Regular user role")
        
        db.add(admin_role)
        db.add(user_role)
        db.commit()
        
        # Create users
        admin_user = User(
            username="admin",
            email="[email protected]",
            full_name="Admin User",
            hashed_password=hash_password("admin123"),
            roles=[admin_role]
        )
        
        regular_user = User(
            username="johndoe",
            email="[email protected]",
            full_name="John Doe",
            hashed_password=hash_password("user123"),
            roles=[user_role]
        )
        
        db.add(admin_user)
        db.add(regular_user)
        db.commit()
        
        # Create posts
        post1 = Post(
            title="Getting Started with SQLAlchemy",
            content="SQLAlchemy is a powerful ORM for Python...",
            is_published=True,
            author=admin_user
        )
        
        post2 = Post(
            title="Advanced Database Relationships",
            content="In this post, we'll explore complex relationships...",
            is_published=True,
            author=regular_user
        )
        
        db.add(post1)
        db.add(post2)
        db.commit()
        
        # Create comments
        comment1 = Comment(
            content="Great tutorial! Very helpful.",
            author=regular_user,
            post=post1
        )
        
        comment2 = Comment(
            content="Thanks for sharing this knowledge.",
            author=admin_user,
            post=post2
        )
        
        db.add(comment1)
        db.add(comment2)
        db.commit()
        
        print("✅ Sample data created successfully!")
        
    except Exception as e:
        print(f"❌ Error creating sample data: {e}")
        db.rollback()
    finally:
        db.close()

def demonstrate_queries():
    """Demonstrate various query operations"""
    db = SessionLocal()
    
    try:
        print("=== Basic Queries ===")
        
        # Get all users
        users = db.query(User).all()
        print(f"Total users: {len(users)}")
        for user in users:
            print(f"  - {user.username} ({user.email})")
        
        # Get user by username
        admin = db.query(User).filter(User.username == "admin").first()
        print(f"\nAdmin user: {admin}")
        
        # Get published posts with authors
        published_posts = db.query(Post).filter(Post.is_published == True).all()
        print(f"\nPublished posts: {len(published_posts)}")
        for post in published_posts:
            print(f"  - '{post.title}' by {post.author.username}")
        
        print("\n=== Relationship Queries ===")
        
        # Get posts with their comments
        posts_with_comments = db.query(Post).join(Comment).all()
        for post in posts_with_comments:
            print(f"\nPost: {post.title}")
            for comment in post.comments:
                print(f"  Comment by {comment.author.username}: {comment.content[:50]}...")
        
        # Get users with their roles
        users_with_roles = db.query(User).join(User.roles).all()
        for user in users_with_roles:
            role_names = [role.name for role in user.roles]
            print(f"User {user.username} has roles: {', '.join(role_names)}")
        
        print("\n=== Advanced Queries ===")
        
        # Count posts per user
        from sqlalchemy import func
        post_counts = db.query(
            User.username,
            func.count(Post.id).label('post_count')
        ).join(Post).group_by(User.username).all()
        
        print("Posts per user:")
        for username, count in post_counts:
            print(f"  - {username}: {count} posts")
        
        # Get recent posts (last 30 days)
        from datetime import datetime, timedelta
        recent_date = datetime.now() - timedelta(days=30)
        recent_posts = db.query(Post).filter(Post.created_at >= recent_date).all()
        print(f"\nRecent posts (last 30 days): {len(recent_posts)}")
        
        # Complex query with multiple conditions
        active_users_with_posts = db.query(User).filter(
            and_(
                User.is_active == True,
                User.posts.any(Post.is_published == True)
            )
        ).all()
        print(f"Active users with published posts: {len(active_users_with_posts)}")
        
    except Exception as e:
        print(f"❌ Error in queries: {e}")
    finally:
        db.close()

if __name__ == "__main__":
    create_sample_data()
    print("\n" + "="*50)
    demonstrate_queries()

Expected Output

Run the scripts:

1
2
python create_tables.py
python crud_operations.py

You should see output like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
✅ All tables created successfully!
✅ Sample data created successfully!

==================================================
=== Basic Queries ===
Total users: 2
  - admin ([email protected])
  - johndoe ([email protected])

Admin user: <User(username='admin', email='[email protected]')>

Published posts: 2
  - 'Getting Started with SQLAlchemy' by admin
  - 'Advanced Database Relationships' by johndoe

=== Relationship Queries ===

Post: Getting Started with SQLAlchemy
  Comment by johndoe: Great tutorial! Very helpful.

Post: Advanced Database Relationships
  Comment by admin: Thanks for sharing this knowledge.

User admin has roles: admin
User johndoe has roles: user

=== Advanced Queries ===
Posts per user:
  - admin: 1 posts
  - johndoe: 1 posts

Recent posts (last 30 days): 2
Active users with published posts: 2

Step 2: Relationships and Advanced Queries

Now let’s explore more complex relationships and advanced querying techniques.

Advanced Relationship Patterns

Create advanced_models.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Table, Enum as SQLEnum
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
import enum

# Self-referential many-to-many for user followers
followers = Table(
    'followers',
    Base.metadata,
    Column('follower_id', Integer, ForeignKey('users.id'), primary_key=True),
    Column('followed_id', Integer, ForeignKey('users.id'), primary_key=True)
)

class PostStatus(enum.Enum):
    DRAFT = "draft"
    PUBLISHED = "published"
    ARCHIVED = "archived"

class Category(Base):
    __tablename__ = "categories"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), unique=True, nullable=False)
    description = Column(String(500))
    
    # Self-referential relationship for parent/child categories
    parent_id = Column(Integer, ForeignKey('categories.id'))
    parent = relationship("Category", remote_side=[id], backref="children")
    
    # Posts relationship
    posts = relationship("AdvancedPost", back_populates="category")

class Tag(Base):
    __tablename__ = "tags"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, nullable=False)
    
    # Many-to-many with posts
    posts = relationship("AdvancedPost", secondary="post_tags", back_populates="tags")

# Association table for posts and tags
post_tags = Table(
    'post_tags',
    Base.metadata,
    Column('post_id', Integer, ForeignKey('advanced_posts.id'), primary_key=True),
    Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)

class AdvancedPost(Base):
    __tablename__ = "advanced_posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    content = Column(String, nullable=False)
    status = Column(SQLEnum(PostStatus), default=PostStatus.DRAFT)
    view_count = Column(Integer, default=0)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # Foreign keys
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    category_id = Column(Integer, ForeignKey("categories.id"))
    
    # Relationships
    author = relationship("User")
    category = relationship("Category", back_populates="posts")
    tags = relationship("Tag", secondary=post_tags, back_populates="posts")

# Add to User model (extend existing User)
# In a real application, you would modify the original User model
class ExtendedUser(Base):
    __tablename__ = "extended_users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    
    # Self-referential many-to-many for followers
    followed = relationship(
        "ExtendedUser",
        secondary=followers,
        primaryjoin=id == followers.c.follower_id,
        secondaryjoin=id == followers.c.followed_id,
        backref="followers"
    )

Advanced Query Examples

Create advanced_queries.py:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from sqlalchemy.orm import Session, joinedload, selectinload, subqueryload
from sqlalchemy import func, desc, asc, and_, or_, exists, case
from database import SessionLocal, engine, Base
from models import User, Post, Comment
from advanced_models import Category, Tag, AdvancedPost, PostStatus
import random

def create_advanced_sample_data():
    """Create more complex sample data"""
    # Create tables first
    Base.metadata.create_all(bind=engine)
    
    db = SessionLocal()
    
    try:
        # Create categories with hierarchy
        tech_category = Category(name="Technology", description="Tech-related posts")
        python_category = Category(name="Python", description="Python programming", parent=tech_category)
        web_category = Category(name="Web Development", description="Web development topics", parent=tech_category)
        
        db.add_all([tech_category, python_category, web_category])
        db.commit()
        
        # Create tags
        tags = [
            Tag(name="python"),
            Tag(name="sqlalchemy"),
            Tag(name="database"),
            Tag(name="orm"),
            Tag(name="tutorial"),
            Tag(name="beginner"),
            Tag(name="advanced")
        ]
        
        db.add_all(tags)
        db.commit()
        
        # Get existing user
        user = db.query(User).first()
        if not user:
            print("No users found. Please run crud_operations.py first.")
            return
        
        # Create advanced posts with relationships
        posts_data = [
            {
                "title": "SQLAlchemy Relationships Explained",
                "content": "Deep dive into SQLAlchemy relationships...",
                "status": PostStatus.PUBLISHED,
                "category": python_category,
                "tags": [tags[0], tags[1], tags[4]]  # python, sqlalchemy, tutorial
            },
            {
                "title": "Database Design Best Practices",
                "content": "Learn how to design efficient databases...",
                "status": PostStatus.PUBLISHED,
                "category": tech_category,
                "tags": [tags[2], tags[6]]  # database, advanced
            },
            {
                "title": "ORM vs Raw SQL",
                "content": "Comparing ORM and raw SQL approaches...",
                "status": PostStatus.DRAFT,
                "category": python_category,
                "tags": [tags[3], tags[2]]  # orm, database
            }
        ]
        
        for post_data in posts_data:
            post = AdvancedPost(
                title=post_data["title"],
                content=post_data["content"],
                status=post_data["status"],
                author=user,
                category=post_data["category"],
                tags=post_data["tags"],
                view_count=random.randint(10, 1000)
            )
            db.add(post)
        
        db.commit()
        print("✅ Advanced sample data created!")
        
    except Exception as e:
        print(f"❌ Error creating advanced data: {e}")
        db.rollback()
    finally:
        db.close()

def demonstrate_advanced_queries():
    """Demonstrate advanced querying techniques"""
    db = SessionLocal()
    
    try:
        print("=== Eager Loading Examples ===")
        
        # Lazy loading (default) - causes N+1 problem
        posts = db.query(AdvancedPost).all()
        print("Posts with lazy loading:")
        for post in posts:
            print(f"  - {post.title} by {post.author.username} in {post.category.name if post.category else 'No category'}")
        
        # Eager loading with joinedload (LEFT JOIN)
        posts_with_author = db.query(AdvancedPost).options(
            joinedload(AdvancedPost.author),
            joinedload(AdvancedPost.category),
            joinedload(AdvancedPost.tags)
        ).all()
        
        print("\nPosts with eager loading:")
        for post in posts_with_author:
            tag_names = [tag.name for tag in post.tags]
            print(f"  - {post.title} (tags: {', '.join(tag_names)})")
        
        print("\n=== Aggregation Queries ===")
        
        # Count posts by status
        status_counts = db.query(
            AdvancedPost.status,
            func.count(AdvancedPost.id).label('count')
        ).group_by(AdvancedPost.status).all()
        
        print("Posts by status:")
        for status, count in status_counts:
            print(f"  - {status.value}: {count}")
        
        # Average view count by category
        avg_views = db.query(
            Category.name,
            func.avg(AdvancedPost.view_count).label('avg_views')
        ).join(AdvancedPost).group_by(Category.name).all()
        
        print("\nAverage views by category:")
        for category, avg in avg_views:
            print(f"  - {category}: {avg:.1f}")
        
        print("\n=== Subqueries and CTEs ===")
        
        # Subquery: Find users with above-average post count
        avg_post_count = db.query(func.avg(
            db.query(func.count(AdvancedPost.id))
            .filter(AdvancedPost.author_id == User.id)
            .scalar_subquery()
        )).scalar()
        
        prolific_users = db.query(User).filter(
            db.query(func.count(AdvancedPost.id))
            .filter(AdvancedPost.author_id == User.id)
            .scalar_subquery() > avg_post_count
        ).all()
        
        print(f"Users with above-average post count (avg: {avg_post_count:.1f}):")
        for user in prolific_users:
            post_count = db.query(func.count(AdvancedPost.id)).filter(
                AdvancedPost.author_id == user.id
            ).scalar()
            print(f"  - {user.username}: {post_count} posts")
        
        print("\n=== Window Functions ===")
        
        # Rank posts by view count within each category
        from sqlalchemy import text
        ranked_posts = db.execute(text("""
            SELECT 
                title,
                view_count,
                c.name as category_name,
                RANK() OVER (PARTITION BY category_id ORDER BY view_count DESC) as rank
            FROM advanced_posts ap
            LEFT JOIN categories c ON ap.category_id = c.id
            ORDER BY c.name, rank
        """)).fetchall()
        
        print("Posts ranked by views within category:")
        current_category = None
        for title, views, category, rank in ranked_posts:
            if category != current_category:
                print(f"\n  {category or 'No Category'}:")
                current_category = category
            print(f"    {rank}. {title} ({views} views)")
        
        print("\n=== Complex Filtering ===")
        
        # Posts with specific tags and high view count
        popular_python_posts = db.query(AdvancedPost).join(
            AdvancedPost.tags
        ).filter(
            and_(
                Tag.name.in_(['python', 'sqlalchemy']),
                AdvancedPost.view_count > 100,
                AdvancedPost.status == PostStatus.PUBLISHED
            )
        ).distinct().all()
        
        print("Popular Python/SQLAlchemy posts:")
        for post in popular_python_posts:
            print(f"  - {post.title} ({post.view_count} views)")
        
        # Posts in categories with subcategories
        posts_in_parent_categories = db.query(AdvancedPost).join(
            Category
        ).filter(
            exists().where(Category.parent_id == AdvancedPost.category_id)
        ).all()
        
        print(f"\nPosts in parent categories: {len(posts_in_parent_categories)}")
        
    except Exception as e:
        print(f"❌ Error in advanced queries: {e}")
    finally:
        db.close()

if __name__ == "__main__":
    create_advanced_sample_data()
    print("\n" + "="*60)
    demonstrate_advanced_queries()

Step 3: Migrations and FastAPI Integration

Now let’s set up database migrations with Alembic and integrate everything with FastAPI.

Alembic Setup

Initialize Alembic:

1
alembic init alembic

Edit alembic.ini to set the database URL:

1
2
# In alembic.ini, find the line:
sqlalchemy.url = sqlite:///./app.db

Edit alembic/env.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os
import sys

# Add your project directory to Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))

# Import your models
from database import Base
from models import User, Role, Post, Comment
from advanced_models import Category, Tag, AdvancedPost, ExtendedUser

# this is the Alembic Config object
config = context.config

# Interpret the config file for Python logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Set target metadata
target_metadata = Base.metadata

def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Create your first migration:

1
2
alembic revision --autogenerate -m "Initial migration"
alembic upgrade head

FastAPI Integration

Create schemas.py for Pydantic models:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from pydantic import BaseModel, Field, validator
from typing import List, Optional
from datetime import datetime
from enum import Enum

class PostStatus(str, Enum):
    DRAFT = "draft"
    PUBLISHED = "published"
    ARCHIVED = "archived"

# User schemas
class UserBase(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str = Field(..., regex=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
    full_name: str = Field(..., min_length=1, max_length=100)

class UserCreate(UserBase):
    password: str = Field(..., min_length=6)

class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime
    
    class Config:
        from_attributes = True

# Post schemas
class PostBase(BaseModel):
    title: str = Field(..., min_length=1, max_length=200)
    content: str = Field(..., min_length=1)

class PostCreate(PostBase):
    is_published: bool = False

class PostUpdate(BaseModel):
    title: Optional[str] = Field(None, min_length=1, max_length=200)
    content: Optional[str] = Field(None, min_length=1)
    is_published: Optional[bool] = None

class PostResponse(PostBase):
    id: int
    is_published: bool
    created_at: datetime
    updated_at: Optional[datetime]
    author: UserResponse
    
    class Config:
        from_attributes = True

# Comment schemas
class CommentBase(BaseModel):
    content: str = Field(..., min_length=1, max_length=1000)

class CommentCreate(CommentBase):
    post_id: int

class CommentResponse(CommentBase):
    id: int
    created_at: datetime
    author: UserResponse
    post_id: int
    
    class Config:
        from_attributes = True

Create crud.py for database operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from models import User, Post, Comment, Role
import schemas
import hashlib

def hash_password(password: str) -> str:
    """Hash password (use proper hashing in production)"""
    return hashlib.sha256(password.encode()).hexdigest()

# User CRUD
def get_user(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()

def get_user_by_username(db: Session, username: str):
    return db.query(User).filter(User.username == username).first()

def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(User).offset(skip).limit(limit).all()

def create_user(db: Session, user: schemas.UserCreate):
    hashed_password = hash_password(user.password)
    db_user = User(
        username=user.username,
        email=user.email,
        full_name=user.full_name,
        hashed_password=hashed_password
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

# Post CRUD
def get_posts(db: Session, skip: int = 0, limit: int = 100, published_only: bool = False):
    query = db.query(Post)
    if published_only:
        query = query.filter(Post.is_published == True)
    return query.offset(skip).limit(limit).all()

def get_post(db: Session, post_id: int):
    return db.query(Post).filter(Post.id == post_id).first()

def create_post(db: Session, post: schemas.PostCreate, author_id: int):
    db_post = Post(**post.model_dump(), author_id=author_id)
    db.add(db_post)
    db.commit()
    db.refresh(db_post)
    return db_post

def update_post(db: Session, post_id: int, post_update: schemas.PostUpdate):
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if db_post:
        update_data = post_update.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(db_post, field, value)
        db.commit()
        db.refresh(db_post)
    return db_post

def delete_post(db: Session, post_id: int):
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if db_post:
        db.delete(db_post)
        db.commit()
    return db_post

# Comment CRUD
def get_comments_for_post(db: Session, post_id: int):
    return db.query(Comment).filter(Comment.post_id == post_id).all()

def create_comment(db: Session, comment: schemas.CommentCreate, author_id: int):
    db_comment = Comment(**comment.model_dump(), author_id=author_id)
    db.add(db_comment)
    db.commit()
    db.refresh(db_comment)
    return db_comment

Create the main FastAPI application main.py:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
import crud
import schemas
from database import get_db, engine, Base

# Create tables
Base.metadata.create_all(bind=engine)

app = FastAPI(
    title="Blog API with SQLAlchemy",
    description="A blog API demonstrating SQLAlchemy integration with FastAPI",
    version="1.0.0"
)

# User endpoints
@app.post("/users/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_username(db, username=user.username)
    if db_user:
        raise HTTPException(
            status_code=400,
            detail="Username already registered"
        )
    return crud.create_user(db=db, user=user)

@app.get("/users/", response_model=List[schemas.UserResponse])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users

@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

# Post endpoints
@app.post("/posts/", response_model=schemas.PostResponse)
def create_post(post: schemas.PostCreate, author_id: int, db: Session = Depends(get_db)):
    # In a real app, you'd get author_id from authentication
    db_user = crud.get_user(db, user_id=author_id)
    if not db_user:
        raise HTTPException(status_code=404, detail="Author not found")
    return crud.create_post(db=db, post=post, author_id=author_id)

@app.get("/posts/", response_model=List[schemas.PostResponse])
def read_posts(
    skip: int = 0, 
    limit: int = 100, 
    published_only: bool = False,
    db: Session = Depends(get_db)
):
    posts = crud.get_posts(db, skip=skip, limit=limit, published_only=published_only)
    return posts

@app.get("/posts/{post_id}", response_model=schemas.PostResponse)
def read_post(post_id: int, db: Session = Depends(get_db)):
    db_post = crud.get_post(db, post_id=post_id)
    if db_post is None:
        raise HTTPException(status_code=404, detail="Post not found")
    return db_post

@app.put("/posts/{post_id}", response_model=schemas.PostResponse)
def update_post(
    post_id: int, 
    post_update: schemas.PostUpdate, 
    db: Session = Depends(get_db)
):
    db_post = crud.update_post(db, post_id=post_id, post_update=post_update)
    if db_post is None:
        raise HTTPException(status_code=404, detail="Post not found")
    return db_post

@app.delete("/posts/{post_id}")
def delete_post(post_id: int, db: Session = Depends(get_db)):
    db_post = crud.delete_post(db, post_id=post_id)
    if db_post is None:
        raise HTTPException(status_code=404, detail="Post not found")
    return {"message": "Post deleted successfully"}

# Comment endpoints
@app.post("/comments/", response_model=schemas.CommentResponse)
def create_comment(
    comment: schemas.CommentCreate, 
    author_id: int, 
    db: Session = Depends(get_db)
):
    # Verify post exists
    db_post = crud.get_post(db, post_id=comment.post_id)
    if not db_post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    # Verify author exists
    db_user = crud.get_user(db, user_id=author_id)
    if not db_user:
        raise HTTPException(status_code=404, detail="Author not found")
    
    return crud.create_comment(db=db, comment=comment, author_id=author_id)

@app.get("/posts/{post_id}/comments/", response_model=List[schemas.CommentResponse])
def read_comments_for_post(post_id: int, db: Session = Depends(get_db)):
    # Verify post exists
    db_post = crud.get_post(db, post_id=post_id)
    if not db_post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    return crud.get_comments_for_post(db, post_id=post_id)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Test the Complete Application

Create test_complete_app.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import requests
import json

BASE_URL = "http://localhost:8000"

def test_complete_workflow():
    """Test the complete application workflow"""
    
    print("=== Testing Complete SQLAlchemy + FastAPI Application ===\n")
    
    # 1. Create a user
    print("1. Creating a user...")
    user_data = {
        "username": "testuser",
        "email": "[email protected]",
        "full_name": "Test User",
        "password": "testpass123"
    }
    
    response = requests.post(f"{BASE_URL}/users/", json=user_data)
    if response.status_code == 200:
        user = response.json()
        print(f"✅ User created: {user['username']} (ID: {user['id']})")
        user_id = user['id']
    else:
        print(f"❌ Failed to create user: {response.text}")
        return
    
    # 2. Create a post
    print("\n2. Creating a post...")
    post_data = {
        "title": "My First SQLAlchemy Post",
        "content": "This is a post created through the FastAPI + SQLAlchemy integration!",
        "is_published": True
    }
    
    response = requests.post(f"{BASE_URL}/posts/?author_id={user_id}", json=post_data)
    if response.status_code == 200:
        post = response.json()
        print(f"✅ Post created: '{post['title']}' (ID: {post['id']})")
        post_id = post['id']
    else:
        print(f"❌ Failed to create post: {response.text}")
        return
    
    # 3. Add a comment
    print("\n3. Adding a comment...")
    comment_data = {
        "content": "Great post! SQLAlchemy is really powerful.",
        "post_id": post_id
    }
    
    response = requests.post(f"{BASE_URL}/comments/?author_id={user_id}", json=comment_data)
    if response.status_code == 200:
        comment = response.json()
        print(f"✅ Comment added: '{comment['content'][:50]}...'")
    else:
        print(f"❌ Failed to add comment: {response.text}")
    
    # 4. List all posts
    print("\n4. Listing all posts...")
    response = requests.get(f"{BASE_URL}/posts/")
    if response.status_code == 200:
        posts = response.json()
        print(f"✅ Found {len(posts)} posts:")
        for post in posts:
            print(f"  - '{post['title']}' by {post['author']['username']}")
    
    # 5. Get comments for the post
    print(f"\n5. Getting comments for post {post_id}...")
    response = requests.get(f"{BASE_URL}/posts/{post_id}/comments/")
    if response.status_code == 200:
        comments = response.json()
        print(f"✅ Found {len(comments)} comments:")
        for comment in comments:
            print(f"  - {comment['author']['username']}: {comment['content'][:50]}...")
    
    # 6. Update the post
    print(f"\n6. Updating post {post_id}...")
    update_data = {
        "title": "My Updated SQLAlchemy Post",
        "content": "This post has been updated to show SQLAlchemy's update capabilities!"
    }
    
    response = requests.put(f"{BASE_URL}/posts/{post_id}", json=update_data)
    if response.status_code == 200:
        updated_post = response.json()
        print(f"✅ Post updated: '{updated_post['title']}'")
    
    print("\n🎉 Complete workflow test successful!")

if __name__ == "__main__":
    test_complete_workflow()

Run the complete application:

1
2
3
4
5
# Terminal 1: Start the FastAPI server
python main.py

# Terminal 2: Run the test
python test_complete_app.py

Congratulations!

You’ve successfully built a complete database-driven application using SQLAlchemy with:

  • Database Models: Complex relationships and proper schema design
  • CRUD Operations: Full create, read, update, delete functionality
  • Advanced Queries: Joins, aggregations, and complex filtering
  • Database Migrations: Version control for your database schema
  • FastAPI Integration: Type-safe API with automatic documentation
  • Pydantic Integration: Request/response validation and serialization

This foundation gives you everything you need to build production-ready database applications with the Pragmatic AI Stack.

Next Steps

  • Implement authentication and authorization
  • Add database connection pooling and optimization
  • Set up database monitoring and logging
  • Explore SQLAlchemy’s advanced features like hybrid properties
  • Implement caching strategies for better performance