目录

  1. 引言:为什么选择 FastAPI?

  2. 环境搭建与基础配置

  3. 核心概念深度解析

  4. 路由与请求处理

  5. 数据验证与序列化

  6. 依赖注入系统

  7. 数据库集成

  8. 认证与安全

  9. 中间件与后台任务

  10. 测试与部署

  11. 性能优化最佳实践


引言:为什么选择 FastAPI?

FastAPI 是由 Sebastián Ramírez 于 2018 年创建的现代、高性能 Web 框架。它基于 Starlette(ASGI 工具集)和 Pydantic(数据验证库),为 Python 开发者带来了革命性的开发体验。

核心优势

特性 说明
极致性能 与 Node.js 和 Go 相当,是 Python 最快的框架之一
自动文档 内置 Swagger UI 和 ReDoc,零配置生成交互式 API 文档
类型安全 基于 Python 3.6+ 类型提示,减少 40% 的人为错误
智能编辑器支持 完整的自动补全和类型检查
异步原生 原生支持 async/await,轻松处理高并发

环境搭建与基础配置

安装 FastAPI

  # 基础安装
  pip install fastapi
  ​
  # 生产环境需要 ASGI 服务器
  pip install "uvicorn[standard]"
  ​
  # 完整安装(包含所有常用依赖)
  pip install fastapi[all]

最小可行应用(MVP)

创建一个 main.py

  from fastapi import FastAPI
  ​
  app = FastAPI(
      title="My Awesome API",
      description="这是一个展示 FastAPI 强大功能的示例 API",
      version="1.0.0",
      docs_url="/docs",      # Swagger UI 路径
      redoc_url="/redoc"     # ReDoc 路径
  )
  ​
  @app.get("/")
  async def root():
      return {"message": "Hello FastAPI!", "framework": "FastAPI"}
  ​
  @app.get("/items/{item_id}")
  async def read_item(item_id: int, q: str | None = None):
      """
      获取特定项目信息
      
      - **item_id**: 项目的唯一标识符
      - **q**: 可选的查询参数
      """
      return {"item_id": item_id, "q": q}

启动应用

  # 开发模式(热重载)
  uvicorn main:app --reload --host 0.0.0.0 --port 8000
  ​
  # 生产模式(多 worker)
  uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000

启动后访问:

  • API 文档:http://localhost:8000/docs

  • 替代文档:http://localhost:8000/redoc


核心概念深度解析

1. 路径操作(Path Operations)

FastAPI 使用装饰器定义路由,支持所有 HTTP 方法:

  from fastapi import FastAPI, HTTPException, status
  from enum import Enum
  ​
  app = FastAPI()
  ​
  class ModelName(str, Enum):
      alexnet = "alexnet"
      resnet = "resnet"
      lenet = "lenet"
  ​
  # GET 请求
  @app.get("/models/{model_name}")
  async def get_model(model_name: ModelName):
      if model_name == ModelName.alexnet:
          return {"model_name": model_name, "message": "Deep Learning FTW!"}
      if model_name.value == "lenet":
          return {"model_name": model_name, "message": "LeCNN all the images"}
      return {"model_name": model_name, "message": "Have some residuals"}
  ​
  # POST 请求 - 创建资源
  @app.post("/items/", status_code=status.HTTP_201_CREATED)
  async def create_item(item: dict):
      return {"item": item, "message": "Item created successfully"}
  ​
  # PUT 请求 - 完整更新
  @app.put("/items/{item_id}")
  async def update_item(item_id: int, item: dict):
      return {"item_id": item_id, **item}
  ​
  # PATCH 请求 - 部分更新
  @app.patch("/items/{item_id}")
  async def partial_update(item_id: int, item: dict):
      return {"item_id": item_id, "updated_fields": list(item.keys())}
  ​
  # DELETE 请求
  @app.delete("/items/{item_id}")
  async def delete_item(item_id: int):
      return {"message": f"Item {item_id} deleted"}

2. 请求参数详解

