Python金丹突破:实战精进
Python Web开发主要有两大阵营:虽然Python主要用于后端,但现代Web开发需要前后端配合:PythonPythonPlain TextPythonPythonPythonPythonPython数据分析与科学计算是Python应用最广泛的领域之一,它结合了统计学、编程和领域知识,帮助我们从数据中提取有价值的信息。下面我将结合具体样例,系统介绍这个方向的核心技术栈和学习路径。NumPy是
Python金丹境界:实战精进
一、专项技术方向选择与实践
1. Web开发方向
一、Web开发技术栈全景图
1. 后端框架选择
Python Web开发主要有两大阵营:
| 框架 | 特点 | 适用场景 |
|---|---|---|
| Flask | 微框架,轻量灵活,扩展性强 | 小型项目、API服务、快速原型 |
| Django | 全功能框架,内置丰富组件 | 企业级应用、内容管理系统 |
2. 前端技术配合
虽然Python主要用于后端,但现代Web开发需要前后端配合:
- 传统模板渲染**:Jinja2(Flask)、Django模板**
- 前后端分离**:REST API + Vue.js/React**
- 全栈Python**:使用PyScript在浏览器运行Python**
二、Flask:轻量级Web开发入门
1. Flask基础应用
from flask import Flask, render_template, request, jsonify, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 初始化扩展
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# 数据模型
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
posts = db.relationship('Post', backref='author', lazy=True)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# 路由定义
@app.route('/')
def index():
"""首页 - 显示最新文章"""
posts = Post.query.order_by(Post.created_at.desc()).limit(10).all()
return render_template('index.html', posts=posts)
@app.route('/login', methods=['GET', 'POST'])
def login():
"""用户登录"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password_hash, password):
login_user(user)
return redirect(url_for('dashboard'))
return render_template('login.html', error='用户名或密码错误')
return render_template('login.html')
@app.route('/dashboard')
@login_required
def dashboard():
"""用户仪表板"""
user_posts = Post.query.filter_by(user_id=current_user.id).all()
return render_template('dashboard.html', posts=user_posts)
@app.route('/api/posts', methods=['GET'])
def get_posts_api():
"""REST API:获取文章列表"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
posts = Post.query.order_by(Post.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'posts': [{
'id': post.id,
'title': post.title,
'content': post.content[:100] + '...' if len(post.content) > 100 else post.content,
'author': post.author.username,
'created_at': post.created_at.isoformat()
} for post in posts.items],
'total': posts.total,
'page': posts.page,
'pages': posts.pages
})
# 模板示例:templates/index.html
"""
<!DOCTYPE html>
<html>
<head>
<title>我的博客</title>
<link href=" https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css " rel="stylesheet">
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container">
<a class="navbar-brand" href="{{ url_for('index') }}">我的博客</a>
<div class="navbar-nav">
{% if current_user.is_authenticated %}
<a class="nav-link" href="{{ url_for('dashboard') }}">仪表板</a>
<a class="nav-link" href="{{ url_for('logout') }}">退出</a>
{% else %}
<a class="nav-link" href="{{ url_for('login') }}">登录</a>
<a class="nav-link" href="{{ url_for('register') }}">注册</a>
{% endif %}
</div>
</div>
</nav>
<div class="container mt-4">
<h1>最新文章</h1>
<div class="row">
{% for post in posts %}
<div class="col-md-6 mb-4">
<div class="card">
<div class="card-body">
<h5 class="card-title">{{ post.title }}</h5>
<p class="card-text">{{ post.content[:150] }}...</p>
<p class="text-muted">
作者: {{ post.author.username }} |
发布时间: {{ post.created_at.strftime('%Y-%m-%d %H:%M') }}
</p>
<a href="{{ url_for('post_detail', post_id=post.id) }}" class="btn btn-primary">阅读更多</a>
</div>
</div>
</div>
{% endfor %}
</div>
</div>
</body>
</html>
"""
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
Python
2. Flask RESTful API开发
from flask import Flask, request, jsonify
from flask_restful import Api, Resource, reqparse
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'super-secret-key'
api = Api(app)
jwt = JWTManager(app)
# 模拟数据库
users = []
products = []
# 请求参数解析器
user_parser = reqparse.RequestParser()
user_parser.add_argument('username', type=str, required=True, help='用户名不能为空')
user_parser.add_argument('password', type=str, required=True, help='密码不能为空')
user_parser.add_argument('email', type=str, required=True, help='邮箱不能为空')
product_parser = reqparse.RequestParser()
product_parser.add_argument('name', type=str, required=True, help='商品名称不能为空')
product_parser.add_argument('price', type=float, required=True, help='价格不能为空')
product_parser.add_argument('stock', type=int, required=True, help='库存不能为空')
class UserRegistration(Resource):
"""用户注册"""
def post(self):
args = user_parser.parse_args()
# 检查用户是否已存在
if any(u['username'] == args['username'] for u in users):
return {'message': '用户名已存在'}, 400
# 创建新用户
user = {
'id': len(users) + 1,
'username': args['username'],
'email': args['email'],
'password_hash': generate_password_hash(args['password'])
}
users.append(user)
return {'message': '用户注册成功', 'user_id': user['id']}, 201
class UserLogin(Resource):
"""用户登录"""
def post(self):
args = user_parser.parse_args()
# 查找用户
user = next((u for u in users if u['username'] == args['username']), None)
if user and check_password_hash(user['password_hash'], args['password']):
# 生成访问令牌
access_token = create_access_token(identity=user['id'])
return {
'access_token': access_token,
'user_id': user['id'],
'username': user['username']
}, 200
return {'message': '用户名或密码错误'}, 401
class ProductList(Resource):
"""商品列表"""
@jwt_required()
def get(self):
return {'products': products}, 200
@jwt_required()
def post(self):
args = product_parser.parse_args()
product = {
'id': len(products) + 1,
'name': args['name'],
'price': args['price'],
'stock': args['stock'],
'created_by': get_jwt_identity()
}
products.append(product)
return {'message': '商品创建成功', 'product': product}, 201
class ProductDetail(Resource):
"""商品详情"""
@jwt_required()
def get(self, product_id):
product = next((p for p in products if p['id'] == product_id), None)
if not product:
return {'message': '商品不存在'}, 404
return {'product': product}, 200
@jwt_required()
def put(self, product_id):
product = next((p for p in products if p['id'] == product_id), None)
if not product:
return {'message': '商品不存在'}, 404
# 检查权限
if product['created_by'] != get_jwt_identity():
return {'message': '无权修改此商品'}, 403
args = product_parser.parse_args()
product.update({
'name': args['name'],
'price': args['price'],
'stock': args['stock']
})
return {'message': '商品更新成功', 'product': product}, 200
@jwt_required()
def delete(self, product_id):
global products
product = next((p for p in products if p['id'] == product_id), None)
if not product:
return {'message': '商品不存在'}, 404
# 检查权限
if product['created_by'] != get_jwt_identity():
return {'message': '无权删除此商品'}, 403
products = [p for p in products if p['id'] != product_id]
return {'message': '商品删除成功'}, 200
# 注册路由
api.add_resource(UserRegistration, '/api/register')
api.add_resource(UserLogin, '/api/login')
api.add_resource(ProductList, '/api/products')
api.add_resource(ProductDetail, '/api/products/<int:product_id>')
if __name__ == '__main__':
app.run(debug=True)
Python
三、Django:企业级Web开发
1. Django项目结构
myproject/
├── manage.py
├── myproject/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── blog/
│ ├── migrations/
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── models.py
│ ├── views.py
│ ├── urls.py
│ └── templates/
└── requirements.txt
Plain Text
2. Django模型与视图
# blog/models.py
from django.db import models
from django.contrib.auth.models import User
from django.urls import reverse
class Category(models.Model):
"""文章分类"""
name = models.CharField('分类名称', max_length=100)
slug = models.SlugField('URL标识', unique=True)
description = models.TextField('描述', blank=True)
class Meta:
verbose_name = '分类'
verbose_name_plural = '分类'
def __str__(self):
return self.name
class Tag(models.Model):
"""文章标签"""
name = models.CharField('标签名称', max_length=100)
slug = models.SlugField('URL标识', unique=True)
class Meta:
verbose_name = '标签'
verbose_name_plural = '标签'
def __str__(self):
return self.name
class Post(models.Model):
"""文章"""
STATUS_CHOICES = [
('draft', '草稿'),
('published', '已发布'),
]
title = models.CharField('标题', max_length=200)
slug = models.SlugField('URL标识', unique_for_date='publish_date')
author = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='作者')
category = models.ForeignKey(Category, on_delete=models.SET_NULL,
null=True, verbose_name='分类')
tags = models.ManyToManyField(Tag, blank=True, verbose_name='标签')
content = models.TextField('内容')
excerpt = models.TextField('摘要', max_length=500, blank=True)
status = models.CharField('状态', max_length=10, choices=STATUS_CHOICES, default='draft')
publish_date = models.DateTimeField('发布时间', null=True, blank=True)
created_at = models.DateTimeField('创建时间', auto_now_add=True)
updated_at = models.DateTimeField('更新时间', auto_now=True)
views = models.PositiveIntegerField('浏览量', default=0)
class Meta:
verbose_name = '文章'
verbose_name_plural = '文章'
ordering = ['-publish_date', '-created_at']
def __str__(self):
return self.title
def get_absolute_url(self):
return reverse('blog:post_detail', args=[
self.publish_date.year,
self.publish_date.month,
self.publish_date.day,
self.slug
])
def increase_views(self):
"""增加浏览量"""
self.views += 1
self.save(update_fields=['views'])
class Comment(models.Model):
"""评论"""
post = models.ForeignKey(Post, on_delete=models.CASCADE, related_name='comments', verbose_name='文章')
user = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='用户')
content = models.TextField('评论内容')
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True,
related_name='replies', verbose_name='父评论')
created_at = models.DateTimeField('创建时间', auto_now_add=True)
is_active = models.BooleanField('是否有效', default=True)
class Meta:
verbose_name = '评论'
verbose_name_plural = '评论'
ordering = ['created_at']
def __str__(self):
return f'{self.user.username} 的评论'
Python
# blog/views.py
from django.shortcuts import render, get_object_or_404, redirect
from django.views.generic import ListView, DetailView, CreateView, UpdateView, DeleteView
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.urls import reverse_lazy
from django.db.models import Q, Count
from django.core.paginator import Paginator
from .models import Post, Category, Tag, Comment
from .forms import PostForm, CommentForm
class PostListView(ListView):
"""文章列表视图"""
model = Post
template_name = 'blog/post_list.html'
context_object_name = 'posts'
paginate_by = 10
def get_queryset(self):
queryset = Post.objects.filter(status='published')
# 搜索功能
search_query = self.request.GET.get('q')
if search_query:
queryset = queryset.filter(
Q(title__icontains=search_query) |
Q(content__icontains=search_query) |
Q(excerpt__icontains=search_query)
)
# 分类过滤
category_slug = self.request.GET.get('category')
if category_slug:
queryset = queryset.filter(category__slug=category_slug)
# 标签过滤
tag_slug = self.request.GET.get('tag')
if tag_slug:
queryset = queryset.filter(tags__slug=tag_slug)
return queryset.select_related('author', 'category').prefetch_related('tags')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['categories'] = Category.objects.annotate(
post_count=Count('post')
).filter(post_count__gt=0)
context['tags'] = Tag.objects.annotate(
post_count=Count('post')
).filter(post_count__gt=0)
return context
class PostDetailView(DetailView):
"""文章详情视图"""
model = Post
template_name = 'blog/post_detail.html'
context_object_name = 'post'
def get_object(self, queryset=None):
# 获取文章对象并增加浏览量
post = super().get_object(queryset)
post.increase_views()
return post
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['comment_form'] = CommentForm()
context['comments'] = self.object.comments.filter(is_active=True, parent=None)
return context
class PostCreateView(LoginRequiredMixin, CreateView):
"""创建文章视图"""
model = Post
form_class = PostForm
template_name = 'blog/post_form.html'
def form_valid(self, form):
form.instance.author = self.request.user
return super().form_valid(form)
def get_success_url(self):
messages.success(self.request, '文章创建成功!')
return reverse_lazy('blog:post_detail', kwargs={'pk': self.object.pk})
class PostUpdateView(LoginRequiredMixin, UserPassesTestMixin, UpdateView):
"""更新文章视图"""
model = Post
form_class = PostForm
template_name = 'blog/post_form.html'
def test_func(self):
post = self.get_object()
return self.request.user == post.author
def get_success_url(self):
messages.success(self.request, '文章更新成功!')
return reverse_lazy('blog:post_detail', kwargs={'pk': self.object.pk})
class PostDeleteView(LoginRequiredMixin, UserPassesTestMixin, DeleteView):
"""删除文章视图"""
model = Post
template_name = 'blog/post_confirm_delete.html'
success_url = reverse_lazy('blog:post_list')
def test_func(self):
post = self.get_object()
return self.request.user == post.author
def delete(self, request, *args, **kwargs):
messages.success(request, '文章删除成功!')
return super().delete(request, *args, **kwargs)
@login_required
def add_comment(request, pk):
"""添加评论"""
post = get_object_or_404(Post, pk=pk)
if request.method == 'POST':
form = CommentForm(request.POST)
if form.is_valid():
comment = form.save(commit=False)
comment.post = post
comment.user = request.user
comment.save()
messages.success(request, '评论发布成功!')
return redirect('blog:post_detail', pk=pk)
Python
3. Django REST Framework API
# api/serializers.py
from rest_framework import serializers
from blog.models import Post, Category, Tag, Comment
from django.contrib.auth.models import User
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'username', 'email', 'date_joined']
read_only_fields = ['date_joined']
class CategorySerializer(serializers.ModelSerializer):
post_count = serializers.IntegerField(source='post_set.count', read_only=True)
class Meta:
model = Category
fields = ['id', 'name', 'slug', 'description', 'post_count']
class TagSerializer(serializers.ModelSerializer):
class Meta:
model = Tag
fields = ['id', 'name', 'slug']
class CommentSerializer(serializers.ModelSerializer):
user = UserSerializer(read_only=True)
replies = serializers.SerializerMethodField()
class Meta:
model = Comment
fields = ['id', 'user', 'content', 'parent', 'replies', 'created_at']
read_only_fields = ['user', 'created_at']
def get_replies(self, obj):
if obj.replies.exists():
return CommentSerializer(obj.replies.filter(is_active=True), many=True).data
return []
def create(self, validated_data):
validated_data['user'] = self.context['request'].user
return super().create(validated_data)
class PostSerializer(serializers.ModelSerializer):
author = UserSerializer(read_only=True)
category = CategorySerializer(read_only=True)
tags = TagSerializer(many=True, read_only=True)
comments = CommentSerializer(many=True, read_only=True)
comment_count = serializers.IntegerField(source='comments.count', read_only=True)
class Meta:
model = Post
fields = [
'id', 'title', 'slug', 'author', 'category', 'tags',
'content', 'excerpt', 'status', 'publish_date',
'created_at', 'updated_at', 'views', 'comments', 'comment_count'
]
read_only_fields = ['author', 'views', 'created_at', 'updated_at']
def create(self, validated_data):
validated_data['author'] = self.context['request'].user
return super().create(validated_data)
# api/views.py
from rest_framework import viewsets, permissions, filters, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.pagination import PageNumberPagination
from django_filters.rest_framework import DjangoFilterBackend
from blog.models import Post, Category, Tag, Comment
from .serializers import (
PostSerializer, CategorySerializer,
TagSerializer, CommentSerializer
)
class StandardResultsSetPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 100
class PostViewSet(viewsets.ModelViewSet):
"""文章视图集"""
queryset = Post.objects.filter(status='published')
serializer_class = PostSerializer
pagination_class = StandardResultsSetPagination
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['category', 'tags', 'author']
search_fields = ['title', 'content', 'excerpt']
ordering_fields = ['publish_date', 'created_at', 'views']
ordering = ['-publish_date']
def get_permissions(self):
"""根据动作设置权限"""
if self.action in ['create', 'update', 'partial_update', 'destroy']:
permission_classes = [permissions.IsAuthenticated]
else:
permission_classes = [permissions.AllowAny]
return [permission() for permission in permission_classes]
def perform_create(self, serializer):
serializer.save(author=self.request.user)
@action(detail=True, methods=['post'])
def like(self, request, pk=None):
"""点赞文章"""
post = self.get_object()
user = request.user
if user.is_authenticated:
# 这里可以添加点赞逻辑
return Response({'message': '点赞成功'})
return Response({'error': '需要登录'}, status=status.HTTP_401_UNAUTHORIZED)
@action(detail=True, methods=['get'])
def comments(self, request, pk=None):
"""获取文章评论"""
post = self.get_object()
comments = post.comments.filter(is_active=True, parent=None)
page = self.paginate_queryset(comments)
if page is not None:
serializer = CommentSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = CommentSerializer(comments, many=True)
return Response(serializer.data)
class CategoryViewSet(viewsets.ReadOnlyModelViewSet):
"""分类视图集"""
queryset = Category.objects.all()
serializer_class = CategorySerializer
pagination_class = StandardResultsSetPagination
class TagViewSet(viewsets.ReadOnlyModelViewSet):
"""标签视图集"""
queryset = Tag.objects.all()
serializer_class = TagSerializer
pagination_class = StandardResultsSetPagination
class CommentViewSet(viewsets.ModelViewSet):
"""评论视图集"""
queryset = Comment.objects.filter(is_active=True)
serializer_class = CommentSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
def get_queryset(self):
queryset = super().get_queryset()
post_id = self.request.query_params.get('post')
if post_id:
queryset = queryset.filter(post_id=post_id)
return queryset
def perform_create(self, serializer):
serializer.save(user=self.request.user)
Python
四、现代Web开发实践
1. 异步Web开发(FastAPI)
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.sql import func
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing import Optional, List
import os
# 配置
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 数据库配置
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
# 密码加密
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI(title="异步博客API", version="1.0.0")
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 数据模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
content = Column(String)
author_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Pydantic模型
class UserCreate(BaseModel):
username: str
email: str
password: str
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
class Config:
orm_mode = True
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class PostCreate(BaseModel):
title: str
content: str
class PostResponse(BaseModel):
id: int
title: str
content: str
author_id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
orm_mode = True
# 依赖项
async def get_db():
async with AsyncSessionLocal() as session:
yield session
async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
from sqlalchemy import select
result = await db.execute(select(User).where(User.username == token_data.username))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
# 工具函数
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 路由
@app.post("/register", response_model=UserResponse)
async def register(user: UserCreate, db: AsyncSession = Depends(get_db)):
"""用户注册"""
from sqlalchemy import select
# 检查用户是否存在
result = await db.execute(select(User).where(User.username == user.username))
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(status_code=400, detail="用户名已存在")
# 创建新用户
db_user = User(
username=user.username,
email=user.email,
hashed_password=get_password_hash(user.password)
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)):
"""用户登录"""
from sqlalchemy import select
result = await db.execute(select(User).where(User.username == form_data.username))
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=UserResponse)
async def read_users_me(current_user: User = Depends(get_current_user)):
"""获取当前用户信息"""
return current_user
@app.post("/posts", response_model=PostResponse)
async def create_post(
post: PostCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""创建文章"""
db_post = Post(**post.dict(), author_id=current_user.id)
db.add(db_post)
await db.commit()
await db.refresh(db_post)
return db_post
@app.get("/posts", response_model=List[PostResponse])
async def read_posts(
skip: int = 0,
limit: int = 10,
db: AsyncSession = Depends(get_db)
):
"""获取文章列表"""
from sqlalchemy import select
result = await db.execute(
select(Post)
.order_by(Post.created_at.desc())
.offset(skip)
.limit(limit)
)
posts = result.scalars().all()
return posts
@app.get("/posts/{post_id}", response_model=PostResponse)
async def read_post(post_id: int, db: AsyncSession = Depends(get_db)):
"""获取单篇文章"""
from sqlalchemy import select
result = await db.execute(select(Post).where(Post.id == post_id))
post = result.scalar_one_or_none()
if post is None:
raise HTTPException(status_code=404, detail="文章不存在")
return post
# 启动时创建数据库表
@app.on_event("startup")
async def startup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Python
2. WebSocket实时应用
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List
import json
import asyncio
app = FastAPI()
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
"""聊天室页面"""
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket聊天室</title>
<style>
body { font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; }
#messages { border: 1px solid #ddd; height: 300px; overflow-y: scroll; padding: 10px; }
#messageInput { width: 80%; padding: 10px; }
button { padding: 10px 20px; background: #007bff; color: white; border: none; cursor: pointer; }
.message { margin: 5px 0; padding: 8px; background: #f1f1f1; border-radius: 5px; }
.my-message { background: #007bff; color: white; text-align: right; }
</style>
</head>
<body>
<h1>WebSocket聊天室</h1>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>
<script>
const ws = new WebSocket("ws://localhost:8000/ws");
const messagesDiv = document.getElementById("messages");
const messageInput = document.getElementById("messageInput");
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const messageDiv = document.createElement("div");
messageDiv.className = data.type === "my" ? "message my-message" : "message";
messageDiv.textContent = `${data.username}: ${data.message}`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
function sendMessage() {
const message = messageInput.value;
if (message.trim()) {
ws.send(JSON.stringify({
"type": "my",
"message": message,
"username": "用户" + Math.floor(Math.random() * 1000)
}));
messageInput.value = "";
}
}
messageInput.addEventListener("keypress", function(event) {
if (event.key === "Enter") {
sendMessage();
}
});
</script>
</body>
</html>
"""
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket端点"""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
# 广播消息给所有连接
await manager.broadcast(json.dumps({
"type": message_data.get("type", "other"),
"message": message_data["message"],
"username": message_data.get("username", "匿名用户")
}))
except WebSocketDisconnect:
manager.disconnect(webs
Python
2. 数据分析与科学计算方向
数据分析与科学计算是Python应用最广泛的领域之一,它结合了统计学、编程和领域知识,帮助我们从数据中提取有价值的信息。下面我将结合具体样例,系统介绍这个方向的核心技术栈和学习路径。
一、数据分析与科学计算的技术栈全景
1. 核心工具库概览
| 工具库 | 主要用途 | 特点 |
|---|---|---|
| NumPy | 数值计算基础 | 提供多维数组对象和数学函数,是科学计算的基础 |
| Pandas | 数据处理与分析 | 提供DataFrame数据结构,支持数据清洗、转换、分析 |
| Matplotlib | 数据可视化 | 基础的绘图库,高度可定制化 |
| Seaborn | 统计可视化 | 基于Matplotlib,提供更美观的统计图表 |
| SciPy | 科学计算 | 提供优化、线性代数、积分、插值等高级数学函数 |
| scikit-learn | 机器学习 | 提供分类、回归、聚类等机器学习算法 |
二、NumPy:科学计算的基础
1. NumPy数组操作基础样例
NumPy是Python科学计算的基础库,它提供了高效的N维数组对象和丰富的数学函数。
import numpy as np
# 1. 创建数组
# 创建一维数组
arr1d = np.array([1, 2, 3, 4, 5])
print(f"一维数组: {arr1d}")
# 创建二维数组
arr2d = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print(f"二维数组:\n{arr2d}")
# 2. 特殊数组创建
zeros_arr = np.zeros((3, 4)) # 3x4的全0数组
ones_arr = np.ones((2, 3)) # 2x3的全1数组
identity = np.eye(3) # 3x3的单位矩阵
random_arr = np.random.rand(3, 3) # 3x3的随机数组
# 3. 数组运算
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
# 基本运算
print(f"加法: {a + b}") # [5 7 9]
print(f"乘法: {a * b}") # [4 10 18]
print(f"点积: {np.dot(a, b)}") # 32
# 4. 广播机制(重要特性)
# 数组与标量的运算
print(f"数组乘以标量: {a * 2}") # [2 4 6]
# 不同形状数组的运算
matrix = np.array([[1, 2, 3], [4, 5, 6]])
vector = np.array([10, 20, 30])
print(f"广播加法:\n{matrix + vector}")
# 5. 数组索引和切片
arr = np.array([[1, 2, 3, 4],
[5, 6, 7, 8],
[9, 10, 11, 12]])
print(f"第二行: {arr[1]}") # [5 6 7 8]
print(f"第二行第三列: {arr[1, 2]}") # 7
print(f"前两行: \n{arr[:2]}") # [[1 2 3 4], [5 6 7 8]]
print(f"所有行的第二列: {arr[:, 1]}") # [2 6 10]
# 6. 数组形状操作
arr = np.arange(12).reshape(3, 4)
print(f"原始形状: {arr.shape}") # (3, 4)
print(f"转置:\n{arr.T}")
print(f"展平: {arr.flatten()}") # [0 1 2 3 4 5 6 7 8 9 10 11]
# 7. 统计计算
data = np.array([23, 45, 67, 89, 12, 34, 56, 78, 90, 21])
print(f"平均值: {np.mean(data):.2f}")
print(f"中位数: {np.median(data):.2f}")
print(f"标准差: {np.std(data):.2f}")
print(f"方差: {np.var(data):.2f}")
print(f"最小值: {np.min(data)}")
print(f"最大值: {np.max(data)}")
print(f"25%分位数: {np.percentile(data, 25)}")
print(f"75%分位数: {np.percentile(data, 75)}")
# 8. 线性代数运算
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
print(f"矩阵乘法:\n{np.dot(A, B)}")
print(f"矩阵行列式: {np.linalg.det(A):.2f}")
print(f"矩阵逆:\n{np.linalg.inv(A)}")
print(f"特征值: {np.linalg.eigvals(A)}")
Python
2. NumPy实战:图像处理基础
NumPy数组可以表示图像数据,让我们看看如何用NumPy进行基本的图像处理:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
# 创建简单的图像数据
# 创建一个100x100的随机噪声图像
noise_image = np.random.rand(100, 100)
# 创建一个渐变图像
x = np.linspace(0, 1, 100)
y = np.linspace(0, 1, 100)
X, Y = np.meshgrid(x, y)
gradient_image = np.sin(2 * np.pi * X) * np.cos(2 * np.pi * Y)
# 图像处理操作
# 1. 调整亮度
brightened = gradient_image * 1.5
brightened = np.clip(brightened, 0, 1) # 限制在0-1范围内
# 2. 添加高斯噪声
def add_gaussian_noise(image, mean=0, sigma=0.1):
"""添加高斯噪声"""
noise = np.random.normal(mean, sigma, image.shape)
noisy_image = image + noise
return np.clip(noisy_image, 0, 1)
noisy_gradient = add_gaussian_noise(gradient_image)
# 3. 图像卷积(模糊效果)
def simple_blur(image, kernel_size=3):
"""简单的均值模糊"""
kernel = np.ones((kernel_size, kernel_size)) / (kernel_size ** 2)
blurred = np.zeros_like(image)
# 边界填充
pad = kernel_size // 2
padded = np.pad(image, pad, mode='edge')
# 卷积操作
for i in range(image.shape:
for j in range(image.shape[1](@ref):
region = padded[i:i+kernel_size, j:j+kernel_size]
blurred[i, j] = np.sum(region * kernel)
return blurred
blurred_image = simple_blur(gradient_image, kernel_size=5)
# 4. 边缘检测(Sobel算子)
def sobel_edge_detection(image):
"""Sobel边缘检测"""
sobel_x = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]])
sobel_y = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]])
edges_x = simple_blur(image, 3) # 使用卷积函数
edges_y = simple_blur(image, 3)
# 计算梯度幅度
gradient_magnitude = np.sqrt(edges_x**2 + edges_y**2)
return gradient_magnitude / np.max(gradient_magnitude) # 归一化
edges = sobel_edge_detection(gradient_image)
# 可视化结果
fig, axes = plt.subplots(2, 3, figsize=(12, 8))
images = [
("原始渐变", gradient_image),
("加亮", brightened),
("加噪声", noisy_gradient),
("模糊处理", blurred_image),
("边缘检测", edges),
("噪声图像", noise_image)
]
for idx, (title, img) in enumerate(images):
ax = axes[idx // 3, idx % 3]
ax.imshow(img, cmap='gray')
ax.set_title(title)
ax.axis('off')
plt.tight_layout()
plt.show()
# 5. 图像统计信息
print("图像统计信息:")
print(f"原始图像 - 均值: {gradient_image.mean():.3f}, 标准差: {gradient_image.std():.3f}")
print(f"模糊图像 - 均值: {blurred_image.mean():.3f}, 标准差: {blurred_image.std():.3f}")
print(f"边缘图像 - 均值: {edges.mean():.3f}, 标准差: {edges.std():.3f}")
# 6. 直方图分析
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
axes[0].hist(gradient_image.flatten(), bins=50, alpha=0.7, color='blue')
axes[0].set_title('原始图像直方图')
axes[0].set_xlabel('像素值')
axes[0].set_ylabel('频数')
axes[1].hist(blurred_image.flatten(), bins=50, alpha=0.7, color='green')
axes[1].set_title('模糊图像直方图')
axes[1].set_xlabel('像素值')
axes[2].hist(edges.flatten(), bins=50, alpha=0.7, color='red')
axes[2].set_title('边缘图像直方图')
axes[2].set_xlabel('像素值')
plt.tight_layout()
plt.show()
Python
三、Pandas:数据分析的核心工具
1. Pandas基础操作样例
Pandas是Python数据分析的核心库,提供了DataFrame和Series两种主要数据结构。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# 1. 创建DataFrame
# 从字典创建
data = {
'姓名': ['张三', '李四', '王五', '赵六'],
'年龄': [25, 30, 35, 28],
'城市': ['北京', '上海', '广州', '深圳'],
'工资': [50000, 60000, 55000, 65000],
'部门': ['技术部', '市场部', '技术部', '人事部']
}
df = pd.DataFrame(data)
print("原始DataFrame:")
print(df)
print("\n基本信息:")
print(f"形状: {df.shape}")
print(f"列名: {list(df.columns)}")
print(f"索引: {list(df.index)}")
# 2. 数据查看和基本信息
print("\n前3行数据:")
print(df.head(3))
print("\n后2行数据:")
print(df.tail(2))
print("\n数据类型:")
print(df.dtypes)
print("\n描述性统计:")
print(df.describe())
print("\n基本信息汇总:")
print(df.info())
# 3. 数据选择与过滤
print("\n选择单列(返回Series):")
print(df['姓名'])
print("\n选择多列(返回DataFrame):")
print(df[['姓名', '年龄', '工资']])
print("\n按位置选择(iloc):")
print(df.iloc # 第一行
print(df.iloc[1:3]) # 第2-3行
print(df.iloc[0, 1]) # 第一行第二列
print(df.iloc[[0, 2], [0, 2]]) # 第1、3行的第1、3列
print("\n按标签选择(loc):")
print(df.loc # 索引为0的行
print(df.loc[0:2]) # 索引0到2的行
print(df.loc[0, '姓名']) # 索引0的姓名列
print(df.loc[[0, 2], ['姓名', '城市']]) # 选择特定行和列
# 4. 条件过滤
print("\n年龄大于28的员工:")
print(df[df['年龄'] > 28])
print("\n技术部的员工:")
print(df[df['部门'] == '技术部'])
print("\n复合条件(年龄>28且工资>55000):")
print(df[(df['年龄'] > 28) & (df['工资'] > 55000)])
print("\n城市在北京或上海:")
print(df[df['城市'].isin(['北京', '上海'])])
# 5. 数据排序
print("\n按年龄升序排序:")
print(df.sort_values('年龄'))
print("\n按工资降序排序:")
print(df.sort_values('工资', ascending=False))
print("\n多列排序(先按部门,再按工资降序):")
print(df.sort_values(['部门', '工资'], ascending=[True, False]))
# 6. 分组聚合
print("\n按部门分组统计:")
grouped = df.groupby('部门')
print("平均工资:")
print(grouped['工资'].mean())
print("\n各部门统计信息:")
print(grouped.agg({
'年龄': ['mean', 'min', 'max'],
'工资': ['mean', 'sum', 'count']
}))
print("\n透视表(按城市和部门统计平均工资):")
pivot_table = pd.pivot_table(df,
values='工资',
index='城市',
columns='部门',
aggfunc='mean',
fill_value=0)
print(pivot_table)
# 7. 数据清洗
# 创建有缺失值的数据
df_with_na = df.copy()
df_with_na.loc[1, '年龄'] = np.nan
df_with_na.loc[2, '工资'] = np.nan
df_with_na.loc[3, '城市'] = None
print("\n包含缺失值的数据:")
print(df_with_na)
print("\n检查缺失值:")
print(df_with_na.isnull())
print("\n每列缺失值数量:")
print(df_with_na.isnull().sum())
print("\n删除包含缺失值的行:")
print(df_with_na.dropna())
print("\n填充缺失值(用均值填充年龄,用中位数填充工资):")
df_filled = df_with_na.copy()
df_filled['年龄'].fillna(df_filled['年龄'].mean(), inplace=True)
df_filled['工资'].fillna(df_filled['工资'].median(), inplace=True)
df_filled['城市'].fillna('未知', inplace=True)
print(df_filled)
# 8. 数据转换
print("\n添加新列(年薪):")
df['年薪'] = df['工资'] * 12
print(df)
print("\n工资等级(使用apply函数):")
def salary_level(salary):
if salary < 55000:
return '初级'
elif salary < 60000:
return '中级'
else:
return '高级'
df['工资等级'] = df['工资'].apply(salary_level)
print(df)
print("\n使用lambda函数(工资增加10%):")
df['调整后工资'] = df['工资'].apply(lambda x: x * 1.1)
print(df)
# 9. 时间序列处理
# 创建时间序列数据
dates = pd.date_range('2024-01-01', periods=10, freq='D')
time_series = pd.DataFrame({
'日期': dates,
'销售额': np.random.randint(1000, 5000, 10),
'客户数': np.random.randint(50, 200, 10)
})
print("\n时间序列数据:")
print(time_series)
print("\n设置日期为索引:")
time_series.set_index('日期', inplace=True)
print(time_series)
print("\n按周重采样(计算每周平均值):")
weekly_mean = time_series.resample('W').mean()
print(weekly_mean)
print("\n移动平均(3天窗口):")
time_series['销售额_3天移动平均'] = time_series['销售额'].rolling(window=3).mean()
print(time_series[['销售额', '销售额_3天移动平均']])
# 10. 数据合并
# 创建第二个DataFrame
df2 = pd.DataFrame({
'姓名': ['张三', '李四', '钱七'],
'入职年份': [2020, 2019, 2021],
'绩效等级': ['A', 'B', 'A']
})
print("\n第二个DataFrame:")
print(df2)
print("\n内连接(inner join):")
merged_inner = pd.merge(df, df2, on='姓名', how='inner')
print(merged_inner)
print("\n左连接(left join):")
merged_left = pd.merge(df, df2, on='姓名', how='left')
print(merged_left)
print("\n外连接(outer join):")
merged_outer = pd.merge(df, df2, on='姓名', how='outer')
print(merged_outer)
# 11. 数据导出和导入
# 保存到CSV
df.to_csv('employee_data.csv', index=False, encoding='utf-8-sig')
# 保存到Excel
df.to_excel('employee_data.xlsx', index=False)
# 从CSV读取
df_from_csv = pd.read_csv('employee_data.csv')
print("\n从CSV读取的数据:")
print(df_from_csv)
# 从Excel读取
df_from_excel = pd.read_excel('employee_data.xlsx')
print("\n从Excel读取的数据:")
print(df_from_excel)
Python
2. Pandas实战:销售数据分析
让我们通过一个实际的销售数据分析案例来展示Pandas的强大功能:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 1. 创建模拟销售数据
np.random.seed(42) # 设置随机种子确保可重复性
# 生成日期范围
dates = pd.date_range('2024-01-01', '2024-12-31', freq='D')
# 生成销售数据
n_days = len(dates)
data = {
'日期': dates,
'产品类别': np.random.choice(['电子产品', '服装', '食品', '家居', '图书'], n_days),
'销售额': np.random.randint(1000, 10000, n_days),
'销售量': np.random.randint(10, 100, n_days),
'客户评分': np.random.uniform(3.0, 5.0, n_days).round(1),
'促销活动': np.random.choice([True, False], n_days, p=[0.3, 0.7]),
'地区': np.random.choice(['华北', '华东', '华南', '华中', '西北', '西南'], n_days)
}
# 添加周末效应
weekend_mask = data['日期'].dt.dayofweek >= 5 # 5和6是周六周日
data['销售额'][weekend_mask] = data['销售额'][weekend_mask] * 1.5
# 添加季节性趋势(模拟节假日效应)
holiday_periods = [
('2024-01-01', '2024-01-03'), # 元旦
('2024-02-10', '2024-02-17'), # 春节
('2024-05-01', '2024-05-03'), # 劳动节
('2024-10-01', '2024-10-07') # 国庆节
]
for start, end in holiday_periods:
holiday_mask = (data['日期'] >= start) & (data['日期'] <= end)
data['销售额'][holiday_mask] = data['销售额'][holiday_mask] * 2.0
data['销售量'][holiday_mask] = data['销售量'][holiday_mask] * 1.8
# 创建DataFrame
sales_df = pd.DataFrame(data)
print("销售数据概览:")
print(sales_df.head())
print(f"\n数据形状: {sales_df.shape}")
print(f"\n数据类型:\n{sales_df.dtypes}")
print(f"\n描述性统计:\n{sales_df.describe()}")
# 2. 数据清洗和预处理
print("\n=== 数据清洗 ===")
# 检查缺失值
print("缺失值统计:")
print(sales_df.isnull().sum())
# 检查重复值
print(f"\n重复行数量: {sales_df.duplicated().sum()}")
# 检查异常值(使用IQR方法)
def detect_outliers_iqr(data, column):
"""使用IQR方法检测异常值"""
Q1 = data[column].quantile(0.25)
Q3 = data[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = data[(data[column] < lower_bound) | (data[column] > upper_bound)]
return outliers, lower_bound, upper_bound
# 检测销售额异常值
sales_outliers, lower, upper = detect_outliers_iqr(sales_df, '销售额')
print(f"\n销售额异常值数量: {len(sales_outliers)}")
print(f"异常值范围: < {lower:.2f} 或 > {upper:.2f}")
# 3. 探索性数据分析(EDA)
print("\n=== 探索性数据分析 ===")
# 按产品类别分析
category_stats = sales_df.groupby('产品类别').agg({
'销售额': ['sum', 'mean', 'count'],
'销售量': ['sum', 'mean'],
'客户评分': 'mean'
}).round(2)
category_stats.columns = ['销售额_总和', '销售额_均值', '交易次数',
'销售量_总和', '销售量_均值', '平均评分']
print("按产品类别统计:")
print(category_stats)
# 按地区分析
region_stats = sales_df.groupby('地区').agg({
'销售额': 'sum',
'销售量': 'sum',
'客户评分': 'mean'
}).round(2)
region_stats = region_stats.sort_values('销售额', ascending=False)
print("\n按地区统计(按销售额排序):")
print(region_stats)
# 促销活动效果分析
promotion_stats = sales_df.groupby('促销活动').agg({
'销售额': ['mean', 'sum', 'count'],
'销售量': 'mean',
'客户评分': 'mean'
}).round(2)
promotion_stats.columns = ['平均销售额', '总销售额', '交易次数', '平均销售量', '平均评分']
print("\n促销活动效果分析:")
print(promotion_stats)
# 4. 时间序列分析
print("\n=== 时间序列分析 ===")
# 设置日期为索引
sales_df_time = sales_df.set_index('日期')
# 按周重采样
weekly_sales = sales_df_time.resample('W').agg({
'销售额': 'sum',
'销售量': 'sum',
'客户评分': 'mean'
})
print("周度销售数据(前5周):")
print(weekly_sales.head())
# 按月重采样
monthly_sales = sales_df_time.resample('M').agg({
'销售额': 'sum',
'销售量': 'sum',
'客户评分': 'mean'
})
print("\n月度销售数据:")
print(monthly_sales)
# 计算移动平均
weekly_sales['销售额_4周移动平均'] = weekly_sales['销售额'].rolling(window=4).mean()
weekly_sales['销售量_4周移动平均'] = weekly_sales['销售量'].rolling(window=4).mean()
print("\n添加移动平均后的周度数据:")
print(weekly_sales.tail())
# 5. 数据可视化
print("\n=== 数据可视化 ===")
# 设置图形风格
sns.set_style("whitegrid")
plt.figure(figsize=(15, 10))
# 子图1:月度销售额趋势
plt.subplot(2, 3, 1)
monthly_sales['销售额'].plot(kind='bar', color='skyblue')
plt.title('月度销售额趋势')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.xticks(rotation=45)
# 子图2:产品类别销售额分布
plt.subplot(2, 3, 2)
category_sales = sales_df.groupby('产品类别')['销售额'].sum().sort_values(ascending=False)
category_sales.plot(kind='pie', autopct='%1.1f%%', startangle=90)
plt.title('产品类别销售额分布')
plt.ylabel('')
# 子图3:地区销售额对比
plt.subplot(2, 3, 3)
region_sales = sales_df.groupby('地区')['销售额'].sum().sort_values()
region_sales.plot(kind='barh', color='lightgreen')
plt.title('地区销售额对比')
plt.xlabel('销售额')
# 子图4:促销活动效果对比
plt.subplot(2, 3, 4)
promotion_comparison = sales_df.groupby('促销活动').agg({
'销售额': 'mean',
'销售量': 'mean'
})
x = np.arange(len(promotion_comparison.index))
width = 0.35
plt.bar(x - width/2, promotion_comparison['销售额'], width, label='平均销售额', color='orange')
plt.bar(x + width/2, promotion_comparison['销售量'], width, label='平均销售量', color='purple')
plt.title('促销活动效果对比')
plt.xlabel('促销活动')
plt.ylabel('平均值')
plt.xticks(x, ['无促销', '有促销'])
plt.legend()
# 子图5:周度销售额趋势
plt.subplot(2, 3, 5)
plt.plot(weekly_sales.index, weekly_sales['销售额'], label='周销售额', alpha=0.7)
plt.plot(weekly_sales.index, weekly_sales['销售额_4周移动平均'],
label='4周移动平均', linewidth=2, color='red')
plt.title('周度销售额趋势')
plt.xlabel('日期')
plt.ylabel('销售额')
plt.legend()
plt.xticks(rotation=45)
# 子图6:销售额与评分散点图
plt.subplot(2, 3, 6)
plt.scatter(sales_df['销售额'], sales_df['客户评分'], alpha=0.6, color='teal')
plt.title('销售额与客户评分关系')
plt.xlabel('销售额')
plt.ylabel('客户评分')
plt.tight_layout()
plt.show()
# 6. 高级分析:相关性分析
print("\n=== 相关性分析 ===")
# 计算数值列的相关性矩阵
numeric_cols = ['销售额', '销售量', '客户评分']
correlation_matrix = sales_df[numeric_cols].corr()
print("相关性矩阵:")
print(correlation_matrix)
# 可视化相关性矩阵
plt.figure(figsize=(8, 6))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
square=True, linewidths=1, cbar_kws={"shrink": 0.8})
plt.title('数值特征相关性热图')
plt.tight_layout()
plt.show()
# 7. 分组深入分析
print("\n=== 分组深入分析 ===")
# 按产品和地区分组分析
product_region_analysis = sales_df.groupby(['产品类别', '地区']).agg({
'销售额': ['sum', 'mean', 'count'],
'销售量': 'sum',
'客户评分': 'mean'
}).round(2)
product_region_analysis.columns = ['销售额_总和', '销售额_均值', '交易次数',
'销售量_总和', '平均评分']
print("按产品和地区分组分析(前10行):")
print(product_region_analysis.head(10))
# 找出每个地区最畅销的产品
best_selling_by_region = sales_df.groupby(['地区', '产品类别'])['销售额'].sum().unstack()
print("\n各地区产品销售额矩阵:")
print(best_selling_by_region)
# 找出每个地区的畅销产品
top_product_by_region = best_selling_by_region.idxmax(axis=1)
top_sales_by_region = best_selling_by_region.max(axis=1)
print("\n各地区最畅销产品:")
for region in top_product_by_region.index:
product = top_product_by_region[region]
sales = top_sales_by_region[region]
print(f"{region}: {product} (销售额: {sales:,.0f})")
# 8. 时间序列分解
print("\n=== 时间序列分解 ===")
# 使用移动平均法分解时间序列
from statsmodels.tsa.seasonal import seasonal_decompose
# 准备时间序列数据(按天聚合)
daily_sales = sales_df_time.resample('D')['销售额'].sum()
# 进行季节性分解(假设周期为7天,每周重复)
decomposition = seasonal_decompose(daily_sales, model='additive', period=7)
# 可视化分解结果
fig, axes = plt.subplots(4, 1, figsize=(12, 10))
axes[0].plot(decomposition.observed)
axes[0].set_title('原始序列')
axes[0].set_ylabel('销售额')
axes[1].plot(decomposition.trend)
axes[1].set_title('趋势成分')
axes[1].set_ylabel('销售额')
axes[2].plot(decomposition.seasonal)
axes[2].set_title('季节性成分')
axes[2].set_ylabel('销售额')
axes[3].plot(decomposition.resid)
axes[3].set_title('残差成分')
axes[3].set_ylabel('销售额')
axes[3].set_xlabel('日期')
plt.tight_layout()
plt.show()
# 9. 预测分析(简单线性回归)
print("\n=== 简单预测分析 ===")
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
# 准备特征和目标变量
sales_df['日期序号'] = range(len(sales_df))
X = sales_df[['日期序号', '促销活动', '客户评分']].copy()
X['促销活动'] = X['促销活动'].astype(int) # 将布尔值转换为0/1
y = sales_df['销售额']
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练线性回归模型
model = LinearRegression()
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 评估模型
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"模型性能评估:")
print(f"均方误差 (MSE): {mse:.2f}")
print(f"R²分数: {r2:.2f}")
print(f"系数: {model.coef_}")
print(f"截距: {model.intercept_:.2f}")
# 可视化预测结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, y_pred, alpha=0.6, color='blue')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()],
'r--', lw=2, label='完美预测线')
plt.xlabel('实际销售额')
plt.ylabel('预测销售额')
plt.title('销售额预测 vs 实际值')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 10. 生成分析报告
print("\n=== 分析报告总结 ===")
# 计算关键指标
total_sales = sales_df['销售额'].sum()
total_quantity = sales_df['销售量'].sum()
avg_rating = sales_df['客户评分'].mean()
avg_sales_per_transaction = sales_df['销售额'].mean()
best_selling_category = category_sales.idxmax()
best_selling_region = region_stats.index
promotion_effect = (promotion_stats.loc[True, '平均销售额'] /
promotion_stats.loc[False, '平均销售额'] - 1) * 100
print(f"年度销售总结报告")
print("=" * 50)
print(f"1. 总体业绩:")
print(f" 总销售额: ¥{total_sales:,.0f}")
print(f" 总销售量: {total_quantity:,} 件")
print(f" 平均客户评分: {avg_rating:.1f}/5.0")
print(f" 单笔交易平均销售额: ¥{avg_sales_per_transaction:,.0f}")
print()
print(f"2. 产品表现:")
print(f" 最畅销品类: {best_selling_category}")
print(f" 品类销售额占比: {category_sales.max()/total_sales*100:.1f}%")
print()
print(f"3. 地区表现:")
print(f" 销售额最高地区: {best_selling_region}")
print(f" 该地区销售额: ¥{region_stats.iloc['销售额']:,.0f}")
print()
print(f"4. 促销效果:")
print(f" 促销期间平均销售额提升: {promotion_effect:.1f}%")
print(f" 促销交易占比: {promotion_stats.loc[True, '交易次数']/len(sales_df)*100:.1f}%")
print()
print(f"5. 时间趋势:")
print(f" 最高月销售额: ¥{monthly_sales['销售额'].max():,.0f}")
print(f" 最低月销售额: ¥{monthly_sales['销售额'].min():,.0f}")
print(f" 销售额月均增长率: {(monthly_sales['销售额'].pct_change().mean()*100):.1f}%")
print("=" * 50)
# 保存分析结果
analysis_results = {
'总体业绩': {
'总销售额': total_sales,
'总销售量': total_quantity,
'平均评分': avg_rating,
'平均交易额': avg_sales_per_transaction
},
'产品分析': category_stats.to_dict(),
'地区分析': region_stats.to_dict(),
'促销分析': promotion_stats.to_dict(),
'时间序列': monthly_sales.to_dict()
}
# 保存到JSON文件
import json
with open('sales_analysis_report.json', 'w', encoding='utf-8') as f:
json.dump(analysis_results, f, ensure_ascii=False, indent=2)
print("\n分析报告已保存到 'sales_analysis_report.json'")
Python
四、数据可视化:Matplotlib和Seaborn
1. Matplotlib基础可视化样例
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建示例数据
np.random.seed(42)
categories = ['电子产品', '服装', '食品', '家居', '图书', '体育用品']
sales_data = {
'第一季度': np.random.randint(100, 500, 6),
'第二季度': np.random.randint(150, 600, 6),
'第三季度': np.random.randint(200, 700, 6),
'第四季度': np.random.randint(250, 800, 6)
}
df_sales = pd.DataFrame(sales_data, index=categories)
# 1. 基本图表类型
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 柱状图
axes[0, 0].bar(categories, df_sales['第一季度'])
axes[0, 0].set_title('第一季度销售额柱状图')
axes[0, 0].set_xlabel('产品类别')
axes[0, 0].set_ylabel('销售额')
axes[0, 0].tick_params(axis='x', rotation=45)
# 折线图
quarters = ['Q1', 'Q2', 'Q3', 'Q4']
for i, category in enumerate(categories[:3]): # 只显示前3个类别
axes[0, 1].plot(quarters, df_sales.loc[category], marker='o', label=category)
axes[0, 1].set_title('各季度销售额趋势')
axes[0, 1].set_xlabel('季度')
axes[0, 1].set_ylabel('销售额')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 散点图
x = np.random.randn(100)
y = 2 * x + np.random.randn(100) * 0.5
axes[0, 2].scatter(x, y, alpha=0.6, c='green', edgecolors='black')
axes[0, 2].set_title('散点图示例')
axes[0, 2].set_xlabel('X轴')
axes[0, 2].set_ylabel('Y轴')
# 饼图
axes[1, 0].pie(df_sales['第一季度'], labels=categories, autopct='%1.1f%%')
axes[1, 0].set_title('第一季度销售额分布')
# 箱线图
data_to_plot = [df_sales[quarter] for quarter in sales_data.keys()]
axes[1, 1].boxplot(data_to_plot, labels=sales_data.keys())
axes[1, 1].set_title('各季度销售额箱线图')
axes[1, 1].set_xlabel('季度')
axes[1, 1].set_ylabel('销售额')
# 直方图
data = np.random.randn(1000)
axes[1, 2].hist(data, bins=30, alpha=0.7, color='purple', edgecolor='black')
axes[1, 2].set_title('数据分布直方图')
axes[1, 2].set_xlabel('数值')
axes[1, 2].set_ylabel('频数')
plt.tight_layout()
plt.show()
# 2. 高级图表:子图布局和样式定制
fig = plt.figure(figsize=(14, 10))
# 创建网格布局
gs = fig.add_gridspec(3, 3)
# 子图1:堆叠柱状图
ax1 = fig.add_subplot(gs[0, :2])
bottom_values = np.zeros(len(categories))
for quarter in sales_data.keys():
ax1.bar(categories, df_sales[quarter], bottom=bottom_values, label=quarter)
bottom_values += df_sales[quarter].values
ax1.set_title('各季度销售额堆叠柱状图')
ax1.set_xlabel('产品类别')
ax1.set_ylabel('累计销售额')
ax1.legend()
ax1.tick_params(axis='x', rotation=45)
# 子图2:面积图
ax2 = fig.add_subplot(gs[0, 2])
x = np.linspace(0, 10, 100)
y1 = np.sin(x)
y2 = np.cos(x)
ax2.fill_between(x, y1, alpha=0.5, label='sin(x)')
ax2.fill_between(x, y2, alpha=0.5, label='cos(x)')
ax2.set_title('面积图示例')
ax2.set_xlabel('x')
ax2.set_ylabel('y')
ax2.legend()
# 子图3:热力图
ax3 = fig.add_subplot(gs[1, :])
im = ax3.imshow(df_sales.values, cmap='YlOrRd', aspect='auto')
ax3.set_title('销售额热力图')
ax3.set_xlabel('季度')
ax3.set_ylabel('产品类别')
ax3.set_xticks(range(len(sales_data.keys())))
ax3.set_xticklabels(sales_data.keys())
ax3.set_yticks(range(len(categories)))
ax3.set_yticklabels(categories)
plt.colorbar(im, ax=ax3)
# 子图4:雷达图
ax4 = fig.add_subplot(gs[2, 0], projection='polar')
angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist()
values = df_sales['第一季度'].values.tolist()
values += values[:1] # 闭合图形
angles += angles[:1]
ax4.plot(angles, values, 'o-', linewidth=2)
ax4.fill(angles, values, alpha=0.25)
ax4.set_title('第一季度销售额雷达图')
ax4.set_xticks(angles[:-1])
ax4.set_xticklabels(categories)
# 子图5:3D散点图
from mpl_toolkits.mplot3d import Axes3D
ax5 = fig.add_subplot(gs[2, 1:], projection='3d')
x = np.random.rand(50)
y = np.random.rand(50)
z = np.random.rand(50)
colors = np.random.rand(50)
ax5.scatter(x, y, z, c=colors, cmap='viridis', s=100, alpha=0.6)
ax5.set_title('3D散点图')
ax5.set_xlabel('X轴')
ax5.set_ylabel('Y轴')
ax5.set_zlabel('Z轴')
plt.tight_layout()
plt.show()
# 3. 时间序列可视化
# 创建时间序列数据
dates = pd.date_range('2024-01-01', periods=365, freq='D')
time_series = pd.DataFrame({
'日期': dates,
'销售额': np.cumsum(np.random.randn(365) * 1000 + 5000),
'访问量': np.cumsum(np.random.randn(365) * 50 + 200),
'转化率': np.random.uniform(0.01, 0.05, 365)
})
time_series.set_index('日期', inplace=True)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 折线图:销售额趋势
axes[0, 0].plot(time_series.index, time_series['销售额'], color='blue', linewidth=2)
axes[0, 0].set_title('每日销售额趋势')
axes[0, 0].set_xlabel('日期')
axes[0, 0].set_ylabel('销售额')
axes[0, 0].grid(True, alpha=0.3)
axes[0, 0].tick_params(axis='x', rotation=45)
# 双Y轴图:销售额和访问量
ax1 = axes[0, 1]
ax2 = ax1.twinx()
color1 = 'tab:blue'
color2 = 'tab:red'
ax1.plot(time_series.index, time_series['销售额'], color=color1, label='销售额')
ax1.set_xlabel('日期')
ax1.set_ylabel('销售额', color=color1)
ax1.tick_params(axis='y', labelcolor=color1)
ax2.plot(time_series.index, time_series['访问量'], color=color2, label='访问量')
ax2.set_ylabel('访问量', color=color2)
ax2.tick_params(axis='y', labelcolor=color2)
axes[0, 1].set_title('销售额与访问量对比')
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
# 面积图:移动平均
window_size = 30
time_series['销售额_移动平均'] = time_series['销售额'].rolling(window=window_size).mean()
time_series['销售额_标准差'] = time_series['销售额'].rolling(window=window_size).std()
axes[1, 0].fill_between(time_series.index,
time_series['销售额_移动平均'] - time_series['销售额_标准差'],
time_series['销售额_移动平均'] + time_series['销售额_标准差'],
alpha=0.3, color='gray', label='标准差范围')
axes[1, 0].plot(time_series.index, time_series['销售额_移动平均'],
color='green', linewidth=2, label=f'{window_size}天移动平均')
axes[1, 0].set_title('销售额移动平均与波动范围')
axes[1, 0].set_xlabel('日期')
axes[1, 0].set_ylabel('销售额')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].tick_params(axis='x', rotation=45)
# 散点图矩阵
from pandas.plotting import scatter_matrix
scatter_matrix(time_series[['销售额', '访问量', '转化率']],
alpha=0.8, figsize=(8, 8), diagonal='hist', ax=axes[1, 1])
axes[1, 1].set_title('特征散点图矩阵')
plt.tight_layout()
plt.show()
# 4. 高级定制:多图组合和样式
# 创建专业报告样式图表
fig = plt.figure(figsize=(16, 12))
# 使用GridSpec创建复杂布局
gs = fig.add_gridspec(3, 4, hspace=0.3, wspace=0.3)
# 主图:销售趋势
ax_main = fig.add_subplot(gs[:2, :2])
months = time_series.resample('M').mean().index.strftime('%Y-%m')
monthly_sales = time_series.resample('M')['销售额'].sum()
monthly_visits = time_series.resample('M')['访问量'].sum()
bar_width = 0.35
x = np.arange(len(months))
ax_main.bar(x - bar_width/2, monthly_sales, bar_width, label='销售额', color='skyblue')
ax_main.bar(x + bar_width/2, monthly_visits, bar_width, label='访问量', color='lightcoral')
ax_main.set_title('月度销售与访问量对比', fontsize=14, fontweight='bold')
ax_main.set_xlabel('月份', fontsize=12)
ax_main.set_ylabel('数值', fontsize=12)
ax_main.set_xticks(x)
ax_main.set_xticklabels(months, rotation=45, ha='right')
ax_main.legend()
ax_main.grid(True, alpha=0.3, axis='y')
# 右上角:转化率分布
ax_top_right = fig.add_subplot(gs[0, 2:])
conversion_data = time_series['转化率']
ax_top_right.hist(conversion_data, bins=30, alpha=0.7, color='purple', edgecolor='black')
ax_top_right.axvline(conversion_data.mean(), color='red', linestyle='--', linewidth=2,
label=f'均值: {conversion_data.mean():.3f}')
ax_top_right.axvline(conversion_data.median(), color='green', linestyle='--', linewidth=2,
label=f'中位数: {conversion_data.median():.3f}')
ax_top_right.set_title('转化率分布', fontsize=14, fontweight='bold')
ax_top_right.set_xlabel('转化率', fontsize=12)
ax_top_right.set_ylabel('频数', fontsize=12)
ax_top_right.legend()
# 中间右侧:相关性热图
ax_mid_right = fig.add_subplot(gs[1, 2:])
corr_matrix = time_series[['销售额', '访问量', '转化率']].corr()
im = ax_mid_right.imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
ax_mid_right.set_title('特征相关性热图', fontsize=14, fontweight='bold')
ax_mid_right.set_xticks(range(len(corr_matrix.columns)))
ax_mid_right.set_yticks(range(len(corr_matrix.columns)))
ax_mid_right.set_xticklabels(corr_matrix.columns, rotation=45)
ax_mid_right.set_yticklabels(corr_matrix.columns)
# 添加相关性数值
for i in range(len(corr_matrix.columns)):
for j in range(len(corr_matrix.columns)):
text = ax_mid_right.text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
ha="center", va="center", color="black", fontsize=10)
plt.colorbar(im, ax=ax_mid_right)
# 底部:箱线图对比
ax_bottom = fig.add_subplot(gs[2, :])
data_to_plot = [time_series[col] for col in ['销售额', '访问量', '转化率']]
bp = ax_bottom.boxplot(data_to_plot, patch_artist=True, labels=['销售额', '访问量', '转化率'])
colors = ['lightblue', 'lightgreen', 'lightpink']
for patch, color in zip(bp['boxes'], colors):
patch.set_facecolor(color)
ax_bottom.set_title('特征分布箱线图', fontsize=14, fontweight='bold')
ax_bottom.set_ylabel('数值', fontsize=12)
ax_bottom.grid(True, alpha=0.3, axis='y')
# 添加统计信息
stats_text = []
for i, col in enumerate(['销售额', '访问量', '转化率']):
stats = time_series[col].describe()
text = f"{col}:\n均值: {stats['mean']:.2f}\n标准差: {stats['std']:.2f}"
stats_text.append(text)
# 在图表右侧添加统计信息
fig.text(0.95, 0.5, '\n\n'.join(stats_text),
fontsize=10, va='center', ha='left',
bbox=dict(boxstyle="round,pad=0.5", facecolor="lightyellow", alpha=0.8))
plt.suptitle('销售数据分析报告', fontsize=18, fontweight='bold', y=0.98)
plt.tight_layout(rect=[0, 0, 0.9, 0.96]) # 为suptitle留出空间
plt.show()
# 5. 保存图表
# 保存为高清图片
fig.savefig('sales_analysis_report.png', dpi=300, bbox_inches='tight', facecolor='white')
print("图表已保存为 'sales_analysis_report.png'")
Python
2. Seaborn统计可视化样例
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# 设置Seaborn样式
sns.set_style("whitegrid")
sns.set_palette("husl")
# 创建示例数据
np.random.seed(42)
n_samples = 1000
data = pd.DataFrame({
'年龄': np.random.normal(35, 10, n_samples).astype(int),
'收入': np.random.normal(50000, 15000, n_samples),
'教育年限': np.random.choice([12, 16, 18, 21], n_samples, p=[0.3, 0.4, 0.2, 0.1]),
'性别': np.random.choice(['男', '女'], n_samples),
'城市': np.random.choice(['北京', '上海', '广州', '深圳'], n_samples),
'消费类别': np.random.choice(['食品', '服装', '电子产品', '娱乐', '交通'], n_samples),
'消费金额': np.random.exponential(500, n_samples),
'满意度': np.random.randint(1, 6, n_samples)
})
# 添加一些相关性
data['收入'] = data['收入'] + data['教育年限'] * 2000 + np.random.normal(0, 5000, n_samples)
data['消费金额'] = data['消费金额'] + data['收入'] * 0.001 + np.random.normal(0, 100, n_samples)
data['满意度'] = data['满意度'] + (data['消费金额'] > data['消费金额'].median()).astype(int)
# 限制范围
data['年龄'] = data['年龄'].clip(18, 70)
data['收入'] = data['收入'].clip(20000, 100000)
data['消费金额'] = data['消费金额'].clip(100, 2000)
data['满意度'] = data['满意度'].clip(1, 5)
print("数据集概览:")
print(data.head())
print(f"\n数据集形状: {data.shape}")
print(f"\n数据类型:\n{data.dtypes}")
# 1. 分布可视化
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 直方图 + KDE
sns.histplot(data=data, x='收入', kde=True, ax=axes[0, 0], bins=30)
axes[0, 0].set_title('收入分布')
axes[0, 0].set_xlabel('收入')
axes[0, 0].set_ylabel('频数')
# 箱线图
sns.boxplot(data=data, x='城市', y='收入', ax=axes[0, 1])
axes[0, 1].set_title('各城市收入分布')
axes[0, 1].set_xlabel('城市')
axes[0, 1].set_ylabel('收入')
axes[0, 1].tick_params(axis='x', rotation=45)
# 小提琴图
sns.violinplot(data=data, x='性别', y='收入', ax=axes[0, 2])
axes[0, 2].set_title('性别收入分布')
axes[0, 2].set_xlabel('性别')
axes[0, 2].set_ylabel('收入')
# 核密度估计图
sns.kdeplot(data=data, x='收入', hue='性别', fill=True, ax=axes[1, 0])
axes[1, 0].set_title('收入分布的性别差异')
axes[1, 0].set_xlabel('收入')
axes[1, 0].set_ylabel('密度')
# 计数图(分类数据)
sns.countplot(data=data, x='消费类别', ax=axes[1, 1])
axes[1, 1].set_title('消费类别分布')
axes[1, 1].set_xlabel('消费类别')
axes[1, 1].set_ylabel('计数')
axes[1, 1].tick_params(axis='x', rotation=45)
# 经验累积分布函数图
sns.ecdfplot(data=data, x='年龄', ax=axes[1, 2])
axes[1, 2].set_title('年龄累积分布')
axes[1, 2].set_xlabel('年龄')
axes[1, 2].set_ylabel('累积比例')
plt.tight_layout()
plt.show()
# 2. 关系可视化
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 散点图
sns.scatterplot(data=data, x='年龄', y='收入', hue='性别', alpha=0.6, ax=axes[0, 0])
axes[0, 0].set_title('年龄与收入关系(按性别)')
axes[0, 0].set_xlabel('年龄')
axes[0, 0].set_ylabel('收入')
# 带回归线的散点图
sns.regplot(data=data, x='教育年限', y='收入', scatter_kws={'alpha': 0.5},
line_kws={'color': 'red'}, ax=axes[0, 1])
axes[0, 1].set_title('教育年限与收入关系')
axes[0, 1].set_xlabel('教育年限')
axes[0, 1].set_ylabel('收入')
# 热力图(相关性矩阵)
correlation_matrix = data[['年龄', '收入', '教育年限', '消费金额', '满意度']].corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
square=True, linewidths=1, cbar_kws={"shrink": 0.8}, ax=axes[1, 0])
axes[1, 0].set_title('数值特征相关性热图')
# 成对关系图(小矩阵)
numeric_cols = ['年龄', '收入', '消费金额', '满意度']
pairplot_data = data[numeric_cols + ['性别']]
sns.pairplot(pairplot_data, hue='性别', diag_kind='kde',
plot_kws={'alpha': 0.6}, height=2)
plt.suptitle('特征成对关系图(按性别着色)', y=1.02)
plt.show()
# 3. 分类数据可视化
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 分类散点图
sns.stripplot(data=data, x='城市', y='消费金额', hue='性别',
dodge=True, alpha=0.5, ax=axes[0, 0])
axes[0, 0].set_title('各城市消费金额分布(按性别)')
axes[0, 0].set_xlabel('城市')
axes[0, 0].set_ylabel('消费金额')
axes[0, 0].tick_params(axis='x', rotation=45)
axes[0, 0].legend(title='性别')
# 分类箱线图
sns.boxplot(data=data, x='消费类别', y='消费金额', hue='性别', ax=axes[0, 1])
axes[0, 1].set_title('消费类别与金额关系(按性别)')
axes[0, 1].set_xlabel('消费类别')
axes[0, 1].set_ylabel('消费金额')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].legend(title='性别')
# 分类条形图(带误差线)
category_stats = data.groupby(['城市', '消费类别'])['消费金额'].agg(['mean', 'std', 'count']).reset_index()
category_stats['ci'] = 1.96 * category_stats['std'] / np.sqrt(category_stats['count'])
sns.barplot(data=category_stats, x='城市', y='mean', hue='消费类别',
ax=axes[1, 0], errorbar=None)
axes[1, 0].set_title('各城市不同消费类别的平均消费金额')
axes[1, 0].set_xlabel('城市')
axes[1, 0].set_ylabel('平均消费金额')
axes[1, 0].tick_params(axis='x', rotation=45)
axes[1, 0].legend(title='消费类别', bbox_to_anchor=(1.05, 1), loc='upper left')
# 分类点图
sns.pointplot(data=data, x='城市', y='满意度', hue='性别',
dodge=True, capsize=0.1, errwidth=1.5, ax=axes[1, 1])
axes[1, 1].set_title('各城市满意度(按性别)')
axes[1, 1].set_xlabel('城市')
axes[1, 1].set_ylabel('平均满意度')
axes[1, 1].tick_params(axis='x', rotation=45)
axes[1, 1].legend(title='性别')
plt.tight_layout()
plt.show()
# 4. 多变量分析
# 创建更大的画布
fig = plt.figure(figsize=(16, 12))
# 使用GridSpec创建复杂布局
gs = fig.add_gridspec(3, 3, hspace=0.4, wspace=0.3)
# 子图1:联合分布图
ax_joint = fig.add_subplot(gs[0, 0])
ax_marg_x = fig.add_subplot(gs[0, 1])
ax_marg_y = fig.add_subplot(gs[1, 0])
# 手动创建联合分布图
scatter = ax_joint.scatter(data['年龄'], data['收入'],
c=data['满意度'], cmap='viridis', alpha=0.6)
ax_joint.set_xlabel('年龄')
ax_joint.set_ylabel('收入')
# 边际分布
ax_marg_x.hist(data['年龄'], bins=30, alpha=0.7, color='skyblue')
ax_marg_x.set_xlabel('年龄分布')
ax_marg_y.hist(data['收入'], bins=30, orientation='horizontal', alpha=0.7, color='lightgreen')
ax_marg_y.set_ylabel('收入分布')
# 添加颜色条
cbar = plt.colorbar(scatter, ax=[ax_joint, ax_marg_x, ax_marg_y],
orientation='vertical', fraction=0.05)
cbar.set_label('满意度')
# 子图2:分面网格(FacetGrid)
g = sns.FacetGrid(data, col='城市', col_wrap=2, height=4, aspect=1.2)
g.map_dataframe(sns.scatterplot, x='年龄', y='收入', hue='性别', alpha=0.6)
g.add_legend()
g.set_titles(col_template="{col_name}")
g.set_axis_labels("年龄", "收入")
# 调整FacetGrid位置
g.fig.set_size_inches(10, 8)
g.fig.subplots_adjust(top=0.9)
g.fig.suptitle('各城市年龄与收入关系(按性别)', fontsize=16)
# 子图3:聚类热图
# 准备数据:计算每个城市-消费类别的平均消费金额
pivot_data = data.pivot_table(values='消费金额', index='城市', columns='消费类别', aggfunc='mean')
ax_heatmap = fig.add_subplot(gs[1:, 1:])
sns.heatmap(pivot_data, annot=True, fmt='.0f', cmap='YlOrRd',
linewidths=0.5, ax=ax_heatmap)
ax_heatmap.set_title('各城市不同消费类别的平均消费金额', fontsize=14)
ax_heatmap.set_xlabel('消费类别')
ax_heatmap.set_ylabel('城市')
plt.tight_layout()
plt.show()
# 5. 高级统计图表
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 子图1:回归残差图
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
# 准备数据
X = data[['年龄', '教育年限']].values
y = data['收入'].values
# 训练线性回归模型
model = LinearRegression()
model.fit(X, y)
y_pred = model.predict(X)
residuals = y - y_pred
# 绘制残差图
axes[0, 0].scatter(y_pred, residuals, alpha=0.6)
axes[0, 0].axhline(y=0, color='red', linestyle='--')
axes[0, 0].set_title('线性回归残差图')
axes[0, 0].set_xlabel('预测值')
axes[0, 0].set_ylabel('残差')
axes[0, 0].grid(True, alpha=0.3)
# 子图2:QQ图(检验正态性)
from scipy import stats
stats.probplot(residuals, dist="norm", plot=axes[0, 1])
axes[0, 1].set_title('残差QQ图(检验正态性)')
axes[0, 1].set_xlabel('理论分位数')
axes[0, 1].set_ylabel('样本分位数')
axes[0, 1].grid(True, alpha=0.3)
# 子图3:累积分布函数比较
sns.ecdfplot(data=data, x='收入', hue='性别', ax=axes[1, 0])
axes[1, 0].set_title('收入累积分布函数(按性别)')
axes[1, 0].set_xlabel('收入')
axes[1, 0].set_ylabel('累积比例')
axes[1, 0].legend(title='性别')
# 子图4:2D核密度估计
scatter = axes[1, 1].scatter(data['年龄'], data['收入'],
c=data['满意度'], cmap='viridis', alpha=0.6, s=20)
sns.kdeplot(data=data, x='年龄', y='收入', ax=axes[1, 1],
levels=5, color='red', linewidths=1)
axes[1, 1].set_title('年龄与收入的2D分布(颜色表示满意度)')
axes[1, 1].set_xlabel('年龄')
axes[1, 1].set_ylabel('收入')
plt.colorbar(scatter, ax=axes[1, 1], label='满意度')
plt.tight_layout()
plt.show()
# 6. 时间序列可视化(使用Seaborn)
# 创建时间序列数据
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='D')
time_data = pd.DataFrame({
'日期': dates,
'销售额': np.cumsum(np.random.randn(100) * 1000 + 5000),
'访问量': np.cumsum(np.random.randn(100) * 50 + 200),
'促销活动': np.random.choice([0, 1], 100, p=[0.7, 0.3])
})
time_data['星期'] = time_data['日期'].dt.day_name()
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 子图1:时间序列线图
sns.lineplot(data=time_data, x='日期', y='销售额', ax=axes[0, 0])
axes[0, 0].set_title('销售额时间序列')
axes[0, 0].set_xlabel('日期')
axes[0, 0].set_ylabel('销售额')
axes[0, 0].tick_params(axis='x', rotation=45)
# 子图2:带置信区间的线图
sns.lineplot(data=time_data, x='日期', y='销售额',
hue='促销活动', style='促销活动', markers=True,
dashes=False, ax=axes[0, 1])
axes[0, 1].set_title('销售额时间序列(按促销活动)')
axes[0, 1].set_xlabel('日期')
axes[0, 1].set_ylabel('销售额')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].legend(title='促销活动', labels=['无促销', '有促销'])
# 子图3:星期效应箱线图
sns.boxplot(data=time_data, x='星期', y='销售额', ax=axes[1, 0])
axes[1, 0].set_title('销售额的星期效应')
axes[1, 0].set_xlabel('星期')
axes[1, 0].set_ylabel('销售额')
axes[1, 0].tick_params(axis='x', rotation=45)
# 子图
Python
3. 自动化与爬虫方向
一、Python爬虫的核心概念与工作流程
1. 爬虫的基本原理
网络爬虫本质上是一个自动化程序,它模拟人类浏览网页的行为,按照预设规则自动抓取互联网信息。爬虫的工作流程通常包括四个关键步骤:
- 模拟浏览器发起请求**:通过HTTP/HTTPS协议向目标网站发送请求,获取网页源代码**
- 获取响应内容**:接收服务器返回的HTML、JSON等格式的数据**
- 解析内容**:从获取的内容中提取所需的结构化数据**
- 保存数据**:将提取的数据存储到数据库或Excel文件中**
2. 爬虫的两种实现方式
根据技术复杂度和灵活性,爬虫实现主要分为两类:
| 实现方式 | 特点 | 适用场景 |
|---|---|---|
| 傻瓜式爬虫工具 | 可视化界面操作,上手快,无需编程 | 简单数据采集、非技术用户 |
| Python编程爬虫 | 灵活性强,功能丰富,需要学习成本 | 复杂数据采集、定制化需求 |
常见的傻瓜式爬虫工具包括后羿采集器( http://houyicaiji.com )和集搜客( http://gooseeker.com )。
二、Python爬虫技术栈详解
1. 基础库与框架
1.1 Requests库:网络请求基础
import requests
from requests.exceptions import RequestException
import time
import random
class BasicCrawler:
"""基础爬虫类,演示requests库的基本使用"""
def __init__(self):
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
}
def get_with_retry(self, url, max_retries=3, timeout=10):
"""带重试机制的GET请求"""
for attempt in range(max_retries):
try:
response = self.session.get(
url,
headers=self.headers,
timeout=timeout,
verify=False # 仅用于测试,生产环境应设为True
)
response.raise_for_status() # 检查HTTP状态码
return response
except RequestException as e:
print(f"第{attempt+1}次请求失败: {e}")
if attempt < max_retries - 1:
wait_time = random.uniform(1, 3) * (attempt + 1)
print(f"等待{wait_time:.2f}秒后重试...")
time.sleep(wait_time)
else:
print(f"请求失败,已达到最大重试次数{max_retries}")
return None
def post_request(self, url, data=None, json=None):
"""POST请求示例"""
try:
response = self.session.post(
url,
headers=self.headers,
data=data,
json=json,
timeout=10
)
response.raise_for_status()
return response
except RequestException as e:
print(f"POST请求失败: {e}")
return None
def download_file(self, url, save_path):
"""下载文件(如图片、PDF等)"""
try:
response = self.session.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"文件已保存到: {save_path}")
return True
except Exception as e:
print(f"文件下载失败: {e}")
return False
# 使用示例
if __name__ == "__main__":
crawler = BasicCrawler()
# 示例1:获取网页内容
response = crawler.get_with_retry(" https://www.example.com ")
if response:
print(f"状态码: {response.status_code}")
print(f"编码: {response.encoding}")
print(f"内容长度: {len(response.text)}字符")
# 示例2:设置代理(如果需要)
proxies = {
'http': ' http://127.0.0.1:8080',
'https': ' http://127.0.0.1:8080',
}
# response = requests.get(url, proxies=proxies)
Python
1.2 BeautifulSoup4:HTML解析利器
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin, urlparse
class HTMLParser:
"""HTML解析器,演示BeautifulSoup4的使用"""
def __init__(self, html_content=None):
self.soup = BeautifulSoup(html_content, 'html.parser') if html_content else None
def parse_from_response(self, response):
"""从requests响应对象创建BeautifulSoup对象"""
if response and response.content:
self.soup = BeautifulSoup(response.content, 'html.parser')
return self.soup
return None
def find_elements_by_tag(self, tag_name, attrs=None, limit=None):
"""按标签名查找元素"""
if not self.soup:
return []
elements = self.soup.find_all(tag_name, attrs=attrs, limit=limit)
return elements
def find_elements_by_css(self, css_selector):
"""使用CSS选择器查找元素"""
if not self.soup:
return []
elements = self.soup.select(css_selector)
return elements
def extract_text(self, element):
"""提取元素的文本内容"""
if element:
return element.get_text(strip=True)
return ""
def extract_attribute(self, element, attr_name):
"""提取元素的属性值"""
if element and element.has_attr(attr_name):
return element[attr_name]
return None
def extract_links(self, base_url=None):
"""提取页面中的所有链接"""
if not self.soup:
return []
links = []
for a_tag in self.soup.find_all('a', href=True):
href = a_tag['href']
if base_url and not href.startswith(('http://', 'https://')):
href = urljoin(base_url, href)
links.append({
'text': self.extract_text(a_tag),
'url': href,
'title': a_tag.get('title', '')
})
return links
def extract_images(self):
"""提取页面中的所有图片"""
if not self.soup:
return []
images = []
for img_tag in self.soup.find_all('img', src=True):
images.append({
'src': img_tag['src'],
'alt': img_tag.get('alt', ''),
'title': img_tag.get('title', ''),
'width': img_tag.get('width'),
'height': img_tag.get('height')
})
return images
def extract_tables(self):
"""提取页面中的所有表格数据"""
if not self.soup:
return []
tables = []
for table_tag in self.soup.find_all('table'):
table_data = []
# 提取表头
headers = []
thead = table_tag.find('thead')
if thead:
for th in thead.find_all('th'):
headers.append(self.extract_text(th))
else:
# 如果没有thead,尝试从第一行获取表头
first_row = table_tag.find('tr')
if first_row:
for th in first_row.find_all(['th', 'td']):
headers.append(self.extract_text(th))
# 提取表格内容
tbody = table_tag.find('tbody') or table_tag
for row in tbody.find_all('tr'):
row_data = []
for cell in row.find_all(['td', 'th']):
row_data.append(self.extract_text(cell))
if row_data:
table_data.append(row_data)
tables.append({
'headers': headers,
'data': table_data
})
return tables
def find_by_regex(self, pattern, text_only=True):
"""使用正则表达式查找内容"""
if not self.soup:
return []
if text_only:
text = self.soup.get_text()
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
else:
html_str = str(self.soup)
matches = re.findall(pattern, html_str, re.IGNORECASE | re.DOTALL)
return matches
def clean_html(self, keep_tags=None):
"""清理HTML,只保留指定标签"""
if not self.soup:
return ""
if keep_tags:
# 只保留指定的标签
for tag in self.soup.find_all(True):
if tag.name not in keep_tags:
tag.decompose()
# 移除所有属性
for tag in self.soup.find_all(True):
tag.attrs = {}
return str(self.soup)
# 使用示例
if __name__ == "__main__":
# 示例HTML内容
html_content = """
<html>
<head>
<title>示例页面</title>
</head>
<body>
<h1 class="title">欢迎来到示例页面</h1>
<div id="content">
<p class="intro">这是一个演示BeautifulSoup用法的页面。</p>
<ul class="list">
<li><a href="/page1.html">页面1</a></li>
<li><a href="/page2.html">页面2</a></li>
<li><a href=" https://external.com/page3 ">外部页面</a></li>
</ul>
<table>
<tr><th>姓名</th><th>年龄</th></tr>
<tr><td>张三</td><td>25</td></tr>
<tr><td>李四</td><td>30</td></tr>
</table>
<img src="image.jpg" alt="示例图片">
</div>
</body>
</html>
"""
parser = HTMLParser(html_content)
# 1. 按标签查找
h1_elements = parser.find_elements_by_tag('h1')
print("H1标签:", parser.extract_text(h1_elements if h1_elements else "未找到")
# 2. 按CSS选择器查找
intro_elements = parser.find_elements_by_css('p.intro')
print("介绍段落:", parser.extract_text(intro_elements if intro_elements else "未找到")
# 3. 提取所有链接
links = parser.extract_links(" https://example.com ")
print("\n页面链接:")
for link in links:
print(f" 文本: {link['text']}, URL: {link['url']}")
# 4. 提取表格数据
tables = parser.extract_tables()
print("\n表格数据:")
for table in tables:
print(f" 表头: {table['headers']}")
for row in table['data']:
print(f" 行数据: {row}")
# 5. 使用正则表达式查找
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = parser.find_by_regex(email_pattern)
print(f"\n找到的邮箱地址: {emails}")
Python
1.3 lxml与XPath:高效XML/HTML解析
from lxml import etree
from lxml.html import fromstring, tostring
import cssselect
class XPathParser:
"""使用lxml和XPath进行高效解析"""
def __init__(self, html_content=None):
self.tree = None
if html_content:
self.parse_html(html_content)
def parse_html(self, html_content):
"""解析HTML内容"""
try:
self.tree = fromstring(html_content)
return True
except Exception as e:
print(f"HTML解析失败: {e}")
return False
def parse_from_response(self, response):
"""从requests响应对象解析"""
if response and response.content:
return self.parse_html(response.content)
return False
def xpath_query(self, xpath_expression):
"""执行XPath查询"""
if not self.tree:
return []
try:
elements = self.tree.xpath(xpath_expression)
return elements
except Exception as e:
print(f"XPath查询失败: {e}")
return []
def css_query(self, css_selector):
"""执行CSS选择器查询"""
if not self.tree:
return []
try:
# 将CSS选择器转换为XPath
xpath_expression = cssselect.HTMLTranslator().css_to_xpath(css_selector)
return self.xpath_query(xpath_expression)
except Exception as e:
print(f"CSS选择器查询失败: {e}")
return []
def extract_text(self, elements):
"""提取元素的文本内容"""
if isinstance(elements, list):
return [self._get_element_text(elem) for elem in elements if elem is not None]
else:
return self._get_element_text(elements)
def _get_element_text(self, element):
"""获取单个元素的文本"""
if element is None:
return ""
if isinstance(element, str):
return element.strip()
# 对于lxml元素,获取所有文本(包括子元素)
text_parts = []
if hasattr(element, 'text') and element.text:
text_parts.append(element.text.strip())
if hasattr(element, 'itertext'):
for text in element.itertext():
if text.strip():
text_parts.append(text.strip())
return ' '.join(text_parts)
def extract_attributes(self, elements, attr_name):
"""提取元素的属性值"""
if isinstance(elements, list):
return [self._get_attribute(elem, attr_name) for elem in elements]
else:
return self._get_attribute(elements, attr_name)
def _get_attribute(self, element, attr_name):
"""获取单个元素的属性"""
if element is None:
return None
if hasattr(element, 'get'):
return element.get(attr_name)
return None
def to_html(self, element=None, pretty_print=False):
"""将元素转换回HTML字符串"""
if element is None:
element = self.tree
if element is None:
return ""
try:
html_str = tostring(element, encoding='unicode', pretty_print=pretty_print)
return html_str
except Exception as e:
print(f"HTML转换失败: {e}")
return ""
def remove_elements(self, xpath_expression):
"""删除匹配的元素"""
if not self.tree:
return False
elements = self.xpath_query(xpath_expression)
for elem in elements:
if elem.getparent() is not None:
elem.getparent().remove(elem)
return len(elements) > 0
def find_all_links(self, base_url=None):
"""查找所有链接"""
if not self.tree:
return []
links = []
for a in self.tree.xpath('//a[@href]'):
href = a.get('href')
text = self.extract_text(a)
if base_url and href and not href.startswith(('http://', 'https://', 'mailto:', 'tel:')):
from urllib.parse import urljoin
href = urljoin(base_url, href)
links.append({
'text': text,
'url': href,
'title': a.get('title', '')
})
return links
def extract_form_data(self):
"""提取表单数据"""
if not self.tree:
return []
forms = []
for form in self.tree.xpath('//form'):
form_data = {
'action': form.get('action', ''),
'method': form.get('method', 'get').upper(),
'inputs': []
}
# 提取所有输入字段
for input_elem in form.xpath('.//input | .//textarea | .//select'):
input_type = input_elem.tag
name = input_elem.get('name')
value = input_elem.get('value', '')
if input_type == 'select':
# 对于下拉框,获取选中的选项
selected = input_elem.xpath('.//option[@selected]')
if selected:
value = selected[0].get('value', '')
if name: # 只包含有name属性的字段
form_data['inputs'].append({
'type': input_type,
'name': name,
'value': value,
'required': 'required' in input_elem.attrib
})
forms.append(form_data)
return forms
# 使用示例
if __name__ == "__main__":
# 示例HTML
html = """
<html>
<body>
<div class="product-list">
<div class="product">
<h3 class="name">iPhone 15</h3>
<p class="price">¥6999</p>
<p class="description">最新款苹果手机</p>
<a href="/product/iphone15">查看详情</a>
</div>
<div class="product">
<h3 class="name">小米14</h3>
<p class="price">¥3999</p>
<p class="description">性价比高的安卓手机</p>
<a href="/product/xiaomi14">查看详情</a>
</div>
</div>
<form action="/search" method="GET">
<input type="text" name="q" placeholder="搜索...">
<input type="submit" value="搜索">
</form>
</body>
</html>
"""
parser = XPathParser(html)
# 1. 使用XPath提取产品信息
products = parser.xpath_query('//div[@class="product"]')
print("找到的产品数量:", len(products))
for i, product in enumerate(products, 1):
name = parser.extract_text(product.xpath('.//h3[@class="name"]')
price = parser.extract_text(product.xpath('.//p[@class="price"]')
desc = parser.extract_text(product.xpath('.//p[@class="description"]')
link = parser.extract_attributes(product.xpath('.//a')[0], 'href')
print(f"\n产品{i}:")
print(f" 名称: {name}")
print(f" 价格: {price}")
print(f" 描述: {desc}")
print(f" 链接: {link}")
# 2. 使用CSS选择器
product_names = parser.css_query('.product .name')
print("\n使用CSS选择器找到的产品名称:")
for name_elem in product_names:
print(f" - {parser.extract_text(name_elem)}")
# 3. 提取表单数据
forms = parser.extract_form_data()
print("\n表单数据:")
for form in forms:
print(f" 表单动作: {form['action']}")
print(f" 请求方法: {form['method']}")
print(" 输入字段:")
for inp in form['inputs']:
print(f" {inp['name']}: {inp['value']} ({inp['type']})")
Python
2. 实战项目:电商网站商品信息爬取
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
import json
import os
from datetime import datetime
from urllib.parse import urljoin, urlparse, parse_qs
import logging
class EcommerceCrawler:
"""电商网站商品信息爬虫"""
def __init__(self, base_url, output_dir='data'):
self.base_url = base_url
self.session = requests.Session()
self.output_dir = output_dir
self.products = []
# 设置请求头,模拟浏览器
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(output_dir, 'crawler.log')),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def get_page(self, url, params=None, retry_count=3):
"""获取页面内容,带重试机制"""
for attempt in range(retry_count):
try:
response = self.session.get(
url,
headers=self.headers,
params=params,
timeout=30,
verify=False
)
# 检查响应状态
if response.status_code == 200:
self.logger.info(f"成功获取页面: {url}")
return response
elif response.status_code == 403:
self.logger.warning(f"访问被拒绝: {url}, 状态码: {response.status_code}")
self._rotate_user_agent()
elif response.status_code == 404:
self.logger.warning(f"页面不存在: {url}")
return None
elif response.status_code == 429:
self.logger.warning(f"请求过多,等待后重试: {url}")
time.sleep(random.uniform(5, 10))
else:
self.logger.error(f"请求失败: {url}, 状态码: {response.status_code}")
except requests.exceptions.RequestException as e:
self.logger.error(f"请求异常: {e}")
# 重试前等待
if attempt < retry_count - 1:
wait_time = random.uniform(2, 5) * (attempt + 1)
self.logger.info(f"等待{wait_time:.2f}秒后重试...")
time.sleep(wait_time)
self.logger.error(f"达到最大重试次数,放弃: {url}")
return None
def _rotate_user_agent(self):
"""更换User-Agent"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
]
self.headers['User-Agent'] = random.choice(user_agents)
self.logger.info(f"更换User-Agent为: {self.headers['User-Agent'][:50]}...")
def parse_product_list(self, html_content, page_url):
"""解析商品列表页"""
soup = BeautifulSoup(html_content, 'html.parser')
products = []
# 根据网站结构调整选择器
product_items = soup.select('.product-item, .goods-item, .item, [data-product-id]')
if not product_items:
# 尝试其他常见的选择器
product_items = soup.find_all('div', class_=lambda x: x and 'product' in x.lower())
self.logger.info(f"找到{len(product_items)}个商品项")
for item in product_items:
product_info = self._extract_product_info(item, page_url)
if product_info:
products.append(product_info)
return products
def _extract_product_info(self, item, page_url):
"""从商品项提取信息"""
try:
# 商品名称
name_elem = item.select_one('.product-name, .name, .title, h3, h4')
name = name_elem.get_text(strip=True) if name_elem else '未知商品'
# 价格
price_elem = item.select_one('.price, .current-price, .money, .current')
price = price_elem.get_text(strip=True) if price_elem else '价格未知'
# 商品链接
link_elem = item.select_one('a[href]')
if link_elem and link_elem.get('href'):
product_url = urljoin(page_url, link_elem['href'])
else:
product_url = ''
# 图片链接
img_elem = item.select_one('img[src]')
if img_elem and img_elem.get('src'):
img_url = urljoin(page_url, img_elem['src'])
else:
img_url = ''
# 商品ID(如果存在)
product_id = item.get('data-product-id', '') or item.get('data-id', '')
# 评分
rating_elem = item.select_one('.rating, .score, .star-rating')
rating = rating_elem.get_text(strip=True) if rating_elem else '无评分'
# 评论数
review_elem = item.select_one('.review-count, .comments, .reviews')
review_count = review_elem.get_text(strip=True) if review_elem else '0'
product_info = {
'name': name,
'price': price,
'url': product_url,
'image_url': img_url,
'product_id': product_id,
'rating': rating,
'review_count': review_count,
'source_page': page_url,
'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return product_info
except Exception as e:
self.logger.error(f"提取商品信息失败: {e}")
return None
def parse_product_detail(self, product_url):
"""解析商品详情页"""
response = self.get_page(product_url)
if not response:
return None
soup = BeautifulSoup(response.content, 'html.parser')
detail_info = {}
try:
# 商品标题
title_elem = soup.select_one('h1.product-title, h1.title, h1.product-name')
detail_info['title'] = title_elem.get_text(strip=True) if title_elem else ''
# 商品描述
desc_elem = soup.select_one('.product-description, .description, #description')
detail_info['description'] = desc_elem.get_text(strip=True) if desc_elem else ''
# 详细价格信息
price_elems = soup.select('.price, .current-price, .original-price, .discount-price')
prices = [elem.get_text(strip=True) for elem in price_elems if elem]
detail_info['prices'] = prices
# 商品规格
specs = {}
spec_elems = soup.select('.spec-item, .attribute, .property')
for elem in spec_elems:
key_elem = elem.select_one('.spec-key, .attr-name, .prop-name')
value_elem = elem.select_one('.spec-value, .attr-value, .prop-value')
if key_elem and value_elem:
key = key_elem.get_text(strip=True).rstrip(':')
value = value_elem.get_text(strip=True)
specs[key] = value
detail_info['specifications'] = specs
# 商品图片
images = []
img_elems = soup.select('.product-image, .main-image, .gallery img')
for img in img_elems:
if img.get('src'):
img_url = urljoin(product_url, img['src'])
images.append(img_url)
detail_info['images'] = images
# 库存状态
stock_elem = soup.select_one('.stock-status, .inventory, .availability')
detail_info['stock_status'] = stock_elem.get_text(strip=True) if stock_elem else '未知'
# 商家信息
seller_elem = soup.select_one('.seller-info, .store-info, .merchant')
detail_info['seller'] = seller_elem.get_text(strip=True) if seller_elem else ''
# 提取SKU
sku_patterns = ['SKU', '货号', '商品编号']
for pattern in sku_patterns:
sku_elem = soup.find(text=re.compile(pattern))
if sku_elem:
detail_info['sku'] = sku_elem.find_next().get_text(strip=True) if sku_elem.find_next() else ''
break
detail_info['detail_url'] = product_url
detail_info['detail_crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.logger.info(f"成功解析商品详情: {detail_info.get('title', '未知商品')}")
except Exception as e:
self.logger.error(f"解析商品详情失败: {e}")
return detail_info
def crawl_category(self, category_url, max_pages=10):
"""爬取整个商品分类"""
all_products = []
current_page = 1
while current_page <= max_pages:
self.logger.info(f"正在爬取第{current_page}页: {category_url}")
# 构建分页URL(根据网站结构调整)
if '?' in category_url:
page_url = f"{category_url}&page={current_page}"
else:
page_url = f"{category_url}?page={current_page}"
response = self.get_page(page_url)
if not response:
break
# 解析商品列表
products = self.parse_product_list(response.content, page_url)
all_products.extend(products)
self.logger.info(f"第{current_page}页找到{len(products)}个商品")
# 检查是否有下一页
soup = BeautifulSoup(response.content, 'html.parser')
next_button = soup.select_one('.next-page, .next, [rel="next"], a:contains("下一页")')
if not next_button:
self.logger.info("没有更多页面")
break
# 随机延迟,避免请求过快
time.sleep(random.uniform(1, 3))
current_page += 1
self.logger.info(f"分类爬取完成,共找到{len(all_products)}个商品")
return all_products
def save_to_csv(self, products, filename=None):
"""保存商品数据到CSV文件"""
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'products_{timestamp}.csv'
filepath = os.path.join(self.output_dir, filename)
# 转换为DataFrame
df = pd.DataFrame(products)
# 保存到CSV
df.to_csv(filepath, index=False, encoding='utf-8-sig')
self.logger.info(f"数据已保存到: {filepath}")
return filepath
def save_to_json(self, products, filename=None):
"""保存商品数据到JSON文件"""
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'products_{timestamp}.json'
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(products, f, ensure_ascii=False, indent=2)
self.logger.info(f"数据已保存到: {filepath}")
return filepath
def save_to_excel(self, products, filename=None):
"""保存商品数据到Excel文件"""
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'products_{timestamp}.xlsx'
filepath = os.path.join(self.output_dir, filename)
# 转换为DataFrame
df = pd.DataFrame(products)
# 保存到Excel
with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='商品数据')
# 可以添加多个sheet
summary = {
'统计时间': [datetime.now().strftime('%Y-%m-%d %H:%M:%S')],
'商品总数': [len(products)],
'数据来源': [self.base_url],
'文件保存路径': [filepath]
}
pd.DataFrame(summary).to_excel(writer, index=False, sheet_name='统计信息')
self.logger.info(f"数据已保存到: {filepath}")
return filepath
def crawl_with_details(self, category_url, max_products=50):
"""爬取商品列表并获取详细信息"""
# 先爬取商品列表
products = self.crawl_category(category_url, max_pages=5)
if not products:
self.logger.warning("未找到商品")
return []
# 限制商品数量
products = products[:max_products]
detailed_products = []
for i, product in enumerate(products, 1):
self.logger.info(f"正在获取商品详情 ({i}/{len(products)}): {product.get('name', '未知商品')}")
if product.get('url'):
detail = self.parse_product_detail(product['url'])
if detail:
# 合并基本信息与详细信息
full_info = {**product, **detail}
detailed_products.append(full_info)
# 随机延迟,避免请求过快
time.sleep(random.uniform(0.5, 2))
self.logger.info(f"详情爬取完成,共获取{len(detailed_products)}个商品的详细信息")
return detailed_products
def generate_report(self, products):
"""生成数据报告"""
if not products:
return "没有数据可生成报告"
df = pd.DataFrame(products)
report = {
'统计摘要': {
'商品总数': len(products),
'数据来源': self.base_url,
'爬取时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'数据字段数': len(df.columns),
'数据完整性': f"{df.notnull().sum().sum() / (len(df) * len(df.columns)) * 100:.1f}%"
},
'价格分析': {
'平均价格': '需要数值化处理',
'价格区间': '需要数值化处理',
'最高价商品': df.loc[df['price'].apply(lambda x: self._extract_price(x)).idxmax()]['name'] if 'price' in df.columns else '未知',
'最低价商品': df.loc[df['price'].apply(lambda x: self._extract_price(x)).idxmin()]['name'] if 'price' in df.columns else '未知'
},
'商品分类统计': {
'不同商家数量': df['seller'].nunique() if 'seller' in df.columns else 0,
'有图片商品数': df['image_url'].notnull().sum() if 'image_url' in df.columns else 0,
'有评分商品数': df[df['rating'] != '无评分'].shape[0] if 'rating' in df.columns else 0
}
}
return report
def _extract_price(self, price_str):
"""从价格字符串中提取数值"""
if not isinstance(price_str, str):
return 0
# 提取数字
import re
numbers = re.findall(r'\d+\.?\d*', price_str)
if numbers:
return float(numbers
return 0
# 使用示例
if __name__ == "__main__":
# 示例:爬取某个电商网站的商品
base_url = " https://example-ecommerce.com "
crawler = EcommerceCrawler(base_url, output_dir='ecommerce_data')
# 1. 爬取某个分类的商品
category_url = f"{base_url}/electronics"
products = crawler.crawl_with_details(category_url, max_products=20)
if products:
# 保存数据
csv_file = crawler.save_to_csv(products)
json_file = crawler.save_to_json(products)
excel_file = crawler.save_to_excel(products)
# 生成报告
report = crawler.generate_report(products)
print("爬取报告:")
for section, data in report.items():
print(f"\n{section}:")
for key, value in data.items():
print(f" {key}: {value}")
# 显示前几个商品
print(f"\n前5个商品信息:")
for i, product in enumerate(products[:5], 1):
print(f"\n商品{i}:")
print(f" 名称: {product.get('name', 'N/A')}")
print(f" 价格: {product.get('price', 'N/A')}")
print(f" 评分: {product.get('rating', 'N/A')}")
print(f" 链接: {product.get('url', 'N/A')}")
else:
print("未爬取到商品数据")
Python
3. 反爬虫策略与应对措施
随着网站对爬虫的防御越来越严格,爬虫开发者需要了解常见的反爬虫机制并采取相应措施。
3.1 常见反爬虫机制及应对策略
import requests
from fake_useragent import UserAgent
import random
import time
from datetime import datetime
import hashlib
import json
class AntiAntiCrawler:
"""反反爬虫策略实现"""
def __init__(self):
self.session = requests.Session()
self.ua = UserAgent()
self.proxies = []
self.cookies = {}
self.request_count = 0
self.last_request_time = time.time()
def rotate_user_agent(self):
"""随机更换User-Agent"""
user_agent = self.ua.random
self.session.headers.update({'User-Agent': user_agent})
return user_agent
def set_referer(self, referer):
"""设置Referer头"""
self.session.headers.update({'Referer': referer})
def set_cookies_from_response(self, response):
"""从响应中更新cookies"""
if response.cookies:
self.cookies.update(response.cookies.get_dict())
self.session.cookies.update(self.cookies)
def add_proxy(self, proxy_list):
"""添加代理IP列表"""
self.proxies = proxy_list
def get_with_proxy_rotation(self, url, **kwargs):
"""使用代理轮询发送请求"""
if not self.proxies:
return self.session.get(url, **kwargs)
proxy = random.choice(self.proxies)
proxies = {
'http': proxy,
'https': proxy
}
try:
response = self.session.get(url, proxies=proxies, **kwargs)
return response
except:
# 如果代理失败,尝试下一个
self.proxies.remove(proxy)
if self.proxies:
return self.get_with_proxy_rotation(url, **kwargs)
else:
return self.session.get(url, **kwargs)
def random_delay(self, min_seconds=1, max_seconds=3):
"""随机延迟,模拟人类操作"""
delay = random.uniform(min_seconds, max_seconds)
time.sleep(delay)
def respect_robots_txt(self, base_url):
"""遵守robots.txt协议"""
try:
robots_url = f"{base_url.rstrip('/')}/robots.txt"
response = self.session.get(robots_url, timeout=5)
if response.status_code == 200:
robots_content = response.text
print("robots.txt内容:")
print(robots_content)
return robots_content
except:
pass
return None
def handle_verification_code(self, image_url):
"""处理验证码(需要人工或OCR识别)"""
# 这里可以集成OCR服务或人工识别
# 示例:下载验证码图片
response = self.session.get(image_url)
with open('captcha.jpg', 'wb') as f:
f.write(response.content)
# 这里可以调用OCR API或显示给用户手动输入
captcha_code = input("请输入验证码图片中的文字: ")
return captcha_code
def simulate_human_behavior(self):
"""模拟人类浏览行为"""
# 随机鼠标移动模式(用于需要行为检测的网站)
behaviors = [
{'action': 'scroll', 'amount': random.randint(100, 500)},
{'action': 'click', 'x': random.randint(100, 800), 'y': random.randint(100, 600)},
{'action': 'wait', 'duration': random.uniform(0.5, 2.0)}
]
# 在实际爬虫中,可以使用Selenium来模拟这些行为
return random.choice(behaviors)
def bypass_cloudflare(self, url):
"""绕过Cloudflare等防护"""
# 一些网站使用Cloudflare防护,需要特殊处理
headers = {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0',
}
response = self.session.get(url, headers=headers)
# 检查是否被重定向到验证页面
if 'challenge' in response.url or 'captcha' in response.url:
print("检测到验证页面,需要人工干预")
# 这里可以集成自动验证码解决服务
return response
def monitor_request_rate(self, max_requests_per_minute=60):
"""监控请求频率,避免被封IP"""
current_time = time.time()
time_diff = current_time - self.last_request_time
# 计算每分钟请求数
if self.request_count > 0 and time_diff < 60:
requests_per_minute = self.request_count / (time_diff / 60)
if requests_per_minute > max_requests_per_minute:
wait_time = 60 - time_diff
print(f"请求频率过高,等待{wait_time:.2f}秒")
time.sleep(wait_time)
self.request_count = 0
self.last_request_time = time.time()
self.request_count += 1
def save_session_state(self, filename='session_state.json'):
"""保存会话状态(cookies等)"""
state = {
'cookies': self.session.cookies.get_dict(),
'headers': dict(self.session.headers),
'last_updated': datetime.now().isoformat()
}
with open(filename, 'w') as f:
json.dump(state, f)
def load_session_state(self, filename='session_state.json'):
"""加载会话状态"""
try:
with open(filename, 'r') as f:
state = json.load(f)
self.session.cookies.update(state.get('cookies', {}))
self.session.headers.update(state.get('headers', {}))
return True
except:
return False
# 使用示例:应对各种反爬措施
def advanced_crawling_example():
"""高级爬虫示例,展示各种反爬应对策略"""
anti_crawler = AntiAntiCrawler()
# 1. 遵守robots协议
base_url = " https://example.com "
robots_content = anti_crawler.respect_robots_txt(base_url)
# 2. 设置随机User-Agent
user_agent = anti_crawler.rotate_user_agent()
print(f"使用User-Agent: {user_agent}")
# 3. 设置Referer(模拟从搜索引擎访问)
anti_crawler.set_referer(" https://www.google.com/ ")
# 4. 使用代理IP(如果有的话)
proxy_list = [
' http://proxy1.example.com:8080',
' http://proxy2.example.com:8080',
' http://proxy3.example.com:8080'
]
anti_crawler.add_proxy(proxy_list)
# 5. 监控请求频率
urls_to_crawl = [
f"{base_url}/page{i}" for i in range(1, 11)
]
results = []
for url in urls_to_crawl:
# 监控请求频率
anti_crawler.monitor_request_rate(max_requests_per_minute=30)
# 随机延迟
anti_crawler.random_delay(min_seconds=2, max_seconds=5)
# 发送请求(带代理轮询)
try:
response = anti_crawler.get_with_proxy_rotation(url, timeout=10)
if response.status_code == 200:
# 更新cookies
anti_crawler.set_cookies_from_response(response)
# 解析内容
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.title.string if soup.title else "无标题"
results.append({
'url': url,
'title': title,
'status': 'success',
'timestamp': datetime.now().isoformat()
})
print(f"成功爬取: {url} - {title}")
elif response.status_code == 403:
print(f"访问被拒绝: {url}")
# 更换User-Agent重试
anti_crawler.rotate_user_agent()
elif response.status_code == 429:
print(f"请求过多: {url}")
# 等待更长时间
time.sleep(10)
except Exception as e:
print(f"请求失败 {url}: {e}")
results.append({
'url': url,
'error': str(e),
'status': 'failed',
'timestamp': datetime.now().isoformat()
})
# 6. 保存会话状态
anti_crawler.save_session_state()
return results
if __name__ == "__main__":
results = advanced_crawling_example()
print(f"\n爬取完成,成功{len([r for r in results if r['status'] == 'success'])}个,失败{len([r for r in results if r['status'] == 'failed'])}个")
Python
4. Scrapy框架实战
Scrapy是一个快速、高层次的网页爬取框架,用于抓取网站并从页面中提取结构化数据。
4.1 基础Scrapy爬虫项目结构
my_scrapy_project/
├── scrapy.cfg
└── my_scrapy_project/
├── __init__.py
├── items.py # 定义数据结构
├── middlewares.py # 中间件
├── pipelines.py # 数据处理管道
├── settings.py # 配置文件
└── spiders/ # 爬虫文件目录
├── __init__.py
└── example_spider.py
Plain Text
4.2 完整的Scrapy爬虫示例
# items.py - 定义要爬取的数据结构
import scrapy
class ProductItem(scrapy.Item):
"""商品数据项"""
# 定义字段
name = scrapy.Field() # 商品名称
price = scrapy.Field() # 价格
description = scrapy.Field() # 描述
url = scrapy.Field() # 商品链接
image_url = scrapy.Field() # 图片链接
category = scrapy.Field() # 分类
sku = scrapy.Field() # SKU编号
brand = scrapy.Field() # 品牌
rating = scrapy.Field() # 评分
review_count = scrapy.Field() # 评论数
stock_status = scrapy.Field() # 库存状态
specifications = scrapy.Field() # 规格参数
crawl_time = scrapy.Field() # 爬取时间
# pipelines.py - 数据处理管道
import json
import csv
import pymongo
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class ProductPipeline:
"""商品数据处理管道"""
def process_item(self, item, spider):
"""处理每个item"""
# 数据清洗
if not item.get('name'):
raise DropItem("缺少商品名称")
if not item.get('price'):
item['price'] = '价格未知'
# 格式化价格
if 'price' in item and item['price']:
item['price'] = self._format_price(item['price'])
# 添加爬取时间戳
from datetime import datetime
item['crawl_time'] = datetime.now().isoformat()
return item
def _format_price(self, price_str):
"""格式化价格字符串"""
import re
# 提取数字
numbers = re.findall(r'\d+\.?\d*', str(price_str))
if numbers:
return float(numbers
return price_str
class JsonWriterPipeline:
"""将数据写入JSON文件"""
def open_spider(self, spider):
self.file = open(f'{spider.name}_products.json', 'w', encoding='utf-8')
self.file.write('[\n')
self.first_item = True
def close_spider(self, spider):
self.file.write('\n]')
self.file.close()
def process_item(self, item, spider):
line = json.dumps(
ItemAdapter(item).asdict(),
ensure_ascii=False,
indent=2
)
if not self.first_item:
self.file.write(',\n')
self.first_item = False
self.file.write(line)
return item
class CsvWriterPipeline:
"""将数据写入CSV文件"""
def __init__(self):
self.csv_file = None
self.writer = None
def open_spider(self, spider):
import csv
self.csv_file = open(f'{spider.name}_products.csv', 'w', newline='', encoding='utf-8-sig')
self.writer = csv.DictWriter(self.csv_file, fieldnames=[
'name', 'price', 'description', 'url', 'image_url',
'category', 'sku', 'brand', 'rating', 'review_count',
'stock_status', 'specifications', 'crawl_time'
])
self.writer.writeheader()
def close_spider(self, spider):
if self.csv_file:
self.csv_file.close()
def process_item(self, item, spider):
self.writer.writerow(ItemAdapter(item).asdict())
return item
class MongoDBPipeline:
"""将数据存储到MongoDB"""
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler
Python
三、高阶拓展与持续成长
1. 性能优化与底层原理
Python作为一门高级解释型语言,其简洁的语法和丰富的生态使其广受欢迎,但性能问题也常被诟病。要真正优化Python程序,必须深入理解其底层运行机制。本文将从解释器执行模型、内存管理、GIL机制、性能优化技巧等多个维度,系统阐述Python性能优化的核心原理与实践策略。
一、Python解释器核心执行机制
1.1 从源代码到字节码的执行流程
Python代码的执行并非直接由CPU执行,而是经历了一个多阶段的转换过程:
- 词法分析**:将源代码字符串分解为有意义的令牌(Tokens)**
- 语法分析**:根据Python语法规则检查令牌序列,构建抽象语法树(AST)**
- 字节码编译**:将AST转换为平台无关的中间表示——字节码**
- 虚拟机执行**:Python虚拟机(PVM)解释执行字节码指令**
使用 dis 模块可以查看函数的字节码表示,帮助理解底层执行过程:
import dis
def add(a, b):
return a + b
dis.dis(add) # 输出字节码指令序列
Python
1.2 CPython解释器的特殊地位
CPython是Python的官方参考实现,使用C语言编写,其执行过程包括解析、编译和执行三个阶段。理解CPython的执行流程对性能优化至关重要,因为很多性能问题在字节码阶段就已埋下伏笔。
二、内存管理机制深度剖析
2.1 引用计数:实时内存回收
Python主要采用引用计数管理内存,每个对象都有一个引用计数器:
- 当引用增加时,计数器+1
- 当引用减少时,计数器-1
- 计数器为0时,对象被立即回收
import sys
a = [1, 2, 3]
print(sys.getrefcount(a)) # 注意:getrefcount()本身会增加一次引用
Python
引用计数的优势在于实时性和确定性,内存释放与对象生命周期严格同步。但无法处理循环引用问题,这是其主要的局限性。
2.2 垃圾回收:解决循环引用
为解决循环引用问题,Python引入了分代垃圾回收机制:
- 分代策略**:对象按存活时间分为三代(0代、1代、2代)**
- 标记清除**:从根对象出发标记所有可达对象,回收不可达对象**
- 阈值触发**:当某代对象数量超过阈值时,触发对应代的回收**
import gc
# 手动触发垃圾回收
collected = gc.collect()
print(f"回收了{collected}个对象")
Python
2.3 内存池优化:小对象高效分配
Python采用分层内存分配策略提升小对象分配效率:
- 小整数池**:[-5, 256]区间的整数共享同一对象**
- 字符串驻留**:符合规范的字符串自动缓存**
- 自由列表**:维护空闲内存块列表,减少内存碎片**
- PyMalloc机制**:针对小于512字节的对象使用内存池**
三、GIL(全局解释器锁)的影响与应对
3.1 GIL的本质与影响
GIL是CPython解释器中的互斥锁,确保同一时刻只有一个线程执行Python字节码。其主要影响包括:
- CPU密集型任务**:多线程无法利用多核CPU,性能提升有限**
- I/O密集型任务**:线程在等待I/O时会释放GIL,多线程仍能提升并发性能**
3.2 突破GIL限制的策略
- 使用多进程(multiprocessing)****:绕过GIL,真正利用多核CPU
- 使用C扩展**:C扩展在执行期间可以释放GIL,如numpy、pandas**
- 异步编程(asyncio)****:适用于I/O密集型任务,避免线程切换开销
- 更换解释器**:如PyPy使用JIT技术,在某些场景下能显著提升性能**
四、性能优化核心技巧
4.1 算法与数据结构优化
选择合适的数据结构对性能影响巨大:
- 集合 vs 列表**:集合的in操作是O(1),列表是O(n)**
- 字典访问**:键访问也是O(1),但需注意哈希冲突**
- 避免不必要操作**:在循环中进行高成本操作会显著降低性能**
# 低效:列表查找O(n)
result = [i for i in items if i in [2, 4, 6, 8]]
# 高效:集合查找O(1)
target_set = {2, 4, 6, 8}
result = [i for i in items if i in target_set]
Python
4.2 局部变量访问加速
Python中局部变量访问比全局变量快得多:
- LOAD_FAST vs LOAD_GLOBAL**:局部变量访问使用LOAD_FAST字节码,比LOAD_GLOBAL快3-5倍**
- 缓存频繁访问的变量**:将全局变量或类属性赋值给局部变量**
# 优化前:频繁访问全局变量
for _ in range(1_000_000):
if global_var == "test": # LOAD_GLOBAL指令
pass
# 优化后:使用局部变量
local_var = global_var
for _ in range(1_000_000):
if local_var == "test": # LOAD_FAST指令
pass
Python
4.3 利用内置函数和库
Python内置函数通常由C实现,比纯Python循环更快:
- 内置函数**:如sum、map、filter等**
- 数值计算库**:如NumPy、pandas进行向量化计算**
- 集合操作**:使用collections模块中的Counter、defaultdict等**
4.4 生成器与惰性求值
处理大数据时,生成器能显著减少内存占用:
# 低效:一次性加载所有数据
def process_large_file_bad(filename):
with open(filename) as f:
lines = f.readlines() # 可能占用大量内存
return [line.strip().upper() for line in lines]
# 高效:逐行处理
def process_large_file_good(filename):
with open(filename) as f:
for line in f:
yield line.strip().upper()
Python
五、高级优化技术
5.1 JIT编译:Numba的魔法
对于数值密集型循环,Numba通过LLVM将Python函数即时编译为机器码,能带来100-200倍的性能提升:
from numba import jit
import math
@jit(nopython=True)
def numba_sqrt(n):
result = []
for i in range(n):
result.append(math.sqrt(i))
return result
Python
5.2 Cython:混合Python与C
Cython允许在Python代码中添加C语言类型标注,然后编译成C扩展模块,特别适合数值计算场景:
# cython_test.pyx
def cython_fib(int n):
cdef int a=0, b=1, i
for i in range(n):
a, b = b, a + b
return a
cython
5.3 __slots__魔法:减少对象内存开销
当创建大量实例时,使用 slots 可以禁用实例字典,直接预分配固定属性空间,内存占用减少40-50%:
class RegularUser:
def __init__(self, uid, name):
self.uid = uid
self.name = name
class SlotUser:
__slots__ = ['uid', 'name'] # 禁用实例字典
def __init__(self, uid, name):
self.uid = uid
self.name = name
Python
5.4 Memoryview与零拷贝操作
处理二进制数据时,memoryview对象允许零拷贝访问底层内存缓冲区,内存占用减少90%以上:
data = bytearray(b"x" * 10_000_000)
# 低效切片(复制数据)
slices = [data[i:i+100] for i in range(0, len(data), 100)]
# memoryview优化
mv = memoryview(data)
slices_mv = [mv[i:i+100] for i in range(0, len(data), 100)]
Python
六、性能分析工具链
6.1 分层性能验证体系
性能优化必须建立科学的验证体系:
- 宏观测量**:使用timeit测量小段代码执行时间**
- 中观分析**:使用cProfile找出热点函数**
- 微观剖析**:使用line_profiler查看函数内每行代码耗时**
6.2 常用性能分析工具
- cProfile**:性能分析器,统计函数调用次数和时间**
- timeit**:精确测量代码执行时间**
- memory_profiler**:内存使用分析**
- objgraph**:可视化对象引用关系,检测内存泄漏**
七、实战优化案例
7.1 日志分析脚本优化:从2.8秒到0.17秒
原始脚本逐行读日志,用str.split()解析,再用list.count()统计IP频次,耗时2.8秒:
- 第一步**:用collections.Counter替代手动计数 → 降为1.4秒**
- 第二步**:用正则预编译 + re.finditer提取IP → 降到0.65秒**
- 第三步**:用map() + 生成器表达式替代for循环 → 最终0.17秒**
7.2 质数计算优化:PyPy vs CPython
对于循环密集型任务,PyPy的JIT编译能带来显著性能提升:
- CPython**:计算100万以内质数约12秒**
- PyPy**:相同任务约0.8秒,提速15倍**
八、优化原则与最佳实践
8.1 优化原则
- 先测量,后优化**:使用性能分析工具找到真正的瓶颈**
- 理解权衡**:优化可能牺牲可读性,确保收益大于成本**
- 关注热点代码**:优先优化被频繁调用的函数**
- 避免过早优化**:在代码可读性和性能间取得平衡**
8.2 持续学习建议
- 学习CPython源码**:深入理解对象模型、内存分配等底层机制**
- 掌握性能分析工具**:熟练使用cProfile、memory_profiler等工具**
- 了解不同解释器特性**:根据场景选择合适的Python解释器**
- 实践优化案例**:通过实际项目积累优化经验**
九、总结
Python性能优化是一个系统工程,需要从底层机制理解入手,结合具体场景选择合适的优化策略。关键要点包括:
- 深入理解CPython执行机制**,特别是字节码生成和执行过程**
- 掌握内存管理原理**,避免循环引用和内存泄漏**
- 合理应对GIL限制**,根据任务类型选择并发方案**
- 善用性能分析工具**,科学定位性能瓶颈**
- 采用分层优化策略**,从算法、数据结构到编译优化全面考虑**
通过系统性的性能优化,Python程序完全能够满足高性能计算的需求,在保持开发效率的同时获得可观的性能提升。
2. 测试驱动开发与质量保障
本文将深入讲解Python测试驱动开发(TDD)的核心概念、实践方法与完整的质量保障体系,帮助您构建可靠、可维护的软件。
第一部分:测试驱动开发(TDD)基础
1.1 TDD的核心循环与理念
测试驱动开发**是一种以测试为驱动的软件开发方法论,其核心流程遵循“红-绿-重构”循环:**
编写测试(红) → 实现功能(绿) → 重构优化(重构) → 再次测试(绿)
Plain Text
TDD的三大法则(Robert C. Martin提出):
- 仅在编写失败测试后才编写产品代码
- 仅编写刚好让测试通过的代码(不超量)
- 仅编写刚好修复失败测试的代码(不冗余)
# TDD实践示例:开发一个简单的计算器
# 步骤1:先写测试(失败-红)
def test_add():
"""测试加法功能"""
calculator = Calculator() # 类还不存在
result = calculator.add(2, 3)
assert result == 5, f"预期5,实际得到{result}"
# 步骤2:编写最小实现(通过-绿)
class Calculator:
def add(self, a, b):
return a + b # 最简单的实现
# 步骤3:重构(优化代码结构,保持测试通过)
# 现在可以安全地重构,因为有测试保障
class Calculator:
def add(self, a, b):
# 添加输入验证等,但功能不变
if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
raise TypeError("参数必须是数字")
return a + b
Python
1.2 为什么需要TDD?
主要优势:
- 设计引导**:通过测试用例定义接口,驱动良好设计**
- 安全性**:每次修改都有测试保护,减少回归bug**
- 可维护性**:简洁的实现,避免过度设计**
- 文档化**:测试即文档,说明代码如何使用**
- 节奏感**:小步快跑,持续获得成就感**
TDD适用场景:
- 核心业务逻辑
- 公共组件库
- 算法实现
- 需要高可靠性的模块
第二部分:Python测试框架深入
2.1 pytest核心功能
pytest是现代Python测试的事实标准,提供丰富功能和简洁语法:
# test_calculator.py
import pytest
from calculator import Calculator
@pytest.fixture
def calculator():
"""创建计算器实例的fixture"""
return Calculator()
# 基础测试
def test_addition(calculator):
"""测试加法"""
assert calculator.add(2, 3) == 5
assert calculator.add(-1, 1) == 0
assert calculator.add(0, 0) == 0
# 参数化测试
@pytest.mark.parametrize("a,b,expected", [
(2, 3, 5),
(-1, 1, 0),
(0, 0, 0),
(2.5, 3.5, 6.0),
])
def test_add_multiple_cases(calculator, a, b, expected):
"""使用参数化测试多个场景"""
result = calculator.add(a, b)
assert result == expected, f"{a}+{b}应该等于{expected}"
# 异常测试
def test_add_type_error(calculator):
"""测试类型错误"""
with pytest.raises(TypeError, match="参数必须是数字"):
calculator.add("2", 3)
with pytest.raises(TypeError):
calculator.add(2, "3")
# 测试标记
@pytest.mark.slow
def test_complex_calculation():
"""标记为慢测试,可单独运行或跳过"""
# 复杂计算测试...
pass
@pytest.mark.skip(reason="功能尚未实现")
def test_future_feature():
"""暂时跳过的测试"""
assert False
Python
2.2 高级fixture用法
fixture是pytest的核心特性之一,用于测试资源的生命周期管理:
# conftest.py - 项目级fixture配置
import pytest
import tempfile
import json
from datetime import datetime
@pytest.fixture
def temp_config_file():
"""创建临时配置文件"""
config = {
"app": "test_app",
"version": "1.0.0",
"timestamp": datetime.now().isoformat()
}
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(config, f)
temp_path = f.name
yield temp_path # 提供给测试使用
# 测试后清理
import os
os.unlink(temp_path)
@pytest.fixture
def db_connection(request):
"""数据库连接fixture"""
connection = DatabaseConnection(
host=request.config.getoption("--db-host"),
database="test_db"
)
connection.connect()
yield connection
connection.disconnect()
# 会话级fixture(整个测试会话只创建一次)
@pytest.fixture(scope="session")
def redis_client():
"""Redis客户端会话级fixture"""
client = Redis(host='localhost', port=6379, decode_responses=True)
yield client
client.close()
# 自动使用fixture
@pytest.fixture(autouse=True)
def setup_logging():
"""自动应用的fixture,为每个测试设置日志"""
import logging
logging.basicConfig(level=logging.DEBUG)
yield
# 清理日志处理器
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
@pytest.fixture(params=[True, False])
def use_cache(request):
"""参数化fixture"""
return request.param
def test_with_caching(db_connection, use_cache):
"""使用参数化fixture"""
result = fetch_data(db_connection, cache=use_cache)
assert result is not None
Python
2.3 unittest模块使用
Python标准库中的unittest模块在某些场景仍然有用:
import unittest
from unittest.mock import Mock, patch, MagicMock
class TestStringMethods(unittest.TestCase):
def setUp(self):
"""测试前的准备工作"""
self.test_string = "hello"
def tearDown(self):
"""测试后的清理工作"""
pass
def test_upper(self):
"""测试大写转换"""
self.assertEqual(self.test_string.upper(), "HELLO")
def test_split(self):
"""测试分割字符串"""
s = 'hello world'
self.assertEqual(s.split(), ['hello', 'world'])
# 检查分割符是否生效
with self.assertRaises(TypeError):
s.split(2)
# 跳过测试
@unittest.skip("演示跳过测试")
def test_nothing(self):
pass
# 条件跳过
@unittest.skipIf(not has_feature(), "缺少必要功能")
def test_feature(self):
pass
# Mock和patch使用
class TestExternalService(unittest.TestCase):
@patch('requests.get') # 替换requests.get函数
def test_fetch_data(self, mock_get):
"""测试调用外部API"""
# 配置mock返回值
mock_response = Mock()
mock_response.json.return_value = {'data': 'test'}
mock_response.status_code = 200
mock_get.return_value = mock_response
# 调用被测试的函数
result = fetch_from_api()
# 验证行为
self.assertEqual(result, 'test')
mock_get.assert_called_once_with(' https://api.example.com/data')
if __name__ == '__main__':
unittest.main()
Python
第三部分:不同类型的测试策略
3.1 单元测试(Unit Tests)
单元测试专注于最小可测试单元——通常是单个函数或方法:
# 被测试的函数
def calculate_statistics(numbers):
"""计算统计指标"""
if not numbers:
raise ValueError("数字列表不能为空")
total = sum(numbers)
mean = total / len(numbers)
sorted_nums = sorted(numbers)
mid = len(sorted_nums) // 2
if len(sorted_nums) % 2 == 0:
median = (sorted_nums[mid-1] + sorted_nums[mid]) / 2
else:
median = sorted_nums[mid]
return {
'total': total,
'mean': mean,
'median': median,
'count': len(numbers),
'min': min(numbers),
'max': max(numbers)
}
# 对应的单元测试
class TestStatistics(unittest.TestCase):
def test_basic_statistics(self):
"""测试基础统计功能"""
result = calculate_statistics([1, 2, 3, 4, 5])
expected = {
'total': 15,
'mean': 3,
'median': 3,
'count': 5,
'min': 1,
'max': 5
}
self.assertEqual(result, expected)
def test_empty_list(self):
"""测试边界情况"""
with self.assertRaises(ValueError):
calculate_statistics([])
def test_single_element(self):
"""测试单个元素的特殊情况"""
result = calculate_statistics(
expected = {
'total': 42,
'mean': 42,
'median': 42,
'count': 1,
'min': 42,
'max': 42
}
self.assertEqual(result, expected)
def test_negative_numbers(self):
"""测试负数情况"""
result = calculate_statistics([-5, -3, 0, 3, 5])
self.assertEqual(result['total'], 0)
self.assertEqual(result['mean'], 0)
self.assertEqual(result['median'], 0)
Python
3.2 集成测试(Integration Tests)
集成测试验证多个组件协同工作:
# 集成测试示例:测试数据库和业务逻辑集成
class TestUserRegistration(unittest.TestCase):
def setUp(self):
"""设置测试数据库"""
self.db = TestDatabase()
self.db.create_tables()
self.user_service = UserService(self.db)
self.email_service = MockEmailService()
def tearDown(self):
"""清理测试数据库"""
self.db.drop_tables()
def test_user_registration_flow(self):
"""测试完整的用户注册流程"""
# 注册新用户
user_data = {
'username': 'testuser',
'email': 'test@example.com',
'password': 'secure123'
}
# 调用注册流程
user_id = self.user_service.register_user(user_data)
# 验证结果
# 1. 用户是否保存到数据库
db_user = self.db.get_user(user_id)
self.assertEqual(db_user.username, 'testuser')
# 2. 是否发送了验证邮件
self.assertTrue(self.email_service.was_called())
# 3. 用户状态是否正确
self.assertEqual(db_user.status, 'pending_verification')
def test_duplicate_registration(self):
"""测试重复注册"""
user_data = {
'username': 'testuser',
'email': 'test@example.com',
'password': 'secure123'
}
# 第一次注册成功
self.user_service.register_user(user_data)
# 第二次注册应该失败
with self.assertRaises(DuplicateUserError):
self.user_service.register_user(user_data)
Python
3.3 端到端测试(End-to-End Tests)
端到端测试模拟真实用户场景:
# 使用Selenium进行Web应用E2E测试
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import pytest
@pytest.fixture
def browser():
"""浏览器fixture"""
driver = webdriver.Chrome() # 或Firefox(), Safari()
driver.implicitly_wait(10)
yield driver
driver.quit()
def test_user_login_flow(browser, live_server):
"""测试用户登录流程"""
# 1. 访问网站
browser.get(live_server.url)
# 2. 点击登录按钮
login_button = browser.find_element(By.XPATH, "//button[text()='登录']")
login_button.click()
# 3. 填写登录表单
username_input = browser.find_element(By.ID, "username")
password_input = browser.find_element(By.ID, "password")
username_input.send_keys("testuser")
password_input.send_keys("testpass123")
# 4. 提交表单
submit_button = browser.find_element(By.XPATH, "//button[@type='submit']")
submit_button.click()
# 5. 验证登录成功
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.ID, "user-greeting"))
)
greeting = browser.find_element(By.ID, "user-greeting")
assert "欢迎" in greeting.text
# 6. 验证用户菜单显示
user_menu = browser.find_element(By.CLASS_NAME, "user-menu")
assert user_menu.is_displayed()
def test_shopping_cart_flow(browser, live_server):
"""测试购物车流程"""
# 登录用户
login_user(browser, live_server)
# 浏览商品
product_link = browser.find_element(By.LINK_TEXT, "产品详情")
product_link.click()
# 添加到购物车
add_to_cart_button = browser.find_element(By.ID, "add-to-cart")
add_to_cart_button.click()
# 查看购物车
cart_link = browser.find_element(By.ID, "cart-link")
cart_link.click()
# 验证购物车内容
cart_items = browser.find_elements(By.CLASS_NAME, "cart-item")
assert len(cart_items) > 0
# 结账流程
checkout_button = browser.find_element(By.ID, "checkout")
checkout_button.click()
# 验证页面跳转
assert "checkout" in browser.current_url
Python
第四部分:Mock与测试替身策略
4.1 何时使用Mock
测试替身主要包括以下几种类型:
| 类型 | 用途 | 例子 |
|---|---|---|
| Dummy | 仅占位,不会被使用 | 空对象或None |
| Stub | 提供预设的响应 | 返回固定数据的对象 |
| Spy | 记录调用信息,验证行为 | 记录函数调用次数 |
| Mock | 预设响应+行为验证 | 验证方法被调用 |
| Fake | 简化实现,用于测试 | 内存数据库 |
4.2 unittest.mock详细用法
from unittest.mock import Mock, MagicMock, patch, call
import requests
# 基本Mock使用
def test_api_client():
"""测试API客户端"""
# 创建Mock对象
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {'success': True}
# Mock requests.get
with patch('requests.get', return_value=mock_response) as mock_get:
api_client = APIClient()
result = api_client.fetch_data()
# 验证调用
mock_get.assert_called_once_with(' https://api.example.com/data')
# 验证结果
assert result['success'] is True
# Mock多个调用
def test_retry_logic():
"""测试重试逻辑"""
mock_session = Mock()
# 第一次调用失败,第二次成功
mock_session.get.side_effect = [
requests.exceptions.ConnectionError,
Mock(status_code=200, json=lambda: {'data': 'success'})
]
# 测试重试逻辑
with patch('my_module.requests.Session', return_value=mock_session):
service = MyService()
result = service.fetch_with_retry()
# 验证重试次数
assert mock_session.get.call_count == 2
assert result == 'success'
# Mock属性访问
def test_property_mocking():
"""Mock属性的高级用法"""
mock_obj = Mock()
# 配置属性链式访问
mock_obj.parent.child.grandchild.name = "test"
# 也可以使用MagicMock
magic_mock = MagicMock()
magic_mock.complex.path.to.attribute = "value"
# 验证
assert mock_obj.parent.child.grandchild.name == "test"
assert magic_mock.complex.path.to.attribute == "value"
# patch装饰器用法
class TestPaymentService:
@patch('payment_service.stripe.Charge.create')
@patch('payment_service.send_receipt_email')
def test_process_payment(self, mock_email, mock_stripe):
"""测试支付处理(patch两个依赖)"""
# 配置mock
mock_stripe.return_value.id = "ch_123"
mock_stripe.return_value.status = "succeeded"
# 调用被测试方法
service = PaymentService()
result = service.process_payment(100, "usd", "tok_123")
# 验证stripe调用
mock_stripe.assert_called_once_with(
amount=100,
currency="usd",
source="tok_123"
)
# 验证邮件发送(检查是否调用,不关心具体参数)
mock_email.assert_called_once()
# 验证结果
assert result.payment_id == "ch_123"
assert result.status == "success"
# 部分Mock
def test_partial_mock():
"""部分Mock,只替换对象的特定方法"""
real_object = RealClass()
# Mock其中一个方法,其他方法保持原样
with patch.object(real_object, 'expensive_method') as mock_method:
mock_method.return_value = "cached_result"
# 调用
result = real_object.some_operation()
# 验证
assert mock_method.called
# 其他方法还是真实调用
Python
第五部分:持续集成与质量检查
5.1 GitHub Actions自动化流水线
# .github/workflows/ci.yml
name: Python CI
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Lint with flake8
run: |
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=100 --statistics
- name: Type checking with mypy
run: |
mypy --ignore-missing-imports --follow-imports=silent --show-column-numbers src/
- name: Format check with black
run: |
black --check --diff src/ tests/
- name: Test with pytest
run: |
pytest tests/ --cov=src --cov-report=xml --cov-report=html
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
flags: unittests
- name: Security scan
run: |
pip install bandit
bandit -r src/ -f json -o bandit_report.json
- name: Dependency vulnerability check
run: |
pip install safety
safety check --full-report
integration-test:
needs: test
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14
env:
POSTGRES_PASSWORD: testpassword
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run integration tests
env:
DATABASE_URL: postgresql://postgres:testpassword@postgres:5432/postgres
run: |
pytest tests/integration/ -v
deploy:
needs: [test, integration-test]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Deploy to staging
run: |
echo "部署到预发布环境..."
# 实际的部署脚本
YAML
5.2 pre-commit钩子配置
# .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-ast
- id: check-json
- id: check-merge-conflict
- id: debug-statements
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
args: [--line-length=100]
language_version: python3
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--line-length=100"]
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
hooks:
- id: flake8
args:
- "--max-line-length=100"
- "--max-complexity=10"
additional_dependencies: [flake8-docstrings]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
hooks:
- id: mypy
args: [--ignore-missing-imports, --follow-imports=silent]
additional_dependencies:
- types-requests
- types-pyyaml
- types-python-dateutil
- repo: https://github.com/PyCQA/bandit
rev: 1.7.5
hooks:
- id: bandit
args: ["-c", "pyproject.toml"]
- repo: local
hooks:
- id: pytest-check
name: pytest
entry: pytest --tb=short -v
language: system
pass_filenames: false
always_run: true
stages: [push]
YAML
5.3 覆盖率报告与质量门控
# pyproject.toml 或 pytest.ini 配置
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q --strict-markers"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
[tool.coverage.run]
source = ["src"]
omit = ["*/__pycache__/*", "setup.py"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"raise AssertionError",
"raise NotImplementedError",
"except:",
"except Exception:",
"finally:",
"^\\s*except\\s.*:",
"^\\s*with.*:"
]
fail_under = 90
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
ignore_missing_imports = true
[tool.black]
line-length = 100
target-version = ['py310']
[tool.isort]
profile = "black"
line_length = 100
known_first_party = ["myapp"]
Python
第六部分:测试金字塔与策略
6.1 测试金字塔模型
/\
/ \ 少量端到端测试(E2E)
/ \
/ \ 更多集成测试
/ \
/ \ 大量单元测试
/ \
/______________\
Plain Text
理想分布**:**
- 单元测试**: 70-80% - 快速、隔离、易于维护**
- 集成测试**: 15-20% - 验证组件协作**
- 端到端测试**: 5-10% - 验证完整业务流程**
6.2 测试策略选择
# 测试策略工厂模式
from enum import Enum
from abc import ABC, abstractmethod
class TestType(Enum):
UNIT = "unit"
INTEGRATION = "integration"
E2E = "e2e"
class TestStrategy(ABC):
"""测试策略基类"""
@abstractmethod
def setup(self):
"""测试环境设置"""
pass
@abstractmethod
def teardown(self):
"""测试环境清理"""
pass
@abstractmethod
def should_run(self, test_path):
"""判断是否应该运行该测试"""
pass
class UnitTestStrategy(TestStrategy):
"""单元测试策略"""
def setup(self):
"""单元测试环境简单,无需复杂设置"""
pass
def teardown(self):
pass
def should_run(self, test_path):
# 单元测试:文件名以test_开头
return test_path.endswith("_test.py") or "unit" in test_path
class IntegrationTestStrategy(TestStrategy):
"""集成测试策略"""
def setup(self):
"""需要设置数据库、外部服务等"""
self.db = setup_test_database()
self.redis = setup_redis()
def teardown(self):
self.db.cleanup()
self.redis.cleanup()
def should_run(self, test_path):
# 集成测试:特定的目录或文件名
return "integration" in test_path or "integration_test" in test_path
class E2ETestStrategy(TestStrategy):
"""端到端测试策略"""
def setup(self):
"""需要完整的部署环境"""
self.server = start_test_server()
self.browser = setup_selenium()
def teardown(self):
self.browser.quit()
self.server.shutdown()
def should_run(self, test_path):
# E2E测试:特定的标记或文件名
return "e2e" in test_path or test_path.endswith("_e2e_test.py")
# 使用策略模式选择测试
class TestRunner:
"""测试运行器,根据策略执行不同测试"""
def __init__(self, strategy: TestStrategy):
self.strategy = strategy
def run_tests(self, test_files):
"""根据策略运行测试"""
self.strategy.setup()
try:
# 筛选并运行测试
for test_file in test_files:
if self.strategy.should_run(test_file):
self._run_single_test(test_file)
finally:
self.strategy.teardown()
def _run_single_test(self, test_file):
"""运行单个测试文件"""
import subprocess
result = subprocess.run(["pytest", test_file, "-v"])
return result.returncode == 0
Python
第七部分:性能测试与基准测试
7.1 pytest-benchmark使用
import pytest
import time
from functools import lru_cache
def fibonacci_recursive(n):
"""递归实现斐波那契 - O(2^n)"""
if n <= 1:
return n
return fibonacci_recursive(n-1) + fibonacci_recursive(n-2)
def fibonacci_iterative(n):
"""迭代实现斐波那契 - O(n)"""
if n <= 1:
return n
a, b = 0, 1
for _ in range(n-1):
a, b = b, a + b
return b
@lru_cache(maxsize=None)
def fibonacci_memoized(n):
"""带缓存的斐波那契 - O(n)"""
if n <= 1:
return n
return fibonacci_memoized(n-1) + fibonacci_memoized(n-2)
# 基准测试
@pytest.mark.benchmark
def test_fibonacci_performance(benchmark):
"""测试不同实现的性能"""
# 测试迭代版本
result_iterative = benchmark(fibonacci_iterative, 30)
assert result_iterative == 832040
# 测试缓存版本(预热缓存)
fibonacci_memoized.cache_clear()
fibonacci_memoized(5) # 预热
result_memoized = benchmark(fibonacci_memoized, 30)
assert result_memoized == 832040
# 测试递归版本(会很慢)
# result_recursive = benchmark(fibonacci_recursive, 30) # 注释掉,太慢
# 参数化基准测试
@pytest.mark.parametrize("n", [10, 20, 30])
def test_fibonacci_scales(benchmark, n):
"""测试不同输入规模的性能"""
result = benchmark(fibonacci_iterative, n)
assert result > 0
# 内存使用测试
def test_memory_usage():
"""测试内存使用"""
import sys
import numpy as np
# 创建大数组
array_size = 1000000
array = np.random.rand(array_size)
# 计算内存使用
memory_usage = sys.getsizeof(array) / 1024 / 1024 # MB
print(f"数组内存使用: {memory_usage:.2f} MB")
# 验证内存使用在合理范围内
assert memory_usage < 10.0 # 应该小于10MB
# 清理
del array
# 并发性能测试
@pytest.mark.benchmark(warmup=True, min_rounds=10)
def test_concurrent_requests(benchmark):
"""测试并发请求性能"""
import asyncio
import aiohttp
async def make_request(session, url):
async with session.get(url) as response:
return await response.text()
async def make_multiple_requests(url, count):
async with aiohttp.ClientSession() as session:
tasks = []
for _ in range(count):
task = make_request(session, url)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
# 基准测试
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def run_benchmark():
return loop.run_until_complete(make_multiple_requests(' http://localhost:8080/test', 10))
# 运行基准测试
results = benchmark(run_benchmark)
assert len(results) == 10
loop.close()
Python
第八部分:测试最佳实践与模式
8.1 测试设计模式
# 1. 工厂模式创建测试数据
class TestDataFactory:
"""测试数据工厂"""
@staticmethod
def create_user(**overrides):
"""创建用户测试数据"""
default = {
'id': 1,
'username': 'testuser',
'email': 'test@example.com',
'is_active': True,
'created_at': '2023-01-01T00:00:00Z'
}
default.update(overrides)
return default
@staticmethod
def create_product(**overrides):
"""创建产品测试数据"""
default = {
'id': 100,
'name': '测试产品',
'price': 99.99,
'stock': 50,
'category': 'electronics'
}
default.update(overrides)
return default
# 2. 建造者模式创建复杂对象
class UserBuilder:
"""用户对象建造者"""
def __init__(self):
self.user = TestDataFactory.create_user()
def with_id(self, user_id):
self.user['id'] = user_id
return self
def with_email(self, email):
self.user['email'] = email
return self
def as_inactive(self):
self.user['is_active'] = False
return self
def build(self):
return self.user.copy()
# 测试中使用
def test_user_activation():
"""测试用户激活"""
# 使用建造者创建测试用户
inactive_user = UserBuilder().with_id(123).as_inactive().build()
user_service = UserService()
activated_user = user_service.activate_user(inactive_user)
assert activated_user['is_active'] is True
assert activated_user['id'] == 123
# 3. 测试数据清理器
class DatabaseCleaner:
"""测试数据库清理器"""
def __init__(self, connection):
self.connection = connection
self.created_tables = []
def clean_all(self):
"""清理所有测试数据"""
cursor = self.connection.cursor()
# 禁用外键约束
cursor.execute("PRAGMA foreign_keys = OFF")
# 清理所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for table in tables:
table_name = table
if table_name != 'sqlite_sequence': # 跳过自增表
cursor.execute(f"DELETE FROM {table_name}")
# 启用外键约束
cursor.execute("PRAGMA foreign_keys = ON")
self.connection.commit()
cursor.close()
def clean_table(self, table_name):
"""清理特定表"""
cursor = self.connection.cursor()
cursor.execute(f"DELETE FROM {table_name}")
self.connection.commit()
cursor.close()
# 4. 测试上下文管理器
class TestTransaction:
"""测试事务上下文管理器"""
def __init__(self, connection):
self.connection = connection
def __enter__(self):
"""开始事务"""
self.connection.execute("BEGIN")
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
"""回滚事务(测试中不提交)"""
self.connection.execute("ROLLBACK")
if exc_type is not None:
# 有异常时记录
print(f"测试事务中发生异常: {exc_type.__name__}: {exc_val}")
@classmethod
def with_rollback(cls, connection):
"""使用装饰器模式"""
def decorator(func):
def wrapper(*args, **kwargs):
with cls(connection):
return func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
def test_with_transaction():
"""在事务中运行测试"""
db = get_test_database()
with TestTransaction(db) as conn:
# 这些操作会在测试后回滚
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name) VALUES ('test')")
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
assert len(results) == 1
# 数据不会被真正提交
Python
8.2 测试命名与组织规范
"""
测试文件组织规范
tests/
├── __init__.py
├── conftest.py # 全局fixture和配置
├── unit/ # 单元测试
│ ├── test_models.py # 测试数据模型
│ ├── test_services.py # 测试业务逻辑
│ └── test_utils.py # 测试工具函数
├── integration/ # 集成测试
│ ├── test_database.py # 数据库集成测试
│ └── test_api.py # API集成测试
├── e2e/ # 端到端测试
│ └── test_user_flow.py
├── fixtures/ # 测试数据fixture
│ └── factories.py
└── perf/ # 性能测试
└── benchmark_tests.py
"""
# 测试类命名规范
class TestUserModel: # 测试用户模型
class TestAuthentication: # 测试认证功能
class TestPaymentProcessor: # 测试支付处理器
class TestCalculateStatistics: # 测试统计计算
# 测试方法命名规范
def test_user_can_be_created(): # 测试用户创建
def test_user_login_with_valid_credentials(): # 测试有效凭据登录
def test_user_cannot_login_with_invalid_password(): # 测试无效密码登录
def test_payment_fails_when_card_declined(): # 测试支付失败
def test_statistics_are_calculated_correctly(): # 测试统计计算
# BDD风格测试命名(使用given-when-then模式)
def test_when_user_registers_then_account_is_created():
"""GIVEN 新用户信息
WHEN 用户注册
THEN 账户应该被创建"""
# 设置(GIVEN)
user_data = {'email': 'test@example.com', 'password': 'secret'}
# 执行(WHEN)
user_service = UserService()
user = user_service.register(user_data)
# 验证(THEN)
assert user.id is not None
assert user.email == user_data['email']
assert user.is_active is True
Python
第九部分:质量保障完整示例
9.1 完整的TDD实践:开发温度转换器
"""
完整TDD流程示例:开发温度转换功能
遵循红-绿-重构循环
"""
# 第1轮:摄氏转华氏
# ==================
# 第1步:写测试(红)
def test_celsius_to_fahrenheit():
"""测试摄氏度转华氏度"""
# 需求:32°C应该转成89.6°F
result = TemperatureConverter.celsius_to_fahrenheit(32)
assert result == 89.6
# 第2步:最小实现(绿)
class TemperatureConverter:
@staticmethod
def celsius_to_fahrenheit(celsius):
# 公式:F = C × 9/5 + 32
return celsius * 9/5 + 32
# 第3步:重构(添加更多测试用例)
def test_celsius_to_fahrenheit_edge_cases():
"""测试边界情况"""
# 0°C = 32°F
result = TemperatureConverter.celsius_to_fahrenheit(0)
assert result == 32
# -40°C = -40°F
result = TemperatureConverter.celsius_to_fahrenheit(-40)
assert result == -40
# 100°C = 212°F
result = TemperatureConverter.celsius_to_fahrenheit(100)
assert result == 212
# 第2轮:华氏转摄氏
# ==================
def test_fahrenheit_to_celsius():
"""测试华氏度转摄氏度"""
result = TemperatureConverter.fahrenheit_to_celsius(89.6)
assert result == 32
# 实现
class TemperatureConverter:
@staticmethod
def celsius_to_fahrenheit(celsius):
return celsius * 9/5 + 32
@staticmethod
def fahrenheit_to_celsius(fahrenheit):
# 公式:C = (F - 32) × 5/9
return (fahrenheit - 32) * 5/9
# 更多测试
def test_fahrenheit_to_celsius_edge_cases():
"""测试华氏转摄氏边界情况"""
# 32°F = 0°C
result = TemperatureConverter.fahrenheit_to_celsius(32)
assert result == 0
# -40°F = -40°C
result = TemperatureConverter.fahrenheit_to_celsius(-40)
assert result == -40
# 212°F = 100°C
result = TemperatureConverter.fahrenheit_to_celsius(212)
assert result == 100
# 第3轮:温度转换工具类
# =====================
def test_temperature_converter_validation():
"""测试输入验证"""
# 应该接受浮点数
result = TemperatureConverter.celsius_to_fahrenheit(32.5)
assert isinstance(result, float)
# 应该拒绝非数值输入
with pytest.raises(TypeError):
TemperatureConverter.celsius_to_fahrenheit("32")
# 重构实现类
class TemperatureConverter:
@staticmethod
def celsius_to_fahrenheit(celsius):
"""摄氏转华氏"""
if not isinstance(celsius, (int, float)):
raise TypeError("温度必须是数值")
return celsius * 9/5 + 32
@staticmethod
def fahrenheit_to_celsius(fahrenheit):
"""华氏转摄氏"""
if not isinstance(fahrenheit, (int, float)):
raise TypeError("温度必须是数值")
return (fahrenheit - 32) * 5/9
# 第4轮:添加新功能 - 凯氏温度转换
@staticmethod
def celsius_to_kelvin(celsius):
"""摄氏转凯氏"""
if not isinstance(celsius, (int, float)):
raise TypeError("温度必须是数值")
return celsius + 273.15
@staticmethod
def kelvin_to_celsius(kelvin):
"""凯氏转摄氏"""
if not isinstance(kelvin, (int, float)):
raise TypeError("温度必须是数值")
return kelvin - 273.15
# 测试新功能
def test_celsius_kelvin_conversion():
"""测试摄氏-凯氏转换"""
# 0°C = 273.15K
result = TemperatureConverter.celsius_to_kelvin(0)
assert result == 273.15
# 273.15K = 0°C
result = TemperatureConverter.kelvin_to_celsius(273.15)
assert result == 0
# 绝对零度
result = TemperatureConverter.celsius_to_kelvin(-273.15)
assert result == 0
# 最终重构:添加方便的转换方法
class TemperatureConverter:
@staticmethod
def convert(value, from_unit, to_unit):
"""
通用的温度转换方法
参数:
value: 温度值
from_unit: 原始单位 ('C', 'F', 'K')
to_unit: 目标单位 ('C', 'F', 'K')
返回:
转换后的温度值
"""
if not isinstance(value, (int, float)):
raise TypeError("温度值必须是数值")
valid_units = {'C', 'F', 'K'}
if
Python
二、容器化与云原生部署
容器化与云原生部署已成为现代应用开发和运维的核心范式。它们通过标准化打包、自动化编排和弹性伸缩,彻底改变了软件的交付和运行方式。本文将结合具体样例,系统介绍容器化与云原生部署的核心概念、工具链和实践方案。
一、容器化基础:从Docker开始
1.1 为什么需要容器化?
传统部署方式面临环境差异、依赖冲突、部署复杂等问题。容器化通过将应用及其所有依赖打包到一个独立的运行环境中,实现了 “一次构建,到处运行” 的目标。
对于AI模型部署,容器化尤为重要。训练好的模型需要快速、稳定、弹性地部署到生产环境,容器化保证了跨环境一致性、依赖隔离和快速部署回滚。
1.2 Docker核心概念与实践
Docker核心概念**包括镜像(Image)、容器(Container)、Dockerfile和镜像仓库。下面是一个Python Flask应用的容器化示例:**
# Dockerfile示例:Python Flask应用
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir
COPY . .
EXPOSE 5000
CMD ["python", "app.py"]
dockerfile
对应的Flask应用代码:
from flask import Flask
app = Flask(__name__)
@app.route('/')
def home():
return "Hello from Flask! 🌟"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Python
构建和运行命令:
# 构建镜像
docker build -t my-flask-app .
# 运行容器
docker run -d -p 5000:5000 my-flask-app
Bash
对于Java Spring Boot应用,Dockerfile略有不同:
FROM openjdk:11-jre-slim
COPY target/myapp.jar /app.jar
CMD ["java", "-jar", "/app.jar"]
dockerfile
1.3 Docker Compose管理多服务应用
当应用包含多个服务时,Docker Compose可以简化管理。以下是一个前后端分离应用的配置:
# docker-compose.yml
version: '3'
services:
frontend:
image: node:14-alpine
working_dir: /app
volumes:
- ./frontend:/app
command: npm start
ports:
- "3000:3000"
backend:
image: my-spring-app:latest
ports:
- "8080:8080"
environment:
- DATABASE_URL=postgresql://db:5432/mydb
depends_on:
- db
db:
image: postgres:13
environment:
POSTGRES_PASSWORD: example
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
YAML
二、云原生部署:Kubernetes编排
2.1 Kubernetes核心概念
Kubernetes(K8s)是一个开源的容器编排平台,用于自动化应用的部署、扩缩、负载均衡和运维管理。核心概念包括:
- Pod**:Kubernetes中最小的可部署单元,包含一个或多个容器**
- Deployment**:管理Pod的部署和更新,确保指定数量的Pod副本运行**
- Service**:为Pod提供稳定的网络访问方式**
- Namespace**:将集群资源划分为不同的逻辑组**
2.2 本地Kubernetes环境搭建
对于学习和测试,可以使用Minikube在本地快速创建单节点Kubernetes集群:
# 安装kubectl(Kubernetes命令行工具)
sudo apt-get update && sudo apt-get install -y apt-transport-https ca-certificates curl
curl -fsSLo /usr/share/keyrings/kubernetes-archive-keyring.gpg https://packages.cloud.google.com/apt/doc/apt-key.gpg
echo "deb [signed-by=/usr/share/keyrings/kubernetes-archive-keyring.gpg] https://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee /etc/apt/sources.list.d/kubernetes.list
sudo apt-get update
sudo apt-get install -y kubectl
# 安装Minikube
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
# 启动集群
minikube start
Bash
2.3 Kubernetes部署示例
示例1:基础Web应用部署
# nginx-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
spec:
replicas: 2
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:latest
ports:
- containerPort: 80
# nginx-service.yaml
apiVersion: v1
kind: Service
metadata:
name: nginx-service
spec:
selector:
app: nginx
ports:
- protocol: TCP
port: 80
targetPort: 80
type: LoadBalancer
YAML
应用配置:
kubectl apply -f nginx-deployment.yaml
kubectl apply -f nginx-service.yaml
Bash
示例2:AI模型推理服务部署
对于需要GPU资源的AI推理服务,Kubernetes配置更为复杂:
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-inference
spec:
replicas: 2
selector:
matchLabels:
app: ai-inference
template:
metadata:
labels:
app: ai-inference
spec:
containers:
- name: inference
image: myrepo/ai-inference:1.0
resources:
limits:
nvidia.com/gpu: 1 # 申请GPU资源
ports:
- containerPort: 8080
YAML
2.4 Helm:Kubernetes包管理器
Helm简化了Kubernetes应用的部署和管理。一个典型的Helm Chart结构如下:
my-node-app/
├── Chart.yaml # Chart基本信息
├── values.yaml # 默认配置值
└── templates/ # Kubernetes资源模板
└── deployment.yaml
Plain Text
Chart.yaml示例:
apiVersion: v2
name: my-node-app
description: A Helm chart for Kubernetes
version: 0.1.0
YAML
安装Helm Chart:
# 创建Chart
helm create myapp
# 部署应用
helm install myapp ./myapp
Bash
三、云平台部署实践
3.1 AWS云原生部署
AWS提供了完整的云原生服务栈:
- 容器服务**:**
- Amazon EKS:托管Kubernetes服务
- Amazon ECS:AWS原生容器编排服务
- Fargate:无服务器计算引擎
- CI/CD工具链**:**
- CodePipeline + CodeBuild + CodeDeploy
- 集成GitHub Actions、GitLab CI等
- 示例部署流程(EKS + GitOps)****:
# 使用Terraform创建EKS集群
terraform apply
# 构建镜像并推送到ECR
docker build -t myapp .
docker tag myapp:latest 123456789.dkr.ecr.us-east-1.amazonaws.com/myapp:latest
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/myapp:latest
# 使用ArgoCD实现GitOps部署
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-cd/stable/manifests/install.yaml
Bash
3.2 Azure云原生部署
Azure的云原生服务包括:
- 容器服务**:**
- Azure Kubernetes Service (AKS):托管Kubernetes服务
- Azure Container Apps:无服务器容器平台
- Azure Container Instances (ACI):快速运行单个容器
- 部署流程示例**:**
# Azure Pipelines配置
trigger:
- main
pool:
vmImage: ubuntu-latest
steps:
- task: Docker@2
inputs:
containerRegistry: 'Azure Container Registry'
repository: 'myapp'
command: 'buildAndPush'
Dockerfile: '**/Dockerfile'
- task: KubernetesManifest@0
inputs:
action: 'deploy'
kubernetesServiceConnection: 'aks-connection'
manifests: '**/deployment.yaml'
YAML
四、本地化云原生实践
4.1 本地部署的挑战与解决方案
本地环境部署云原生应用面临独特挑战:
- 基础设施适配性**:缺乏公有云的弹性资源池**
- 解决方案:通过Kubernetes节点自动伸缩组的本地化配置
- 服务发现与负载均衡**:需要替代公有云负载均衡器**
- 解决方案:使用MetalLB实现本地环境的BGP或L2模式负载均衡
apiVersion: v1
kind: ConfigMap
metadata:
namespace: metallb-system
name: config
data:
config: |
address-pools:
- name: default
protocol: layer2
addresses:
- 192.168.1.240-192.168.1.250
YAML
- 持久化存储管理**:本地存储的动态供给**
- 解决方案:使用Rook+Ceph部署分布式存储
4.2 边缘计算场景
对于工业物联网边缘部署,可以使用轻量化的K3s集群:
# 在树莓派上部署K3s
curl -sfL https://get.k3s.io | INSTALL_K3S_EXEC="--no-deploy servicelb --no-deploy traefik" sh -
Bash
五、高性能推理平台:NVIDIA Triton
对于AI模型部署,NVIDIA Triton Inference Server提供了专业解决方案:
5.1 Triton架构优势
- 多框架支持**:PyTorch、TensorFlow、ONNX、TensorRT等**
- 批处理能力**:动态批处理提高吞吐量**
- 并行模型加载**:同时服务多个模型**
- 动态扩缩容**:根据负载自动调整资源**
5.2 Triton部署示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: triton-inference
spec:
replicas: 2
selector:
matchLabels:
app: triton-inference
template:
metadata:
labels:
app: triton-inference
spec:
containers:
- name: triton
image: nvcr.io/nvidia/tritonserver:22.12-py3
args: ["tritonserver", "--model-repository=/models"]
resources:
limits:
nvidia.com/gpu: 2
volumeMounts:
- name: model-storage
mountPath: /models
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-pvc
YAML
六、最佳实践与建议
6.1 安全最佳实践
- 最小权限原则**:使用RBAC控制Kubernetes资源访问**
- 网络策略**:通过NetworkPolicy实现微隔离**
- 镜像扫描**:使用Trivy、Clair等工具扫描镜像漏洞**
- 安全上下文**:使用非root用户运行容器**
6.2 成本优化策略
- 使用Spot实例**:AWS Spot实例或Azure低优先级VM**
- 自动扩缩容**:配置HPA(Horizontal Pod Autoscaler)和VPA(Vertical Pod Autoscaler)**
- 资源配额管理**:设置合理的资源请求和限制**
6.3 监控与可观测性
完整的监控栈包括:
- Prometheus**:指标收集和告警**
- Grafana**:数据可视化**
- Loki**:日志收集**
- Jaeger**:分布式追踪**
示例告警规则:
groups:
- name: node-memory
rules:
- alert: HighMemoryUsage
expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100 > 85
for: 5m
labels:
severity: warning
YAML
七、总结
容器化与云原生部署不是遥不可及的"高大上"技术,而是现代软件开发的必备技能。从简单的Docker容器化开始,逐步掌握Kubernetes编排,最终实现完整的云原生架构,这一路径适合各种规模的团队。
关键要点总结:
- 容器化保证环境一致性**,解决"在我机器上能运行"的问题**
- Kubernetes提供自动化编排**,实现应用的高可用和弹性伸缩**
- 云平台服务简化运维**,但需考虑数据主权和成本控制**
- 本地化部署满足特定需求**,如合规要求和边缘计算场景**
- 完整工具链和最佳实践**确保部署的安全性、可靠性和可维护性**
通过循序渐进的实践,中小团队也能成功实施云原生转型,享受自动化、弹性扩展和快速迭代带来的效率提升。
四、学习建议与资源规划
1. 学习方法论
- 项目驱动学习**:每个阶段都要有对应的实战项目,从简单脚本到复杂系统逐步升级。学习Python的目的应该是完成项目,而非单纯学习语言本身。**
- 代码审查习惯**:定期回顾自己的代码,思考如何重构改进;参与开源项目的代码审查。**
- 知识体系构建**:使用笔记工具(如Obsidian、Notion)建立个人知识库,连接相关概念。**
- 社区参与**:关注PyCon会议、订阅Python Weekly、参与本地技术社区活动。**
2. 时间规划建议
根据搜索结果中的路线图,一个完整的进阶学习周期建议分配如下:
- 第1-2个月**:完成阶段一至三,打好语言基础和工程能力。**
- 第3个月**:选择专项方向深入,完成1-2个中型项目。**
- 第4个月起**:参与开源贡献,学习高阶拓展内容,保持技术敏感度。**
3. 推荐学习资源
- 官方文档**:Python官方文档、各主流库文档(阅读源码是最好的学习)。**
- 经典书籍**:《Fluent Python》、《Effective Python》、《Python Cookbook》。**
- 实践平台**:LeetCode(算法)、Kaggle(数据科学)、Exercism(代码练习)。**
- 在线教程与求助**:菜鸟教程等网站适合基础入门。遇到困难时,善用搜索引擎(尤其是英文搜索)和社区提问。**
五、总结
这份Python进阶学习大纲为你提供了一条从语言特性掌握到工程实践能力,再到专项技术深化的完整路径。关键在于坚持“学以致用”的原则,每个知识点都要通过代码实践来巩固,每个阶段都要有对应的项目产出。进阶之路没有终点,Python生态在不断演进,保持持续学习的心态,积极参与社区,你将从一名Python使用者成长为真正的Python开发者。
记住,编程能力的提升不是线性过程,而是螺旋式上升。遇到困难时,回到基础概念重新思考;掌握新技能后,尝试在项目中应用。祝你学习顺利,早日成为Python高手!
更多推荐


所有评论(0)