041-Web应用开发实战

学习目标

  • 掌握现代Web应用开发的完整流程
  • 学会使用FastAPI构建高性能API服务
  • 理解前后端分离架构设计
  • 掌握数据库设计和ORM使用
  • 学会实现用户认证和权限管理
  • 掌握API文档生成和测试
  • 理解Web应用的部署和监控

1. 项目架构设计

1.1 整体架构

数据层
微服务架构
PostgreSQL
Redis
认证服务
业务API
后台任务
前端应用
API网关
数据访问层
缓存层
消息队列

1.2 项目结构

# 项目结构设计
from pathlib import Path
import os

class ProjectStructure:
    """
    Web应用项目结构管理器
    """
    
    def __init__(self, project_name: str):
        self.project_name = project_name
        self.base_path = Path(project_name)
    
    def create_structure(self):
        """
        创建标准的Web应用项目结构
        """
        structure = {
            'app': {
                '__init__.py': '',
                'main.py': self._get_main_content(),
                'config.py': self._get_config_content(),
                'database.py': self._get_database_content(),
                'models': {
                    '__init__.py': '',
                    'user.py': '',
                    'base.py': ''
                },
                'api': {
                    '__init__.py': '',
                    'v1': {
                        '__init__.py': '',
                        'endpoints': {
                            '__init__.py': '',
                            'auth.py': '',
                            'users.py': '',
                            'items.py': ''
                        }
                    }
                },
                'core': {
                    '__init__.py': '',
                    'security.py': '',
                    'dependencies.py': '',
                    'exceptions.py': ''
                },
                'services': {
                    '__init__.py': '',
                    'user_service.py': '',
                    'auth_service.py': ''
                },
                'schemas': {
                    '__init__.py': '',
                    'user.py': '',
                    'auth.py': ''
                },
                'utils': {
                    '__init__.py': '',
                    'helpers.py': '',
                    'validators.py': ''
                }
            },
            'tests': {
                '__init__.py': '',
                'conftest.py': '',
                'test_auth.py': '',
                'test_users.py': ''
            },
            'migrations': {
                'versions': {}
            },
            'docker': {
                'Dockerfile': self._get_dockerfile_content(),
                'docker-compose.yml': self._get_docker_compose_content()
            },
            'requirements.txt': self._get_requirements_content(),
            'README.md': self._get_readme_content(),
            '.env.example': self._get_env_example_content(),
            '.gitignore': self._get_gitignore_content()
        }
        
        return structure
    
    def _get_main_content(self):
        return '''
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1.api import api_router
from app.core.config import settings

app = FastAPI(
    title=settings.PROJECT_NAME,
    version=settings.VERSION,
    openapi_url=f"{settings.API_V1_STR}/openapi.json"
)

# CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_HOSTS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(api_router, prefix=settings.API_V1_STR)

@app.get("/")
async def root():
    return {"message": "Welcome to the API"}
'''
    
    def _get_config_content(self):
        return '''
from pydantic import BaseSettings
from typing import List, Optional

class Settings(BaseSettings):
    PROJECT_NAME: str = "Web Application"
    VERSION: str = "1.0.0"
    API_V1_STR: str = "/api/v1"
    
    # 数据库配置
    DATABASE_URL: str = "postgresql://user:password@localhost/dbname"
    
    # Redis配置
    REDIS_URL: str = "redis://localhost:6379"
    
    # 安全配置
    SECRET_KEY: str = "your-secret-key-here"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    
    # CORS配置
    ALLOWED_HOSTS: List[str] = ["*"]
    
    class Config:
        env_file = ".env"

settings = Settings()
'''
    
    def _get_database_content(self):
        return '''
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings

engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
'''
    
    def _get_requirements_content(self):
        return '''
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
alembic==1.12.1
pydantic[email]==2.5.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
redis==5.0.1
celery==5.3.4
pytest==7.4.3
pytest-asyncio==0.21.1
httpx==0.25.2
'''
    
    def _get_dockerfile_content(self):
        return '''
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
'''
    
    def _get_docker_compose_content(self):
        return '''
version: '3.8'

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/webapp
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
  
  db:
    image: postgres:15
    environment:
      - POSTGRES_DB=webapp
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

volumes:
  postgres_data:
'''
    
    def _get_readme_content(self):
        return f'''
# {self.project_name}

现代Web应用开发实战项目

## 功能特性

- 用户认证和授权
- RESTful API设计
- 数据库操作和迁移
- 缓存和会话管理
- 异步任务处理
- API文档自动生成
- 单元测试和集成测试

## 快速开始

1. 克隆项目
2. 安装依赖:`pip install -r requirements.txt`
3. 配置环境变量:复制`.env.example`为`.env`
4. 运行数据库迁移:`alembic upgrade head`
5. 启动应用:`uvicorn app.main:app --reload`

## API文档

启动应用后访问:http://localhost:8000/docs
'''
    
    def _get_env_example_content(self):
        return '''
PROJECT_NAME="Web Application"
VERSION="1.0.0"
DATABASE_URL="postgresql://user:password@localhost/dbname"
REDIS_URL="redis://localhost:6379"
SECRET_KEY="your-secret-key-here"
ACCESS_TOKEN_EXPIRE_MINUTES=30
'''
    
    def _get_gitignore_content(self):
        return '''
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

.pytest_cache/
.coverage
htmlcov/

.DS_Store
.vscode/
.idea/
'''

# 使用示例
project_structure = ProjectStructure("web_app_demo")
structure = project_structure.create_structure()

print("Web应用项目结构:")
for key, value in structure.items():
    print(f"📁 {key}")
    if isinstance(value, dict):
        for subkey in value.keys():
            print(f"  📄 {subkey}")

2. 数据模型设计

2.1 基础模型

from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from datetime import datetime
from typing import Optional

Base = declarative_base()

class TimestampMixin:
    """
    时间戳混入类
    """
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

class User(Base, TimestampMixin):
    """
    用户模型
    """
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    username = Column(String(50), unique=True, index=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    full_name = Column(String(100))
    is_active = Column(Boolean, default=True)
    is_superuser = Column(Boolean, default=False)
    avatar_url = Column(String(500))
    bio = Column(Text)
    
    # 关系
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
    comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")
    
    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

class Category(Base, TimestampMixin):
    """
    分类模型
    """
    __tablename__ = "categories"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, nullable=False)
    description = Column(Text)
    slug = Column(String(50), unique=True, index=True)
    is_active = Column(Boolean, default=True)
    
    # 关系
    posts = relationship("Post", back_populates="category")
    
    def __repr__(self):
        return f"<Category(id={self.id}, name='{self.name}')>"

class Post(Base, TimestampMixin):
    """
    文章模型
    """
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False, index=True)
    slug = Column(String(200), unique=True, index=True)
    content = Column(Text, nullable=False)
    excerpt = Column(Text)
    featured_image = Column(String(500))
    is_published = Column(Boolean, default=False)
    view_count = Column(Integer, default=0)
    like_count = Column(Integer, default=0)
    
    # 外键
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    category_id = Column(Integer, ForeignKey("categories.id"))
    
    # 关系
    author = relationship("User", back_populates="posts")
    category = relationship("Category", back_populates="posts")
    comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
    tags = relationship("Tag", secondary="post_tags", back_populates="posts")
    
    def __repr__(self):
        return f"<Post(id={self.id}, title='{self.title}')>"

class Comment(Base, TimestampMixin):
    """
    评论模型
    """
    __tablename__ = "comments"
    
    id = Column(Integer, primary_key=True, index=True)
    content = Column(Text, nullable=False)
    is_approved = Column(Boolean, default=False)
    
    # 外键
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)
    parent_id = Column(Integer, ForeignKey("comments.id"))  # 支持回复
    
    # 关系
    author = relationship("User", back_populates="comments")
    post = relationship("Post", back_populates="comments")
    parent = relationship("Comment", remote_side=[id])
    
    def __repr__(self):
        return f"<Comment(id={self.id}, author_id={self.author_id})>"

class Tag(Base, TimestampMixin):
    """
    标签模型
    """
    __tablename__ = "tags"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, nullable=False)
    slug = Column(String(50), unique=True, index=True)
    color = Column(String(7), default="#007bff")  # 十六进制颜色
    
    # 关系
    posts = relationship("Post", secondary="post_tags", back_populates="tags")
    
    def __repr__(self):
        return f"<Tag(id={self.id}, name='{self.name}')>"