FastAPI 支持多种参数类型,自动进行验证和转换:

  from fastapi import FastAPI, Query, Path, Body, File, UploadFile
  from typing import Annotated
  ​
  app = FastAPI()
  ​
  # 查询参数(Query Parameters)
  @app.get("/items/")
  async def read_items(
      # 必填参数
      q: str,
      # 可选参数(带默认值)
      skip: int = 0,
      limit: int = 10,
      # 复杂验证
      price: Annotated[
          float | None,
          Query(
              gt=0,           # 大于 0
              le=1000,        # 小于等于 1000
              description="价格范围必须在 0-1000 之间",
              alias="item-price"  # 别名:?item-price=100
          )
      ] = None,
      # 列表参数:?tags=foo&tags=bar
      tags: Annotated[list[str] | None, Query()] = None,
      # 布尔参数:?is_active=true
      is_active: bool = True
  ):
      results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}], "skip": skip, "limit": limit}
      if q:
          results.update({"q": q})
      return results
  ​
  # 路径参数(Path Parameters)- 带验证
  @app.get("/items/{item_id}")
  async def read_item(
      item_id: Annotated[
          int,
          Path(
              title="项目 ID",
              description="项目的唯一标识符",
              ge=1,  # 大于等于 1
              le=1000
          )
      ]
  ):
      return {"item_id": item_id}
  ​
  # 请求体(Request Body)
  @app.post("/items/")
  async def create_item(
      # 单个请求体
      item: Annotated[dict, Body()],
      # 嵌套请求体
      user: Annotated[dict, Body(embed=True)],  # {"user": {...}}
      # 额外数据
      importance: Annotated[int, Body()] = 0
  ):
      return {
          "item": item,
          "user": user,
          "importance": importance
      }
  ​
  # 文件上传
  @app.post("/uploadfile/")
  async def create_upload_file(
      file: UploadFile = File(..., description="上传的文件"),
      # 多文件上传
      files: list[UploadFile] = File(default=[], description="多个文件")
  ):
      content = await file.read()
      return {
          "filename": file.filename,
          "content_type": file.content_type,
          "size": len(content),
          "files_count": len(files)
      }

数据验证与序列化

Pydantic 模型:FastAPI 的核心

  from pydantic import BaseModel, Field, EmailStr, validator, root_validator
  from typing import Optional, List
  from datetime import datetime
  from enum import Enum
  ​
  # 基础模型
  class ItemBase(BaseModel):
      title: str = Field(..., min_length=3, max_length=50, description="项目标题")
      description: Optional[str] = Field(
          None, 
          max_length=300,
          title="项目描述",
          example="这是一个示例描述"
      )
      price: float = Field(..., gt=0, description="必须大于 0")
      tax: Optional[float] = None
  ​
  # 创建时模型(无 ID)
  class ItemCreate(ItemBase):
      pass
  ​
  # 响应模型(包含 ID 和时间戳)
  class Item(ItemBase):
      id: int
      created_at: datetime
      updated_at: Optional[datetime] = None
      
      class Config:
          orm_mode = True  # 支持 ORM 对象转换
          schema_extra = {
              "example": {
                  "title": "示例项目",
                  "description": "详细描述",
                  "price": 35.4,
                  "tax": 3.2,
                  "id": 1,
                  "created_at": "2024-01-01T00:00:00"
              }
          }
  ​
  # 复杂验证示例
  class UserCreate(BaseModel):
      username: str = Field(..., min_length=3, max_length=20, regex="^[a-zA-Z0-9_]+$")
      email: EmailStr
      password: str = Field(..., min_length=8)
      password_confirm: str
      age: Optional[int] = Field(None, ge=18, le=120)
      
      @validator('username')
      def username_must_be_unique(cls, v):
          # 模拟数据库检查
          forbidden = ["admin", "root", "superuser"]
          if v.lower() in forbidden:
              raise ValueError(f'用户名 "{v}" 不可用')
          return v
      
      @root_validator
      def passwords_match(cls, values):
          pw1 = values.get('password')
          pw2 = values.get('password_confirm')
          if pw1 != pw2:
              raise ValueError('两次输入的密码不匹配')
          return values
  ​
  # 使用模型
  from fastapi import FastAPI
  ​
  app = FastAPI()
  ​
  @app.post("/items/", response_model=Item, status_code=201)
  async def create_item(item: ItemCreate):
      # 模拟数据库操作
      db_item = Item(
          id=1,
          created_at=datetime.now(),
          **item.dict()
      )
      return db_item
  ​
  @app.post("/users/", response_model=dict)
  async def create_user(user: UserCreate):
      return {
          "username": user.username,
          "email": user.email,
          "message": "用户创建成功"
      }

嵌套模型与复杂数据结构

  from pydantic import BaseModel, HttpUrl
  from typing import Set, List, Dict
  ​
  class Image(BaseModel):
      url: HttpUrl
      name: str
  ​
  class Item(BaseModel):
      name: str
      description: Optional[str] = None
      price: float
      tax: Optional[float] = None
      tags: Set[str] = set()
      images: Optional[List[Image]] = None
      
      # 深度嵌套
      metadata: Optional[Dict[str, str]] = None
  ​
  # 使用示例
  item_data = {
      "name": "Foo",
      "description": "The pretender",
      "price": 42.0,
      "tax": 3.2,
      "tags": ["rock", "metal", "rock"],  # 自动去重为 Set
      "images": [
          {"url": "http://example.com/baz.jpg", "name": "The Foo live"},
          {"url": "http://example.com/dave.jpg", "name": "The Baz"}
      ],
      "metadata": {"key1": "value1", "key2": "value2"}
  }

