Flask-SQLAlchemy高级用法:关系建模与复杂查询
本文深入探讨Flask-SQLAlchemy的高级用法,包括关系建模与复杂查询的实现。首先介绍了环境配置和基础模型类创建,然后详细讲解了一对一、一对多、多对多等关系建模方式,并提供了代码示例。文章还涵盖了复杂查询技巧,如连接查询、子查询和聚合查询,以及性能优化策略。通过示例代码和Mermaid图表,展示了如何构建高效、灵活的数据库操作方案,帮助开发者提升Web应用的数据库处理能力。
目录
『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网
Flask-SQLAlchemy高级用法:关系建模与复杂查询
1. 引言
在现代Web应用开发中,数据库操作是核心环节之一。Flask-SQLAlchemy作为Flask的官方ORM扩展,不仅提供了简洁的数据库操作接口,还支持复杂的模型关系映射和高级查询功能。本博客将深入探讨Flask-SQLAlchemy的高级用法,重点讲解关系建模与复杂查询的实现。
2. 环境准备与基础配置
2.1 安装与配置
# requirements.txt
Flask==2.3.3
Flask-SQLAlchemy==3.0.5
Flask-Migrate==4.0.4
psycopg2-binary==2.9.6 # PostgreSQL驱动
pymysql==1.0.3 # MySQL驱动
sqlite3 # Python内置
2.2 基础配置
# app.py - 基础配置示例
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
import os
from datetime import datetime
# 创建Flask应用
app = Flask(__name__)
# 配置数据库
basedir = os.path.abspath(os.path.dirname(__file__))
# 根据环境选择数据库
if os.environ.get('FLASK_ENV') == 'production':
# PostgreSQL配置
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL',
'postgresql://user:password@localhost/mydatabase')
elif os.environ.get('FLASK_ENV') == 'testing':
# SQLite内存数据库
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
else:
# 开发环境使用SQLite
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'app.db')
# 其他配置
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = os.environ.get('SQLALCHEMY_ECHO', 'False').lower() == 'true'
# 初始化扩展
db = SQLAlchemy(app)
migrate = Migrate(app, db)
# 创建基础模型类
class BaseModel(db.Model):
"""所有模型的基类"""
__abstract__ = True
id = db.Column(db.Integer, primary_key=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def save(self):
"""保存对象"""
db.session.add(self)
db.session.commit()
return self
def update(self, **kwargs):
"""更新对象"""
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
self.updated_at = datetime.utcnow()
db.session.commit()
return self
def delete(self):
"""删除对象"""
db.session.delete(self)
db.session.commit()
@classmethod
def get_by_id(cls, id):
"""通过ID获取对象"""
return cls.query.get(id)
@classmethod
def get_all(cls):
"""获取所有对象"""
return cls.query.all()
@classmethod
def create(cls, **kwargs):
"""创建新对象"""
instance = cls(**kwargs)
return instance.save()
3. 关系建模深入
3.1 一对一关系(One-to-One)
一对一关系用于关联两个唯一相关的实体。
# models/one_to_one.py
from app import db, BaseModel
from sqlalchemy.orm import validates
class UserProfile(BaseModel):
"""用户个人资料 - 一对一关系示例"""
__tablename__ = 'user_profiles'
# 基本信息
full_name = db.Column(db.String(100), nullable=False)
date_of_birth = db.Column(db.Date)
bio = db.Column(db.Text)
# 联系信息
phone = db.Column(db.String(20))
address = db.Column(db.Text)
# 社交媒体
website = db.Column(db.String(200))
github = db.Column(db.String(100))
linkedin = db.Column(db.String(100))
# 外键
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True, nullable=False)
# 关系定义
user = db.relationship('User', back_populates='profile', uselist=False)
@validates('phone')
def validate_phone(self, key, phone):
"""验证电话号码"""
if phone and not phone.replace('+', '').replace(' ', '').isdigit():
raise ValueError('Invalid phone number')
return phone
def to_dict(self):
"""转换为字典"""
return {
'id': self.id,
'full_name': self.full_name,
'date_of_birth': self.date_of_birth.isoformat() if self.date_of_birth else None,
'bio': self.bio,
'phone': self.phone,
'address': self.address,
'website': self.website,
'github': self.github,
'linkedin': self.linkedin,
'user_id': self.user_id
}
class User(BaseModel):
"""用户模型 - 一对一关系示例"""
__tablename__ = 'users'
# 账户信息
username = db.Column(db.String(50), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(128), nullable=False)
# 状态
is_active = db.Column(db.Boolean, default=True)
is_verified = db.Column(db.Boolean, default=False)
last_login = db.Column(db.DateTime)
# 一对一关系
profile = db.relationship('UserProfile', back_populates='user',
uselist=False, cascade='all, delete-orphan')
# 验证器
@validates('email')
def validate_email(self, key, email):
"""验证邮箱格式"""
import re
if not re.match(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$', email):
raise ValueError('Invalid email address')
return email
@validates('username')
def validate_username(self, key, username):
"""验证用户名"""
if len(username) < 3:
raise ValueError('Username must be at least 3 characters')
if not username.isalnum():
raise ValueError('Username can only contain letters and numbers')
return username
def to_dict(self, include_profile=True):
"""转换为字典"""
data = {
'id': self.id,
'username': self.username,
'email': self.email,
'is_active': self.is_active,
'is_verified': self.is_verified,
'last_login': self.last_login.isoformat() if self.last_login else None,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
if include_profile and self.profile:
data['profile'] = self.profile.to_dict()
return data
@classmethod
def create_with_profile(cls, **kwargs):
"""创建用户同时创建个人资料"""
profile_data = kwargs.pop('profile', {})
user = cls(**kwargs)
db.session.add(user)
db.session.flush() # 获取用户ID
if profile_data:
profile_data['user_id'] = user.id
profile = UserProfile(**profile_data)
user.profile = profile
db.session.commit()
return user
3.2 一对多关系(One-to-Many)
一对多关系是最常见的关系类型。
# models/one_to_many.py
from app import db, BaseModel
from sqlalchemy.orm import validates
from sqlalchemy import event
class Category(BaseModel):
"""分类模型 - 一对多关系示例(父)"""
__tablename__ = 'categories'
# 分类信息
name = db.Column(db.String(100), nullable=False, index=True)
slug = db.Column(db.String(100), unique=True, nullable=False, index=True)
description = db.Column(db.Text)
# 层级关系
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True)
# 关系定义
parent = db.relationship('Category', remote_side='Category.id',
backref=db.backref('children', lazy='dynamic'),
uselist=False)
# 一对多关系:一个分类有多个产品
products = db.relationship('Product', back_populates='category',
lazy='dynamic', cascade='all, delete-orphan')
@validates('slug')
def validate_slug(self, key, slug):
"""验证slug格式"""
import re
if not re.match(r'^[a-z0-9]+(?:-[a-z0-9]+)*$', slug):
raise ValueError('Slug can only contain lowercase letters, numbers, and hyphens')
return slug
def get_all_products(self):
"""获取分类及其所有子分类的产品"""
from sqlalchemy.orm import aliased
# 使用递归CTE获取所有子分类
from sqlalchemy import text
# 简单实现:递归获取
all_categories = [self]
stack = [self]
while stack:
current = stack.pop()
for child in current.children:
all_categories.append(child)
stack.append(child)
category_ids = [cat.id for cat in all_categories]
return Product.query.filter(Product.category_id.in_(category_ids)).all()
def to_dict(self, include_children=True, include_products=False):
"""转换为字典"""
data = {
'id': self.id,
'name': self.name,
'slug': self.slug,
'description': self.description,
'parent_id': self.parent_id,
'product_count': self.products.count() if hasattr(self.products, 'count') else len(self.products)
}
if include_children and self.children:
data['children'] = [child.to_dict(include_children=False) for child in self.children]
if include_products:
data['products'] = [product.to_dict(include_category=False) for product in self.products.limit(10)]
return data
class Product(BaseModel):
"""产品模型 - 一对多关系示例(子)"""
__tablename__ = 'products'
# 产品信息
name = db.Column(db.String(200), nullable=False, index=True)
sku = db.Column(db.String(50), unique=True, nullable=False, index=True)
description = db.Column(db.Text)
# 价格信息
price = db.Column(db.Numeric(10, 2), nullable=False)
cost_price = db.Column(db.Numeric(10, 2))
discount_price = db.Column(db.Numeric(10, 2))
# 库存信息
stock_quantity = db.Column(db.Integer, default=0)
reorder_level = db.Column(db.Integer, default=10)
is_available = db.Column(db.Boolean, default=True)
# 属性
weight = db.Column(db.Float) # 重量(kg)
dimensions = db.Column(db.String(50)) # 尺寸(长x宽x高)
color = db.Column(db.String(50))
material = db.Column(db.String(100))
# 外键
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=False)
# 关系定义
category = db.relationship('Category', back_populates='products')
# 一对多关系:一个产品有多个评价
reviews = db.relationship('Review', back_populates='product',
lazy='dynamic', cascade='all, delete-orphan')
# 多对多关系:产品标签(稍后定义)
tags = db.relationship('Tag', secondary='product_tags',
back_populates='products', lazy='dynamic')
# 验证器
@validates('price')
def validate_price(self, key, price):
"""验证价格"""
if price <= 0:
raise ValueError('Price must be positive')
return price
@validates('stock_quantity')
def validate_stock(self, key, quantity):
"""验证库存数量"""
if quantity < 0:
raise ValueError('Stock quantity cannot be negative')
return quantity
# 属性
@property
def is_low_stock(self):
"""是否低库存"""
return self.stock_quantity <= self.reorder_level
@property
def average_rating(self):
"""平均评分"""
if self.reviews.count() == 0:
return 0
total = sum(review.rating for review in self.reviews)
return total / self.reviews.count()
@property
def discount_percentage(self):
"""折扣百分比"""
if self.discount_price and self.price > 0:
return round((1 - self.discount_price / self.price) * 100, 2)
return 0
def to_dict(self, include_category=True, include_reviews=False, include_tags=False):
"""转换为字典"""
data = {
'id': self.id,
'name': self.name,
'sku': self.sku,
'description': self.description,
'price': float(self.price),
'cost_price': float(self.cost_price) if self.cost_price else None,
'discount_price': float(self.discount_price) if self.discount_price else None,
'stock_quantity': self.stock_quantity,
'reorder_level': self.reorder_level,
'is_available': self.is_available,
'is_low_stock': self.is_low_stock,
'weight': self.weight,
'dimensions': self.dimensions,
'color': self.color,
'material': self.material,
'category_id': self.category_id,
'average_rating': self.average_rating,
'discount_percentage': self.discount_percentage,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
if include_category and self.category:
data['category'] = self.category.to_dict(include_children=False, include_products=False)
if include_reviews:
data['reviews'] = [review.to_dict(include_product=False) for review in self.reviews.limit(5)]
data['review_count'] = self.reviews.count()
if include_tags:
data['tags'] = [tag.to_dict() for tag in self.tags]
return data
# 事件监听器
@event.listens_for(Product.stock_quantity, 'set')
def check_stock_availability(target, value, oldvalue, initiator):
"""监听库存变化,自动更新可用状态"""
if value <= 0:
target.is_available = False
elif not target.is_available and value > 0:
target.is_available = True
3.3 多对多关系(Many-to-Many)
多对多关系需要使用关联表。
# models/many_to_many.py
from app import db, BaseModel
from sqlalchemy.orm import validates
from datetime import datetime
# 关联表 - 产品标签
product_tags = db.Table('product_tags',
db.Column('product_id', db.Integer, db.ForeignKey('products.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True),
db.Column('created_at', db.DateTime, default=datetime.utcnow)
)
class Tag(BaseModel):
"""标签模型 - 多对多关系示例"""
__tablename__ = 'tags'
# 标签信息
name = db.Column(db.String(50), unique=True, nullable=False, index=True)
slug = db.Column(db.String(50), unique=True, nullable=False, index=True)
description = db.Column(db.Text)
# 关系定义
products = db.relationship('Product', secondary=product_tags,
back_populates='tags', lazy='dynamic')
@validates('name')
def validate_name(self, key, name):
"""验证标签名"""
if len(name) < 2:
raise ValueError('Tag name must be at least 2 characters')
return name.strip()
def to_dict(self, include_products=False):
"""转换为字典"""
data = {
'id': self.id,
'name': self.name,
'slug': self.slug,
'description': self.description,
'product_count': self.products.count() if hasattr(self.products, 'count') else len(self.products)
}
if include_products:
data['products'] = [product.to_dict(include_category=False, include_tags=False)
for product in self.products.limit(10)]
return data
# 关联表 - 用户角色(带额外字段的关联表)
class UserRole(db.Model):
"""用户角色关联表 - 带额外字段的多对多关系"""
__tablename__ = 'user_roles'
# 复合主键
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'), primary_key=True)
# 额外字段
assigned_at = db.Column(db.DateTime, default=datetime.utcnow)
assigned_by = db.Column(db.Integer, db.ForeignKey('users.id'))
expires_at = db.Column(db.DateTime, nullable=True)
is_active = db.Column(db.Boolean, default=True)
# 关系定义
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('role_associations', lazy='dynamic'))
role = db.relationship('Role', backref=db.backref('user_associations', lazy='dynamic'))
assigner = db.relationship('User', foreign_keys=[assigned_by], uselist=False)
def to_dict(self):
"""转换为字典"""
return {
'user_id': self.user_id,
'role_id': self.role_id,
'assigned_at': self.assigned_at.isoformat(),
'assigned_by': self.assigned_by,
'expires_at': self.expires_at.isoformat() if self.expires_at else None,
'is_active': self.is_active
}
class Role(BaseModel):
"""角色模型"""
__tablename__ = 'roles'
# 角色信息
name = db.Column(db.String(50), unique=True, nullable=False)
description = db.Column(db.Text)
permissions = db.Column(db.Text) # JSON格式的权限列表
# 关系定义(通过关联表)
users = db.relationship('User', secondary='user_roles',
backref=db.backref('roles', lazy='dynamic'),
lazy='dynamic')
def to_dict(self, include_users=False):
"""转换为字典"""
data = {
'id': self.id,
'name': self.name,
'description': self.description,
'permissions': self.permissions,
'user_count': self.users.count() if hasattr(self.users, 'count') else len(self.users)
}
if include_users:
data['users'] = [user.to_dict(include_profile=False) for user in self.users.limit(10)]
return data
# 更新User模型,添加角色关系
class User(BaseModel):
# ... 之前的代码 ...
# 添加角色关系
role_associations = db.relationship('UserRole', back_populates='user',
cascade='all, delete-orphan')
@property
def active_roles(self):
"""获取用户当前有效的角色"""
now = datetime.utcnow()
return [assoc.role for assoc in self.role_associations
if assoc.is_active and (assoc.expires_at is None or assoc.expires_at > now)]
def has_role(self, role_name):
"""检查用户是否有特定角色"""
return any(role.name == role_name for role in self.active_roles)
def add_role(self, role, assigned_by=None, expires_at=None):
"""为用户添加角色"""
from models.many_to_many import UserRole
# 检查是否已有该角色
existing = UserRole.query.filter_by(
user_id=self.id,
role_id=role.id,
is_active=True
).first()
if existing:
return existing
# 创建新的角色关联
user_role = UserRole(
user_id=self.id,
role_id=role.id,
assigned_by=assigned_by,
expires_at=expires_at,
is_active=True
)
db.session.add(user_role)
db.session.commit()
return user_role
3.4 自引用关系(Self-Referential)
自引用关系是指模型与自身建立关系。
# models/self_referential.py
from app import db, BaseModel
class Employee(BaseModel):
"""员工模型 - 自引用关系示例(组织架构)"""
__tablename__ = 'employees'
# 员工信息
employee_id = db.Column(db.String(20), unique=True, nullable=False, index=True)
first_name = db.Column(db.String(50), nullable=False)
last_name = db.Column(db.String(50), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
phone = db.Column(db.String(20))
position = db.Column(db.String(100))
department = db.Column(db.String(100))
hire_date = db.Column(db.Date, nullable=False)
salary = db.Column(db.Numeric(12, 2))
# 自引用外键:经理ID
manager_id = db.Column(db.Integer, db.ForeignKey('employees.id'), nullable=True)
# 自引用关系
manager = db.relationship('Employee', remote_side='Employee.id',
backref=db.backref('subordinates', lazy='dynamic'),
uselist=False)
# 属性
@property
def full_name(self):
"""全名"""
return f"{self.first_name} {self.last_name}"
@property
def reporting_chain(self):
"""获取汇报链(从当前员工到最高领导)"""
chain = []
current = self
while current and current.manager:
chain.append(current.manager)
current = current.manager
return chain
def get_all_subordinates(self, include_self=False):
"""获取所有下属(递归)"""
from sqlalchemy.orm import aliased
# 使用递归CTE(Common Table Expression)
from sqlalchemy import text
# 简单实现:递归获取
all_subordinates = []
def get_subordinates(employee, depth=0):
if depth > 10: # 防止无限递归
return
for subordinate in employee.subordinates:
all_subordinates.append((subordinate, depth + 1))
get_subordinates(subordinate, depth + 1)
if include_self:
all_subordinates.append((self, 0))
get_subordinates(self)
return all_subordinates
def to_dict(self, include_manager=False, include_subordinates=False, max_depth=1):
"""转换为字典"""
data = {
'id': self.id,
'employee_id': self.employee_id,
'first_name': self.first_name,
'last_name': self.last_name,
'full_name': self.full_name,
'email': self.email,
'phone': self.phone,
'position': self.position,
'department': self.department,
'hire_date': self.hire_date.isoformat() if self.hire_date else None,
'salary': float(self.salary) if self.salary else None,
'manager_id': self.manager_id,
'subordinate_count': self.subordinates.count() if hasattr(self.subordinates, 'count') else len(self.subordinates)
}
if include_manager and self.manager:
data['manager'] = self.manager.to_dict()
if include_subordinates and max_depth > 0:
subordinates_data = []
for subordinate in self.subordinates:
subordinate_dict = subordinate.to_dict(
include_manager=False,
include_subordinates=(max_depth > 1),
max_depth=max_depth - 1
)
subordinates_data.append(subordinate_dict)
data['subordinates'] = subordinates_data
return data
class Comment(BaseModel):
"""评论模型 - 自引用关系示例(嵌套评论)"""
__tablename__ = 'comments'
# 评论内容
content = db.Column(db.Text, nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), nullable=False)
# 自引用外键:父评论ID
parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'), nullable=True)
# 状态
is_approved = db.Column(db.Boolean, default=True)
is_spam = db.Column(db.Boolean, default=False)
# 关系定义
author = db.relationship('User', backref=db.backref('comments', lazy='dynamic'))
post = db.relationship('Post', backref=db.backref('comments', lazy='dynamic'))
# 自引用关系
parent = db.relationship('Comment', remote_side='Comment.id',
backref=db.backref('replies', lazy='dynamic'),
uselist=False)
def get_thread(self, max_depth=3):
"""获取评论线程"""
thread = []
def collect_comments(comment, depth):
if depth > max_depth:
return
thread.append({
'comment': comment.to_dict(include_replies=False),
'depth': depth
})
for reply in comment.replies.order_by(Comment.created_at.asc()):
collect_comments(reply, depth + 1)
collect_comments(self, 0)
return thread
def to_dict(self, include_author=True, include_replies=False, max_depth=1):
"""转换为字典"""
data = {
'id': self.id,
'content': self.content,
'author_id': self.author_id,
'post_id': self.post_id,
'parent_id': self.parent_id,
'is_approved': self.is_approved,
'is_spam': self.is_spam,
'reply_count': self.replies.count() if hasattr(self.replies, 'count') else len(self.replies),
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
if include_author and self.author:
data['author'] = self.author.to_dict(include_profile=False)
if include_replies and max_depth > 0:
replies_data = []
for reply in self.replies.order_by(Comment.created_at.asc()):
reply_dict = reply.to_dict(
include_author=True,
include_replies=(max_depth > 1),
max_depth=max_depth - 1
)
replies_data.append(reply_dict)
data['replies'] = replies_data
return data
class Post(BaseModel):
"""文章模型"""
__tablename__ = 'posts'
# 文章信息
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(200), unique=True, nullable=False)
content = db.Column(db.Text, nullable=False)
excerpt = db.Column(db.Text)
# 状态
is_published = db.Column(db.Boolean, default=False)
published_at = db.Column(db.DateTime)
# 外键
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True)
# 关系定义
author = db.relationship('User', backref=db.backref('posts', lazy='dynamic'))
category = db.relationship('Category', backref=db.backref('posts', lazy='dynamic'))
4. 复杂查询技术
4.1 基础查询方法
# queries/basic_queries.py
from app import db
from models.one_to_one import User, UserProfile
from models.one_to_many import Category, Product, Review
from models.many_to_many import Tag
from models.self_referential import Employee, Comment, Post
from sqlalchemy import or_, and_, not_, func, extract, case
from datetime import datetime, timedelta
class BasicQueries:
"""基础查询示例"""
@staticmethod
def get_users_with_profiles():
"""获取所有用户及其个人资料"""
# 方法1:使用join
users = db.session.query(User, UserProfile).\
join(UserProfile, User.id == UserProfile.user_id).\
all()
# 方法2:使用relationship
users = User.query.options(db.joinedload(User.profile)).all()
return users
@staticmethod
def get_products_by_category(category_id, min_price=None, max_price=None,
available_only=True):
"""获取分类下的产品"""
query = Product.query.filter_by(category_id=category_id)
# 价格过滤
if min_price is not None:
query = query.filter(Product.price >= min_price)
if max_price is not None:
query = query.filter(Product.price <= max_price)
# 可用性过滤
if available_only:
query = query.filter_by(is_available=True)
# 排序
query = query.order_by(Product.price.asc(), Product.name.asc())
return query.all()
@staticmethod
def search_products(keyword, category_id=None, tag_ids=None,
min_rating=None, max_price=None):
"""搜索产品"""
query = Product.query
# 关键词搜索
if keyword:
keyword_pattern = f'%{keyword}%'
query = query.filter(
or_(
Product.name.ilike(keyword_pattern),
Product.description.ilike(keyword_pattern),
Product.sku.ilike(keyword_pattern)
)
)
# 分类过滤
if category_id:
query = query.filter_by(category_id=category_id)
# 标签过滤
if tag_ids:
for tag_id in tag_ids:
query = query.filter(Product.tags.any(Tag.id == tag_id))
# 最低评分过滤
if min_rating is not None:
# 创建子查询计算平均评分
from sqlalchemy.sql import label
# 方法:使用子查询或应用程序逻辑
# 这里简化处理,实际可能需要更复杂的查询
query = query.join(Product.reviews).\
group_by(Product.id).\
having(func.avg(Review.rating) >= min_rating)
# 最高价格过滤
if max_price is not None:
query = query.filter(Product.price <= max_price)
return query.distinct().all()
@staticmethod
def get_products_with_reviews(limit=10):
"""获取有评价的产品"""
# 使用子查询获取有评价的产品ID
subquery = db.session.query(Review.product_id).\
distinct().subquery()
products = Product.query.\
join(subquery, Product.id == subquery.c.product_id).\
options(db.joinedload(Product.reviews)).\
limit(limit).all()
return products
@staticmethod
def get_employees_by_department(department, include_manager=True):
"""获取部门员工"""
query = Employee.query.filter_by(department=department)
if include_manager:
query = query.options(db.joinedload(Employee.manager))
return query.order_by(Employee.position, Employee.last_name).all()
@staticmethod
def get_comment_thread(post_id, max_depth=3):
"""获取文章评论线程"""
# 获取顶级评论(没有父评论)
top_level_comments = Comment.query.\
filter_by(post_id=post_id, parent_id=None, is_approved=True).\
options(db.joinedload(Comment.author)).\
order_by(Comment.created_at.asc()).all()
# 构建线程
threads = []
for comment in top_level_comments:
thread = comment.get_thread(max_depth=max_depth)
threads.append(thread)
return threads
4.2 高级查询技术
# queries/advanced_queries.py
from app import db
from models.one_to_one import User, UserProfile
from models.one_to_many import Category, Product, Review
from models.many_to_many import Tag, Role, UserRole
from models.self_referential import Employee, Comment, Post
from sqlalchemy import text, desc, asc, literal_column, select, union_all
from sqlalchemy.orm import aliased, contains_eager, subqueryload
from datetime import datetime, timedelta
import json
class AdvancedQueries:
"""高级查询示例"""
@staticmethod
def complex_join_query():
"""复杂连接查询:获取产品及其分类、标签和评价"""
# 使用多个join和contains_eager进行预加载
query = Product.query.\
join(Category).\
outerjoin(Product.tags).\
outerjoin(Product.reviews).\
options(
contains_eager(Product.category),
contains_eager(Product.tags),
contains_eager(Product.reviews)
)
return query.all()
@staticmethod
def subquery_example():
"""子查询示例:获取每个分类的产品数量"""
# 创建子查询
subquery = db.session.query(
Product.category_id,
func.count(Product.id).label('product_count'),
func.avg(Product.price).label('avg_price'),
func.max(Product.price).label('max_price'),
func.min(Product.price).label('min_price')
).\
group_by(Product.category_id).\
subquery()
# 主查询
categories = db.session.query(
Category,
subquery.c.product_count,
subquery.c.avg_price,
subquery.c.max_price,
subquery.c.min_price
).\
outerjoin(subquery, Category.id == subquery.c.category_id).\
order_by(desc(subquery.c.product_count)).all()
return categories
@staticmethod
def window_function_example():
"""窗口函数示例:计算每个分类中产品的价格排名"""
# SQLAlchemy支持窗口函数
from sqlalchemy import over
# 创建窗口
window = over(
partition_by=Product.category_id,
order_by=Product.price.desc()
)
query = db.session.query(
Product,
func.row_number().over(window).label('price_rank'),
func.rank().over(window).label('price_rank_dense'),
func.percent_rank().over(window).label('price_percentile')
).\
order_by(Product.category_id, Product.price.desc())
return query.all()
@staticmethod
def hierarchical_query():
"""层次查询:获取组织架构"""
# 使用递归CTE(Common Table Expression)
# 注意:SQLAlchemy对递归CTE的支持因数据库而异
EmployeeAlias = aliased(Employee)
# 递归查询所有员工及其经理链
cte = db.session.query(
EmployeeAlias.id,
EmployeeAlias.first_name,
EmployeeAlias.last_name,
EmployeeAlias.position,
EmployeeAlias.manager_id,
literal_column("0").label("level")
).\
filter(EmployeeAlias.manager_id.is_(None)).\
cte(name="employee_hierarchy", recursive=True)
# 递归部分
cte = cte.union_all(
db.session.query(
Employee.id,
Employee.first_name,
Employee.last_name,
Employee.position,
Employee.manager_id,
(cte.c.level + 1).label("level")
).\
join(cte, Employee.manager_id == cte.c.id)
)
# 执行查询
result = db.session.query(cte).order_by(cte.c.level, cte.c.last_name).all()
return result
@staticmethod
def conditional_aggregation():
"""条件聚合:统计不同类型的产品"""
# 使用case表达式进行条件聚合
query = db.session.query(
Category.name,
func.count(Product.id).label('total_products'),
func.sum(
case(
(Product.is_available == True, 1),
else_=0
)
).label('available_products'),
func.sum(
case(
(Product.stock_quantity <= Product.reorder_level, 1),
else_=0
)
).label('low_stock_products'),
func.avg(
case(
(Product.is_available == True, Product.price),
else_=None
)
).label('avg_price')
).\
join(Product, Category.id == Product.category_id).\
group_by(Category.id, Category.name).\
order_by(desc('total_products'))
return query.all()
@staticmethod
def json_query_example():
"""JSON查询示例:查询角色权限"""
# 假设permissions字段是JSON格式
# PostgreSQL的JSON查询
# 查找具有特定权限的角色
query = Role.query
# 使用JSON操作符(PostgreSQL特定)
# 注意:这需要数据库支持JSON操作
# 替代方案:在应用层处理
roles_with_permission = []
for role in query.all():
try:
permissions = json.loads(role.permissions) if role.permissions else []
if 'admin' in permissions:
roles_with_permission.append(role)
except json.JSONDecodeError:
continue
return roles_with_permission
@staticmethod
def temporal_query_example():
"""时间查询示例:最近的活动"""
# 获取最近7天创建的用户
seven_days_ago = datetime.utcnow() - timedelta(days=7)
recent_users = User.query.\
filter(User.created_at >= seven_days_ago).\
order_by(desc(User.created_at)).all()
# 获取今天生日的用户(假设有birth_date字段)
today = datetime.utcnow().date()
# 如果UserProfile有date_of_birth字段
users_with_birthday = User.query.\
join(UserProfile).\
filter(
extract('month', UserProfile.date_of_birth) == today.month,
extract('day', UserProfile.date_of_birth) == today.day
).all()
return {
'recent_users': recent_users,
'users_with_birthday': users_with_birthday
}
@staticmethod
def pagination_example(page=1, per_page=20):
"""分页查询示例"""
# 计算偏移量
offset = (page - 1) * per_page
# 查询产品
products_query = Product.query.\
filter_by(is_available=True).\
order_by(Product.created_at.desc())
# 获取总数
total = products_query.count()
# 获取分页数据
products = products_query.\
offset(offset).\
limit(per_page).\
all()
# 计算总页数
total_pages = (total + per_page - 1) // per_page
return {
'products': products,
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'total_pages': total_pages,
'has_next': page < total_pages,
'has_prev': page > 1
}
}
@staticmethod
def bulk_operations():
"""批量操作示例"""
# 批量更新:将所有缺货产品的可用状态设为False
updated_count = Product.query.\
filter(Product.stock_quantity == 0, Product.is_available == True).\
update({'is_available': False}, synchronize_session=False)
db.session.commit()
# 批量插入:添加多个标签
tags_data = [
{'name': 'New Arrival', 'slug': 'new-arrival'},
{'name': 'Best Seller', 'slug': 'best-seller'},
{'name': 'Limited Edition', 'slug': 'limited-edition'}
]
tags = [Tag(**data) for data in tags_data]
db.session.bulk_save_objects(tags)
db.session.commit()
return {
'updated_products': updated_count,
'inserted_tags': len(tags)
}
4.3 查询性能优化
# queries/performance_optimization.py
from app import db
from models.one_to_many import Product, Review
from models.many_to_many import Tag
from sqlalchemy.orm import joinedload, subqueryload, selectinload, lazyload
from sqlalchemy.sql import text
import time
class PerformanceOptimization:
"""查询性能优化示例"""
@staticmethod
def n_plus_one_problem():
"""N+1查询问题示例"""
# 有问题的查询:获取所有产品及其评价
print("=== N+1查询问题示例 ===")
# 方法1:N+1查询(有问题)
start_time = time.time()
products = Product.query.all()
for product in products:
reviews = product.reviews.all() # 每次都会执行查询
print(f"Product: {product.name}, Reviews: {len(reviews)}")
method1_time = time.time() - start_time
print(f"方法1耗时: {method1_time:.4f}秒")
# 方法2:使用预加载(优化)
start_time = time.time()
products = Product.query.options(joinedload(Product.reviews)).all()
for product in products:
reviews = product.reviews
print(f"Product: {product.name}, Reviews: {len(reviews)}")
method2_time = time.time() - start_time
print(f"方法2耗时: {method2_time:.4f}秒")
# 方法3:使用子查询加载
start_time = time.time()
products = Product.query.options(subqueryload(Product.reviews)).all()
for product in products:
reviews = product.reviews
print(f"Product: {product.name}, Reviews: {len(reviews)}")
method3_time = time.time() - start_time
print(f"方法3耗时: {method3_time:.4f}秒")
return {
'method1': method1_time,
'method2': method2_time,
'method3': method3_time
}
@staticmethod
def eager_loading_strategies():
"""预加载策略比较"""
# 1. joinedload(连接加载)
# 使用LEFT OUTER JOIN一次性加载所有数据
products_joined = Product.query.\
options(joinedload(Product.category)).\
options(joinedload(Product.tags)).\
options(joinedload(Product.reviews)).\
limit(10).all()
# 2. subqueryload(子查询加载)
# 对每个关系执行单独的查询
products_subquery = Product.query.\
options(subqueryload(Product.category)).\
options(subqueryload(Product.tags)).\
options(subqueryload(Product.reviews)).\
limit(10).all()
# 3. selectinload(SELECT IN加载)
# 对每个关系使用IN子句
products_selectin = Product.query.\
options(selectinload(Product.category)).\
options(selectinload(Product.tags)).\
options(selectinload(Product.reviews)).\
limit(10).all()
# 性能测试
strategies = {
'joinedload': joinedload,
'subqueryload': subqueryload,
'selectinload': selectinload
}
results = {}
for name, strategy in strategies.items():
start_time = time.time()
# 执行查询
query = Product.query.\
options(strategy(Product.category)).\
options(strategy(Product.tags)).\
options(strategy(Product.reviews)).\
limit(100)
products = query.all()
# 访问关系触发加载
for product in products:
_ = product.category
_ = list(product.tags)
_ = list(product.reviews)
elapsed = time.time() - start_time
results[name] = {
'time': elapsed,
'product_count': len(products)
}
return results
@staticmethod
def query_optimization_tips():
"""查询优化技巧"""
# 技巧1:只选择需要的列
print("技巧1:只选择需要的列")
# 不好的做法:选择所有列
bad_query = Product.query.all()
# 好的做法:只选择需要的列
good_query = db.session.query(
Product.id,
Product.name,
Product.price,
Product.stock_quantity
).all()
# 技巧2:使用索引
print("\n技巧2:使用索引")
# 确保经常查询的列上有索引
# 例如:username, email, sku等
# 技巧3:避免在循环中查询
print("\n技巧3:避免在循环中查询")
# 不好的做法
category_ids = [1, 2, 3, 4, 5]
category_names = []
for category_id in category_ids:
category = Category.query.get(category_id)
if category:
category_names.append(category.name)
# 好的做法:批量查询
categories = Category.query.filter(Category.id.in_(category_ids)).all()
category_names = [cat.name for cat in categories]
# 技巧4:使用批量操作
print("\n技巧4:使用批量操作")
# 批量更新
Product.query.filter(Product.stock_quantity == 0).\
update({'is_available': False}, synchronize_session=False)
# 批量插入
tags = [
Tag(name=f'Tag {i}', slug=f'tag-{i}')
for i in range(10)
]
db.session.bulk_save_objects(tags)
# 技巧5:使用原生SQL处理复杂操作
print("\n技巧5:使用原生SQL处理复杂操作")
# 复杂统计查询
sql = text("""
SELECT
c.name as category_name,
COUNT(p.id) as product_count,
AVG(p.price) as avg_price,
SUM(CASE WHEN p.is_available THEN 1 ELSE 0 END) as available_count
FROM categories c
LEFT JOIN products p ON c.id = p.category_id
GROUP BY c.id, c.name
ORDER BY product_count DESC
""")
result = db.session.execute(sql)
return {
'category_names': category_names,
'tags_created': len(tags),
'statistics': [dict(row) for row in result]
}
@staticmethod
def database_indexing():
"""数据库索引优化"""
# 创建索引的示例(通常在模型定义中)
indexes_example = """
# 在模型定义中添加索引
class Product(db.Model):
__tablename__ = 'products'
# 单列索引
name = db.Column(db.String(200), nullable=False, index=True)
sku = db.Column(db.String(50), unique=True, nullable=False, index=True)
# 复合索引
__table_args__ = (
db.Index('idx_category_price', 'category_id', 'price'),
db.Index('idx_availability_stock', 'is_available', 'stock_quantity'),
)
# 在查询中使用索引提示(某些数据库支持)
# MySQL示例:
# query = Product.query.from_statement(text(
# "SELECT * FROM products USE INDEX (idx_category_price) WHERE category_id = :cat_id"
# )).params(cat_id=1)
"""
return indexes_example
5. 完整示例:电子商务系统
# ecommerce_system.py
from app import db, BaseModel
from sqlalchemy import CheckConstraint, Enum
import enum
from datetime import datetime, timedelta
class OrderStatus(enum.Enum):
"""订单状态枚举"""
PENDING = 'pending' # 待处理
PROCESSING = 'processing' # 处理中
SHIPPED = 'shipped' # 已发货
DELIVERED = 'delivered' # 已送达
CANCELLED = 'cancelled' # 已取消
REFUNDED = 'refunded' # 已退款
class PaymentStatus(enum.Enum):
"""支付状态枚举"""
PENDING = 'pending' # 待支付
PAID = 'paid' # 已支付
FAILED = 'failed' # 支付失败
REFUNDED = 'refunded' # 已退款
class PaymentMethod(enum.Enum):
"""支付方式枚举"""
CREDIT_CARD = 'credit_card'
DEBIT_CARD = 'debit_card'
PAYPAL = 'paypal'
BANK_TRANSFER = 'bank_transfer'
CASH_ON_DELIVERY = 'cash_on_delivery'
class Order(BaseModel):
"""订单模型"""
__tablename__ = 'orders'
# 订单信息
order_number = db.Column(db.String(50), unique=True, nullable=False, index=True)
customer_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
# 金额信息
subtotal = db.Column(db.Numeric(12, 2), nullable=False)
tax_amount = db.Column(db.Numeric(12, 2), default=0)
shipping_cost = db.Column(db.Numeric(12, 2), default=0)
discount_amount = db.Column(db.Numeric(12, 2), default=0)
total_amount = db.Column(db.Numeric(12, 2), nullable=False)
# 状态
status = db.Column(db.Enum(OrderStatus), default=OrderStatus.PENDING, nullable=False)
payment_status = db.Column(db.Enum(PaymentStatus), default=PaymentStatus.PENDING, nullable=False)
# 配送信息
shipping_address = db.Column(db.Text, nullable=False)
billing_address = db.Column(db.Text, nullable=False)
shipping_method = db.Column(db.String(100))
tracking_number = db.Column(db.String(100))
# 时间信息
placed_at = db.Column(db.DateTime, default=datetime.utcnow)
processed_at = db.Column(db.DateTime)
shipped_at = db.Column(db.DateTime)
delivered_at = db.Column(db.DateTime)
cancelled_at = db.Column(db.DateTime)
# 备注
notes = db.Column(db.Text)
# 关系定义
customer = db.relationship('User', backref=db.backref('orders', lazy='dynamic'))
items = db.relationship('OrderItem', back_populates='order',
lazy='dynamic', cascade='all, delete-orphan')
payments = db.relationship('Payment', back_populates='order',
lazy='dynamic', cascade='all, delete-orphan')
# 约束
__table_args__ = (
CheckConstraint('subtotal >= 0', name='check_subtotal_positive'),
CheckConstraint('total_amount >= 0', name='check_total_amount_positive'),
CheckConstraint('tax_amount >= 0', name='check_tax_amount_positive'),
CheckConstraint('shipping_cost >= 0', name='check_shipping_cost_positive'),
CheckConstraint('discount_amount >= 0', name='check_discount_amount_positive'),
)
# 属性
@property
def item_count(self):
"""订单商品数量"""
return sum(item.quantity for item in self.items)
@property
def is_fulfilled(self):
"""订单是否已完成"""
return self.status == OrderStatus.DELIVERED
@property
def can_cancel(self):
"""订单是否可以取消"""
return self.status in [OrderStatus.PENDING, OrderStatus.PROCESSING]
@property
def estimated_delivery_date(self):
"""预计送达日期"""
if self.shipped_at:
# 假设3-5天送达
return self.shipped_at.date() + timedelta(days=5)
elif self.placed_at:
# 如果还没发货,预计7-10天
return self.placed_at.date() + timedelta(days=10)
return None
# 方法
def calculate_totals(self):
"""重新计算订单金额"""
self.subtotal = sum(item.total_price for item in self.items)
self.total_amount = self.subtotal + self.tax_amount + self.shipping_cost - self.discount_amount
return self.total_amount
def add_item(self, product, quantity, price=None):
"""添加商品到订单"""
from models.one_to_many import Product as Prod
if isinstance(product, Prod):
actual_price = price or product.price
item = OrderItem(
order=self,
product_id=product.id,
product_name=product.name,
product_sku=product.sku,
quantity=quantity,
unit_price=actual_price,
total_price=actual_price * quantity
)
self.items.append(item)
return item
return None
def update_status(self, new_status, commit=True):
"""更新订单状态"""
old_status = self.status
self.status = new_status
# 更新时间戳
now = datetime.utcnow()
if new_status == OrderStatus.PROCESSING:
self.processed_at = now
elif new_status == OrderStatus.SHIPPED:
self.shipped_at = now
elif new_status == OrderStatus.DELIVERED:
self.delivered_at = now
elif new_status == OrderStatus.CANCELLED:
self.cancelled_at = now
if commit:
db.session.commit()
return self
def to_dict(self, include_items=True, include_customer=True, include_payments=False):
"""转换为字典"""
data = {
'id': self.id,
'order_number': self.order_number,
'customer_id': self.customer_id,
'subtotal': float(self.subtotal),
'tax_amount': float(self.tax_amount),
'shipping_cost': float(self.shipping_cost),
'discount_amount': float(self.discount_amount),
'total_amount': float(self.total_amount),
'status': self.status.value,
'payment_status': self.payment_status.value,
'shipping_address': self.shipping_address,
'billing_address': self.billing_address,
'shipping_method': self.shipping_method,
'tracking_number': self.tracking_number,
'item_count': self.item_count,
'is_fulfilled': self.is_fulfilled,
'can_cancel': self.can_cancel,
'estimated_delivery_date': self.estimated_delivery_date.isoformat() if self.estimated_delivery_date else None,
'placed_at': self.placed_at.isoformat() if self.placed_at else None,
'processed_at': self.processed_at.isoformat() if self.processed_at else None,
'shipped_at': self.shipped_at.isoformat() if self.shipped_at else None,
'delivered_at': self.delivered_at.isoformat() if self.delivered_at else None,
'cancelled_at': self.cancelled_at.isoformat() if self.cancelled_at else None,
'notes': self.notes,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
if include_items and self.items:
data['items'] = [item.to_dict(include_order=False) for item in self.items]
if include_customer and self.customer:
data['customer'] = self.customer.to_dict(include_profile=True)
if include_payments and self.payments:
data['payments'] = [payment.to_dict(include_order=False) for payment in self.payments]
return data
class OrderItem(BaseModel):
"""订单项模型"""
__tablename__ = 'order_items'
# 基本信息
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=True) # 可能商品已删除
# 商品快照(保存下单时的信息)
product_name = db.Column(db.String(200), nullable=False)
product_sku = db.Column(db.String(50), nullable=False)
product_image = db.Column(db.String(500))
# 价格信息
unit_price = db.Column(db.Numeric(12, 2), nullable=False)
quantity = db.Column(db.Integer, nullable=False)
total_price = db.Column(db.Numeric(12, 2), nullable=False)
# 折扣信息
discount_percentage = db.Column(db.Numeric(5, 2), default=0)
discount_amount = db.Column(db.Numeric(12, 2), default=0)
# 关系定义
order = db.relationship('Order', back_populates='items')
product = db.relationship('Product', backref=db.backref('order_items', lazy='dynamic'))
# 约束
__table_args__ = (
CheckConstraint('unit_price >= 0', name='check_unit_price_positive'),
CheckConstraint('quantity > 0', name='check_quantity_positive'),
CheckConstraint('total_price >= 0', name='check_total_price_positive'),
)
@property
def final_price(self):
"""最终价格(考虑折扣)"""
return self.total_price - self.discount_amount
def to_dict(self, include_order=False, include_product=False):
"""转换为字典"""
data = {
'id': self.id,
'order_id': self.order_id,
'product_id': self.product_id,
'product_name': self.product_name,
'product_sku': self.product_sku,
'product_image': self.product_image,
'unit_price': float(self.unit_price),
'quantity': self.quantity,
'total_price': float(self.total_price),
'discount_percentage': float(self.discount_percentage),
'discount_amount': float(self.discount_amount),
'final_price': float(self.final_price),
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
if include_order and self.order:
data['order'] = self.order.to_dict(include_items=False, include_customer=False)
if include_product and self.product:
data['product'] = self.product.to_dict(include_category=False, include_tags=False)
return data
class Payment(BaseModel):
"""支付模型"""
__tablename__ = 'payments'
# 支付信息
payment_number = db.Column(db.String(50), unique=True, nullable=False, index=True)
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
amount = db.Column(db.Numeric(12, 2), nullable=False)
# 支付方式
method = db.Column(db.Enum(PaymentMethod), nullable=False)
status = db.Column(db.Enum(PaymentStatus), default=PaymentStatus.PENDING, nullable=False)
# 支付详情(JSON格式,存储支付网关返回的信息)
payment_details = db.Column(db.Text) # JSON格式
# 时间信息
initiated_at = db.Column(db.DateTime, default=datetime.utcnow)
completed_at = db.Column(db.DateTime)
failed_at = db.Column(db.DateTime)
# 关系定义
order = db.relationship('Order', back_populates='payments')
# 约束
__table_args__ = (
CheckConstraint('amount > 0', name='check_amount_positive'),
)
@property
def is_successful(self):
"""支付是否成功"""
return self.status == PaymentStatus.PAID
@property
def is_failed(self):
"""支付是否失败"""
return self.status == PaymentStatus.FAILED
def update_status(self, new_status, details=None, commit=True):
"""更新支付状态"""
self.status = new_status
now = datetime.utcnow()
if new_status == PaymentStatus.PAID:
self.completed_at = now
# 同时更新订单支付状态
if self.order:
self.order.payment_status = PaymentStatus.PAID
elif new_status == PaymentStatus.FAILED:
self.failed_at = now
if details:
import json
self.payment_details = json.dumps(details)
if commit:
db.session.commit()
return self
def to_dict(self, include_order=False):
"""转换为字典"""
import json
data = {
'id': self.id,
'payment_number': self.payment_number,
'order_id': self.order_id,
'amount': float(self.amount),
'method': self.method.value,
'status': self.status.value,
'is_successful': self.is_successful,
'is_failed': self.is_failed,
'payment_details': json.loads(self.payment_details) if self.payment_details else None,
'initiated_at': self.initiated_at.isoformat() if self.initiated_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'failed_at': self.failed_at.isoformat() if self.failed_at else None,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
if include_order and self.order:
data['order'] = self.order.to_dict(include_items=False, include_customer=False)
return data
class ECommerceQueries:
"""电子商务系统查询示例"""
@staticmethod
def get_customer_orders(customer_id, status=None, limit=20, offset=0):
"""获取客户订单"""
query = Order.query.filter_by(customer_id=customer_id)
if status:
if isinstance(status, str):
status = OrderStatus(status)
query = query.filter_by(status=status)
# 总数
total = query.count()
# 分页数据
orders = query.order_by(Order.placed_at.desc()).\
offset(offset).\
limit(limit).\
options(
db.joinedload(Order.items),
db.joinedload(Order.payments)
).all()
return {
'orders': orders,
'total': total,
'limit': limit,
'offset': offset
}
@staticmethod
def get_sales_report(start_date, end_date, group_by='day'):
"""获取销售报告"""
from sqlalchemy import extract, func, text
# 基础查询
query = db.session.query(
func.date_trunc(group_by, Order.placed_at).label('period'),
func.count(Order.id).label('order_count'),
func.sum(Order.total_amount).label('total_revenue'),
func.avg(Order.total_amount).label('avg_order_value'),
func.sum(func.coalesce(Order.discount_amount, 0)).label('total_discount'),
func.count(func.distinct(Order.customer_id)).label('unique_customers')
).\
filter(
Order.placed_at >= start_date,
Order.placed_at <= end_date,
Order.status != OrderStatus.CANCELLED
).\
group_by(text('period')).\
order_by(text('period'))
result = query.all()
# 转换为字典列表
report = []
for row in result:
report.append({
'period': row.period.isoformat() if row.period else None,
'order_count': row.order_count or 0,
'total_revenue': float(row.total_revenue or 0),
'avg_order_value': float(row.avg_order_value or 0),
'total_discount': float(row.total_discount or 0),
'unique_customers': row.unique_customers or 0
})
# 汇总统计
summary = db.session.query(
func.count(Order.id).label('total_orders'),
func.sum(Order.total_amount).label('total_revenue'),
func.avg(Order.total_amount).label('avg_order_value'),
func.sum(OrderItem.quantity).label('total_items_sold')
).\
join(OrderItem).\
filter(
Order.placed_at >= start_date,
Order.placed_at <= end_date,
Order.status != OrderStatus.CANCELLED
).first()
return {
'report': report,
'summary': {
'total_orders': summary.total_orders or 0,
'total_revenue': float(summary.total_revenue or 0),
'avg_order_value': float(summary.avg_order_value or 0),
'total_items_sold': summary.total_items_sold or 0
}
}
@staticmethod
def get_product_sales_analysis(product_id=None, start_date=None, end_date=None):
"""产品销售分析"""
from sqlalchemy import func
query = db.session.query(
Product.id.label('product_id'),
Product.name.label('product_name'),
Product.sku.label('product_sku'),
func.sum(OrderItem.quantity).label('total_quantity_sold'),
func.sum(OrderItem.total_price).label('total_revenue'),
func.avg(OrderItem.unit_price).label('avg_sale_price'),
func.count(func.distinct(Order.id)).label('order_count'),
func.count(func.distinct(Order.customer_id)).label('customer_count')
).\
join(OrderItem, Product.id == OrderItem.product_id).\
join(Order, OrderItem.order_id == Order.id).\
filter(Order.status != OrderStatus.CANCELLED)
# 过滤条件
if product_id:
query = query.filter(Product.id == product_id)
if start_date:
query = query.filter(Order.placed_at >= start_date)
if end_date:
query = query.filter(Order.placed_at <= end_date)
# 分组和排序
result = query.\
group_by(Product.id, Product.name, Product.sku).\
order_by(func.sum(OrderItem.total_price).desc()).\
limit(50).all()
# 转换为字典列表
analysis = []
for row in result:
analysis.append({
'product_id': row.product_id,
'product_name': row.product_name,
'product_sku': row.product_sku,
'total_quantity_sold': row.total_quantity_sold or 0,
'total_revenue': float(row.total_revenue or 0),
'avg_sale_price': float(row.avg_sale_price or 0),
'order_count': row.order_count or 0,
'customer_count': row.customer_count or 0
})
return analysis
@staticmethod
def get_customer_lifetime_value():
"""客户生命周期价值分析"""
from sqlalchemy import func
# 计算每个客户的总消费
customer_value = db.session.query(
Order.customer_id,
User.username,
User.email,
func.count(Order.id).label('order_count'),
func.sum(Order.total_amount).label('total_spent'),
func.min(Order.placed_at).label('first_order_date'),
func.max(Order.placed_at).label('last_order_date'),
func.avg(Order.total_amount).label('avg_order_value')
).\
join(User, Order.customer_id == User.id).\
filter(Order.status != OrderStatus.CANCELLED).\
group_by(Order.customer_id, User.username, User.email).\
order_by(func.sum(Order.total_amount).desc()).\
limit(100).all()
# 计算RFM分析(最近购买时间、购买频率、消费金额)
rfm_analysis = []
now = datetime.utcnow()
for row in customer_value:
# 计算最近购买天数
recency = (now - row.last_order_date).days if row.last_order_date else None
# 计算购买频率(平均购买间隔天数)
if row.order_count > 1 and row.first_order_date and row.last_order_date:
total_days = (row.last_order_date - row.first_order_date).days
frequency = total_days / (row.order_count - 1) if row.order_count > 1 else None
else:
frequency = None
rfm_analysis.append({
'customer_id': row.customer_id,
'username': row.username,
'email': row.email,
'order_count': row.order_count,
'total_spent': float(row.total_spent or 0),
'avg_order_value': float(row.avg_order_value or 0),
'first_order_date': row.first_order_date.isoformat() if row.first_order_date else None,
'last_order_date': row.last_order_date.isoformat() if row.last_order_date else None,
'recency_days': recency,
'frequency_days': frequency,
'customer_value_segment': cls._segment_customer_value(float(row.total_spent or 0))
})
return rfm_analysis
@staticmethod
def _segment_customer_value(total_spent):
"""客户价值分段"""
if total_spent >= 10000:
return 'VIP'
elif total_spent >= 5000:
return 'Premium'
elif total_spent >= 1000:
return 'Regular'
elif total_spent >= 100:
return 'Occasional'
else:
return 'New'
@staticmethod
def get_inventory_analysis():
"""库存分析"""
from sqlalchemy import func, case
analysis = db.session.query(
Product.id,
Product.name,
Product.sku,
Product.stock_quantity,
Product.reorder_level,
func.coalesce(
func.sum(OrderItem.quantity).filter(
Order.status != OrderStatus.CANCELLED,
Order.placed_at >= datetime.utcnow() - timedelta(days=30)
), 0
).label('sales_last_30_days'),
func.coalesce(
func.sum(OrderItem.quantity).filter(
Order.status != OrderStatus.CANCELLED,
Order.placed_at >= datetime.utcnow() - timedelta(days=7)
), 0
).label('sales_last_7_days'),
case(
[
(Product.stock_quantity == 0, 'Out of Stock'),
(Product.stock_quantity <= Product.reorder_level, 'Low Stock'),
(Product.stock_quantity > Product.reorder_level, 'In Stock')
],
else_='Unknown'
).label('stock_status')
).\
outerjoin(OrderItem, Product.id == OrderItem.product_id).\
outerjoin(Order, OrderItem.order_id == Order.id).\
group_by(Product.id, Product.name, Product.sku, Product.stock_quantity, Product.reorder_level).\
order_by(
case(
[
(Product.stock_quantity == 0, 1),
(Product.stock_quantity <= Product.reorder_level, 2),
(Product.stock_quantity > Product.reorder_level, 3)
],
else_=4
),
Product.stock_quantity.asc()
).all()
# 计算建议补货数量
result = []
for row in analysis:
# 基于最近7天的销量预测补货量
daily_sales = row.sales_last_7_days / 7 if row.sales_last_7_days > 0 else 0
# 建议补货量:满足30天销量的库存
suggested_reorder = max(0, int(daily_sales * 30) - row.stock_quantity)
result.append({
'product_id': row.id,
'product_name': row.name,
'sku': row.sku,
'stock_quantity': row.stock_quantity,
'reorder_level': row.reorder_level,
'sales_last_30_days': row.sales_last_30_days,
'sales_last_7_days': row.sales_last_7_days,
'stock_status': row.stock_status,
'daily_sales_avg': round(daily_sales, 2),
'suggested_reorder': suggested_reorder,
'urgency': 'High' if row.stock_quantity == 0 else
'Medium' if row.stock_quantity <= row.reorder_level else 'Low'
})
return result
6. 实践建议与最佳实践
6.1 性能优化建议
# best_practices/performance_tips.py
"""
Flask-SQLAlchemy性能优化建议
"""
class PerformanceTips:
"""性能优化建议"""
TIPS = [
{
'title': '使用正确的加载策略',
'description': '根据数据量和关系复杂度选择合适的加载策略',
'recommendations': [
'小数据量:使用joinedload',
'大数据量:使用selectinload',
'避免N+1查询'
]
},
{
'title': '创建适当的索引',
'description': '为经常查询的字段创建索引',
'recommendations': [
'为外键字段创建索引',
'为经常用于过滤的字段创建索引',
'考虑创建复合索引'
]
},
{
'title': '优化查询语句',
'description': '编写高效的查询语句',
'recommendations': [
'只选择需要的列',
'使用批量操作',
'避免在循环中查询数据库',
'使用分页处理大数据集'
]
},
{
'title': '数据库连接管理',
'description': '合理管理数据库连接',
'recommendations': [
'使用连接池',
'及时关闭不再使用的会话',
'避免长时间持有事务'
]
},
{
'title': '缓存策略',
'description': '使用缓存减少数据库查询',
'recommendations': [
'缓存频繁查询但很少变化的数据',
'使用Redis等内存数据库做缓存',
'实现缓存失效策略'
]
}
]
@staticmethod
def get_query_optimization_checklist():
"""查询优化检查清单"""
return [
"是否使用了正确的加载策略?",
"是否只选择了需要的列?",
"是否有N+1查询问题?",
"是否创建了适当的索引?",
"是否使用了批量操作?",
"是否有不必要的重复查询?",
"是否使用了分页处理大数据集?",
"是否考虑了查询的复杂度?"
]
6.2 常见问题与解决方案
# best_practices/troubleshooting.py
"""
Flask-SQLAlchemy常见问题与解决方案
"""
class Troubleshooting:
"""常见问题与解决方案"""
COMMON_ISSUES = [
{
'issue': 'N+1查询问题',
'symptoms': ['页面加载缓慢', '数据库查询次数过多'],
'solution': '使用预加载策略(joinedload, selectinload, subqueryload)',
'example': '''
# 错误示例
products = Product.query.all()
for product in products:
reviews = product.reviews.all() # N+1查询
# 正确示例
products = Product.query.options(db.joinedload(Product.reviews)).all()
'''
},
{
'issue': '会话管理问题',
'symptoms': ['数据库连接泄露', '事务未提交', '对象状态异常'],
'solution': '正确管理数据库会话',
'example': '''
# 错误示例
db.session.add(user)
# 忘记提交
# 正确示例
try:
db.session.add(user)
db.session.commit()
except Exception as e:
db.session.rollback()
raise e
'''
},
{
'issue': '循环引用问题',
'symptoms': ['序列化时无限递归', '内存溢出'],
'solution': '使用lazy加载或在序列化时排除某些字段',
'example': '''
# 在to_dict方法中控制序列化深度
def to_dict(self, include_related=False):
data = {'id': self.id, 'name': self.name}
if include_related:
data['related'] = [item.to_dict(include_related=False) for item in self.related_items]
return data
'''
},
{
'issue': '性能瓶颈',
'symptoms': ['查询执行时间过长', '内存占用过高'],
'solution': '优化查询,使用索引,实施缓存',
'example': '''
# 使用索引
class Product(db.Model):
name = db.Column(db.String(200), index=True)
__table_args__ = (
db.Index('idx_price_stock', 'price', 'stock_quantity'),
)
'''
}
]
@staticmethod
def debug_query_performance(query):
"""调试查询性能"""
import time
# 记录开始时间
start_time = time.time()
# 执行查询
result = query.all()
# 计算耗时
elapsed = time.time() - start_time
# 获取查询语句
query_str = str(query)
# 统计结果数量
result_count = len(result)
return {
'execution_time': elapsed,
'result_count': result_count,
'query': query_str,
'performance_status': 'Good' if elapsed < 1.0 else
'Acceptable' if elapsed < 5.0 else 'Poor'
}
7. 总结
通过本博客的学习,我们深入探讨了Flask-SQLAlchemy的高级用法,包括:
-
关系建模:
- 一对一、一对多、多对多和自引用关系的实现
- 复杂模型关系的设计模式
- 关联表的创建与管理
-
复杂查询:
- 基础查询方法与高级查询技术
- 连接查询、子查询、聚合查询的实现
- 窗口函数和递归查询的使用
-
性能优化:
- N+1查询问题的识别与解决
- 预加载策略的选择与使用
- 查询性能优化技巧
-
实践应用:
- 完整的电子商务系统示例
- 实际业务场景的查询实现
- 性能监控与调试方法
关键要点
进一步学习建议
-
深入学习SQLAlchemy:
- 阅读SQLAlchemy官方文档
- 学习SQLAlchemy Core API
- 掌握数据库迁移工具
-
数据库优化:
- 学习数据库索引原理
- 掌握查询执行计划分析
- 了解数据库分区和分片
-
架构设计:
- 学习微服务架构
- 掌握数据库读写分离
- 了解数据缓存策略
-
监控与维护:
- 学习数据库性能监控
- 掌握慢查询日志分析
- 了解数据库备份与恢复
通过不断实践和学习,您将能够构建高效、可扩展的Flask应用,充分利用Flask-SQLAlchemy的强大功能。
更多推荐



所有评论(0)