# 多对多关联表
from sqlalchemy import Table

post_tags = Table(
    'post_tags',
    Base.metadata,
    Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
    Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)

2.2 数据库操作封装

from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc, asc
from typing import List, Optional, Dict, Any, Type, TypeVar, Generic
from abc import ABC, abstractmethod

ModelType = TypeVar("ModelType", bound=Base)

class BaseRepository(Generic[ModelType], ABC):
    """
    基础仓储类
    """
    
    def __init__(self, model: Type[ModelType], db: Session):
        self.model = model
        self.db = db
    
    def create(self, obj_in: Dict[str, Any]) -> ModelType:
        """
        创建对象
        """
        db_obj = self.model(**obj_in)
        self.db.add(db_obj)
        self.db.commit()
        self.db.refresh(db_obj)
        return db_obj
    
    def get(self, id: int) -> Optional[ModelType]:
        """
        根据ID获取对象
        """
        return self.db.query(self.model).filter(self.model.id == id).first()
    
    def get_multi(
        self, 
        skip: int = 0, 
        limit: int = 100,
        filters: Optional[Dict[str, Any]] = None,
        order_by: Optional[str] = None
    ) -> List[ModelType]:
        """
        获取多个对象
        """
        query = self.db.query(self.model)
        
        # 应用过滤器
        if filters:
            for key, value in filters.items():
                if hasattr(self.model, key):
                    query = query.filter(getattr(self.model, key) == value)
        
        # 应用排序
        if order_by:
            if order_by.startswith('-'):
                field = order_by[1:]
                if hasattr(self.model, field):
                    query = query.order_by(desc(getattr(self.model, field)))
            else:
                if hasattr(self.model, order_by):
                    query = query.order_by(asc(getattr(self.model, order_by)))
        
        return query.offset(skip).limit(limit).all()
    
    def update(self, id: int, obj_in: Dict[str, Any]) -> Optional[ModelType]:
        """
        更新对象
        """
        db_obj = self.get(id)
        if db_obj:
            for key, value in obj_in.items():
                if hasattr(db_obj, key):
                    setattr(db_obj, key, value)
            self.db.commit()
            self.db.refresh(db_obj)
        return db_obj
    
    def delete(self, id: int) -> bool:
        """
        删除对象
        """
        db_obj = self.get(id)
        if db_obj:
            self.db.delete(db_obj)
            self.db.commit()
            return True
        return False
    
    def count(self, filters: Optional[Dict[str, Any]] = None) -> int:
        """
        统计对象数量
        """
        query = self.db.query(self.model)
        
        if filters:
            for key, value in filters.items():
                if hasattr(self.model, key):
                    query = query.filter(getattr(self.model, key) == value)
        
        return query.count()

class UserRepository(BaseRepository[User]):
    """
    用户仓储类
    """
    
    def __init__(self, db: Session):
        super().__init__(User, db)
    
    def get_by_email(self, email: str) -> Optional[User]:
        """
        根据邮箱获取用户
        """
        return self.db.query(User).filter(User.email == email).first()
    
    def get_by_username(self, username: str) -> Optional[User]:
        """
        根据用户名获取用户
        """
        return self.db.query(User).filter(User.username == username).first()
    
    def get_active_users(self, skip: int = 0, limit: int = 100) -> List[User]:
        """
        获取活跃用户
        """
        return self.db.query(User).filter(User.is_active == True).offset(skip).limit(limit).all()
    
    def search_users(self, query: str, skip: int = 0, limit: int = 100) -> List[User]:
        """
        搜索用户
        """
        search_filter = or_(
            User.username.ilike(f"%{query}%"),
            User.full_name.ilike(f"%{query}%"),
            User.email.ilike(f"%{query}%")
        )
        return self.db.query(User).filter(search_filter).offset(skip).limit(limit).all()

class PostRepository(BaseRepository[Post]):
    """
    文章仓储类
    """
    
    def __init__(self, db: Session):
        super().__init__(Post, db)
    
    def get_published_posts(self, skip: int = 0, limit: int = 100) -> List[Post]:
        """
        获取已发布的文章
        """
        return (self.db.query(Post)
                .filter(Post.is_published == True)
                .order_by(desc(Post.created_at))
                .offset(skip)
                .limit(limit)
                .all())
    
    def get_by_slug(self, slug: str) -> Optional[Post]:
        """
        根据slug获取文章
        """
        return self.db.query(Post).filter(Post.slug == slug).first()
    
    def get_by_author(self, author_id: int, skip: int = 0, limit: int = 100) -> List[Post]:
        """
        获取指定作者的文章
        """
        return (self.db.query(Post)
                .filter(Post.author_id == author_id)
                .order_by(desc(Post.created_at))
                .offset(skip)
                .limit(limit)
                .all())
    
    def get_by_category(self, category_id: int, skip: int = 0, limit: int = 100) -> List[Post]:
        """
        获取指定分类的文章
        """
        return (self.db.query(Post)
                .filter(and_(Post.category_id == category_id, Post.is_published == True))
                .order_by(desc(Post.created_at))
                .offset(skip)
                .limit(limit)
                .all())
    
    def search_posts(self, query: str, skip: int = 0, limit: int = 100) -> List[Post]:
        """
        搜索文章
        """
        search_filter = or_(
            Post.title.ilike(f"%{query}%"),
            Post.content.ilike(f"%{query}%"),
            Post.excerpt.ilike(f"%{query}%")
        )
        return (self.db.query(Post)
                .filter(and_(search_filter, Post.is_published == True))
                .order_by(desc(Post.created_at))
                .offset(skip)
                .limit(limit)
                .all())
    
    def increment_view_count(self, post_id: int) -> bool:
        """
        增加文章浏览量
        """
        result = (self.db.query(Post)
                 .filter(Post.id == post_id)
                 .update({Post.view_count: Post.view_count + 1}))
        self.db.commit()
        return result > 0

# 使用示例
class DatabaseManager:
    """
    数据库管理器
    """
    
    def __init__(self, db: Session):
        self.db = db
        self.user_repo = UserRepository(db)
        self.post_repo = PostRepository(db)
    
    def create_sample_data(self):
        """
        创建示例数据
        """
        # 创建用户
        user_data = {
            'email': 'admin@example.com',
            'username': 'admin',
            'hashed_password': 'hashed_password_here',
            'full_name': 'Administrator',
            'is_superuser': True
        }
        user = self.user_repo.create(user_data)
        
        # 创建分类
        category_data = {
            'name': 'Technology',
            'description': 'Technology related posts',
            'slug': 'technology'
        }
        category = CategoryRepository(self.db).create(category_data)
        
        # 创建文章
        post_data = {
            'title': 'Welcome to Our Blog',
            'slug': 'welcome-to-our-blog',
            'content': 'This is the first post on our blog...',
            'excerpt': 'Welcome post excerpt',
            'is_published': True,
            'author_id': user.id,
            'category_id': category.id
        }
        post = self.post_repo.create(post_data)
        
        return {
            'user': user,
            'category': category,
            'post': post
        }
    
    def get_dashboard_stats(self) -> Dict[str, int]:
        """
        获取仪表板统计数据
        """
        return {
            'total_users': self.user_repo.count(),
            'active_users': self.user_repo.count({'is_active': True}),
            'total_posts': self.post_repo.count(),
            'published_posts': self.post_repo.count({'is_published': True})
        }

# 示例使用
print("数据模型设计完成")
print("包含以下模型:")
print("- User: 用户模型")
print("- Post: 文章模型")
print("- Category: 分类模型")
print("- Comment: 评论模型")
print("- Tag: 标签模型")
print("\n仓储模式实现:")
print("- BaseRepository: 基础仓储类")
print("- UserRepository: 用户仓储")
print("- PostRepository: 文章仓储")

3. API设计与实现

3.1 Pydantic模式定义

from pydantic import BaseModel, EmailStr, validator, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum

class UserRole(str, Enum):
    """
    用户角色枚举
    """
    ADMIN = "admin"
    MODERATOR = "moderator"
    USER = "user"

class PostStatus(str, Enum):
    """
    文章状态枚举
    """
    DRAFT = "draft"
    PUBLISHED = "published"
    ARCHIVED = "archived"

