ORM: 用Python对象操作数据库,无需直接编写SQL。

  • Django ORM: Django自带,功能强大,易用性好。(数据库迁移工具Django Migrations: Django内置。)
  • SQLAlchemy: Python界ORM的事实标准,功能极其丰富和灵活,学习曲线稍陡。(数据库迁移工具Alembic: 通常与SQLAlchemy配合使用。)
  • Peewee: 轻量级的替代品,API更简单直观。

数据库驱动

  • PostgreSQL: psycopg2 或 asyncpg
  • MySQL: mysqlclient 或 PyMySQL
  • MongoDB: pymongo
  • Redis: redis

FastAPI + SQLAlchemy 案例

fastapi_db_example/
├── main.py  -----------FastAPI应用入口
├── database.py  --------数据库配置和会话管理
├── models.py  -----------SQLAlchemy模型定义
├── schemas.py ------------Pydantic模式定义
├── crud.py -----------数据库操作类
└── dependencies.py

database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os

#数据库URL-支持多种数据库
DATABASE_URL = os.getenv("DATABASE_URL","sqlite:///./test.db")

#创建引擎
engine = create_engine(
	DATABASE_URL,
	connect_args = {"check_same_thread": False} if "sqlite" in DATABASE_URL else {}
)

#会话工厂
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)

#声明基类
Base = declarative_base()

#依赖注入获取数据库会话
def get_db():
	db = SessionLocal()
	try:
		yield db
	finally:
		db.close()

models.py

使用Python类定义数据表结构

from sqlalchemy import Column,Integer,String,DateTime,Boolean,Text,ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
from typing import List,Optional

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, index=True, nullable=False)
    email = Column(String(100), unique=True, index=True, nullable=False)
    hashed_password = Column(String(100), nullable=False)
    full_name = Column(String(100))
    is_active = Column(Boolean, default=True)
    role = Column(String(20), default="user")
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # 关系定义
    posts = relationship("Post", back_populates="author")
    comments = relationship("Comment", back_populates="author")

class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    summary = Column(String(500))
    is_published = Column(Boolean, default=False)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # 关系
    author = relationship("User", back_populates="posts")
    comments = relationship("Comment", back_populates="post")
    
    # 标签多对多关系
    tags = relationship("Tag", secondary="post_tags", back_populates="posts")

class Comment(Base):
    __tablename__ = "comments"
    
    id = Column(Integer, primary_key=True, index=True)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    
    # 关系
    author = relationship("User", back_populates="comments")
    post = relationship("Post", back_populates="comments")

class Tag(Base):
    __tablename__ = "tags"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, nullable=False)
    description = Column(String(200))
    
    posts = relationship("Post", secondary="post_tags", back_populates="tags")