依赖注入系统

FastAPI 的依赖注入(Dependency Injection)是其最强大的特性之一,支持嵌套依赖、子依赖、路径操作装饰器依赖等。

  from fastapi import Depends, HTTPException, status, Header
  from typing import Annotated
  from functools import lru_cache
  import jwt
  from datetime import datetime, timedelta
  ​
  # 基础依赖
  async def common_parameters(
      q: str | None = None, 
      skip: int = 0, 
      limit: int = 100
  ):
      return {"q": q, "skip": skip, "limit": limit}
  ​
  @app.get("/items/")
  async def read_items(commons: Annotated[dict, Depends(common_parameters)]):
      return commons
  ​
  # 类作为依赖(更复杂的场景)
  class CommonQueryParams:
      def __init__(
          self,
          q: str | None = None,
          skip: int = 0,
          limit: int = 100
      ):
          self.q = q
          self.skip = skip
          self.limit = limit
  ​
  @app.get("/users/")
  async def read_users(commons: Annotated[CommonQueryParams, Depends()]):
      return commons
  ​
  # 数据库依赖(实际应用模式)
  from sqlalchemy import create_engine
  from sqlalchemy.orm import sessionmaker, Session
  ​
  SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
  engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
  SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  ​
  def get_db():
      db = SessionLocal()
      try:
          yield db
      finally:
          db.close()
  ​
  @app.get("/users/{user_id}")
  async def read_user(user_id: int, db: Annotated[Session, Depends(get_db)]):
      # 使用 db 进行查询
      return {"user_id": user_id}
  ​
  # 认证依赖
  SECRET_KEY = "your-secret-key"
  ALGORITHM = "HS256"
  ​
  async def get_current_user(token: Annotated[str, Header(alias="X-Token")]):
      credentials_exception = HTTPException(
          status_code=status.HTTP_401_UNAUTHORIZED,
          detail="Could not validate credentials",
          headers={"WWW-Authenticate": "Bearer"},
      )
      try:
          payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
          username: str = payload.get("sub")
          if username is None:
              raise credentials_exception
      except jwt.PyJWTError:
          raise credentials_exception
      
      # 模拟获取用户
      user = {"username": username, "id": 1}
      if user is None:
          raise credentials_exception
      return user
  ​
  async def get_current_active_user(
      current_user: Annotated[dict, Depends(get_current_user)]
  ):
      if current_user.get("disabled"):
          raise HTTPException(status_code=400, detail="Inactive user")
      return current_user
  ​
  @app.get("/users/me")
  async def read_users_me(
      current_user: Annotated[dict, Depends(get_current_active_user)]
  ):
      return current_user
  ​
  # 路径操作装饰器依赖(适用于整个路由)
  async def verify_token(x_token: Annotated[str, Header()]):
      if x_token != "fake-super-secret-token":
          raise HTTPException(status_code=400, detail="X-Token header invalid")
  ​
  async def verify_key(x_key: Annotated[str, Header()]):
      if x_key != "fake-super-secret-key":
          raise HTTPException(status_code=400, detail="X-Key header invalid")
      return x_key
  ​
  @app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)])
  async def read_items():
      return [{"item": "Foo"}, {"item": "Bar"}]
  ​
  # 带 yield 的依赖(用于资源管理)
  async def get_db_with_transaction():
      db = SessionLocal()
      try:
          yield db
          db.commit()  # 成功时提交
      except Exception:
          db.rollback()  # 失败时回滚
          raise
      finally:
          db.close()

数据库集成

