FastAPI 完全指南:现代 Python Web 开发的终极选择
FastAPI是一个现代、高性能的Python Web框架,基于Starlette和Pydantic构建,具有以下核心优势:1)极致的性能表现,媲美Node.js和Go;2)自动生成交互式API文档;3)基于Python类型提示的智能开发体验;4)原生支持异步/await语法。该框架提供了完整的开发解决方案,包括路由处理、数据验证、依赖注入、数据库集成、安全认证等核心功能,并通过中间件和后台任务支
目录
引言:为什么选择 FastAPI?
FastAPI 是由 Sebastián Ramírez 于 2018 年创建的现代、高性能 Web 框架。它基于 Starlette(ASGI 工具集)和 Pydantic(数据验证库),为 Python 开发者带来了革命性的开发体验。
核心优势
| 特性 | 说明 |
|---|---|
| 极致性能 | 与 Node.js 和 Go 相当,是 Python 最快的框架之一 |
| 自动文档 | 内置 Swagger UI 和 ReDoc,零配置生成交互式 API 文档 |
| 类型安全 | 基于 Python 3.6+ 类型提示,减少 40% 的人为错误 |
| 智能编辑器支持 | 完整的自动补全和类型检查 |
| 异步原生 | 原生支持 async/await,轻松处理高并发 |
环境搭建与基础配置
安装 FastAPI
# 基础安装 pip install fastapi # 生产环境需要 ASGI 服务器 pip install "uvicorn[standard]" # 完整安装(包含所有常用依赖) pip install fastapi[all]
最小可行应用(MVP)
创建一个 main.py:
from fastapi import FastAPI
app = FastAPI(
title="My Awesome API",
description="这是一个展示 FastAPI 强大功能的示例 API",
version="1.0.0",
docs_url="/docs", # Swagger UI 路径
redoc_url="/redoc" # ReDoc 路径
)
@app.get("/")
async def root():
return {"message": "Hello FastAPI!", "framework": "FastAPI"}
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str | None = None):
"""
获取特定项目信息
- **item_id**: 项目的唯一标识符
- **q**: 可选的查询参数
"""
return {"item_id": item_id, "q": q}
启动应用
# 开发模式(热重载) uvicorn main:app --reload --host 0.0.0.0 --port 8000 # 生产模式(多 worker) uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000
启动后访问:
-
API 文档:
http://localhost:8000/docs -
替代文档:
http://localhost:8000/redoc
核心概念深度解析
1. 路径操作(Path Operations)
FastAPI 使用装饰器定义路由,支持所有 HTTP 方法:
from fastapi import FastAPI, HTTPException, status
from enum import Enum
app = FastAPI()
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
# GET 请求
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
if model_name == ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}
if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}
return {"model_name": model_name, "message": "Have some residuals"}
# POST 请求 - 创建资源
@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(item: dict):
return {"item": item, "message": "Item created successfully"}
# PUT 请求 - 完整更新
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: dict):
return {"item_id": item_id, **item}
# PATCH 请求 - 部分更新
@app.patch("/items/{item_id}")
async def partial_update(item_id: int, item: dict):
return {"item_id": item_id, "updated_fields": list(item.keys())}
# DELETE 请求
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
return {"message": f"Item {item_id} deleted"}
2. 请求参数详解
FastAPI 支持多种参数类型,自动进行验证和转换:
from fastapi import FastAPI, Query, Path, Body, File, UploadFile
from typing import Annotated
app = FastAPI()
# 查询参数(Query Parameters)
@app.get("/items/")
async def read_items(
# 必填参数
q: str,
# 可选参数(带默认值)
skip: int = 0,
limit: int = 10,
# 复杂验证
price: Annotated[
float | None,
Query(
gt=0, # 大于 0
le=1000, # 小于等于 1000
description="价格范围必须在 0-1000 之间",
alias="item-price" # 别名:?item-price=100
)
] = None,
# 列表参数:?tags=foo&tags=bar
tags: Annotated[list[str] | None, Query()] = None,
# 布尔参数:?is_active=true
is_active: bool = True
):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}], "skip": skip, "limit": limit}
if q:
results.update({"q": q})
return results
# 路径参数(Path Parameters)- 带验证
@app.get("/items/{item_id}")
async def read_item(
item_id: Annotated[
int,
Path(
title="项目 ID",
description="项目的唯一标识符",
ge=1, # 大于等于 1
le=1000
)
]
):
return {"item_id": item_id}
# 请求体(Request Body)
@app.post("/items/")
async def create_item(
# 单个请求体
item: Annotated[dict, Body()],
# 嵌套请求体
user: Annotated[dict, Body(embed=True)], # {"user": {...}}
# 额外数据
importance: Annotated[int, Body()] = 0
):
return {
"item": item,
"user": user,
"importance": importance
}
# 文件上传
@app.post("/uploadfile/")
async def create_upload_file(
file: UploadFile = File(..., description="上传的文件"),
# 多文件上传
files: list[UploadFile] = File(default=[], description="多个文件")
):
content = await file.read()
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(content),
"files_count": len(files)
}
数据验证与序列化
Pydantic 模型:FastAPI 的核心
from pydantic import BaseModel, Field, EmailStr, validator, root_validator
from typing import Optional, List
from datetime import datetime
from enum import Enum
# 基础模型
class ItemBase(BaseModel):
title: str = Field(..., min_length=3, max_length=50, description="项目标题")
description: Optional[str] = Field(
None,
max_length=300,
title="项目描述",
example="这是一个示例描述"
)
price: float = Field(..., gt=0, description="必须大于 0")
tax: Optional[float] = None
# 创建时模型(无 ID)
class ItemCreate(ItemBase):
pass
# 响应模型(包含 ID 和时间戳)
class Item(ItemBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
orm_mode = True # 支持 ORM 对象转换
schema_extra = {
"example": {
"title": "示例项目",
"description": "详细描述",
"price": 35.4,
"tax": 3.2,
"id": 1,
"created_at": "2024-01-01T00:00:00"
}
}
# 复杂验证示例
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=20, regex="^[a-zA-Z0-9_]+$")
email: EmailStr
password: str = Field(..., min_length=8)
password_confirm: str
age: Optional[int] = Field(None, ge=18, le=120)
@validator('username')
def username_must_be_unique(cls, v):
# 模拟数据库检查
forbidden = ["admin", "root", "superuser"]
if v.lower() in forbidden:
raise ValueError(f'用户名 "{v}" 不可用')
return v
@root_validator
def passwords_match(cls, values):
pw1 = values.get('password')
pw2 = values.get('password_confirm')
if pw1 != pw2:
raise ValueError('两次输入的密码不匹配')
return values
# 使用模型
from fastapi import FastAPI
app = FastAPI()
@app.post("/items/", response_model=Item, status_code=201)
async def create_item(item: ItemCreate):
# 模拟数据库操作
db_item = Item(
id=1,
created_at=datetime.now(),
**item.dict()
)
return db_item
@app.post("/users/", response_model=dict)
async def create_user(user: UserCreate):
return {
"username": user.username,
"email": user.email,
"message": "用户创建成功"
}
嵌套模型与复杂数据结构
from pydantic import BaseModel, HttpUrl
from typing import Set, List, Dict
class Image(BaseModel):
url: HttpUrl
name: str
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
tags: Set[str] = set()
images: Optional[List[Image]] = None
# 深度嵌套
metadata: Optional[Dict[str, str]] = None
# 使用示例
item_data = {
"name": "Foo",
"description": "The pretender",
"price": 42.0,
"tax": 3.2,
"tags": ["rock", "metal", "rock"], # 自动去重为 Set
"images": [
{"url": "http://example.com/baz.jpg", "name": "The Foo live"},
{"url": "http://example.com/dave.jpg", "name": "The Baz"}
],
"metadata": {"key1": "value1", "key2": "value2"}
}
依赖注入系统
FastAPI 的依赖注入(Dependency Injection)是其最强大的特性之一,支持嵌套依赖、子依赖、路径操作装饰器依赖等。
from fastapi import Depends, HTTPException, status, Header
from typing import Annotated
from functools import lru_cache
import jwt
from datetime import datetime, timedelta
# 基础依赖
async def common_parameters(
q: str | None = None,
skip: int = 0,
limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: Annotated[dict, Depends(common_parameters)]):
return commons
# 类作为依赖(更复杂的场景)
class CommonQueryParams:
def __init__(
self,
q: str | None = None,
skip: int = 0,
limit: int = 100
):
self.q = q
self.skip = skip
self.limit = limit
@app.get("/users/")
async def read_users(commons: Annotated[CommonQueryParams, Depends()]):
return commons
# 数据库依赖(实际应用模式)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: Annotated[Session, Depends(get_db)]):
# 使用 db 进行查询
return {"user_id": user_id}
# 认证依赖
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
async def get_current_user(token: Annotated[str, Header(alias="X-Token")]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
# 模拟获取用户
user = {"username": username, "id": 1}
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[dict, Depends(get_current_user)]
):
if current_user.get("disabled"):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.get("/users/me")
async def read_users_me(
current_user: Annotated[dict, Depends(get_current_active_user)]
):
return current_user
# 路径操作装饰器依赖(适用于整个路由)
async def verify_token(x_token: Annotated[str, Header()]):
if x_token != "fake-super-secret-token":
raise HTTPException(status_code=400, detail="X-Token header invalid")
async def verify_key(x_key: Annotated[str, Header()]):
if x_key != "fake-super-secret-key":
raise HTTPException(status_code=400, detail="X-Key header invalid")
return x_key
@app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)])
async def read_items():
return [{"item": "Foo"}, {"item": "Bar"}]
# 带 yield 的依赖(用于资源管理)
async def get_db_with_transaction():
db = SessionLocal()
try:
yield db
db.commit() # 成功时提交
except Exception:
db.rollback() # 失败时回滚
raise
finally:
db.close()
数据库集成
SQLAlchemy 2.0 + FastAPI 最佳实践
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, Session, joinedload
from sqlalchemy.sql import func
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from contextlib import asynccontextmanager
# 数据库配置
DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_size=10, max_overflow=20)
Base = declarative_base()
# 模型定义
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String)
is_active = Column(Integer, default=1)
created_at = Column(DateTime(timezone=True), server_default=func.now())
items = relationship("Item", back_populates="owner", cascade="all, delete-orphan")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String)
price = Column(Float)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
# Pydantic 模型
class UserBase(BaseModel):
email: str
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: int
is_active: bool
created_at: Optional[str] = None
class Config:
from_attributes = True
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
price: float
class ItemCreate(ItemBase):
pass
class ItemResponse(ItemBase):
id: int
owner_id: int
class Config:
from_attributes = True
class UserWithItems(UserResponse):
items: List[ItemResponse] = []
# 数据库会话管理
def get_db():
db = Session(bind=engine)
try:
yield db
finally:
db.close()
# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时创建表(生产环境使用 Alembic 迁移)
Base.metadata.create_all(bind=engine)
yield
# 关闭时清理
engine.dispose()
app = FastAPI(lifespan=lifespan)
# CRUD 操作
@app.post("/users/", response_model=UserResponse, status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
# 检查邮箱是否已存在
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# 创建用户(实际应用需要哈希密码)
fake_hashed_password = user.password + "notreallyhashed"
db_user = User(
email=user.email,
hashed_password=fake_hashed_password,
full_name=user.full_name
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/", response_model=List[UserResponse])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users
@app.get("/users/{user_id}", response_model=UserWithItems)
def read_user(user_id: int, db: Session = Depends(get_db)):
# 使用 joinedload 避免 N+1 查询
user = db.query(User).options(joinedload(User.items)).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users/{user_id}/items/", response_model=ItemResponse)
def create_item_for_user(
user_id: int,
item: ItemCreate,
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
db_item = Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}
异步数据库(推荐用于高并发)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.future import select
# 使用异步驱动
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/async", response_model=List[UserResponse])
async def read_users_async(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_async_db)
):
result = await db.execute(select(User).offset(skip).limit(limit))
users = result.scalars().all()
return users
认证与安全
JWT 认证完整实现
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
# 配置
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 方案
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 模型
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
# 模拟数据库
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": pwd_context.hash("secret"),
"disabled": False,
}
}
# 工具函数
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 依赖
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 路由
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
CORS 配置与安全头
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com", "https://www.example.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
expose_headers=["X-Custom-Header"],
max_age=600, # 预检请求缓存时间
)
# 受信任主机
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["example.com", "*.example.com"]
)
# Gzip 压缩
app.add_middleware(GZipMiddleware, minimum_size=1000)
# 安全头中间件
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
中间件与后台任务
自定义中间件
import time
import logging
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
# 日志配置
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# 记录请求信息
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
logger.info(f"Response time: {process_time:.4f}s - Status: {response.status_code}")
return response
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_requests: int = 100, window_seconds: int = 60):
super().__init__(app)
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
current_time = time.time()
# 清理过期记录
self.requests = {
ip: [t for t in times if current_time - t < self.window_seconds]
for ip, times in self.requests.items()
}
# 检查限制
if len(self.requests.get(client_ip, [])) >= self.max_requests:
from fastapi.responses import JSONResponse
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"}
)
# 记录请求
if client_ip not in self.requests:
self.requests[client_ip] = []
self.requests[client_ip].append(current_time)
return await call_next(request)
app.add_middleware(TimingMiddleware)
app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)
后台任务
from fastapi import BackgroundTasks, Depends
from typing import Annotated
import asyncio
import smtplib
from email.mime.text import MIMEText
def send_email(email_to: str, subject: str, body: str):
"""模拟发送邮件(实际应用中配置 SMTP)"""
print(f"Sending email to {email_to}")
# smtp_server = smtplib.SMTP("smtp.gmail.com", 587)
# ...
async def process_large_file(file_path: str):
"""异步处理大文件"""
await asyncio.sleep(10) # 模拟长时间处理
print(f"File {file_path} processed")
@app.post("/send-notification/{email}")
async def send_notification(
email: str,
background_tasks: BackgroundTasks,
message: str = "Hello from FastAPI"
):
background_tasks.add_task(send_email, email, "Notification", message)
return {"message": "Notification sent in the background"}
@app.post("/upload/")
async def upload_file(
background_tasks: BackgroundTasks,
file: UploadFile = File(...)
):
file_path = f"/tmp/{file.filename}"
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# 添加后台任务
background_tasks.add_task(process_large_file, file_path)
return {
"filename": file.filename,
"status": "File uploaded, processing in background"
}
# 使用 Celery 进行更复杂的后台任务(生产环境推荐)
"""
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def heavy_computation(data: dict):
import time
time.sleep(30)
return {"result": "completed", "data": data}
@app.post("/heavy-task/")
async def trigger_heavy_task(data: dict):
task = heavy_computation.delay(data)
return {"task_id": task.id, "status": "processing"}
"""
测试与部署
测试 FastAPI 应用
# test_main.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from main import app, get_db, Base
# 内存数据库用于测试
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
@pytest.fixture(scope="function")
def setup_database():
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
def test_create_user(setup_database):
response = client.post(
"/users/",
json={"email": "test@example.com", "password": "testpass123", "full_name": "Test User"}
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "id" in data
def test_read_user(setup_database):
# 先创建用户
client.post(
"/users/",
json={"email": "test2@example.com", "password": "testpass123"}
)
response = client.get("/users/1")
assert response.status_code == 200
assert response.json()["email"] == "test2@example.com"
def test_read_nonexistent_user(setup_database):
response = client.get("/users/999")
assert response.status_code == 404
def test_invalid_user_data(setup_database):
response = client.post(
"/users/",
json={"email": "invalid-email", "password": "short"}
)
assert response.status_code == 422 # 验证错误
# 异步测试
import httpx
import pytest_asyncio
@pytest_asyncio.fixture
async def async_client():
async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.mark.asyncio
async def test_async_endpoint(async_client):
response = await async_client.get("/")
assert response.status_code == 200
Docker 部署
# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 非 root 用户运行 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # 暴露端口 EXPOSE 8000 # 启动命令(生产环境使用 gunicorn + uvicorn) CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db
- SECRET_KEY=${SECRET_KEY}
depends_on:
- db
- redis
volumes:
- ./:/app
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
db:
image: postgres:15
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
- POSTGRES_DB=fastapi_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
postgres_data:
Kubernetes 部署配置
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
spec:
replicas: 3
selector:
matchLabels:
app: fastapi
template:
metadata:
labels:
app: fastapi
spec:
containers:
- name: fastapi
image: your-registry/fastapi-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fastapi-service
spec:
selector:
app: fastapi
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
性能优化最佳实践
1. 异步数据库连接池
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker import aioredis # 优化连接池配置 async_engine = create_async_engine( "postgresql+asyncpg://user:pass@localhost/db", pool_size=20, # 基础连接数 max_overflow=10, # 最大溢出连接 pool_pre_ping=True, # 连接前 ping 检查 pool_recycle=3600, # 连接回收时间 echo=False # 生产环境关闭 SQL 日志 ) # Redis 连接池 redis_pool = aioredis.ConnectionPool.from_url( "redis://localhost", max_connections=100, decode_responses=True )
2. 缓存策略
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
@app.on_event("startup")
async def startup():
redis = aioredis.from_url("redis://localhost", encoding="utf8", decode_responses=True)
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
@app.get("/heavy-computation")
@cache(expire=3600) # 缓存 1 小时
async def heavy_computation():
# 模拟耗时操作
await asyncio.sleep(2)
return {"result": "expensive data"}
# 手动缓存控制
from fastapi_cache.coder import JsonCoder
@app.get("/users/{user_id}")
async def get_user(user_id: int):
cache_key = f"user:{user_id}"
# 尝试从缓存获取
cached = await FastAPICache.get_backend().get(cache_key)
if cached:
return json.loads(cached)
# 查询数据库
user = await fetch_user_from_db(user_id)
# 写入缓存
await FastAPICache.get_backend().set(
cache_key,
json.dumps(user),
expire=300
)
return user
3. 响应模型优化
from fastapi import Response
from fastapi.responses import ORJSONResponse, StreamingResponse
import orjson
# 使用更快的 JSON 库
@app.get("/items/", response_class=ORJSONResponse)
async def get_items():
return [{"id": i, "data": "value"} for i in range(1000)]
# 流式响应(大文件)
@app.get("/large-file/")
async def get_large_file():
def file_generator():
with open("large_file.zip", "rb") as f:
while chunk := f.read(8192):
yield chunk
return StreamingResponse(
file_generator(),
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=large_file.zip"}
)
# 分页优化
from fastapi import Query
from sqlalchemy import func
@app.get("/items/paginated/")
async def get_items_paginated(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_async_db)
):
offset = (page - 1) * size
# 使用 count(*) OVER() 优化总数查询
result = await db.execute(
select(
Item,
func.count().over().label("total")
)
.offset(offset)
.limit(size)
)
rows = result.all()
total = rows[0].total if rows else 0
return {
"items": [row.Item for row in rows],
"total": total,
"page": page,
"size": size,
"pages": (total + size - 1) // size
}
4. 性能监控
from prometheus_client import Counter, Histogram, generate_latest
from starlette.middleware.base import BaseHTTPMiddleware
# Prometheus 指标
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
class PrometheusMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
app.add_middleware(PrometheusMiddleware)
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
总结
FastAPI 代表了 Python Web 开发的现代化方向,它通过以下特性成为构建高性能 API 的首选框架:
-
开发效率:类型提示带来的智能补全和自动文档生成
-
运行时性能:异步原生设计,媲美 Node.js 和 Go
-
工程规范:内置数据验证、依赖注入、认证授权等生产级功能
-
生态兼容:与 SQLAlchemy、Pydantic、Celery 等主流库无缝集成
学习路径建议
-
入门:掌握基础路由、请求参数、Pydantic 模型
-
进阶:深入依赖注入系统、数据库集成、异步编程
-
生产:学习 Docker 部署、Kubernetes 编排、监控告警
-
架构:微服务拆分、事件驱动架构、CQRS 模式
FastAPI 不仅是一个框架,更是一种现代 Python 工程实践的体现。无论你是构建小型微服务还是大型分布式系统,FastAPI 都能提供坚实的技术支撑。
本文代码示例基于 FastAPI 0.104+ 和 Python 3.11+,建议在实际项目中使用最新稳定版本。
更多推荐


所有评论(0)