# 多对多关联表
class PostTag(Base):
    __tablename__ = "post_tags"
    
    post_id = Column(Integer, ForeignKey("posts.id"), primary_key=True)
    tag_id = Column(Integer, ForeignKey("tags.id"), primary_key=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

Pydantic模式定义

请求/响应数据的序列化和验证

from pydantic import BaseModel, EmailStr, validator
from typing import List, Optional
from datetime import datetime

class UserBase(BaseModel):
    username: str
    email: EmailStr
    full_name: Optional[str] = None
    role: str = "user"

class UserCreate(UserBase):
    password: str
    
    @validator('password')
    def password_strength(cls, v):
        if len(v) < 6:
            raise ValueError('密码至少6位')
        return v

class UserUpdate(BaseModel):
    username: Optional[str] = None
    email: Optional[EmailStr] = None
    full_name: Optional[str] = None
    role: Optional[str] = None

class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime
    updated_at: Optional[datetime] = None
    
    class Config:
        orm_mode = True

class PostBase(BaseModel):
    title: str
    content: str
    summary: Optional[str] = None
    is_published: bool = False

class PostCreate(PostBase):
    pass

class PostUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None
    summary: Optional[str] = None
    is_published: Optional[bool] = None

class PostResponse(PostBase):
    id: int
    author_id: int
    created_at: datetime
    updated_at: Optional[datetime] = None
    author: UserResponse
    tags: List["TagResponse"] = []
    
    class Config:
        orm_mode = True

class TagBase(BaseModel):
    name: str
    description: Optional[str] = None

class TagResponse(TagBase):
    id: int
    
    class Config:
        orm_mode = True

class CommentBase(BaseModel):
    content: str

class CommentCreate(CommentBase):
    pass

class CommentResponse(CommentBase):
    id: int
    author_id: int
    post_id: int
    created_at: datetime
    author: UserResponse
    
    class Config:
        orm_mode = True

crud.py

# crud.py - 数据库操作类
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from typing import List, Optional
import models
import schemas
from auth import get_password_hash

class UserCRUD:
    @staticmethod
    def get_user(db: Session, user_id: int) -> Optional[models.User]:
        return db.query(models.User).filter(models.User.id == user_id).first()
    
    @staticmethod
    def get_user_by_email(db: Session, email: str) -> Optional[models.User]:
        return db.query(models.User).filter(models.User.email == email).first()
    
    @staticmethod
    def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[models.User]:
        return db.query(models.User).offset(skip).limit(limit).all()
    
    @staticmethod
    def create_user(db: Session, user: schemas.UserCreate) -> models.User:
        hashed_password = get_password_hash(user.password)
        db_user = models.User(
            username=user.username,
            email=user.email,
            hashed_password=hashed_password,
            full_name=user.full_name,
            role=user.role
        )
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user
    
    @staticmethod
    def update_user(db: Session, user_id: int, user_update: schemas.UserUpdate) -> Optional[models.User]:
        db_user = db.query(models.User).filter(models.User.id == user_id).first()
        if db_user:
            update_data = user_update.dict(exclude_unset=True)
            for field, value in update_data.items():
                setattr(db_user, field, value)
            db.commit()
            db.refresh(db_user)
        return db_user
    
    @staticmethod
    def delete_user(db: Session, user_id: int) -> bool:
        db_user = db.query(models.User).filter(models.User.id == user_id).first()
        if db_user:
            db.delete(db_user)
            db.commit()
            return True
        return False

class PostCRUD:
    @staticmethod
    def get_post(db: Session, post_id: int) -> Optional[models.Post]:
        return db.query(models.Post).filter(models.Post.id == post_id).first()
    
    @staticmethod
    def get_posts(
        db: Session, 
        skip: int = 0, 
        limit: int = 100,
        author_id: Optional[int] = None,
        published_only: bool = True
    ) -> List[models.Post]:
        query = db.query(models.Post)
        if author_id:
            query = query.filter(models.Post.author_id == author_id)
        if published_only:
            query = query.filter(models.Post.is_published == True)
        return query.offset(skip).limit(limit).all()
    
    @staticmethod
    def create_post(db: Session, post: schemas.PostCreate, author_id: int) -> models.Post:
        db_post = models.Post(**post.dict(), author_id=author_id)
        db.add(db_post)
        db.commit()
        db.refresh(db_post)
        return db_post
    
    @staticmethod
    def update_post(db: Session, post_id: int, post_update: schemas.PostUpdate) -> Optional[models.Post]:
        db_post = db.query(models.Post).filter(models.Post.id == post_id).first()
        if db_post:
            update_data = post_update.dict(exclude_unset=True)
            for field, value in update_data.items():
                setattr(db_post, field, value)
            db.commit()
            db.refresh(db_post)
        return db_post
    
    @staticmethod
    def add_tag_to_post(db: Session, post_id: int, tag_id: int) -> bool:
        db_post = db.query(models.Post).filter(models.Post.id == post_id).first()
        db_tag = db.query(models.Tag).filter(models.Tag.id == tag_id).first()
        
        if db_post and db_tag:
            # 检查是否已存在关联
            existing = db.query(models.PostTag).filter(
                and_(models.PostTag.post_id == post_id, models.PostTag.tag_id == tag_id)
            ).first()
            if not existing:
                post_tag = models.PostTag(post_id=post_id, tag_id=tag_id)
                db.add(post_tag)
                db.commit()
            return True
        return False

class TagCRUD:
    @staticmethod
    def get_or_create_tag(db: Session, tag_name: str) -> models.Tag:
        db_tag = db.query(models.Tag).filter(models.Tag.name == tag_name).first()
        if not db_tag:
            db_tag = models.Tag(name=tag_name)
            db.add(db_tag)
            db.commit()
            db.refresh(db_tag)
        return db_tag

main.py

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
import models
import schemas
import crud
from database import engine, get_db
from auth import get_current_active_user

# 创建数据库表
models.Base.metadata.create_all(bind=engine)

app = FastAPI(title="博客系统API", version="1.0.0")

# 用户路由
@app.post("/users/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.UserCRUD.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="邮箱已注册")
    return crud.UserCRUD.create_user(db=db, user=user)

@app.get("/users/me", response_model=schemas.UserResponse)
def read_user_me(current_user: models.User = Depends(get_current_active_user)):
    return current_user

@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.UserCRUD.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="用户不存在")
    return db_user

# 文章路由
@app.post("/posts/", response_model=schemas.PostResponse)
def create_post(
    post: schemas.PostCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_active_user)
):
    return crud.PostCRUD.create_post(db=db, post=post, author_id=current_user.id)

@app.get("/posts/", response_model=List[schemas.PostResponse])
def read_posts(
    skip: int = 0,
    limit: int = 100,
    author_id: Optional[int] = None,
    db: Session = Depends(get_db)
):
    posts = crud.PostCRUD.get_posts(
        db, skip=skip, limit=limit, author_id=author_id
    )
    return posts

@app.put("/posts/{post_id}", response_model=schemas.PostResponse)
def update_post(
    post_id: int,
    post_update: schemas.PostUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_active_user)
):
    db_post = crud.PostCRUD.get_post(db, post_id=post_id)
    if not db_post:
        raise HTTPException(status_code=404, detail="文章不存在")
    if db_post.author_id != current_user.id and current_user.role != "admin":
        raise HTTPException(status_code=403, detail="无权修改此文章")
    return crud.PostCRUD.update_post(db=db, post_id=post_id, post_update=post_update)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Django ORM 案例