SQLAlchemy 2.0 + FastAPI 最佳实践

  from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
  from sqlalchemy.orm import declarative_base, relationship, Session, joinedload
  from sqlalchemy.sql import func
  from fastapi import FastAPI, Depends, HTTPException
  from pydantic import BaseModel
  from typing import List, Optional
  from contextlib import asynccontextmanager
  ​
  # 数据库配置
  DATABASE_URL = "postgresql://user:password@localhost/dbname"
  engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_size=10, max_overflow=20)
  Base = declarative_base()
  ​
  # 模型定义
  class User(Base):
      __tablename__ = "users"
      
      id = Column(Integer, primary_key=True, index=True)
      email = Column(String, unique=True, index=True, nullable=False)
      hashed_password = Column(String, nullable=False)
      full_name = Column(String)
      is_active = Column(Integer, default=1)
      created_at = Column(DateTime(timezone=True), server_default=func.now())
      
      items = relationship("Item", back_populates="owner", cascade="all, delete-orphan")
  ​
  class Item(Base):
      __tablename__ = "items"
      
      id = Column(Integer, primary_key=True, index=True)
      title = Column(String, index=True)
      description = Column(String)
      price = Column(Float)
      owner_id = Column(Integer, ForeignKey("users.id"))
      
      owner = relationship("User", back_populates="items")
  ​
  # Pydantic 模型
  class UserBase(BaseModel):
      email: str
      full_name: Optional[str] = None
  ​
  class UserCreate(UserBase):
      password: str
  ​
  class UserResponse(UserBase):
      id: int
      is_active: bool
      created_at: Optional[str] = None
      
      class Config:
          from_attributes = True
  ​
  class ItemBase(BaseModel):
      title: str
      description: Optional[str] = None
      price: float
  ​
  class ItemCreate(ItemBase):
      pass
  ​
  class ItemResponse(ItemBase):
      id: int
      owner_id: int
      
      class Config:
          from_attributes = True
  ​
  class UserWithItems(UserResponse):
      items: List[ItemResponse] = []
  ​
  # 数据库会话管理
  def get_db():
      db = Session(bind=engine)
      try:
          yield db
      finally:
          db.close()
  ​
  # 生命周期管理
  @asynccontextmanager
  async def lifespan(app: FastAPI):
      # 启动时创建表(生产环境使用 Alembic 迁移)
      Base.metadata.create_all(bind=engine)
      yield
      # 关闭时清理
      engine.dispose()
  ​
  app = FastAPI(lifespan=lifespan)
  ​
  # CRUD 操作
  @app.post("/users/", response_model=UserResponse, status_code=201)
  def create_user(user: UserCreate, db: Session = Depends(get_db)):
      # 检查邮箱是否已存在
      db_user = db.query(User).filter(User.email == user.email).first()
      if db_user:
          raise HTTPException(status_code=400, detail="Email already registered")
      
      # 创建用户(实际应用需要哈希密码)
      fake_hashed_password = user.password + "notreallyhashed"
      db_user = User(
          email=user.email,
          hashed_password=fake_hashed_password,
          full_name=user.full_name
      )
      db.add(db_user)
      db.commit()
      db.refresh(db_user)
      return db_user
  ​
  @app.get("/users/", response_model=List[UserResponse])
  def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
      users = db.query(User).offset(skip).limit(limit).all()
      return users
  ​
  @app.get("/users/{user_id}", response_model=UserWithItems)
  def read_user(user_id: int, db: Session = Depends(get_db)):
      # 使用 joinedload 避免 N+1 查询
      user = db.query(User).options(joinedload(User.items)).filter(User.id == user_id).first()
      if user is None:
          raise HTTPException(status_code=404, detail="User not found")
      return user
  ​
  @app.post("/users/{user_id}/items/", response_model=ItemResponse)
  def create_item_for_user(
      user_id: int, 
      item: ItemCreate, 
      db: Session = Depends(get_db)
  ):
      db_user = db.query(User).filter(User.id == user_id).first()
      if not db_user:
          raise HTTPException(status_code=404, detail="User not found")
      
      db_item = Item(**item.dict(), owner_id=user_id)
      db.add(db_item)
      db.commit()
      db.refresh(db_item)
      return db_item
  ​
  @app.delete("/users/{user_id}")
  def delete_user(user_id: int, db: Session = Depends(get_db)):
      user = db.query(User).filter(User.id == user_id).first()
      if not user:
          raise HTTPException(status_code=404, detail="User not found")
      
      db.delete(user)
      db.commit()
      return {"message": "User deleted successfully"}

异步数据库(推荐用于高并发)

  from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
  from sqlalchemy.future import select
  ​
  # 使用异步驱动
  ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
  async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)
  AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
  ​
  async def get_async_db():
      async with AsyncSessionLocal() as session:
          yield session
  ​
  @app.get("/users/async", response_model=List[UserResponse])
  async def read_users_async(
      skip: int = 0, 
      limit: int = 100, 
      db: AsyncSession = Depends(get_async_db)
  ):
      result = await db.execute(select(User).offset(skip).limit(limit))
      users = result.scalars().all()
      return users

认证与安全

