Python数据持久层:数据库与ORM
ORM: 用Python对象操作数据库,无需直接编写SQL。数据库驱动:database.pymodels.py使用Python类定义数据表结构Pydantic模式定义请求/响应数据的序列化和验证crud.pymain.pyDjango ORM 案例models.pyserializers.pyviews.pyadmin.pyFlask + SQLAlchemy 案例config.py显式配置管理
ORM: 用Python对象操作数据库,无需直接编写SQL。
- Django ORM: Django自带,功能强大,易用性好。(数据库迁移工具Django Migrations: Django内置。)
- SQLAlchemy: Python界ORM的事实标准,功能极其丰富和灵活,学习曲线稍陡。(数据库迁移工具Alembic: 通常与SQLAlchemy配合使用。)
- Peewee: 轻量级的替代品,API更简单直观。
数据库驱动:
- PostgreSQL: psycopg2 或 asyncpg
- MySQL: mysqlclient 或 PyMySQL
- MongoDB: pymongo
- Redis: redis
FastAPI + SQLAlchemy 案例
fastapi_db_example/
├── main.py -----------FastAPI应用入口
├── database.py --------数据库配置和会话管理
├── models.py -----------SQLAlchemy模型定义
├── schemas.py ------------Pydantic模式定义
├── crud.py -----------数据库操作类
└── dependencies.py
database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
#数据库URL-支持多种数据库
DATABASE_URL = os.getenv("DATABASE_URL","sqlite:///./test.db")
#创建引擎
engine = create_engine(
DATABASE_URL,
connect_args = {"check_same_thread": False} if "sqlite" in DATABASE_URL else {}
)
#会话工厂
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)
#声明基类
Base = declarative_base()
#依赖注入获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
models.py
使用Python类定义数据表结构
from sqlalchemy import Column,Integer,String,DateTime,Boolean,Text,ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
from typing import List,Optional
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(100), nullable=False)
full_name = Column(String(100))
is_active = Column(Boolean, default=True)
role = Column(String(20), default="user")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# 关系定义
posts = relationship("Post", back_populates="author")
comments = relationship("Comment", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
summary = Column(String(500))
is_published = Column(Boolean, default=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# 关系
author = relationship("User", back_populates="posts")
comments = relationship("Comment", back_populates="post")
# 标签多对多关系
tags = relationship("Tag", secondary="post_tags", back_populates="posts")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# 关系
author = relationship("User", back_populates="comments")
post = relationship("Post", back_populates="comments")
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), unique=True, nullable=False)
description = Column(String(200))
posts = relationship("Post", secondary="post_tags", back_populates="tags")
# 多对多关联表
class PostTag(Base):
__tablename__ = "post_tags"
post_id = Column(Integer, ForeignKey("posts.id"), primary_key=True)
tag_id = Column(Integer, ForeignKey("tags.id"), primary_key=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
Pydantic模式定义
请求/响应数据的序列化和验证
from pydantic import BaseModel, EmailStr, validator
from typing import List, Optional
from datetime import datetime
class UserBase(BaseModel):
username: str
email: EmailStr
full_name: Optional[str] = None
role: str = "user"
class UserCreate(UserBase):
password: str
@validator('password')
def password_strength(cls, v):
if len(v) < 6:
raise ValueError('密码至少6位')
return v
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
full_name: Optional[str] = None
role: Optional[str] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
orm_mode = True
class PostBase(BaseModel):
title: str
content: str
summary: Optional[str] = None
is_published: bool = False
class PostCreate(PostBase):
pass
class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
summary: Optional[str] = None
is_published: Optional[bool] = None
class PostResponse(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: Optional[datetime] = None
author: UserResponse
tags: List["TagResponse"] = []
class Config:
orm_mode = True
class TagBase(BaseModel):
name: str
description: Optional[str] = None
class TagResponse(TagBase):
id: int
class Config:
orm_mode = True
class CommentBase(BaseModel):
content: str
class CommentCreate(CommentBase):
pass
class CommentResponse(CommentBase):
id: int
author_id: int
post_id: int
created_at: datetime
author: UserResponse
class Config:
orm_mode = True
crud.py
# crud.py - 数据库操作类
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from typing import List, Optional
import models
import schemas
from auth import get_password_hash
class UserCRUD:
@staticmethod
def get_user(db: Session, user_id: int) -> Optional[models.User]:
return db.query(models.User).filter(models.User.id == user_id).first()
@staticmethod
def get_user_by_email(db: Session, email: str) -> Optional[models.User]:
return db.query(models.User).filter(models.User.email == email).first()
@staticmethod
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[models.User]:
return db.query(models.User).offset(skip).limit(limit).all()
@staticmethod
def create_user(db: Session, user: schemas.UserCreate) -> models.User:
hashed_password = get_password_hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
hashed_password=hashed_password,
full_name=user.full_name,
role=user.role
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@staticmethod
def update_user(db: Session, user_id: int, user_update: schemas.UserUpdate) -> Optional[models.User]:
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if db_user:
update_data = user_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user
@staticmethod
def delete_user(db: Session, user_id: int) -> bool:
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if db_user:
db.delete(db_user)
db.commit()
return True
return False
class PostCRUD:
@staticmethod
def get_post(db: Session, post_id: int) -> Optional[models.Post]:
return db.query(models.Post).filter(models.Post.id == post_id).first()
@staticmethod
def get_posts(
db: Session,
skip: int = 0,
limit: int = 100,
author_id: Optional[int] = None,
published_only: bool = True
) -> List[models.Post]:
query = db.query(models.Post)
if author_id:
query = query.filter(models.Post.author_id == author_id)
if published_only:
query = query.filter(models.Post.is_published == True)
return query.offset(skip).limit(limit).all()
@staticmethod
def create_post(db: Session, post: schemas.PostCreate, author_id: int) -> models.Post:
db_post = models.Post(**post.dict(), author_id=author_id)
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post
@staticmethod
def update_post(db: Session, post_id: int, post_update: schemas.PostUpdate) -> Optional[models.Post]:
db_post = db.query(models.Post).filter(models.Post.id == post_id).first()
if db_post:
update_data = post_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_post, field, value)
db.commit()
db.refresh(db_post)
return db_post
@staticmethod
def add_tag_to_post(db: Session, post_id: int, tag_id: int) -> bool:
db_post = db.query(models.Post).filter(models.Post.id == post_id).first()
db_tag = db.query(models.Tag).filter(models.Tag.id == tag_id).first()
if db_post and db_tag:
# 检查是否已存在关联
existing = db.query(models.PostTag).filter(
and_(models.PostTag.post_id == post_id, models.PostTag.tag_id == tag_id)
).first()
if not existing:
post_tag = models.PostTag(post_id=post_id, tag_id=tag_id)
db.add(post_tag)
db.commit()
return True
return False
class TagCRUD:
@staticmethod
def get_or_create_tag(db: Session, tag_name: str) -> models.Tag:
db_tag = db.query(models.Tag).filter(models.Tag.name == tag_name).first()
if not db_tag:
db_tag = models.Tag(name=tag_name)
db.add(db_tag)
db.commit()
db.refresh(db_tag)
return db_tag
main.py
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
import models
import schemas
import crud
from database import engine, get_db
from auth import get_current_active_user
# 创建数据库表
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="博客系统API", version="1.0.0")
# 用户路由
@app.post("/users/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.UserCRUD.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="邮箱已注册")
return crud.UserCRUD.create_user(db=db, user=user)
@app.get("/users/me", response_model=schemas.UserResponse)
def read_user_me(current_user: models.User = Depends(get_current_active_user)):
return current_user
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.UserCRUD.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="用户不存在")
return db_user
# 文章路由
@app.post("/posts/", response_model=schemas.PostResponse)
def create_post(
post: schemas.PostCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_active_user)
):
return crud.PostCRUD.create_post(db=db, post=post, author_id=current_user.id)
@app.get("/posts/", response_model=List[schemas.PostResponse])
def read_posts(
skip: int = 0,
limit: int = 100,
author_id: Optional[int] = None,
db: Session = Depends(get_db)
):
posts = crud.PostCRUD.get_posts(
db, skip=skip, limit=limit, author_id=author_id
)
return posts
@app.put("/posts/{post_id}", response_model=schemas.PostResponse)
def update_post(
post_id: int,
post_update: schemas.PostUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_active_user)
):
db_post = crud.PostCRUD.get_post(db, post_id=post_id)
if not db_post:
raise HTTPException(status_code=404, detail="文章不存在")
if db_post.author_id != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权修改此文章")
return crud.PostCRUD.update_post(db=db, post_id=post_id, post_update=post_update)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Django ORM 案例
django_blog/
├── manage.py
├── requirements.txt
└── blog/
├── __init__.py
├── settings.py
├── urls.py
├── models.py
├── serializers.py
├── views.py
├── admin.py --------自动生成管理界面
└── migrations/
└── __init__.py
models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.utils import timezone
from django.core.validators import MinLengthValidator
class User(AbstractUser):# 模型继承:支持AbstractUser等基类扩展
ROLE_CHOICES = [
('admin', '管理员'),
('author', '作者'),
('user', '普通用户'),
]
# 覆盖AbstractUser的字段或添加新字段
email = models.EmailField(unique=True, verbose_name='邮箱')
role = models.CharField(max_length=20, choices=ROLE_CHOICES, default='user', verbose_name='角色')
bio = models.TextField(blank=True, null=True, verbose_name='个人简介')
avatar = models.ImageField(upload_to='avatars/', blank=True, null=True, verbose_name='头像')
website = models.URLField(blank=True, null=True, verbose_name='个人网站')
# 修改USERNAME_FIELD为email
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class Meta:
verbose_name = '用户'
verbose_name_plural = verbose_name
db_table = 'users'
def __str__(self):
return self.email
@property
def is_author(self):
return self.role in ['author', 'admin']
class Tag(models.Model):
name = models.CharField(max_length=50, unique=True, verbose_name='标签名')
description = models.CharField(max_length=200, blank=True, null=True, verbose_name='描述')
color = models.CharField(max_length=7, default='#007bff', verbose_name='颜色') # HEX颜色
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
verbose_name = '标签'
verbose_name_plural = verbose_name
db_table = 'tags'
ordering = ['name']
def __str__(self):
return self.name
class Category(models.Model):
name = models.CharField(max_length=100, verbose_name='分类名')
slug = models.SlugField(unique=True, verbose_name='URL标识')
description = models.TextField(blank=True, null=True, verbose_name='描述')
parent = models.ForeignKey(
'self',
on_delete=models.CASCADE,
blank=True,
null=True,
related_name='children',
verbose_name='父分类'
)
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
verbose_name = '分类'
verbose_name_plural = verbose_name
db_table = 'categories'
ordering = ['name']
unique_together = ['slug', 'parent']
def __str__(self):
return self.name
@property
def is_root(self):
return self.parent is None
class Post(models.Model):
STATUS_CHOICES = [
('draft', '草稿'),
('published', '已发布'),
('archived', '已归档'),
]
title = models.CharField(max_length=200, verbose_name='标题')
slug = models.SlugField(unique=True, verbose_name='URL标识')
content = models.TextField(verbose_name='内容')
summary = models.CharField(max_length=500, blank=True, null=True, verbose_name='摘要')
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='draft', verbose_name='状态')
is_featured = models.BooleanField(default=False, verbose_name='是否推荐')
view_count = models.PositiveIntegerField(default=0, verbose_name='浏览数')
like_count = models.PositiveIntegerField(default=0, verbose_name='点赞数')
# 关系字段
author = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='posts',
verbose_name='作者'
)
category = models.ForeignKey(
Category,
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name='posts',
verbose_name='分类'
)
tags = models.ManyToManyField(
Tag,
through='PostTag',
related_name='posts',
verbose_name='标签'
)
# 时间字段
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
published_at = models.DateTimeField(blank=True, null=True, verbose_name='发布时间')
class Meta:
verbose_name = '文章'
verbose_name_plural = verbose_name
db_table = 'posts'
ordering = ['-created_at']
indexes = [
models.Index(fields=['status', 'published_at']),
models.Index(fields=['author', 'status']),
]
def __str__(self):
return self.title
def save(self, *args, **kwargs):
# 自动设置发布时间
if self.status == 'published' and not self.published_at:
self.published_at = timezone.now()
super().save(*args, **kwargs)
@property
def is_published(self):
return self.status == 'published'
class PostTag(models.Model):
post = models.ForeignKey(Post, on_delete=models.CASCADE, verbose_name='文章')
tag = models.ForeignKey(Tag, on_delete=models.CASCADE, verbose_name='标签')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='关联时间')
class Meta:
verbose_name = '文章标签关联'
verbose_name_plural = verbose_name
db_table = 'post_tags'
unique_together = ['post', 'tag']
class Comment(models.Model):
content = models.TextField(validators=[MinLengthValidator(10)], verbose_name='评论内容')
is_approved = models.BooleanField(default=False, verbose_name='是否审核通过')
# 关系字段
author = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='comments',
verbose_name='评论者'
)
post = models.ForeignKey(
Post,
on_delete=models.CASCADE,
related_name='comments',
verbose_name='所属文章'
)
parent = models.ForeignKey(
'self',
on_delete=models.CASCADE,
blank=True,
null=True,
related_name='replies',
verbose_name='父评论'
)
created_at = models.DateTimeField(auto_now_add=True, verbose_name='评论时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
class Meta:
verbose_name = '评论'
verbose_name_plural = verbose_name
db_table = 'comments'
ordering = ['-created_at']
def __str__(self):
return f"{self.author.username} - {self.content[:50]}"
@property
def is_reply(self):
return self.parent is not None
serializers.py
# blog/serializers.py
from rest_framework import serializers
from django.contrib.auth import authenticate
from .models import User, Post, Tag, Category, Comment
class UserSerializer(serializers.ModelSerializer):
post_count = serializers.SerializerMethodField()
class Meta:
model = User
fields = ('id', 'username', 'email', 'role', 'bio', 'avatar',
'website', 'date_joined', 'post_count')
read_only_fields = ('id', 'date_joined', 'post_count')
def get_post_count(self, obj):
return obj.posts.count()
class UserRegisterSerializer(serializers.ModelSerializer):
password = serializers.CharField(write_only=True, min_length=6)
password_confirm = serializers.CharField(write_only=True)
class Meta:
model = User
fields = ('username', 'email', 'password', 'password_confirm', 'role')
def validate(self, data):
if data['password'] != data['password_confirm']:
raise serializers.ValidationError("两次密码不一致")
return data
def create(self, validated_data):
validated_data.pop('password_confirm')
user = User.objects.create_user(**validated_data)
return user
class TagSerializer(serializers.ModelSerializer):
post_count = serializers.SerializerMethodField()
class Meta:
model = Tag
fields = '__all__'
def get_post_count(self, obj):
return obj.posts.count()
class CategorySerializer(serializers.ModelSerializer):
post_count = serializers.SerializerMethodField()
children = serializers.SerializerMethodField()
class Meta:
model = Category
fields = '__all__'
def get_post_count(self, obj):
return obj.posts.count()
def get_children(self, obj):
if obj.children.exists():
return CategorySerializer(obj.children.all(), many=True).data
return []
class PostListSerializer(serializers.ModelSerializer):
author = UserSerializer(read_only=True)
category = CategorySerializer(read_only=True)
tags = TagSerializer(many=True, read_only=True)
comment_count = serializers.SerializerMethodField()
class Meta:
model = Post
fields = ('id', 'title', 'slug', 'summary', 'status', 'is_featured',
'view_count', 'like_count', 'author', 'category', 'tags',
'created_at', 'published_at', 'comment_count')
def get_comment_count(self, obj):
return obj.comments.count()
class PostDetailSerializer(PostListSerializer):
class Meta(PostListSerializer.Meta):
fields = PostListSerializer.Meta.fields + ('content',)
class PostCreateSerializer(serializers.ModelSerializer):
tags = serializers.PrimaryKeyRelatedField(
queryset=Tag.objects.all(), many=True, required=False
)
class Meta:
model = Post
fields = ('title', 'slug', 'content', 'summary', 'status',
'is_featured', 'category', 'tags')
def create(self, validated_data):
tags = validated_data.pop('tags', [])
post = Post.objects.create(**validated_data)
post.tags.set(tags)
return post
def update(self, instance, validated_data):
tags = validated_data.pop('tags', None)
for attr, value in validated_data.items():
setattr(instance, attr, value)
instance.save() #信号系统:save、delete等操作的钩子函数
if tags is not None:
instance.tags.set(tags)
return instance
class CommentSerializer(serializers.ModelSerializer):
author = UserSerializer(read_only=True)
replies = serializers.SerializerMethodField()
class Meta:
model = Comment
fields = '__all__'
read_only_fields = ('author', 'created_at', 'updated_at')
def get_replies(self, obj):
if obj.replies.exists():
return CommentSerializer(obj.replies.all(), many=True).data
return []
views.py
# blog/views.py
from rest_framework import generics, status, permissions
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import filters
from django.db.models import Q, Count
from .models import Post, User, Tag, Category, Comment
from .serializers import (
UserSerializer, UserRegisterSerializer, PostListSerializer,
PostDetailSerializer, PostCreateSerializer, TagSerializer,
CategorySerializer, CommentSerializer
)
from .permissions import IsAuthorOrReadOnly, IsAdminUser
class UserRegisterView(generics.CreateAPIView):
queryset = User.objects.all()
serializer_class = UserRegisterSerializer
permission_classes = [permissions.AllowAny]
class UserProfileView(generics.RetrieveUpdateAPIView):
serializer_class = UserSerializer
def get_object(self):
return self.request.user
class PostListView(generics.ListCreateAPIView):
serializer_class = PostListSerializer
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['author', 'category', 'tags', 'status', 'is_featured']
search_fields = ['title', 'content', 'summary']
ordering_fields = ['created_at', 'updated_at', 'published_at', 'view_count', 'like_count']
ordering = ['-created_at']
def get_queryset(self):
queryset = Post.objects.select_related('author', 'category').prefetch_related('tags')
# 非管理员只能看到已发布文章
if not self.request.user.is_authenticated or not self.request.user.is_staff:
queryset = queryset.filter(status='published')
return queryset
def perform_create(self, serializer):
serializer.save(author=self.request.user)
class PostDetailView(generics.RetrieveUpdateDestroyAPIView):
queryset = Post.objects.select_related('author', 'category').prefetch_related('tags')
permission_classes = [IsAuthorOrReadOnly]
def get_serializer_class(self):
if self.request.method == 'GET':
return PostDetailSerializer
return PostCreateSerializer
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
# 增加浏览数(只有已发布文章才计数)
if instance.is_published:
instance.view_count += 1
instance.save(update_fields=['view_count'])
serializer = self.get_serializer(instance)
return Response(serializer.data)
class TagListView(generics.ListAPIView):
queryset = Tag.objects.annotate(post_count=Count('posts'))
serializer_class = TagSerializer
filter_backends = [filters.SearchFilter]
search_fields = ['name']
class CategoryListView(generics.ListAPIView):
queryset = Category.objects.filter(parent__isnull=True).prefetch_related('children')
serializer_class = CategorySerializer
class CommentListView(generics.ListCreateAPIView):
serializer_class = CommentSerializer
def get_queryset(self):
post_id = self.kwargs['post_id']
return Comment.objects.filter(
post_id=post_id,
parent__isnull=True
).select_related('author').prefetch_related('replies')
def perform_create(self, serializer):
post_id = self.kwargs['post_id']
serializer.save(author=self.request.user, post_id=post_id)
@api_view(['POST'])
@permission_classes([permissions.IsAuthenticated])
def like_post(request, pk):
try:
post = Post.objects.get(pk=pk)
except Post.DoesNotExist:
return Response({'error': '文章不存在'}, status=status.HTTP_404_NOT_FOUND)
# 简单的点赞实现
post.like_count += 1
post.save(update_fields=['like_count'])
return Response({'like_count': post.like_count})
# 复杂的查询示例
class PostStatsView(generics.GenericAPIView):
permission_classes = [IsAdminUser]
def get(self, request):
from django.db.models import Count, Avg
from django.utils import timezone
from datetime import timedelta
# 最近30天的统计
thirty_days_ago = timezone.now() - timedelta(days=30)
stats = {
'total_posts': Post.objects.count(),
'published_posts': Post.objects.filter(status='published').count(),
'total_comments': Comment.objects.count(),
'approved_comments': Comment.objects.filter(is_approved=True).count(),
'recent_posts': Post.objects.filter(created_at__gte=thirty_days_ago).count(),
'top_authors': User.objects.annotate(
post_count=Count('posts')
).filter(post_count__gt=0).order_by('-post_count')[:5].values('username', 'post_count'),
'popular_tags': Tag.objects.annotate(
post_count=Count('posts')
).order_by('-post_count')[:10].values('name', 'post_count')
}
return Response(stats)
admin.py
# blog/admin.py
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from .models import User, Post, Tag, Category, Comment, PostTag
@admin.register(User)
class CustomUserAdmin(UserAdmin):
list_display = ('email', 'username', 'role', 'is_staff', 'date_joined')
list_filter = ('role', 'is_staff', 'is_superuser', 'date_joined')
search_fields = ('email', 'username')
ordering = ('-date_joined',)
fieldsets = UserAdmin.fieldsets + (
('扩展信息', {'fields': ('role', 'bio', 'avatar', 'website')}),
)
@admin.register(Post)
class PostAdmin(admin.ModelAdmin):
list_display = ('title', 'author', 'status', 'category', 'view_count', 'created_at')
list_filter = ('status', 'category', 'tags', 'created_at')
search_fields = ('title', 'content')
prepopulated_fields = {'slug': ('title',)}
raw_id_fields = ('author',)
date_hierarchy = 'created_at'
def get_queryset(self, request):
return super().get_queryset(request).select_related('author', 'category')
@admin.register(Comment)
class CommentAdmin(admin.ModelAdmin):
list_display = ('author', 'post', 'content_preview', 'is_approved', 'created_at')
list_filter = ('is_approved', 'created_at')
search_fields = ('content', 'author__username', 'post__title')
actions = ['approve_comments']
def content_preview(self, obj):
return obj.content[:50] + '...' if len(obj.content) > 50 else obj.content
content_preview.short_description = '内容预览'
def approve_comments(self, request, queryset):
queryset.update(is_approved=True)
approve_comments.short_description = "审核通过选中的评论"
admin.site.register(Tag)
admin.site.register(Category)
admin.site.register(PostTag)
Flask + SQLAlchemy 案例
flask_blog/
├── app.py
├── config.py
├── models.py
├── routes/
│ ├── __init__.py
│ ├── auth.py
│ ├── posts.py
│ └── users.py
├── utils/
│ ├── __init__.py
│ └── decorators.py
└── requirements.txt
config.py
显式配置管理:通过Config类管理不同环境配置
# config.py
import os
from datetime import timedelta
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
# 分页配置
POSTS_PER_PAGE = 10
COMMENTS_PER_PAGE = 20
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_ECHO = True
class ProductionConfig(Config):
DEBUG = False
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
models.py
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_jwt_extended import create_access_token
from datetime import datetime
import jwt
from config import Config
db = SQLAlchemy()
bcrypt = Bcrypt()
class TimestampMixin:
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class User(db.Model, TimestampMixin):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True, nullable=False)
email = db.Column(db.String(120), unique=True, index=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
full_name = db.Column(db.String(100))
bio = db.Column(db.Text)
avatar_url = db.Column(db.String(200))
website = db.Column(db.String(200))
role = db.Column(db.String(20), default='user', nullable=False)
is_active = db.Column(db.Boolean, default=True)
last_login = db.Column(db.DateTime)
# 关系
posts = db.relationship('Post', backref='author', lazy='dynamic', cascade='all, delete-orphan')
comments = db.relationship('Comment', backref='author', lazy='dynamic', cascade='all, delete-orphan')
def set_password(self, password):
self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
def check_password(self, password):
return bcrypt.check_password_hash(self.password_hash, password)
def generate_token(self):
return create_access_token(identity=self.id)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'full_name': self.full_name,
'bio': self.bio,
'avatar_url': self.avatar_url,
'website': self.website,
'role': self.role,
'is_active': self.is_active,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'last_login': self.last_login.isoformat() if self.last_login else None
}
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model, TimestampMixin):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(200), unique=True, index=True, nullable=False)
content = db.Column(db.Text, nullable=False)
summary = db.Column(db.String(500))
status = db.Column(db.String(20), default='draft', nullable=False) # draft, published, archived
is_featured = db.Column(db.Boolean, default=False)
view_count = db.Column(db.Integer, default=0)
like_count = db.Column(db.Integer, default=0)
# 外键
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
# 关系
category = db.relationship('Category', backref=db.backref('posts', lazy='dynamic'))
tags = db.relationship('Tag', secondary='post_tags', backref=db.backref('posts', lazy='dynamic'))
comments = db.relationship('Comment', backref='post', lazy='dynamic', cascade='all, delete-orphan')
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'slug': self.slug,
'content': self.content,
'summary': self.summary,
'status': self.status,
'is_featured': self.is_featured,
'view_count': self.view_count,
'like_count': self.like_count,
'author': self.author.to_dict() if self.author else None,
'category': self.category.to_dict() if self.category else None,
'tags': [tag.to_dict() for tag in self.tags],
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'comment_count': self.comments.count()
}
def __repr__(self):
return f'<Post {self.title}>'
class Category(db.Model, TimestampMixin):
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
slug = db.Column(db.String(100), unique=True, index=True, nullable=False)
description = db.Column(db.Text)
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
# 自引用关系
parent = db.relationship('Category', remote_side=[id], backref=db.backref('children', lazy='dynamic'))
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'slug': self.slug,
'description': self.description,
'parent_id': self.parent_id,
'created_at': self.created_at.isoformat(),
'post_count': self.posts.count()
}
def __repr__(self):
return f'<Category {self.name}>'
class Tag(db.Model):
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
description = db.Column(db.String(200))
color = db.Column(db.String(7), default='#007bff')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'color': self.color,
'created_at': self.created_at.isoformat(),
'post_count': len(self.posts)
}
def __repr__(self):
return f'<Tag {self.name}>'
class PostTag(db.Model):
__tablename__ = 'post_tags'
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), primary_key=True)
tag_id = db.Column(db.Integer, db.ForeignKey('tags.id'), primary_key=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class Comment(db.Model, TimestampMixin):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
is_approved = db.Column(db.Boolean, default=False)
# 外键
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'))
# 自引用关系(回复)
parent = db.relationship('Comment', remote_side=[id], backref=db.backref('replies', lazy='dynamic'))
def to_dict(self):
return {
'id': self.id,
'content': self.content,
'is_approved': self.is_approved,
'author': self.author.to_dict() if self.author else None,
'post_id': self.post_id,
'parent_id': self.parent_id,
'created_at': self.created_at.isoformat(),
'replies': [reply.to_dict() for reply in self.replies] if self.replies else []
}
def __repr__(self):
return f'<Comment {self.id} by {self.author.username}>'
decorators.py
# utils/decorators.py
from functools import wraps
from flask import request, jsonify, g
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
from models import User, db
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
verify_jwt_in_request()
user_id = get_jwt_identity()
g.current_user = User.query.get(user_id)
if not g.current_user or not g.current_user.is_active:
return jsonify({'error': '无效的令牌或用户已被禁用'}), 401
except Exception as e:
return jsonify({'error': '令牌验证失败'}), 401
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
@token_required
def decorated(*args, **kwargs):
if g.current_user.role != 'admin':
return jsonify({'error': '需要管理员权限'}), 403
return f(*args, **kwargs)
return decorated
def author_required(f):
@wraps(f)
@token_required
def decorated(*args, **kwargs):
if g.current_user.role not in ['admin', 'author']:
return jsonify({'error': '需要作者权限'}), 403
return f(*args, **kwargs)
return decorated
def validate_json(schema):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.is_json:
return jsonify({'error': '请求必须是JSON格式'}), 400
data = request.get_json()
errors = schema.validate(data) if hasattr(schema, 'validate') else {}
if errors:
return jsonify({'errors': errors}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
posts.py
# routes/posts.py
from flask import Blueprint, request, jsonify, g
from sqlalchemy import or_, and_, desc, asc
from models import db, Post, Tag, Category, Comment, User
from utils.decorators import token_required, author_required, admin_required
posts_bp = Blueprint('posts', __name__)
@posts_bp.route('/posts', methods=['GET'])
def get_posts():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
status_filter = request.args.get('status', 'published')
category_slug = request.args.get('category')
tag_name = request.args.get('tag')
search = request.args.get('search')
sort_by = request.args.get('sort_by', 'created_at')
order = request.args.get('order', 'desc')
# 构建查询
query = Post.query
# 状态过滤
if status_filter:
query = query.filter(Post.status == status_filter)
# 分类过滤
if category_slug:
category = Category.query.filter_by(slug=category_slug).first()
if category:
query = query.filter(Post.category_id == category.id)
# 标签过滤
if tag_name:
tag = Tag.query.filter_by(name=tag_name).first()
if tag:
query = query.filter(Post.tags.contains(tag))
# 搜索
if search:
query = query.filter(
or_(
Post.title.ilike(f'%{search}%'),
Post.content.ilike(f'%{search}%'),
Post.summary.ilike(f'%{search}%')
)
)
# 排序
sort_column = getattr(Post, sort_by, Post.created_at)
if order == 'asc':
query = query.order_by(asc(sort_column))
else:
query = query.order_by(desc(sort_column))
# 分页
pagination = query.paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'posts': [post.to_dict() for post in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
})
@posts_bp.route('/posts', methods=['POST'])
@author_required
def create_post():
data = request.get_json()
# 检查slug是否唯一
existing_post = Post.query.filter_by(slug=data.get('slug')).first()
if existing_post:
return jsonify({'error': 'URL标识已存在'}), 400
post = Post(
title=data['title'],
slug=data['slug'],
content=data['content'],
summary=data.get('summary'),
status=data.get('status', 'draft'),
is_featured=data.get('is_featured', False),
author_id=g.current_user.id,
category_id=data.get('category_id')
)
# 处理标签
if 'tags' in data:
for tag_name in data['tags']:
tag = Tag.query.filter_by(name=tag_name).first()
if not tag:
tag = Tag(name=tag_name)
db.session.add(tag)
post.tags.append(tag)
db.session.add(post)
db.session.commit()
return jsonify(post.to_dict()), 201
@posts_bp.route('/posts/<int:post_id>', methods=['GET'])
def get_post(post_id):
post = Post.query.get_or_404(post_id)
# 增加浏览数
if post.status == 'published':
post.view_count += 1
db.session.commit()
return jsonify(post.to_dict())
@posts_bp.route('/posts/<int:post_id>', methods=['PUT'])
@token_required
def update_post(post_id):
post = Post.query.get_or_404(post_id)
# 权限检查
if post.author_id != g.current_user.id and g.current_user.role != 'admin':
return jsonify({'error': '无权修改此文章'}), 403
data = request.get_json()
# 更新字段
if 'title' in data:
post.title = data['title']
if 'slug' in data:
# 检查slug是否被其他文章使用
existing = Post.query.filter(Post.slug == data['slug'], Post.id != post_id).first()
if existing:
return jsonify({'error': 'URL标识已被使用'}), 400
post.slug = data['slug']
if 'content' in data:
post.content = data['content']
if 'summary' in data:
post.summary = data['summary']
if 'status' in data:
post.status = data['status']
if 'is_featured' in data:
post.is_featured = data['is_featured']
if 'category_id' in data:
post.category_id = data['category_id']
# 更新标签
if 'tags' in data:
post.tags.clear()
for tag_name in data['tags']:
tag = Tag.query.filter_by(name=tag_name).first()
if not tag:
tag = Tag(name=tag_name)
db.session.add(tag)
post.tags.append(tag)
db.session.commit()
return jsonify(post.to_dict())
@posts_bp.route('/posts/<int:post_id>/like', methods=['POST'])
@token_required
def like_post(post_id):
post = Post.query.get_or_404(post_id)
post.like_count += 1
db.session.commit()
return jsonify({'like_count': post.like_count})
@posts_bp.route('/posts/<int:post_id>/comments', methods=['GET'])
def get_post_comments(post_id):
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
comments = Comment.query.filter_by(
post_id=post_id,
parent_id=None,
is_approved=True
).order_by(Comment.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'comments': [comment.to_dict() for comment in comments.items],
'total': comments.total,
'pages': comments.pages,
'current_page': page
})
app.py
# app.py
from flask import Flask, jsonify
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager
from config import config
from models import db, bcrypt
from routes.posts import posts_bp
from routes.auth import auth_bp
from routes.users import users_bp
def create_app(config_name='default'):
app = Flask(__name__)
app.config.from_object(config[config_name])
# 初始化扩展
db.init_app(app)
bcrypt.init_app(app)
jwt = JWTManager(app)
migrate = Migrate(app, db)
# 注册蓝图
app.register_blueprint(posts_bp, url_prefix='/api')
app.register_blueprint(auth_bp, url_prefix='/api/auth')
app.register_blueprint(users_bp, url_prefix='/api')
# 错误处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': '资源未找到'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': '服务器内部错误'}), 500
@app.errorhandler(403)
def forbidden(error):
return jsonify({'error': '权限不足'}), 403
# JWT配置
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
return jsonify({'error': '令牌已过期'}), 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
return jsonify({'error': '无效的令牌'}), 401
@jwt.unauthorized_loader
def missing_token_callback(error):
return jsonify({'error': '缺少访问令牌'}), 401
# CLI命令
@app.cli.command('init-db')
def init_db():
"""初始化数据库"""
db.create_all()
print('数据库初始化完成')
@app.cli.command('create-admin')
def create_admin():
"""创建管理员用户"""
from models import User
admin = User(
username='admin',
email='admin@example.com',
role='admin'
)
admin.set_password('admin123')
db.session.add(admin)
db.session.commit()
print('管理员用户创建完成')
return app
if __name__ == '__main__':
app = create_app()
app.run(debug=True)
总结对比
特性 FastAPI + SQLAlchemy Django ORM Flask + SQLAlchemy
配置方式 依赖注入自动管理 settings.py集中配置 Config类显式配置
会话管理 依赖注入自动处理 自动管理 手动commit/rollback
模型定义 SQLAlchemy声明式 Django Model类 SQLAlchemy声明式
数据验证 Pydantic自动验证 Serializer手动验证 手动或装饰器验证
查询语法 SQLAlchemy查询 QuerySet API SQLAlchemy查询
迁移工具 Alembic 内置migration Flask-Migrate
关系管理 relationship() ForeignKey/ManyToMany relationship()
序列化 Pydantic自动 Serializer类 手动to_dict()
学习曲线 中等 平缓 较陡
更多推荐



所有评论(0)