django_blog/
├── manage.py
├── requirements.txt
└── blog/
    ├── __init__.py
    ├── settings.py
    ├── urls.py
    ├── models.py
    ├── serializers.py
    ├── views.py
    ├── admin.py --------自动生成管理界面
    └── migrations/
        └── __init__.py

models.py

from django.db import models
from django.contrib.auth.models import AbstractUser
from django.utils import timezone
from django.core.validators import MinLengthValidator

class User(AbstractUser):# 模型继承:支持AbstractUser等基类扩展
    ROLE_CHOICES = [
        ('admin', '管理员'),
        ('author', '作者'),
        ('user', '普通用户'),
    ]
    
    # 覆盖AbstractUser的字段或添加新字段
    email = models.EmailField(unique=True, verbose_name='邮箱')
    role = models.CharField(max_length=20, choices=ROLE_CHOICES, default='user', verbose_name='角色')
    bio = models.TextField(blank=True, null=True, verbose_name='个人简介')
    avatar = models.ImageField(upload_to='avatars/', blank=True, null=True, verbose_name='头像')
    website = models.URLField(blank=True, null=True, verbose_name='个人网站')
    
    # 修改USERNAME_FIELD为email
    USERNAME_FIELD = 'email'
    REQUIRED_FIELDS = ['username']
    
    class Meta:
        verbose_name = '用户'
        verbose_name_plural = verbose_name
        db_table = 'users'
    
    def __str__(self):
        return self.email
    
    @property
    def is_author(self):
        return self.role in ['author', 'admin']

class Tag(models.Model):
    name = models.CharField(max_length=50, unique=True, verbose_name='标签名')
    description = models.CharField(max_length=200, blank=True, null=True, verbose_name='描述')
    color = models.CharField(max_length=7, default='#007bff', verbose_name='颜色')  # HEX颜色
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    
    class Meta:
        verbose_name = '标签'
        verbose_name_plural = verbose_name
        db_table = 'tags'
        ordering = ['name']
    
    def __str__(self):
        return self.name

class Category(models.Model):
    name = models.CharField(max_length=100, verbose_name='分类名')
    slug = models.SlugField(unique=True, verbose_name='URL标识')
    description = models.TextField(blank=True, null=True, verbose_name='描述')
    parent = models.ForeignKey(
        'self', 
        on_delete=models.CASCADE, 
        blank=True, 
        null=True, 
        related_name='children',
        verbose_name='父分类'
    )
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    
    class Meta:
        verbose_name = '分类'
        verbose_name_plural = verbose_name
        db_table = 'categories'
        ordering = ['name']
        unique_together = ['slug', 'parent']
    
    def __str__(self):
        return self.name
    
    @property
    def is_root(self):
        return self.parent is None

class Post(models.Model):
    STATUS_CHOICES = [
        ('draft', '草稿'),
        ('published', '已发布'),
        ('archived', '已归档'),
    ]
    
    title = models.CharField(max_length=200, verbose_name='标题')
    slug = models.SlugField(unique=True, verbose_name='URL标识')
    content = models.TextField(verbose_name='内容')
    summary = models.CharField(max_length=500, blank=True, null=True, verbose_name='摘要')
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='draft', verbose_name='状态')
    is_featured = models.BooleanField(default=False, verbose_name='是否推荐')
    view_count = models.PositiveIntegerField(default=0, verbose_name='浏览数')
    like_count = models.PositiveIntegerField(default=0, verbose_name='点赞数')
    
    # 关系字段
    author = models.ForeignKey(
        User, 
        on_delete=models.CASCADE, 
        related_name='posts',
        verbose_name='作者'
    )
    category = models.ForeignKey(
        Category,
        on_delete=models.SET_NULL,
        blank=True,
        null=True,
        related_name='posts',
        verbose_name='分类'
    )
    tags = models.ManyToManyField(
        Tag,
        through='PostTag',
        related_name='posts',
        verbose_name='标签'
    )
    
    # 时间字段
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
    published_at = models.DateTimeField(blank=True, null=True, verbose_name='发布时间')
    
    class Meta:
        verbose_name = '文章'
        verbose_name_plural = verbose_name
        db_table = 'posts'
        ordering = ['-created_at']
        indexes = [
            models.Index(fields=['status', 'published_at']),
            models.Index(fields=['author', 'status']),
        ]
    
    def __str__(self):
        return self.title
    
    def save(self, *args, **kwargs):
        # 自动设置发布时间
        if self.status == 'published' and not self.published_at:
            self.published_at = timezone.now()
        super().save(*args, **kwargs)
    
    @property
    def is_published(self):
        return self.status == 'published'

class PostTag(models.Model):
    post = models.ForeignKey(Post, on_delete=models.CASCADE, verbose_name='文章')
    tag = models.ForeignKey(Tag, on_delete=models.CASCADE, verbose_name='标签')
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='关联时间')
    
    class Meta:
        verbose_name = '文章标签关联'
        verbose_name_plural = verbose_name
        db_table = 'post_tags'
        unique_together = ['post', 'tag']