JWT 认证完整实现

  from datetime import datetime, timedelta
  from typing import Optional
  from jose import JWTError, jwt
  from passlib.context import CryptContext
  from fastapi import Depends, HTTPException, status
  from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  from pydantic import BaseModel
  ​
  # 配置
  SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
  ALGORITHM = "HS256"
  ACCESS_TOKEN_EXPIRE_MINUTES = 30
  ​
  # 密码加密上下文
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  ​
  # OAuth2 方案
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  ​
  # 模型
  class Token(BaseModel):
      access_token: str
      token_type: str
  ​
  class TokenData(BaseModel):
      username: Optional[str] = None
  ​
  class User(BaseModel):
      username: str
      email: Optional[str] = None
      full_name: Optional[str] = None
      disabled: Optional[bool] = None
  ​
  class UserInDB(User):
      hashed_password: str
  ​
  # 模拟数据库
  fake_users_db = {
      "johndoe": {
          "username": "johndoe",
          "full_name": "John Doe",
          "email": "johndoe@example.com",
          "hashed_password": pwd_context.hash("secret"),
          "disabled": False,
      }
  }
  ​
  # 工具函数
  def verify_password(plain_password, hashed_password):
      return pwd_context.verify(plain_password, hashed_password)
  ​
  def get_password_hash(password):
      return pwd_context.hash(password)
  ​
  def get_user(db, username: str):
      if username in db:
          user_dict = db[username]
          return UserInDB(**user_dict)
  ​
  def authenticate_user(fake_db, username: str, password: str):
      user = get_user(fake_db, username)
      if not user:
          return False
      if not verify_password(password, user.hashed_password):
          return False
      return user
  ​
  def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
      to_encode = data.copy()
      if expires_delta:
          expire = datetime.utcnow() + expires_delta
      else:
          expire = datetime.utcnow() + timedelta(minutes=15)
      to_encode.update({"exp": expire})
      encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
      return encoded_jwt
  ​
  # 依赖
  async def get_current_user(token: str = Depends(oauth2_scheme)):
      credentials_exception = HTTPException(
          status_code=status.HTTP_401_UNAUTHORIZED,
          detail="Could not validate credentials",
          headers={"WWW-Authenticate": "Bearer"},
      )
      try:
          payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
          username: str = payload.get("sub")
          if username is None:
              raise credentials_exception
          token_data = TokenData(username=username)
      except JWTError:
          raise credentials_exception
      user = get_user(fake_users_db, username=token_data.username)
      if user is None:
          raise credentials_exception
      return user
  ​
  async def get_current_active_user(current_user: User = Depends(get_current_user)):
      if current_user.disabled:
          raise HTTPException(status_code=400, detail="Inactive user")
      return current_user
  ​
  # 路由
  @app.post("/token", response_model=Token)
  async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
      user = authenticate_user(fake_users_db, form_data.username, form_data.password)
      if not user:
          raise HTTPException(
              status_code=status.HTTP_401_UNAUTHORIZED,
              detail="Incorrect username or password",
              headers={"WWW-Authenticate": "Bearer"},
          )
      access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
      access_token = create_access_token(
          data={"sub": user.username}, expires_delta=access_token_expires
      )
      return {"access_token": access_token, "token_type": "bearer"}
  ​
  @app.get("/users/me/", response_model=User)
  async def read_users_me(current_user: User = Depends(get_current_active_user)):
      return current_user
  ​
  @app.get("/users/me/items/")
  async def read_own_items(current_user: User = Depends(get_current_active_user)):
      return [{"item_id": "Foo", "owner": current_user.username}]

CORS 配置与安全头

  from fastapi.middleware.cors import CORSMiddleware
  from fastapi.middleware.trustedhost import TrustedHostMiddleware
  from fastapi.middleware.gzip import GZipMiddleware
  ​
  app = FastAPI()
  ​
  # CORS 配置
  app.add_middleware(
      CORSMiddleware,
      allow_origins=["https://example.com", "https://www.example.com"],
      allow_credentials=True,
      allow_methods=["GET", "POST", "PUT", "DELETE"],
      allow_headers=["*"],
      expose_headers=["X-Custom-Header"],
      max_age=600,  # 预检请求缓存时间
  )
  ​
  # 受信任主机
  app.add_middleware(
      TrustedHostMiddleware, 
      allowed_hosts=["example.com", "*.example.com"]
  )
  ​
  # Gzip 压缩
  app.add_middleware(GZipMiddleware, minimum_size=1000)
  ​
  # 安全头中间件
  @app.middleware("http")
  async def add_security_headers(request, call_next):
      response = await call_next(request)
      response.headers["X-Content-Type-Options"] = "nosniff"
      response.headers["X-Frame-Options"] = "DENY"
      response.headers["X-XSS-Protection"] = "1; mode=block"
      response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
      return response