# 基础模式
class BaseSchema(BaseModel):
    """
    基础模式类
    """
    class Config:
        from_attributes = True
        use_enum_values = True

# 用户相关模式
class UserBase(BaseSchema):
    """
    用户基础模式
    """
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=50)
    full_name: Optional[str] = Field(None, max_length=100)
    bio: Optional[str] = Field(None, max_length=500)
    avatar_url: Optional[str] = None
    
    @validator('username')
    def validate_username(cls, v):
        if not v.isalnum() and '_' not in v:
            raise ValueError('用户名只能包含字母、数字和下划线')
        return v

class UserCreate(UserBase):
    """
    用户创建模式
    """
    password: str = Field(..., min_length=8, max_length=100)
    
    @validator('password')
    def validate_password(cls, v):
        if not any(c.isupper() for c in v):
            raise ValueError('密码必须包含至少一个大写字母')
        if not any(c.islower() for c in v):
            raise ValueError('密码必须包含至少一个小写字母')
        if not any(c.isdigit() for c in v):
            raise ValueError('密码必须包含至少一个数字')
        return v

class UserUpdate(BaseSchema):
    """
    用户更新模式
    """
    email: Optional[EmailStr] = None
    username: Optional[str] = Field(None, min_length=3, max_length=50)
    full_name: Optional[str] = Field(None, max_length=100)
    bio: Optional[str] = Field(None, max_length=500)
    avatar_url: Optional[str] = None
    is_active: Optional[bool] = None

class UserInDB(UserBase):
    """
    数据库中的用户模式
    """
    id: int
    is_active: bool
    is_superuser: bool
    created_at: datetime
    updated_at: Optional[datetime] = None

class UserResponse(UserInDB):
    """
    用户响应模式
    """
    posts_count: Optional[int] = 0
    comments_count: Optional[int] = 0

# 文章相关模式
class PostBase(BaseSchema):
    """
    文章基础模式
    """
    title: str = Field(..., min_length=1, max_length=200)
    content: str = Field(..., min_length=1)
    excerpt: Optional[str] = Field(None, max_length=500)
    featured_image: Optional[str] = None
    category_id: Optional[int] = None
    tags: Optional[List[str]] = []

class PostCreate(PostBase):
    """
    文章创建模式
    """
    slug: Optional[str] = None
    is_published: bool = False
    
    @validator('slug', pre=True, always=True)
    def generate_slug(cls, v, values):
        if not v and 'title' in values:
            # 简单的slug生成逻辑
            import re
            slug = re.sub(r'[^\w\s-]', '', values['title']).strip().lower()
            slug = re.sub(r'[-\s]+', '-', slug)
            return slug
        return v

class PostUpdate(BaseSchema):
    """
    文章更新模式
    """
    title: Optional[str] = Field(None, min_length=1, max_length=200)
    content: Optional[str] = Field(None, min_length=1)
    excerpt: Optional[str] = Field(None, max_length=500)
    featured_image: Optional[str] = None
    category_id: Optional[int] = None
    is_published: Optional[bool] = None
    tags: Optional[List[str]] = None

class PostInDB(PostBase):
    """
    数据库中的文章模式
    """
    id: int
    slug: str
    is_published: bool
    view_count: int
    like_count: int
    author_id: int
    created_at: datetime
    updated_at: Optional[datetime] = None

class PostResponse(PostInDB):
    """
    文章响应模式
    """
    author: Optional[UserResponse] = None
    category: Optional['CategoryResponse'] = None
    tags: Optional[List['TagResponse']] = []
    comments_count: Optional[int] = 0

# 分类相关模式
class CategoryBase(BaseSchema):
    """
    分类基础模式
    """
    name: str = Field(..., min_length=1, max_length=50)
    description: Optional[str] = Field(None, max_length=500)
    slug: Optional[str] = None

class CategoryCreate(CategoryBase):
    """
    分类创建模式
    """
    pass

class CategoryUpdate(BaseSchema):
    """
    分类更新模式
    """
    name: Optional[str] = Field(None, min_length=1, max_length=50)
    description: Optional[str] = Field(None, max_length=500)
    is_active: Optional[bool] = None

class CategoryResponse(CategoryBase):
    """
    分类响应模式
    """
    id: int
    is_active: bool
    posts_count: Optional[int] = 0
    created_at: datetime

# 评论相关模式
class CommentBase(BaseSchema):
    """
    评论基础模式
    """
    content: str = Field(..., min_length=1, max_length=1000)
    parent_id: Optional[int] = None

class CommentCreate(CommentBase):
    """
    评论创建模式
    """
    post_id: int

class CommentUpdate(BaseSchema):
    """
    评论更新模式
    """
    content: Optional[str] = Field(None, min_length=1, max_length=1000)
    is_approved: Optional[bool] = None

class CommentResponse(CommentBase):
    """
    评论响应模式
    """
    id: int
    is_approved: bool
    author: Optional[UserResponse] = None
    post_id: int
    created_at: datetime
    replies: Optional[List['CommentResponse']] = []

# 标签相关模式
class TagBase(BaseSchema):
    """
    标签基础模式
    """
    name: str = Field(..., min_length=1, max_length=50)
    color: Optional[str] = Field("#007bff", regex=r'^#[0-9A-Fa-f]{6}$')

class TagCreate(TagBase):
    """
    标签创建模式
    """
    slug: Optional[str] = None

class TagResponse(TagBase):
    """
    标签响应模式
    """
    id: int
    slug: str
    posts_count: Optional[int] = 0
    created_at: datetime

# 认证相关模式
class Token(BaseSchema):
    """
    令牌模式
    """
    access_token: str
    token_type: str = "bearer"
    expires_in: int
    user: UserResponse

class TokenData(BaseSchema):
    """
    令牌数据模式
    """
    user_id: Optional[int] = None
    username: Optional[str] = None

class LoginRequest(BaseSchema):
    """
    登录请求模式
    """
    username: str
    password: str

class PasswordReset(BaseSchema):
    """
    密码重置模式
    """
    email: EmailStr

class PasswordResetConfirm(BaseSchema):
    """
    密码重置确认模式
    """
    token: str
    new_password: str = Field(..., min_length=8, max_length=100)

# 通用响应模式
class PaginatedResponse(BaseSchema):
    """
    分页响应模式
    """
    items: List[Any]
    total: int
    page: int
    size: int
    pages: int
    has_next: bool
    has_prev: bool

class MessageResponse(BaseSchema):
    """
    消息响应模式
    """
    message: str
    success: bool = True
    data: Optional[Dict[str, Any]] = None

class ErrorResponse(BaseSchema):
    """
    错误响应模式
    """
    error: str
    detail: Optional[str] = None
    code: Optional[str] = None

# 统计相关模式
class DashboardStats(BaseSchema):
    """
    仪表板统计模式
    """
    total_users: int
    active_users: int
    total_posts: int
    published_posts: int
    total_comments: int
    approved_comments: int
    total_categories: int
    total_tags: int

class PostStats(BaseSchema):
    """
    文章统计模式
    """
    views_today: int
    views_week: int
    views_month: int
    likes_today: int
    comments_today: int

# 更新前向引用
PostResponse.model_rebuild()
CommentResponse.model_rebuild()

print("Pydantic模式定义完成")
print("包含以下模式类别:")
print("- 用户相关: UserBase, UserCreate, UserUpdate, UserResponse")
print("- 文章相关: PostBase, PostCreate, PostUpdate, PostResponse")
print("- 分类相关: CategoryBase, CategoryCreate, CategoryResponse")
print("- 评论相关: CommentBase, CommentCreate, CommentResponse")
print("- 标签相关: TagBase, TagCreate, TagResponse")
print("- 认证相关: Token, LoginRequest, PasswordReset")
print("- 通用响应: PaginatedResponse, MessageResponse, ErrorResponse")

3.2 API路由实现

from fastapi import APIRouter, Depends, HTTPException, status, Query, Path
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from typing import List, Optional
from app.database import get_db
from app.models import User, Post, Category, Comment, Tag
from app.schemas import (
    UserCreate, UserUpdate, UserResponse,
    PostCreate, PostUpdate, PostResponse,
    CategoryCreate, CategoryResponse,
    CommentCreate, CommentResponse,
    PaginatedResponse, MessageResponse
)
from app.core.security import get_current_user, get_current_active_user
from app.services.user_service import UserService
from app.services.post_service import PostService
from app.services.auth_service import AuthService