class Comment(models.Model):
    content = models.TextField(validators=[MinLengthValidator(10)], verbose_name='评论内容')
    is_approved = models.BooleanField(default=False, verbose_name='是否审核通过')
    
    # 关系字段
    author = models.ForeignKey(
        User,
        on_delete=models.CASCADE,
        related_name='comments',
        verbose_name='评论者'
    )
    post = models.ForeignKey(
        Post,
        on_delete=models.CASCADE,
        related_name='comments',
        verbose_name='所属文章'
    )
    parent = models.ForeignKey(
        'self',
        on_delete=models.CASCADE,
        blank=True,
        null=True,
        related_name='replies',
        verbose_name='父评论'
    )
    
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='评论时间')
    updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
    
    class Meta:
        verbose_name = '评论'
        verbose_name_plural = verbose_name
        db_table = 'comments'
        ordering = ['-created_at']
    
    def __str__(self):
        return f"{self.author.username} - {self.content[:50]}"
    
    @property
    def is_reply(self):
        return self.parent is not None

serializers.py

# blog/serializers.py
from rest_framework import serializers
from django.contrib.auth import authenticate
from .models import User, Post, Tag, Category, Comment

class UserSerializer(serializers.ModelSerializer):
    post_count = serializers.SerializerMethodField()
    
    class Meta:
        model = User
        fields = ('id', 'username', 'email', 'role', 'bio', 'avatar', 
                 'website', 'date_joined', 'post_count')
        read_only_fields = ('id', 'date_joined', 'post_count')
    
    def get_post_count(self, obj):
        return obj.posts.count()

class UserRegisterSerializer(serializers.ModelSerializer):
    password = serializers.CharField(write_only=True, min_length=6)
    password_confirm = serializers.CharField(write_only=True)
    
    class Meta:
        model = User
        fields = ('username', 'email', 'password', 'password_confirm', 'role')
    
    def validate(self, data):
        if data['password'] != data['password_confirm']:
            raise serializers.ValidationError("两次密码不一致")
        return data
    
    def create(self, validated_data):
        validated_data.pop('password_confirm')
        user = User.objects.create_user(**validated_data)
        return user

class TagSerializer(serializers.ModelSerializer):
    post_count = serializers.SerializerMethodField()
    
    class Meta:
        model = Tag
        fields = '__all__'
    
    def get_post_count(self, obj):
        return obj.posts.count()

class CategorySerializer(serializers.ModelSerializer):
    post_count = serializers.SerializerMethodField()
    children = serializers.SerializerMethodField()
    
    class Meta:
        model = Category
        fields = '__all__'
    
    def get_post_count(self, obj):
        return obj.posts.count()
    
    def get_children(self, obj):
        if obj.children.exists():
            return CategorySerializer(obj.children.all(), many=True).data
        return []

class PostListSerializer(serializers.ModelSerializer):
    author = UserSerializer(read_only=True)
    category = CategorySerializer(read_only=True)
    tags = TagSerializer(many=True, read_only=True)
    comment_count = serializers.SerializerMethodField()
    
    class Meta:
        model = Post
        fields = ('id', 'title', 'slug', 'summary', 'status', 'is_featured',
                 'view_count', 'like_count', 'author', 'category', 'tags',
                 'created_at', 'published_at', 'comment_count')
    
    def get_comment_count(self, obj):
        return obj.comments.count()

class PostDetailSerializer(PostListSerializer):
    class Meta(PostListSerializer.Meta):
        fields = PostListSerializer.Meta.fields + ('content',)

class PostCreateSerializer(serializers.ModelSerializer):
    tags = serializers.PrimaryKeyRelatedField(
        queryset=Tag.objects.all(), many=True, required=False
    )
    
    class Meta:
        model = Post
        fields = ('title', 'slug', 'content', 'summary', 'status', 
                 'is_featured', 'category', 'tags')
    
    def create(self, validated_data):
        tags = validated_data.pop('tags', [])
        post = Post.objects.create(**validated_data)
        post.tags.set(tags)
        return post
    
    def update(self, instance, validated_data):
        tags = validated_data.pop('tags', None)
        for attr, value in validated_data.items():
            setattr(instance, attr, value)
        instance.save()  #信号系统:save、delete等操作的钩子函数
        
        if tags is not None:
            instance.tags.set(tags)
        
        return instance

class CommentSerializer(serializers.ModelSerializer):
    author = UserSerializer(read_only=True)
    replies = serializers.SerializerMethodField()
    
    class Meta:
        model = Comment
        fields = '__all__'
        read_only_fields = ('author', 'created_at', 'updated_at')
    
    def get_replies(self, obj):
        if obj.replies.exists():
            return CommentSerializer(obj.replies.all(), many=True).data
        return []

views.py

# blog/views.py
from rest_framework import generics, status, permissions
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import filters
from django.db.models import Q, Count
from .models import Post, User, Tag, Category, Comment
from .serializers import (
    UserSerializer, UserRegisterSerializer, PostListSerializer,
    PostDetailSerializer, PostCreateSerializer, TagSerializer,
    CategorySerializer, CommentSerializer
)
from .permissions import IsAuthorOrReadOnly, IsAdminUser

class UserRegisterView(generics.CreateAPIView):
    queryset = User.objects.all()
    serializer_class = UserRegisterSerializer
    permission_classes = [permissions.AllowAny]