中间件与后台任务

自定义中间件

  import time
  import logging
  from fastapi import Request
  from starlette.middleware.base import BaseHTTPMiddleware
  ​
  # 日志配置
  logging.basicConfig(level=logging.INFO)
  logger = logging.getLogger(__name__)
  ​
  class TimingMiddleware(BaseHTTPMiddleware):
      async def dispatch(self, request: Request, call_next):
          start_time = time.time()
          
          # 记录请求信息
          logger.info(f"Request: {request.method} {request.url}")
          
          response = await call_next(request)
          
          process_time = time.time() - start_time
          response.headers["X-Process-Time"] = str(process_time)
          
          logger.info(f"Response time: {process_time:.4f}s - Status: {response.status_code}")
          return response
  ​
  class RateLimitMiddleware(BaseHTTPMiddleware):
      def __init__(self, app, max_requests: int = 100, window_seconds: int = 60):
          super().__init__(app)
          self.max_requests = max_requests
          self.window_seconds = window_seconds
          self.requests = {}
      
      async def dispatch(self, request: Request, call_next):
          client_ip = request.client.host
          current_time = time.time()
          
          # 清理过期记录
          self.requests = {
              ip: [t for t in times if current_time - t < self.window_seconds]
              for ip, times in self.requests.items()
          }
          
          # 检查限制
          if len(self.requests.get(client_ip, [])) >= self.max_requests:
              from fastapi.responses import JSONResponse
              return JSONResponse(
                  status_code=429,
                  content={"detail": "Rate limit exceeded"}
              )
          
          # 记录请求
          if client_ip not in self.requests:
              self.requests[client_ip] = []
          self.requests[client_ip].append(current_time)
          
          return await call_next(request)
  ​
  app.add_middleware(TimingMiddleware)
  app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)

后台任务

  from fastapi import BackgroundTasks, Depends
  from typing import Annotated
  import asyncio
  import smtplib
  from email.mime.text import MIMEText
  ​
  def send_email(email_to: str, subject: str, body: str):
      """模拟发送邮件(实际应用中配置 SMTP)"""
      print(f"Sending email to {email_to}")
      # smtp_server = smtplib.SMTP("smtp.gmail.com", 587)
      # ...
  ​
  async def process_large_file(file_path: str):
      """异步处理大文件"""
      await asyncio.sleep(10)  # 模拟长时间处理
      print(f"File {file_path} processed")
  ​
  @app.post("/send-notification/{email}")
  async def send_notification(
      email: str,
      background_tasks: BackgroundTasks,
      message: str = "Hello from FastAPI"
  ):
      background_tasks.add_task(send_email, email, "Notification", message)
      return {"message": "Notification sent in the background"}
  ​
  @app.post("/upload/")
  async def upload_file(
      background_tasks: BackgroundTasks,
      file: UploadFile = File(...)
  ):
      file_path = f"/tmp/{file.filename}"
      with open(file_path, "wb") as f:
          content = await file.read()
          f.write(content)
      
      # 添加后台任务
      background_tasks.add_task(process_large_file, file_path)
      
      return {
          "filename": file.filename,
          "status": "File uploaded, processing in background"
      }
  ​
  # 使用 Celery 进行更复杂的后台任务(生产环境推荐)
  """
  from celery import Celery
  ​
  celery_app = Celery(
      "tasks",
      broker="redis://localhost:6379/0",
      backend="redis://localhost:6379/0"
  )
  ​
  @celery_app.task
  def heavy_computation(data: dict):
      import time
      time.sleep(30)
      return {"result": "completed", "data": data}
  ​
  @app.post("/heavy-task/")
  async def trigger_heavy_task(data: dict):
      task = heavy_computation.delay(data)
      return {"task_id": task.id, "status": "processing"}
  """

测试与部署