# 安全依赖
security = HTTPBearer()

# 用户路由
user_router = APIRouter(prefix="/users", tags=["users"])

@user_router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    user_in: UserCreate,
    db: Session = Depends(get_db)
):
    """
    创建新用户
    """
    user_service = UserService(db)
    
    # 检查用户是否已存在
    if user_service.get_by_email(user_in.email):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="邮箱已被注册"
        )
    
    if user_service.get_by_username(user_in.username):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="用户名已被使用"
        )
    
    user = user_service.create_user(user_in)
    return user

@user_router.get("/me", response_model=UserResponse)
async def get_current_user_info(
    current_user: User = Depends(get_current_active_user)
):
    """
    获取当前用户信息
    """
    return current_user

@user_router.put("/me", response_model=UserResponse)
async def update_current_user(
    user_update: UserUpdate,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    更新当前用户信息
    """
    user_service = UserService(db)
    updated_user = user_service.update_user(current_user.id, user_update)
    return updated_user

@user_router.get("/", response_model=PaginatedResponse)
async def get_users(
    page: int = Query(1, ge=1, description="页码"),
    size: int = Query(20, ge=1, le=100, description="每页数量"),
    search: Optional[str] = Query(None, description="搜索关键词"),
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_active_user)
):
    """
    获取用户列表(需要管理员权限)
    """
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="权限不足"
        )
    
    user_service = UserService(db)
    users, total = user_service.get_users_paginated(page, size, search)
    
    return PaginatedResponse(
        items=users,
        total=total,
        page=page,
        size=size,
        pages=(total + size - 1) // size,
        has_next=page * size < total,
        has_prev=page > 1
    )

@user_router.get("/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int = Path(..., description="用户ID"),
    db: Session = Depends(get_db)
):
    """
    根据ID获取用户信息
    """
    user_service = UserService(db)
    user = user_service.get_user(user_id)
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="用户不存在"
        )
    
    return user

# 文章路由
post_router = APIRouter(prefix="/posts", tags=["posts"])

@post_router.post("/", response_model=PostResponse, status_code=status.HTTP_201_CREATED)
async def create_post(
    post_in: PostCreate,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    创建新文章
    """
    post_service = PostService(db)
    post = post_service.create_post(post_in, current_user.id)
    return post

@post_router.get("/", response_model=PaginatedResponse)
async def get_posts(
    page: int = Query(1, ge=1, description="页码"),
    size: int = Query(20, ge=1, le=100, description="每页数量"),
    category_id: Optional[int] = Query(None, description="分类ID"),
    author_id: Optional[int] = Query(None, description="作者ID"),
    search: Optional[str] = Query(None, description="搜索关键词"),
    published_only: bool = Query(True, description="仅显示已发布"),
    db: Session = Depends(get_db)
):
    """
    获取文章列表
    """
    post_service = PostService(db)
    posts, total = post_service.get_posts_paginated(
        page=page,
        size=size,
        category_id=category_id,
        author_id=author_id,
        search=search,
        published_only=published_only
    )
    
    return PaginatedResponse(
        items=posts,
        total=total,
        page=page,
        size=size,
        pages=(total + size - 1) // size,
        has_next=page * size < total,
        has_prev=page > 1
    )

@post_router.get("/{post_id}", response_model=PostResponse)
async def get_post(
    post_id: int = Path(..., description="文章ID"),
    db: Session = Depends(get_db)
):
    """
    根据ID获取文章详情
    """
    post_service = PostService(db)
    post = post_service.get_post(post_id)
    
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="文章不存在"
        )
    
    # 增加浏览量
    post_service.increment_view_count(post_id)
    
    return post

@post_router.get("/slug/{slug}", response_model=PostResponse)
async def get_post_by_slug(
    slug: str = Path(..., description="文章slug"),
    db: Session = Depends(get_db)
):
    """
    根据slug获取文章详情
    """
    post_service = PostService(db)
    post = post_service.get_post_by_slug(slug)
    
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="文章不存在"
        )
    
    # 增加浏览量
    post_service.increment_view_count(post.id)
    
    return post

@post_router.put("/{post_id}", response_model=PostResponse)
async def update_post(
    post_id: int = Path(..., description="文章ID"),
    post_update: PostUpdate = ...,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    更新文章
    """
    post_service = PostService(db)
    post = post_service.get_post(post_id)
    
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="文章不存在"
        )
    
    # 检查权限:只有作者或管理员可以编辑
    if post.author_id != current_user.id and not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="权限不足"
        )
    
    updated_post = post_service.update_post(post_id, post_update)
    return updated_post

@post_router.delete("/{post_id}", response_model=MessageResponse)
async def delete_post(
    post_id: int = Path(..., description="文章ID"),
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    删除文章
    """
    post_service = PostService(db)
    post = post_service.get_post(post_id)
    
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="文章不存在"
        )
    
    # 检查权限:只有作者或管理员可以删除
    if post.author_id != current_user.id and not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="权限不足"
        )
    
    success = post_service.delete_post(post_id)
    
    if not success:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="删除失败"
        )
    
    return MessageResponse(message="文章删除成功")

# 分类路由
category_router = APIRouter(prefix="/categories", tags=["categories"])

@category_router.get("/", response_model=List[CategoryResponse])
async def get_categories(
    db: Session = Depends(get_db)
):
    """
    获取所有分类
    """
    from app.services.category_service import CategoryService
    category_service = CategoryService(db)
    categories = category_service.get_all_categories()
    return categories