class UserProfileView(generics.RetrieveUpdateAPIView):
    serializer_class = UserSerializer
    
    def get_object(self):
        return self.request.user

class PostListView(generics.ListCreateAPIView):
    serializer_class = PostListSerializer
    filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
    filterset_fields = ['author', 'category', 'tags', 'status', 'is_featured']
    search_fields = ['title', 'content', 'summary']
    ordering_fields = ['created_at', 'updated_at', 'published_at', 'view_count', 'like_count']
    ordering = ['-created_at']
    
    def get_queryset(self):
        queryset = Post.objects.select_related('author', 'category').prefetch_related('tags')
        
        # 非管理员只能看到已发布文章
        if not self.request.user.is_authenticated or not self.request.user.is_staff:
            queryset = queryset.filter(status='published')
        
        return queryset
    
    def perform_create(self, serializer):
        serializer.save(author=self.request.user)

class PostDetailView(generics.RetrieveUpdateDestroyAPIView):
    queryset = Post.objects.select_related('author', 'category').prefetch_related('tags')
    permission_classes = [IsAuthorOrReadOnly]
    
    def get_serializer_class(self):
        if self.request.method == 'GET':
            return PostDetailSerializer
        return PostCreateSerializer
    
    def retrieve(self, request, *args, **kwargs):
        instance = self.get_object()
        
        # 增加浏览数(只有已发布文章才计数)
        if instance.is_published:
            instance.view_count += 1
            instance.save(update_fields=['view_count'])
        
        serializer = self.get_serializer(instance)
        return Response(serializer.data)

class TagListView(generics.ListAPIView):
    queryset = Tag.objects.annotate(post_count=Count('posts'))
    serializer_class = TagSerializer
    filter_backends = [filters.SearchFilter]
    search_fields = ['name']

class CategoryListView(generics.ListAPIView):
    queryset = Category.objects.filter(parent__isnull=True).prefetch_related('children')
    serializer_class = CategorySerializer

class CommentListView(generics.ListCreateAPIView):
    serializer_class = CommentSerializer
    
    def get_queryset(self):
        post_id = self.kwargs['post_id']
        return Comment.objects.filter(
            post_id=post_id, 
            parent__isnull=True
        ).select_related('author').prefetch_related('replies')
    
    def perform_create(self, serializer):
        post_id = self.kwargs['post_id']
        serializer.save(author=self.request.user, post_id=post_id)

@api_view(['POST'])
@permission_classes([permissions.IsAuthenticated])
def like_post(request, pk):
    try:
        post = Post.objects.get(pk=pk)
    except Post.DoesNotExist:
        return Response({'error': '文章不存在'}, status=status.HTTP_404_NOT_FOUND)
    
    # 简单的点赞实现
    post.like_count += 1
    post.save(update_fields=['like_count'])
    
    return Response({'like_count': post.like_count})

# 复杂的查询示例
class PostStatsView(generics.GenericAPIView):
    permission_classes = [IsAdminUser]
    
    def get(self, request):
        from django.db.models import Count, Avg
        from django.utils import timezone
        from datetime import timedelta
        
        # 最近30天的统计
        thirty_days_ago = timezone.now() - timedelta(days=30)
        
        stats = {
            'total_posts': Post.objects.count(),
            'published_posts': Post.objects.filter(status='published').count(),
            'total_comments': Comment.objects.count(),
            'approved_comments': Comment.objects.filter(is_approved=True).count(),
            'recent_posts': Post.objects.filter(created_at__gte=thirty_days_ago).count(),
            'top_authors': User.objects.annotate(
                post_count=Count('posts')
            ).filter(post_count__gt=0).order_by('-post_count')[:5].values('username', 'post_count'),
            'popular_tags': Tag.objects.annotate(
                post_count=Count('posts')
            ).order_by('-post_count')[:10].values('name', 'post_count')
        }
        
        return Response(stats)

admin.py

# blog/admin.py
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from .models import User, Post, Tag, Category, Comment, PostTag

@admin.register(User)
class CustomUserAdmin(UserAdmin):
    list_display = ('email', 'username', 'role', 'is_staff', 'date_joined')
    list_filter = ('role', 'is_staff', 'is_superuser', 'date_joined')
    search_fields = ('email', 'username')
    ordering = ('-date_joined',)
    
    fieldsets = UserAdmin.fieldsets + (
        ('扩展信息', {'fields': ('role', 'bio', 'avatar', 'website')}),
    )

@admin.register(Post)
class PostAdmin(admin.ModelAdmin):
    list_display = ('title', 'author', 'status', 'category', 'view_count', 'created_at')
    list_filter = ('status', 'category', 'tags', 'created_at')
    search_fields = ('title', 'content')
    prepopulated_fields = {'slug': ('title',)}
    raw_id_fields = ('author',)
    date_hierarchy = 'created_at'
    
    def get_queryset(self, request):
        return super().get_queryset(request).select_related('author', 'category')