测试 FastAPI 应用

  # test_main.py
  import pytest
  from fastapi.testclient import TestClient
  from sqlalchemy import create_engine
  from sqlalchemy.orm import sessionmaker
  from sqlalchemy.pool import StaticPool
  ​
  from main import app, get_db, Base
  ​
  # 内存数据库用于测试
  SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
  ​
  engine = create_engine(
      SQLALCHEMY_DATABASE_URL,
      connect_args={"check_same_thread": False},
      poolclass=StaticPool,
  )
  TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  ​
  def override_get_db():
      try:
          db = TestingSessionLocal()
          yield db
      finally:
          db.close()
  ​
  app.dependency_overrides[get_db] = override_get_db
  client = TestClient(app)
  ​
  @pytest.fixture(scope="function")
  def setup_database():
      Base.metadata.create_all(bind=engine)
      yield
      Base.metadata.drop_all(bind=engine)
  ​
  def test_create_user(setup_database):
      response = client.post(
          "/users/",
          json={"email": "test@example.com", "password": "testpass123", "full_name": "Test User"}
      )
      assert response.status_code == 201
      data = response.json()
      assert data["email"] == "test@example.com"
      assert "id" in data
  ​
  def test_read_user(setup_database):
      # 先创建用户
      client.post(
          "/users/",
          json={"email": "test2@example.com", "password": "testpass123"}
      )
      
      response = client.get("/users/1")
      assert response.status_code == 200
      assert response.json()["email"] == "test2@example.com"
  ​
  def test_read_nonexistent_user(setup_database):
      response = client.get("/users/999")
      assert response.status_code == 404
  ​
  def test_invalid_user_data(setup_database):
      response = client.post(
          "/users/",
          json={"email": "invalid-email", "password": "short"}
      )
      assert response.status_code == 422  # 验证错误
  ​
  # 异步测试
  import httpx
  import pytest_asyncio
  ​
  @pytest_asyncio.fixture
  async def async_client():
      async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
          yield ac
  ​
  @pytest.mark.asyncio
  async def test_async_endpoint(async_client):
      response = await async_client.get("/")
      assert response.status_code == 200

Docker 部署

  # Dockerfile
  FROM python:3.11-slim
  ​
  WORKDIR /app
  ​
  # 安装依赖
  COPY requirements.txt .
  RUN pip install --no-cache-dir -r requirements.txt
  ​
  # 复制应用代码
  COPY . .
  ​
  # 非 root 用户运行
  RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
  USER appuser
  ​
  # 暴露端口
  EXPOSE 8000
  ​
  # 启动命令(生产环境使用 gunicorn + uvicorn)
  CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]
  # docker-compose.yml
  version: '3.8'
  ​
  services:
    web:
      build: .
      ports:
        - "8000:8000"
      environment:
        - DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db
        - SECRET_KEY=${SECRET_KEY}
      depends_on:
        - db
        - redis
      volumes:
        - ./:/app
      command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
  ​
    db:
      image: postgres:15
      environment:
        - POSTGRES_USER=postgres
        - POSTGRES_PASSWORD=password
        - POSTGRES_DB=fastapi_db
      volumes:
        - postgres_data:/var/lib/postgresql/data
      ports:
        - "5432:5432"
  ​
    redis:
      image: redis:7-alpine
      ports:
        - "6379:6379"
  ​
  volumes:
    postgres_data:

Kubernetes 部署配置

  # k8s-deployment.yaml
  apiVersion: apps/v1
  kind: Deployment
  metadata:
    name: fastapi-app
  spec:
    replicas: 3
    selector:
      matchLabels:
        app: fastapi
    template:
      metadata:
        labels:
          app: fastapi
      spec:
        containers:
        - name: fastapi
          image: your-registry/fastapi-app:latest
          ports:
          - containerPort: 8000
          env:
          - name: DATABASE_URL
            valueFrom:
              secretKeyRef:
                name: db-secret
                key: url
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "500m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 5
  ---
  apiVersion: v1
  kind: Service
  metadata:
    name: fastapi-service
  spec:
    selector:
      app: fastapi
    ports:
    - port: 80
      targetPort: 8000
    type: LoadBalancer

性能优化最佳实践

1. 异步数据库连接池

  from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
  from sqlalchemy.orm import sessionmaker
  import aioredis
  ​
  # 优化连接池配置
  async_engine = create_async_engine(
      "postgresql+asyncpg://user:pass@localhost/db",
      pool_size=20,              # 基础连接数
      max_overflow=10,           # 最大溢出连接
      pool_pre_ping=True,        # 连接前 ping 检查
      pool_recycle=3600,         # 连接回收时间
      echo=False                 # 生产环境关闭 SQL 日志
  )
  ​
  # Redis 连接池
  redis_pool = aioredis.ConnectionPool.from_url(
      "redis://localhost",
      max_connections=100,
      decode_responses=True
  )