@category_router.post("/", response_model=CategoryResponse, status_code=status.HTTP_201_CREATED)
async def create_category(
    category_in: CategoryCreate,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    创建新分类(需要管理员权限)
    """
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="权限不足"
        )
    
    from app.services.category_service import CategoryService
    category_service = CategoryService(db)
    category = category_service.create_category(category_in)
    return category

# 评论路由
comment_router = APIRouter(prefix="/comments", tags=["comments"])

@comment_router.post("/", response_model=CommentResponse, status_code=status.HTTP_201_CREATED)
async def create_comment(
    comment_in: CommentCreate,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    创建新评论
    """
    from app.services.comment_service import CommentService
    comment_service = CommentService(db)
    comment = comment_service.create_comment(comment_in, current_user.id)
    return comment

@comment_router.get("/post/{post_id}", response_model=List[CommentResponse])
async def get_post_comments(
    post_id: int = Path(..., description="文章ID"),
    db: Session = Depends(get_db)
):
    """
    获取文章的所有评论
    """
    from app.services.comment_service import CommentService
    comment_service = CommentService(db)
    comments = comment_service.get_post_comments(post_id)
    return comments

# 主路由
api_router = APIRouter()
api_router.include_router(user_router)
api_router.include_router(post_router)
api_router.include_router(category_router)
api_router.include_router(comment_router)

print("API路由实现完成")
print("包含以下路由组:")
print("- /users: 用户管理")
print("- /posts: 文章管理")
print("- /categories: 分类管理")
print("- /comments: 评论管理")
print("\n主要功能:")
print("- CRUD操作")
print("- 分页查询")
print("- 权限控制")
print("- 参数验证")
print("- 错误处理")

4. 用户认证与权限管理

4.1 JWT认证实现

from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.core.config import settings
from app.database import get_db
from app.models import User
from app.schemas import TokenData

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# JWT配置
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES

# HTTP Bearer认证
security = HTTPBearer()

class SecurityManager:
    """
    安全管理器
    """
    
    def __init__(self):
        self.pwd_context = pwd_context
        self.secret_key = settings.SECRET_KEY
        self.algorithm = ALGORITHM
    
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """
        验证密码
        """
        return self.pwd_context.verify(plain_password, hashed_password)
    
    def get_password_hash(self, password: str) -> str:
        """
        获取密码哈希
        """
        return self.pwd_context.hash(password)
    
    def create_access_token(
        self, 
        data: Dict[str, Any], 
        expires_delta: Optional[timedelta] = None
    ) -> str:
        """
        创建访问令牌
        """
        to_encode = data.copy()
        
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
        return encoded_jwt
    
    def verify_token(self, token: str) -> Optional[TokenData]:
        """
        验证令牌
        """
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            user_id: int = payload.get("sub")
            username: str = payload.get("username")
            
            if user_id is None or token_type != "refresh":
                return None
            
            return int(user_id)
        except JWTError:
            return None

# 全局安全管理器实例
security_manager = SecurityManager()

# 依赖函数
async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db)
) -> User:
    """
    获取当前用户
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无效的认证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    token = credentials.credentials
    token_data = security_manager.verify_token(token)
    
    if token_data is None:
        raise credentials_exception
    
    user = db.query(User).filter(User.id == token_data.user_id).first()
    
    if user is None:
        raise credentials_exception
    
    return user

async def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -> User:
    """
    获取当前活跃用户
    """
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, 
            detail="用户账户已被禁用"
        )
    return current_user

async def get_current_superuser(
    current_user: User = Depends(get_current_active_user)
) -> User:
    """
    获取当前超级用户
    """
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="权限不足"
        )
    return current_user

class AuthService:
    """
    认证服务类
    """
    
    def __init__(self, db: Session):
        self.db = db
        self.security_manager = security_manager
    
    def authenticate_user(self, username: str, password: str) -> Optional[User]:
        """
        用户认证
        """
        # 支持用户名或邮箱登录
        user = (self.db.query(User)
                .filter(
                    (User.username == username) | (User.email == username)
                )
                .first())
        
        if not user:
            return None
        
        if not self.security_manager.verify_password(password, user.hashed_password):
            return None
        
        return user
    
    def create_user_tokens(self, user: User) -> Dict[str, Any]:
        """
        为用户创建访问令牌和刷新令牌
        """
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = self.security_manager.create_access_token(
            data={"sub": str(user.id), "username": user.username},
            expires_delta=access_token_expires
        )
        
        refresh_token = self.security_manager.create_refresh_token(user.id)
        
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer",
            "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
            "user": user
        }
    
    def refresh_access_token(self, refresh_token: str) -> Optional[Dict[str, Any]]:
        """
        使用刷新令牌获取新的访问令牌
        """
        user_id = self.security_manager.verify_refresh_token(refresh_token)
        
        if not user_id:
            return None
        
        user = self.db.query(User).filter(User.id == user_id).first()
        
        if not user or not user.is_active:
            return None
        
        return self.create_user_tokens(user)

print("JWT认证实现完成")
print("包含以下组件:")
print("- SecurityManager: 安全管理器")
print("- AuthService: 认证服务")
print("- 依赖函数: get_current_user, get_current_active_user, get_current_superuser")
print("\n主要功能:")
print("- 密码加密和验证")
print("- JWT令牌生成和验证")
print("- 用户认证和授权")
print("- 刷新令牌机制")

4.2 权限控制系统

from enum import Enum
from typing import List, Set, Callable, Any
from functools import wraps
from fastapi import HTTPException, status, Depends
from sqlalchemy.orm import Session
from app.models import User
from app.core.security import get_current_active_user

class Permission(str, Enum):
    """
    权限枚举
    """
    # 用户权限
    USER_READ = "user:read"
    USER_WRITE = "user:write"
    USER_DELETE = "user:delete"
    
    # 文章权限
    POST_READ = "post:read"
    POST_WRITE = "post:write"
    POST_DELETE = "post:delete"
    POST_PUBLISH = "post:publish"
    
    # 评论权限
    COMMENT_READ = "comment:read"
    COMMENT_WRITE = "comment:write"
    COMMENT_DELETE = "comment:delete"
    COMMENT_MODERATE = "comment:moderate"
    
    # 分类权限
    CATEGORY_READ = "category:read"
    CATEGORY_WRITE = "category:write"
    CATEGORY_DELETE = "category:delete"
    
    # 管理员权限
    ADMIN_ACCESS = "admin:access"
    SYSTEM_CONFIG = "system:config"

class Role(str, Enum):
    """
    角色枚举
    """
    GUEST = "guest"
    USER = "user"
    MODERATOR = "moderator"
    ADMIN = "admin"
    SUPER_ADMIN = "super_admin"

# 角色权限映射
ROLE_PERMISSIONS = {
    Role.GUEST: {
        Permission.POST_READ,
        Permission.COMMENT_READ,
        Permission.CATEGORY_READ
    },
    Role.USER: {
        Permission.POST_READ,
        Permission.POST_WRITE,
        Permission.COMMENT_READ,
        Permission.COMMENT_WRITE,
        Permission.CATEGORY_READ,
        Permission.USER_READ
    },
    Role.MODERATOR: {
        Permission.POST_READ,
        Permission.POST_WRITE,
        Permission.POST_DELETE,
        Permission.COMMENT_READ,
        Permission.COMMENT_WRITE,
        Permission.COMMENT_DELETE,
        Permission.COMMENT_MODERATE,
        Permission.CATEGORY_READ,
        Permission.USER_READ
    },
    Role.ADMIN: {
        Permission.POST_READ,
        Permission.POST_WRITE,
        Permission.POST_DELETE,
        Permission.POST_PUBLISH,
        Permission.COMMENT_READ,
        Permission.COMMENT_WRITE,
        Permission.COMMENT_DELETE,
        Permission.COMMENT_MODERATE,
        Permission.CATEGORY_READ,
        Permission.CATEGORY_WRITE,
        Permission.CATEGORY_DELETE,
        Permission.USER_READ,
        Permission.USER_WRITE,
        Permission.USER_DELETE,
        Permission.ADMIN_ACCESS
    },
    Role.SUPER_ADMIN: set(Permission)  # 所有权限
}

class PermissionChecker:
    """
    权限检查器
    """
    
    def __init__(self):
        self.role_permissions = ROLE_PERMISSIONS
    
    def get_user_role(self, user: User) -> Role:
        """
        获取用户角色
        """
        if user.is_superuser:
            return Role.SUPER_ADMIN
        
        # 这里可以根据实际业务逻辑确定用户角色
        # 例如从数据库中查询用户角色
        if hasattr(user, 'role'):
            return Role(user.role)
        
        # 默认为普通用户
        return Role.USER
    
    def get_user_permissions(self, user: User) -> Set[Permission]:
        """
        获取用户权限
        """
        role = self.get_user_role(user)
        return self.role_permissions.get(role, set())
    
    def has_permission(self, user: User, permission: Permission) -> bool:
        """
        检查用户是否有指定权限
        """
        user_permissions = self.get_user_permissions(user)
        return permission in user_permissions
    
    def has_any_permission(self, user: User, permissions: List[Permission]) -> bool:
        """
        检查用户是否有任意一个权限
        """
        user_permissions = self.get_user_permissions(user)
        return any(perm in user_permissions for perm in permissions)
    
    def has_all_permissions(self, user: User, permissions: List[Permission]) -> bool:
        """
        检查用户是否有所有权限
        """
        user_permissions = self.get_user_permissions(user)
        return all(perm in user_permissions for perm in permissions)
    
    def check_resource_ownership(self, user: User, resource_owner_id: int) -> bool:
        """
        检查资源所有权
        """
        # 超级管理员可以访问所有资源
        if user.is_superuser:
            return True
        
        # 用户只能访问自己的资源
        return user.id == resource_owner_id

# 全局权限检查器实例
permission_checker = PermissionChecker()

print("权限控制系统实现完成")
print("包含以下组件:")
print("- Permission: 权限枚举")
print("- Role: 角色枚举")
print("- PermissionChecker: 权限检查器")
print("\n主要功能:")
print("- 基于角色的权限控制(RBAC)")
print("- 资源所有权检查")
print("- 灵活的权限组合")

5. 服务层实现

5.1 用户服务

from typing import Optional, List, Tuple, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc
from app.models import User
from app.schemas import UserCreate, UserUpdate, UserResponse
from app.core.security import security_manager
from app.repositories.user_repository import UserRepository

class UserService:
    """
    用户服务类
    """
    
    def __init__(self, db: Session):
        self.db = db
        self.user_repo = UserRepository(db)
    
    def create_user(self, user_create: UserCreate) -> User:
        """
        创建新用户
        """
        # 加密密码
        hashed_password = security_manager.get_password_hash(user_create.password)
        
        # 创建用户数据
        user_data = {
            "email": user_create.email,
            "username": user_create.username,
            "hashed_password": hashed_password,
            "full_name": user_create.full_name,
            "bio": user_create.bio,
            "avatar_url": user_create.avatar_url,
            "is_active": True,
            "is_superuser": False
        }
        
        return self.user_repo.create(user_data)
    
    def get_user(self, user_id: int) -> Optional[User]:
        """
        根据ID获取用户
        """
        return self.user_repo.get(user_id)
    
    def get_by_email(self, email: str) -> Optional[User]:
        """
        根据邮箱获取用户
        """
        return self.user_repo.get_by_email(email)
    
    def get_by_username(self, username: str) -> Optional[User]:
        """
        根据用户名获取用户
        """
        return self.user_repo.get_by_username(username)
    
    def update_user(self, user_id: int, user_update: UserUpdate) -> Optional[User]:
        """
        更新用户信息
        """
        update_data = user_update.dict(exclude_unset=True)
        return self.user_repo.update(user_id, update_data)
    
    def delete_user(self, user_id: int) -> bool:
        """
        删除用户
        """
        return self.user_repo.delete(user_id)
    
    def get_users_paginated(
        self, 
        page: int = 1, 
        size: int = 20, 
        search: Optional[str] = None
    ) -> Tuple[List[User], int]:
        """
        分页获取用户列表
        """
        skip = (page - 1) * size
        
        if search:
            users = self.user_repo.search_users(search, skip, size)
            # 这里简化处理,实际应该有专门的计数查询
            total = len(self.user_repo.search_users(search, 0, 1000))
        else:
            users = self.user_repo.get_multi(skip, size)
            total = self.user_repo.count()
        
        return users, total
    
    def activate_user(self, user_id: int) -> bool:
        """
        激活用户
        """
        return bool(self.user_repo.update(user_id, {"is_active": True}))
    
    def deactivate_user(self, user_id: int) -> bool:
        """
        停用用户
        """
        return bool(self.user_repo.update(user_id, {"is_active": False}))
    
    def change_password(self, user_id: int, new_password: str) -> bool:
        """
        修改用户密码
        """
        hashed_password = security_manager.get_password_hash(new_password)
        return bool(self.user_repo.update(user_id, {"hashed_password": hashed_password}))
    
    def get_user_stats(self, user_id: int) -> Dict[str, int]:
        """
        获取用户统计信息
        """
        user = self.get_user(user_id)
        if not user:
            return {}
        
        # 这里应该查询相关的统计数据
        # 简化处理,返回模拟数据
        return {
            "posts_count": len(user.posts) if user.posts else 0,
            "comments_count": len(user.comments) if user.comments else 0,
            "followers_count": 0,  # 需要实现关注功能
            "following_count": 0   # 需要实现关注功能
        }

print("用户服务实现完成")
print("主要功能:")
print("- 用户CRUD操作")
print("- 用户认证相关")
print("- 分页查询")
print("- 用户统计")

5.2 文章服务

from typing import Optional, List, Tuple, Dict, Any
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, or_, desc, func
from app.models import Post, User, Category, Tag
from app.schemas import PostCreate, PostUpdate, PostResponse
from app.repositories.post_repository import PostRepository

class PostService:
    """
    文章服务类
    """
    
    def __init__(self, db: Session):
        self.db = db
        self.post_repo = PostRepository(db)
    
    def create_post(self, post_create: PostCreate, author_id: int) -> Post:
        """
        创建新文章
        """
        # 生成唯一的slug
        slug = self._generate_unique_slug(post_create.title, post_create.slug)
        
        post_data = {
            "title": post_create.title,
            "slug": slug,
            "content": post_create.content,
            "excerpt": post_create.excerpt or self._generate_excerpt(post_create.content),
            "featured_image": post_create.featured_image,
            "is_published": post_create.is_published,
            "author_id": author_id,
            "category_id": post_create.category_id,
            "view_count": 0,
            "like_count": 0
        }
        
        post = self.post_repo.create(post_data)
        
        # 处理标签
        if post_create.tags:
            self._handle_post_tags(post.id, post_create.tags)
        
        return post
    
    def get_post(self, post_id: int) -> Optional[Post]:
        """
        根据ID获取文章
        """
        return (self.db.query(Post)
                .options(
                    joinedload(Post.author),
                    joinedload(Post.category),
                    joinedload(Post.tags)
                )
                .filter(Post.id == post_id)
                .first())
    
    def get_post_by_slug(self, slug: str) -> Optional[Post]:
        """
        根据slug获取文章
        """
        return (self.db.query(Post)
                .options(
                    joinedload(Post.author),
                    joinedload(Post.category),
                    joinedload(Post.tags)
                )
                .filter(Post.slug == slug)
                .first())
    
    def update_post(self, post_id: int, post_update: PostUpdate) -> Optional[Post]:
        """
        更新文章
        """
        update_data = post_update.dict(exclude_unset=True)
        
        # 处理标签
        if "tags" in update_data:
            tags = update_data.pop("tags")
            self._handle_post_tags(post_id, tags)
        
        # 如果更新了标题,可能需要更新slug
        if "title" in update_data:
            current_post = self.get_post(post_id)
            if current_post:
                new_slug = self._generate_unique_slug(
                    update_data["title"], 
                    exclude_post_id=post_id
                )
                update_data["slug"] = new_slug
        
        return self.post_repo.update(post_id, update_data)
    
    def delete_post(self, post_id: int) -> bool:
        """
        删除文章
        """
        return self.post_repo.delete(post_id)
    
    def get_posts_paginated(
        self,
        page: int = 1,
        size: int = 20,
        category_id: Optional[int] = None,
        author_id: Optional[int] = None,
        search: Optional[str] = None,
        published_only: bool = True
    ) -> Tuple[List[Post], int]:
        """
        分页获取文章列表
        """
        skip = (page - 1) * size
        
        # 构建查询条件
        filters = {}
        if published_only:
            filters["is_published"] = True
        if category_id:
            filters["category_id"] = category_id
        if author_id:
            filters["author_id"] = author_id
        
        if search:
            posts = self.post_repo.search_posts(search, skip, size)
            # 简化处理总数计算
            total = len(self.post_repo.search_posts(search, 0, 1000))
        else:
            posts = self.post_repo.get_multi(
                skip=skip,
                limit=size,
                filters=filters,
                order_by="-created_at"
            )
            total = self.post_repo.count(filters)
        
        return posts, total
    
    def increment_view_count(self, post_id: int) -> bool:
        """
        增加文章浏览量
        """
        return self.post_repo.increment_view_count(post_id)
    
    def toggle_like(self, post_id: int, user_id: int) -> Dict[str, Any]:
        """
        切换文章点赞状态
        """
        # 这里需要实现点赞表的逻辑
        # 简化处理,直接更新计数
        post = self.get_post(post_id)
        if not post:
            return {"success": False, "message": "文章不存在"}
        
        # 实际应该检查用户是否已点赞
        # 这里简化处理
        new_count = post.like_count + 1
        self.post_repo.update(post_id, {"like_count": new_count})
        
        return {
            "success": True,
            "liked": True,
            "like_count": new_count
        }
    
    def get_popular_posts(self, limit: int = 10) -> List[Post]:
        """
        获取热门文章
        """
        return (self.db.query(Post)
                .filter(Post.is_published == True)
                .order_by(desc(Post.view_count))
                .limit(limit)
                .all())
    
    def get_recent_posts(self, limit: int = 10) -> List[Post]:
        """
        获取最新文章
        """
        return (self.db.query(Post)
                .filter(Post.is_published == True)
                .order_by(desc(Post.created_at))
                .limit(limit)
                .all())
    
    def _generate_unique_slug(
        self, 
        title: str, 
        base_slug: Optional[str] = None,
        exclude_post_id: Optional[int] = None
    ) -> str:
        """
        生成唯一的slug
        """
        import re
        
        if base_slug:
            slug = base_slug
        else:
            # 从标题生成slug
            slug = re.sub(r'[^\w\s-]', '', title).strip().lower()
            slug = re.sub(r'[-\s]+', '-', slug)
        
        # 检查slug是否已存在
        original_slug = slug
        counter = 1
        
        while True:
            query = self.db.query(Post).filter(Post.slug == slug)
            if exclude_post_id:
                query = query.filter(Post.id != exclude_post_id)
            
            if not query.first():
                break
            
            slug = f"{original_slug}-{counter}"
            counter += 1
        
        return slug
    
    def _generate_excerpt(self, content: str, max_length: int = 200) -> str:
        """
        从内容生成摘要
        """
        import re
        
        # 移除HTML标签
        clean_content = re.sub(r'<[^>]+>', '', content)
        
        # 截取指定长度
        if len(clean_content) <= max_length:
            return clean_content
        
        # 在单词边界截取
        excerpt = clean_content[:max_length]
        last_space = excerpt.rfind(' ')
        
        if last_space > 0:
            excerpt = excerpt[:last_space]
        
        return excerpt + "..."
    
    def _handle_post_tags(self, post_id: int, tag_names: List[str]):
        """
        处理文章标签
        """
        post = self.db.query(Post).filter(Post.id == post_id).first()
        if not post:
            return
        
        # 清除现有标签关联
        post.tags.clear()
        
        # 添加新标签
        for tag_name in tag_names:
            tag = self.db.query(Tag).filter(Tag.name == tag_name).first()
            
            if not tag:
                # 创建新标签
                tag_slug = tag_name.lower().replace(' ', '-')
                tag = Tag(
                    name=tag_name,
                    slug=tag_slug
                )
                self.db.add(tag)
                self.db.flush()  # 获取ID
            
            post.tags.append(tag)
        
        self.db.commit()

print("文章服务实现完成")
print("主要功能:")
print("- 文章CRUD操作")
print("- 分页查询和搜索")
print("- 标签管理")
print("- 浏览量和点赞")
print("- 热门和最新文章")

6. API测试与文档

6.1 单元测试

import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import get_db, Base
from app.core.security import security_manager
from app.models import User, Post, Category

# 测试数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, 
    connect_args={"check_same_thread": False}
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建测试数据库表
Base.metadata.create_all(bind=engine)

def override_get_db():
    """
    测试数据库依赖覆盖
    """
    try:
        db = TestingSessionLocal()
        yield db
    finally:
        db.close()

# 覆盖数据库依赖
app.dependency_overrides[get_db] = override_get_db

# 测试客户端
client = TestClient(app)

class TestAuth:
    """
    认证相关测试
    """
    
    def test_register_user(self):
        """
        测试用户注册
        """
        user_data = {
            "email": "test@example.com",
            "username": "testuser",
            "password": "TestPassword123",
            "full_name": "Test User"
        }
        
        response = client.post("/api/v1/users/", json=user_data)
        assert response.status_code == 201
        
        data = response.json()
        assert data["email"] == user_data["email"]
        assert data["username"] == user_data["username"]
        assert "id" in data
    
    def test_register_duplicate_email(self):
        """
        测试重复邮箱注册
        """
        user_data = {
            "email": "test@example.com",
            "username": "testuser2",
            "password": "TestPassword123",
            "full_name": "Test User 2"
        }
        
        response = client.post("/api/v1/users/", json=user_data)
        assert response.status_code == 400
        assert "邮箱已被注册" in response.json()["detail"]
    
    def test_login_success(self):
        """
        测试登录成功
        """
        login_data = {
            "username": "testuser",
            "password": "TestPassword123"
        }
        
        response = client.post("/api/v1/auth/login", json=login_data)
        assert response.status_code == 200
        
        data = response.json()
        assert "access_token" in data
        assert "refresh_token" in data
        assert data["token_type"] == "bearer"
    
    def test_login_invalid_credentials(self):
        """
        测试无效凭据登录
        """
        login_data = {
            "username": "testuser",
            "password": "wrongpassword"
        }
        
        response = client.post("/api/v1/auth/login", json=login_data)
        assert response.status_code == 401
    
    def test_get_current_user(self):
        """
        测试获取当前用户信息
        """
        # 先登录获取token
        login_data = {
            "username": "testuser",
            "password": "TestPassword123"
        }
        
        login_response = client.post("/api/v1/auth/login", json=login_data)
        token = login_response.json()["access_token"]
        
        # 使用token获取用户信息
        headers = {"Authorization": f"Bearer {token}"}
        response = client.get("/api/v1/users/me", headers=headers)
        
        assert response.status_code == 200
        data = response.json()
        assert data["username"] == "testuser"
    
    def test_unauthorized_access(self):
        """
        测试未授权访问
        """
        response = client.get("/api/v1/users/me")
        assert response.status_code == 401

class TestPosts:
    """
    文章相关测试
    """
    
    @pytest.fixture
    def auth_headers(self):
        """
        获取认证头
        """
        login_data = {
            "username": "testuser",
            "password": "TestPassword123"
        }
        
        login_response = client.post("/api/v1/auth/login", json=login_data)
        token = login_response.json()["access_token"]
        
        return {"Authorization": f"Bearer {token}"}
    
    def test_create_post(self, auth_headers):
        """
        测试创建文章
        """
        post_data = {
            "title": "Test Post",
            "content": "This is a test post content.",
            "excerpt": "Test excerpt",
            "is_published": True
        }
        
        response = client.post(
            "/api/v1/posts/", 
            json=post_data, 
            headers=auth_headers
        )
        
        assert response.status_code == 201
        data = response.json()
        assert data["title"] == post_data["title"]
        assert data["slug"] == "test-post"
        assert "id" in data
    
    def test_get_posts(self):
        """
        测试获取文章列表
        """
        response = client.get("/api/v1/posts/")
        assert response.status_code == 200
        
        data = response.json()
        assert "items" in data
        assert "total" in data
        assert "page" in data
    
    def test_get_post_by_id(self):
        """
        测试根据ID获取文章
        """
        # 假设文章ID为1
        response = client.get("/api/v1/posts/1")
        
        if response.status_code == 200:
            data = response.json()
            assert "id" in data
            assert "title" in data
        else:
            assert response.status_code == 404
    
    def test_update_post(self, auth_headers):
        """
        测试更新文章
        """
        update_data = {
            "title": "Updated Test Post",
            "content": "Updated content"
        }
        
        response = client.put(
            "/api/v1/posts/1", 
            json=update_data, 
            headers=auth_headers
        )
        
        if response.status_code == 200:
            data = response.json()
            assert data["title"] == update_data["title"]
        else:
            # 可能是权限不足或文章不存在
            assert response.status_code in [403, 404]
    
    def test_delete_post(self, auth_headers):
        """
        测试删除文章
        """
        response = client.delete("/api/v1/posts/1", headers=auth_headers)
        
        # 可能成功删除或权限不足
        assert response.status_code in [200, 403, 404]

class TestValidation:
    """
    数据验证测试
    """
    
    def test_invalid_email_format(self):
        """
        测试无效邮箱格式
        """
        user_data = {
            "email": "invalid-email",
            "username": "testuser3",
            "password": "TestPassword123",
            "full_name": "Test User 3"
        }
        
        response = client.post("/api/v1/users/", json=user_data)
        assert response.status_code == 422
    
    def test_weak_password(self):
        """
        测试弱密码
        """
        user_data = {
            "email": "test3@example.com",
            "username": "testuser3",
            "password": "weak",  # 太短且没有大写字母和数字
            "full_name": "Test User 3"
        }
        
        response = client.post("/api/v1/users/", json=user_data)
        assert response.status_code == 422
    
    def test_missing_required_fields(self):
        """
        测试缺少必需字段
        """
        user_data = {
            "email": "test4@example.com"
            # 缺少username和password
        }
        
        response = client.post("/api/v1/users/", json=user_data)
        assert response.status_code == 422

class TestPerformance:
    """
    性能测试
    """
    
    def test_concurrent_requests(self):
        """
        测试并发请求
        """
        import concurrent.futures
        import time
        
        def make_request():
            return client.get("/api/v1/posts/")
        
        start_time = time.time()
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(make_request) for _ in range(50)]
            results = [future.result() for future in futures]
        
        end_time = time.time()
        
        # 检查所有请求都成功
        for response in results:
            assert response.status_code == 200
        
        # 检查响应时间
        total_time = end_time - start_time
        assert total_time < 10  # 50个请求应该在10秒内完成
        
        print(f"50个并发请求耗时: {total_time:.2f}秒")

# 运行测试
if __name__ == "__main__":
    pytest.main(["-v", __file__])

print("API测试实现完成")
print("包含以下测试类别:")
print("- TestAuth: 认证相关测试")
print("- TestPosts: 文章相关测试")
print("- TestValidation: 数据验证测试")
print("- TestPerformance: 性能测试")
print("\n测试覆盖:")
print("- 用户注册和登录")
print("- 文章CRUD操作")
print("- 权限控制")
print("- 数据验证")
print("- 并发性能")

7. 部署与监控

7.1 Docker部署

# Dockerfile内容已在项目结构中定义
# 这里提供部署脚本和配置

import os
import subprocess
from typing import Dict, List, Optional

class DeploymentManager:
    """
    部署管理器
    """
    
    def __init__(self, project_name: str = "web-app"):
        self.project_name = project_name
        self.docker_compose_file = "docker-compose.yml"
        self.env_file = ".env"
    
    def create_env_file(self, config: Dict[str, str]):
        """
        创建环境变量文件
        """
        env_content = []
        for key, value in config.items():
            env_content.append(f"{key}={value}")
        
        with open(self.env_file, 'w') as f:
            f.write('\n'.join(env_content))
        
        print(f"环境变量文件 {self.env_file} 已创建")
    
    def build_images(self):
        """
        构建Docker镜像
        """
        try:
            result = subprocess.run(
                ["docker-compose", "build"],
                check=True,
                capture_output=True,
                text=True
            )
            print("Docker镜像构建成功")
            return True
        except subprocess.CalledProcessError as e:
            print(f"Docker镜像构建失败: {e.stderr}")
            return False
    
    def start_services(self, detached: bool = True):
        """
        启动服务
        """
        try:
            cmd = ["docker-compose", "up"]
            if detached:
                cmd.append("-d")
            
            result = subprocess.run(
                cmd,
                check=True,
                capture_output=True,
                text=True
            )
            print("服务启动成功")
            return True
        except subprocess.CalledProcessError as e:
            print(f"服务启动失败: {e.stderr}")
            return False
    
    def stop_services(self):
        """
        停止服务
        """
        try:
            result = subprocess.run(
                ["docker-compose", "down"],
                check=True,
                capture_output=True,
                text=True
            )
            print("服务已停止")
            return True
        except subprocess.CalledProcessError as e:
            print(f"服务停止失败: {e.stderr}")
            return False
    
    def get_service_status(self) -> List[Dict[str, str]]:
        """
        获取服务状态
        """
        try:
            result = subprocess.run(
                ["docker-compose", "ps"],
                check=True,
                capture_output=True,
                text=True
            )
            
            # 解析输出(简化处理)
            lines = result.stdout.strip().split('\n')
            services = []
            
            for line in lines[2:]:  # 跳过标题行
                if line.strip():
                    parts = line.split()
                    if len(parts) >= 4:
                        services.append({
                            "name": parts[0],
                            "status": parts[3],
                            "ports": parts[4] if len(parts) > 4 else ""
                        })
            
            return services
        except subprocess.CalledProcessError as e:
            print(f"获取服务状态失败: {e.stderr}")
            return []
    
    def view_logs(self, service_name: Optional[str] = None, tail: int = 100):
        """
        查看服务日志
        """
        try:
            cmd = ["docker-compose", "logs", "--tail", str(tail)]
            if service_name:
                cmd.append(service_name)
            
            result = subprocess.run(
                cmd,
                check=True,
                capture_output=True,
                text=True
            )
            
            print(result.stdout)
            return True
        except subprocess.CalledProcessError as e:
            print(f"查看日志失败: {e.stderr}")
            return False
    
    def run_migrations(self):
        """
        运行数据库迁移
        """
        try:
            result = subprocess.run(
                ["docker-compose", "exec", "web", "alembic", "upgrade", "head"],
                check=True,
                capture_output=True,
                text=True
            )
            print("数据库迁移完成")
            return True
        except subprocess.CalledProcessError as e:
            print(f"数据库迁移失败: {e.stderr}")
            return False
    
    def backup_database(self, backup_file: str):
        """
        备份数据库
        """
        try:
            result = subprocess.run([
                "docker-compose", "exec", "db", 
                "pg_dump", "-U", "postgres", "-d", "webapp", 
                "-f", f"/tmp/{backup_file}"
            ], check=True, capture_output=True, text=True)
            
            # 复制备份文件到主机
            subprocess.run([
                "docker", "cp", 
                f"$(docker-compose ps -q db):/tmp/{backup_file}", 
                backup_file
            ], check=True)
            
            print(f"数据库备份完成: {backup_file}")
            return True
        except subprocess.CalledProcessError as e:
            print(f"数据库备份失败: {e.stderr}")
            return False

# 部署配置
DEPLOYMENT_CONFIG = {
    "development": {
        "PROJECT_NAME": "Web App Dev",
        "DATABASE_URL": "postgresql://postgres:password@db:5432/webapp_dev",
        "REDIS_URL": "redis://redis:6379/0",
        "SECRET_KEY": "dev-secret-key-change-in-production",
        "ACCESS_TOKEN_EXPIRE_MINUTES": "60",
        "DEBUG": "true"
    },
    "production": {
        "PROJECT_NAME": "Web App",
        "DATABASE_URL": "postgresql://postgres:secure_password@db:5432/webapp",
        "REDIS_URL": "redis://redis:6379/0",
        "SECRET_KEY": "super-secret-key-for-production",
        "ACCESS_TOKEN_EXPIRE_MINUTES": "30",
        "DEBUG": "false"
    }
}

# 使用示例
def deploy_application(environment: str = "development"):
    """
    部署应用程序
    """
    deployment = DeploymentManager()
    
    print(f"开始部署 {environment} 环境...")
    
    # 1. 创建环境变量文件
    config = DEPLOYMENT_CONFIG.get(environment, DEPLOYMENT_CONFIG["development"])
    deployment.create_env_file(config)
    
    # 2. 构建镜像
    if not deployment.build_images():
        print("部署失败:镜像构建失败")
        return False
    
    # 3. 启动服务
    if not deployment.start_services():
        print("部署失败:服务启动失败")
        return False
    
    # 4. 等待服务启动
    import time
    print("等待服务启动...")
    time.sleep(10)
    
    # 5. 运行数据库迁移
    if not deployment.run_migrations():
        print("警告:数据库迁移失败")
    
    # 6. 检查服务状态
    services = deployment.get_service_status()
    print("\n服务状态:")
    for service in services:
        print(f"- {service['name']}: {service['status']} {service['ports']}")
    
    print(f"\n{environment} 环境部署完成!")
    print("应用访问地址: http://localhost:8000")
    print("API文档地址: http://localhost:8000/docs")
    
    return True

print("部署管理器实现完成")
print("主要功能:")
print("- Docker镜像构建")
print("- 服务启动和停止")
print("- 环境配置管理")
print("- 数据库迁移")
print("- 服务状态监控")
print("- 数据库备份")

总结

本章节详细介绍了现代Web应用开发的完整流程,包括:

核心技术栈

  • FastAPI: 高性能异步Web框架
  • SQLAlchemy: 强大的ORM工具
  • PostgreSQL: 可靠的关系型数据库
  • Redis: 高性能缓存和会话存储
  • JWT: 无状态认证机制
  • Docker: 容器化部署

架构设计

  • 分层架构: 清晰的代码组织结构
  • 仓储模式: 数据访问层抽象
  • 服务层: 业务逻辑封装
  • 依赖注入: 松耦合设计

安全特性

  • 密码加密: bcrypt哈希算法
  • JWT认证: 访问令牌和刷新令牌
  • 权限控制: 基于角色的访问控制(RBAC)
  • 输入验证: Pydantic数据验证

性能优化

  • Redis缓存: 多层缓存策略
  • 数据库优化: 查询优化和索引
  • 异步处理: 高并发支持
  • 连接池: 数据库连接管理

开发实践

  • API设计: RESTful风格
  • 自动文档: OpenAPI/Swagger
  • 单元测试: 全面的测试覆盖
  • 错误处理: 统一的异常处理

部署运维

  • 容器化: Docker和Docker Compose
  • 环境管理: 多环境配置
  • 监控日志: 服务状态监控
  • 数据备份: 自动化备份策略

通过本章节的学习,你将掌握构建生产级Web应用的完整技能,包括架构设计、安全实现、性能优化和部署运维等各个方面。这些知识和技能将为你的Web开发职业生涯奠定坚实的基础。

下一步

建议继续学习:

  • 微服务架构设计
  • 消息队列和异步任务
  • 监控和日志系统
  • CI/CD自动化部署
  • 云原生技术栈
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