@admin.register(Comment)
class CommentAdmin(admin.ModelAdmin):
    list_display = ('author', 'post', 'content_preview', 'is_approved', 'created_at')
    list_filter = ('is_approved', 'created_at')
    search_fields = ('content', 'author__username', 'post__title')
    actions = ['approve_comments']
    
    def content_preview(self, obj):
        return obj.content[:50] + '...' if len(obj.content) > 50 else obj.content
    content_preview.short_description = '内容预览'
    
    def approve_comments(self, request, queryset):
        queryset.update(is_approved=True)
    approve_comments.short_description = "审核通过选中的评论"

admin.site.register(Tag)
admin.site.register(Category)
admin.site.register(PostTag)

Flask + SQLAlchemy 案例

flask_blog/
├── app.py
├── config.py
├── models.py
├── routes/
│   ├── __init__.py
│   ├── auth.py
│   ├── posts.py
│   └── users.py
├── utils/
│   ├── __init__.py
│   └── decorators.py
└── requirements.txt

config.py

显式配置管理:通过Config类管理不同环境配置

# config.py
import os
from datetime import timedelta

basedir = os.path.abspath(os.path.dirname(__file__))

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
        'sqlite:///' + os.path.join(basedir, 'app.db')
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
    
    # 分页配置
    POSTS_PER_PAGE = 10
    COMMENTS_PER_PAGE = 20

class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_ECHO = True

class ProductionConfig(Config):
    DEBUG = False

config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'default': DevelopmentConfig
}

models.py

from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_jwt_extended import create_access_token
from datetime import datetime
import jwt
from config import Config

db = SQLAlchemy()
bcrypt = Bcrypt()

class TimestampMixin:
    created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class User(db.Model, TimestampMixin):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, index=True, nullable=False)
    email = db.Column(db.String(120), unique=True, index=True, nullable=False)
    password_hash = db.Column(db.String(128), nullable=False)
    full_name = db.Column(db.String(100))
    bio = db.Column(db.Text)
    avatar_url = db.Column(db.String(200))
    website = db.Column(db.String(200))
    role = db.Column(db.String(20), default='user', nullable=False)
    is_active = db.Column(db.Boolean, default=True)
    last_login = db.Column(db.DateTime)
    
    # 关系
    posts = db.relationship('Post', backref='author', lazy='dynamic', cascade='all, delete-orphan')
    comments = db.relationship('Comment', backref='author', lazy='dynamic', cascade='all, delete-orphan')
    
    def set_password(self, password):
        self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
    
    def check_password(self, password):
        return bcrypt.check_password_hash(self.password_hash, password)
    
    def generate_token(self):
        return create_access_token(identity=self.id)
    
    def to_dict(self):
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'full_name': self.full_name,
            'bio': self.bio,
            'avatar_url': self.avatar_url,
            'website': self.website,
            'role': self.role,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat() if self.updated_at else None,
            'last_login': self.last_login.isoformat() if self.last_login else None
        }
    
    def __repr__(self):
        return f'<User {self.username}>'

class Post(db.Model, TimestampMixin):
    __tablename__ = 'posts'
    
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    slug = db.Column(db.String(200), unique=True, index=True, nullable=False)
    content = db.Column(db.Text, nullable=False)
    summary = db.Column(db.String(500))
    status = db.Column(db.String(20), default='draft', nullable=False)  # draft, published, archived
    is_featured = db.Column(db.Boolean, default=False)
    view_count = db.Column(db.Integer, default=0)
    like_count = db.Column(db.Integer, default=0)
    
    # 外键
    author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
    
    # 关系
    category = db.relationship('Category', backref=db.backref('posts', lazy='dynamic'))
    tags = db.relationship('Tag', secondary='post_tags', backref=db.backref('posts', lazy='dynamic'))
    comments = db.relationship('Comment', backref='post', lazy='dynamic', cascade='all, delete-orphan')
    
    def to_dict(self):
        return {
            'id': self.id,
            'title': self.title,
            'slug': self.slug,
            'content': self.content,
            'summary': self.summary,
            'status': self.status,
            'is_featured': self.is_featured,
            'view_count': self.view_count,
            'like_count': self.like_count,
            'author': self.author.to_dict() if self.author else None,
            'category': self.category.to_dict() if self.category else None,
            'tags': [tag.to_dict() for tag in self.tags],
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat() if self.updated_at else None,
            'comment_count': self.comments.count()
        }
    
    def __repr__(self):
        return f'<Post {self.title}>'

class Category(db.Model, TimestampMixin):
    __tablename__ = 'categories'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    slug = db.Column(db.String(100), unique=True, index=True, nullable=False)
    description = db.Column(db.Text)
    parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
    
    # 自引用关系
    parent = db.relationship('Category', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
    
    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'slug': self.slug,
            'description': self.description,
            'parent_id': self.parent_id,
            'created_at': self.created_at.isoformat(),
            'post_count': self.posts.count()
        }
    
    def __repr__(self):
        return f'<Category {self.name}>'

class Tag(db.Model):
    __tablename__ = 'tags'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    description = db.Column(db.String(200))
    color = db.Column(db.String(7), default='#007bff')
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'description': self.description,
            'color': self.color,
            'created_at': self.created_at.isoformat(),
            'post_count': len(self.posts)
        }
    
    def __repr__(self):
        return f'<Tag {self.name}>'