2. 缓存策略

  from fastapi_cache import FastAPICache
  from fastapi_cache.backends.redis import RedisBackend
  from fastapi_cache.decorator import cache
  from redis import asyncio as aioredis
  ​
  @app.on_event("startup")
  async def startup():
      redis = aioredis.from_url("redis://localhost", encoding="utf8", decode_responses=True)
      FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
  ​
  @app.get("/heavy-computation")
  @cache(expire=3600)  # 缓存 1 小时
  async def heavy_computation():
      # 模拟耗时操作
      await asyncio.sleep(2)
      return {"result": "expensive data"}
  ​
  # 手动缓存控制
  from fastapi_cache.coder import JsonCoder
  ​
  @app.get("/users/{user_id}")
  async def get_user(user_id: int):
      cache_key = f"user:{user_id}"
      
      # 尝试从缓存获取
      cached = await FastAPICache.get_backend().get(cache_key)
      if cached:
          return json.loads(cached)
      
      # 查询数据库
      user = await fetch_user_from_db(user_id)
      
      # 写入缓存
      await FastAPICache.get_backend().set(
          cache_key, 
          json.dumps(user), 
          expire=300
      )
      return user

3. 响应模型优化

  from fastapi import Response
  from fastapi.responses import ORJSONResponse, StreamingResponse
  import orjson
  ​
  # 使用更快的 JSON 库
  @app.get("/items/", response_class=ORJSONResponse)
  async def get_items():
      return [{"id": i, "data": "value"} for i in range(1000)]
  ​
  # 流式响应(大文件)
  @app.get("/large-file/")
  async def get_large_file():
      def file_generator():
          with open("large_file.zip", "rb") as f:
              while chunk := f.read(8192):
                  yield chunk
      
      return StreamingResponse(
          file_generator(),
          media_type="application/zip",
          headers={"Content-Disposition": "attachment; filename=large_file.zip"}
      )
  ​
  # 分页优化
  from fastapi import Query
  from sqlalchemy import func
  ​
  @app.get("/items/paginated/")
  async def get_items_paginated(
      page: int = Query(1, ge=1),
      size: int = Query(20, ge=1, le=100),
      db: AsyncSession = Depends(get_async_db)
  ):
      offset = (page - 1) * size
      
      # 使用 count(*) OVER() 优化总数查询
      result = await db.execute(
          select(
              Item,
              func.count().over().label("total")
          )
          .offset(offset)
          .limit(size)
      )
      
      rows = result.all()
      total = rows[0].total if rows else 0
      
      return {
          "items": [row.Item for row in rows],
          "total": total,
          "page": page,
          "size": size,
          "pages": (total + size - 1) // size
      }

4. 性能监控

  from prometheus_client import Counter, Histogram, generate_latest
  from starlette.middleware.base import BaseHTTPMiddleware
  ​
  # Prometheus 指标
  REQUEST_COUNT = Counter(
      'http_requests_total', 
      'Total HTTP requests',
      ['method', 'endpoint', 'status']
  )
  ​
  REQUEST_LATENCY = Histogram(
      'http_request_duration_seconds',
      'HTTP request latency',
      ['method', 'endpoint']
  )
  ​
  class PrometheusMiddleware(BaseHTTPMiddleware):
      async def dispatch(self, request, call_next):
          start_time = time.time()
          response = await call_next(request)
          duration = time.time() - start_time
          
          REQUEST_COUNT.labels(
              method=request.method,
              endpoint=request.url.path,
              status=response.status_code
          ).inc()
          
          REQUEST_LATENCY.labels(
              method=request.method,
              endpoint=request.url.path
          ).observe(duration)
          
          return response
  ​
  app.add_middleware(PrometheusMiddleware)
  ​
  @app.get("/metrics")
  async def metrics():
      return Response(generate_latest(), media_type="text/plain")

总结

FastAPI 代表了 Python Web 开发的现代化方向,它通过以下特性成为构建高性能 API 的首选框架:

  1. 开发效率:类型提示带来的智能补全和自动文档生成

  2. 运行时性能:异步原生设计,媲美 Node.js 和 Go

  3. 工程规范:内置数据验证、依赖注入、认证授权等生产级功能

  4. 生态兼容:与 SQLAlchemy、Pydantic、Celery 等主流库无缝集成

学习路径建议

  1. 入门:掌握基础路由、请求参数、Pydantic 模型

  2. 进阶:深入依赖注入系统、数据库集成、异步编程

  3. 生产:学习 Docker 部署、Kubernetes 编排、监控告警

  4. 架构:微服务拆分、事件驱动架构、CQRS 模式

FastAPI 不仅是一个框架,更是一种现代 Python 工程实践的体现。无论你是构建小型微服务还是大型分布式系统,FastAPI 都能提供坚实的技术支撑。


本文代码示例基于 FastAPI 0.104+ 和 Python 3.11+,建议在实际项目中使用最新稳定版本。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