class PostTag(db.Model):
    __tablename__ = 'post_tags'
    
    post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), primary_key=True)
    tag_id = db.Column(db.Integer, db.ForeignKey('tags.id'), primary_key=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

class Comment(db.Model, TimestampMixin):
    __tablename__ = 'comments'
    
    id = db.Column(db.Integer, primary_key=True)
    content = db.Column(db.Text, nullable=False)
    is_approved = db.Column(db.Boolean, default=False)
    
    # 外键
    author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), nullable=False)
    parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'))
    
    # 自引用关系(回复)
    parent = db.relationship('Comment', remote_side=[id], backref=db.backref('replies', lazy='dynamic'))
    
    def to_dict(self):
        return {
            'id': self.id,
            'content': self.content,
            'is_approved': self.is_approved,
            'author': self.author.to_dict() if self.author else None,
            'post_id': self.post_id,
            'parent_id': self.parent_id,
            'created_at': self.created_at.isoformat(),
            'replies': [reply.to_dict() for reply in self.replies] if self.replies else []
        }
    
    def __repr__(self):
        return f'<Comment {self.id} by {self.author.username}>'

decorators.py

# utils/decorators.py
from functools import wraps
from flask import request, jsonify, g
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
from models import User, db

def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        try:
            verify_jwt_in_request()
            user_id = get_jwt_identity()
            g.current_user = User.query.get(user_id)
            if not g.current_user or not g.current_user.is_active:
                return jsonify({'error': '无效的令牌或用户已被禁用'}), 401
        except Exception as e:
            return jsonify({'error': '令牌验证失败'}), 401
        return f(*args, **kwargs)
    return decorated

def admin_required(f):
    @wraps(f)
    @token_required
    def decorated(*args, **kwargs):
        if g.current_user.role != 'admin':
            return jsonify({'error': '需要管理员权限'}), 403
        return f(*args, **kwargs)
    return decorated

def author_required(f):
    @wraps(f)
    @token_required
    def decorated(*args, **kwargs):
        if g.current_user.role not in ['admin', 'author']:
            return jsonify({'error': '需要作者权限'}), 403
        return f(*args, **kwargs)
    return decorated

def validate_json(schema):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not request.is_json:
                return jsonify({'error': '请求必须是JSON格式'}), 400
            
            data = request.get_json()
            errors = schema.validate(data) if hasattr(schema, 'validate') else {}
            
            if errors:
                return jsonify({'errors': errors}), 400
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

posts.py

# routes/posts.py
from flask import Blueprint, request, jsonify, g
from sqlalchemy import or_, and_, desc, asc
from models import db, Post, Tag, Category, Comment, User
from utils.decorators import token_required, author_required, admin_required

posts_bp = Blueprint('posts', __name__)

@posts_bp.route('/posts', methods=['GET'])
def get_posts():
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    status_filter = request.args.get('status', 'published')
    category_slug = request.args.get('category')
    tag_name = request.args.get('tag')
    search = request.args.get('search')
    sort_by = request.args.get('sort_by', 'created_at')
    order = request.args.get('order', 'desc')
    
    # 构建查询
    query = Post.query
    
    # 状态过滤
    if status_filter:
        query = query.filter(Post.status == status_filter)
    
    # 分类过滤
    if category_slug:
        category = Category.query.filter_by(slug=category_slug).first()
        if category:
            query = query.filter(Post.category_id == category.id)
    
    # 标签过滤
    if tag_name:
        tag = Tag.query.filter_by(name=tag_name).first()
        if tag:
            query = query.filter(Post.tags.contains(tag))
    
    # 搜索
    if search:
        query = query.filter(
            or_(
                Post.title.ilike(f'%{search}%'),
                Post.content.ilike(f'%{search}%'),
                Post.summary.ilike(f'%{search}%')
            )
        )
    
    # 排序
    sort_column = getattr(Post, sort_by, Post.created_at)
    if order == 'asc':
        query = query.order_by(asc(sort_column))
    else:
        query = query.order_by(desc(sort_column))
    
    # 分页
    pagination = query.paginate(
        page=page, per_page=per_page, error_out=False
    )
    
    return jsonify({
        'posts': [post.to_dict() for post in pagination.items],
        'total': pagination.total,
        'pages': pagination.pages,
        'current_page': page,
        'has_next': pagination.has_next,
        'has_prev': pagination.has_prev
    })

@posts_bp.route('/posts', methods=['POST'])
@author_required
def create_post():
    data = request.get_json()
    
    # 检查slug是否唯一
    existing_post = Post.query.filter_by(slug=data.get('slug')).first()
    if existing_post:
        return jsonify({'error': 'URL标识已存在'}), 400
    
    post = Post(
        title=data['title'],
        slug=data['slug'],
        content=data['content'],
        summary=data.get('summary'),
        status=data.get('status', 'draft'),
        is_featured=data.get('is_featured', False),
        author_id=g.current_user.id,
        category_id=data.get('category_id')
    )
    
    # 处理标签
    if 'tags' in data:
        for tag_name in data['tags']:
            tag = Tag.query.filter_by(name=tag_name).first()
            if not tag:
                tag = Tag(name=tag_name)
                db.session.add(tag)
            post.tags.append(tag)
    
    db.session.add(post)
    db.session.commit()
    
    return jsonify(post.to_dict()), 201

@posts_bp.route('/posts/<int:post_id>', methods=['GET'])
def get_post(post_id):
    post = Post.query.get_or_404(post_id)
    
    # 增加浏览数
    if post.status == 'published':
        post.view_count += 1
        db.session.commit()
    
    return jsonify(post.to_dict())

@posts_bp.route('/posts/<int:post_id>', methods=['PUT'])
@token_required
def update_post(post_id):
    post = Post.query.get_or_404(post_id)
    
    # 权限检查
    if post.author_id != g.current_user.id and g.current_user.role != 'admin':
        return jsonify({'error': '无权修改此文章'}), 403
    
    data = request.get_json()
    
    # 更新字段
    if 'title' in data:
        post.title = data['title']
    if 'slug' in data:
        # 检查slug是否被其他文章使用
        existing = Post.query.filter(Post.slug == data['slug'], Post.id != post_id).first()
        if existing:
            return jsonify({'error': 'URL标识已被使用'}), 400
        post.slug = data['slug']
    if 'content' in data:
        post.content = data['content']
    if 'summary' in data:
        post.summary = data['summary']
    if 'status' in data:
        post.status = data['status']
    if 'is_featured' in data:
        post.is_featured = data['is_featured']
    if 'category_id' in data:
        post.category_id = data['category_id']
    
    # 更新标签
    if 'tags' in data:
        post.tags.clear()
        for tag_name in data['tags']:
            tag = Tag.query.filter_by(name=tag_name).first()
            if not tag:
                tag = Tag(name=tag_name)
                db.session.add(tag)
            post.tags.append(tag)
    
    db.session.commit()
    return jsonify(post.to_dict())

@posts_bp.route('/posts/<int:post_id>/like', methods=['POST'])
@token_required
def like_post(post_id):
    post = Post.query.get_or_404(post_id)
    post.like_count += 1
    db.session.commit()
    return jsonify({'like_count': post.like_count})

@posts_bp.route('/posts/<int:post_id>/comments', methods=['GET'])
def get_post_comments(post_id):
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 20, type=int)
    
    comments = Comment.query.filter_by(
        post_id=post_id, 
        parent_id=None,
        is_approved=True
    ).order_by(Comment.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False
    )
    
    return jsonify({
        'comments': [comment.to_dict() for comment in comments.items],
        'total': comments.total,
        'pages': comments.pages,
        'current_page': page
    })

app.py

# app.py
from flask import Flask, jsonify
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager
from config import config
from models import db, bcrypt
from routes.posts import posts_bp
from routes.auth import auth_bp
from routes.users import users_bp

def create_app(config_name='default'):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    
    # 初始化扩展
    db.init_app(app)
    bcrypt.init_app(app)
    jwt = JWTManager(app)
    migrate = Migrate(app, db)
    
    # 注册蓝图
    app.register_blueprint(posts_bp, url_prefix='/api')
    app.register_blueprint(auth_bp, url_prefix='/api/auth')
    app.register_blueprint(users_bp, url_prefix='/api')
    
    # 错误处理
    @app.errorhandler(404)
    def not_found(error):
        return jsonify({'error': '资源未找到'}), 404
    
    @app.errorhandler(500)
    def internal_error(error):
        return jsonify({'error': '服务器内部错误'}), 500
    
    @app.errorhandler(403)
    def forbidden(error):
        return jsonify({'error': '权限不足'}), 403
    
    # JWT配置
    @jwt.expired_token_loader
    def expired_token_callback(jwt_header, jwt_payload):
        return jsonify({'error': '令牌已过期'}), 401
    
    @jwt.invalid_token_loader
    def invalid_token_callback(error):
        return jsonify({'error': '无效的令牌'}), 401
    
    @jwt.unauthorized_loader
    def missing_token_callback(error):
        return jsonify({'error': '缺少访问令牌'}), 401
    
    # CLI命令
    @app.cli.command('init-db')
    def init_db():
        """初始化数据库"""
        db.create_all()
        print('数据库初始化完成')
    
    @app.cli.command('create-admin')
    def create_admin():
        """创建管理员用户"""
        from models import User
        admin = User(
            username='admin',
            email='admin@example.com',
            role='admin'
        )
        admin.set_password('admin123')
        db.session.add(admin)
        db.session.commit()
        print('管理员用户创建完成')
    
    return app

if __name__ == '__main__':
    app = create_app()
    app.run(debug=True)

总结对比

特性 FastAPI + SQLAlchemy Django ORM Flask + SQLAlchemy
配置方式 依赖注入自动管理 settings.py集中配置 Config类显式配置
会话管理 依赖注入自动处理 自动管理 手动commit/rollback
模型定义 SQLAlchemy声明式 Django Model类 SQLAlchemy声明式
数据验证 Pydantic自动验证 Serializer手动验证 手动或装饰器验证
查询语法 SQLAlchemy查询 QuerySet API SQLAlchemy查询
迁移工具 Alembic 内置migration Flask-Migrate
关系管理 relationship() ForeignKey/ManyToMany relationship()
序列化 Pydantic自动 Serializer类 手动to_dict()
学习曲线 中等 平缓 较陡

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