Python后端开发之旅(三)

Python进阶——动态导入机制

通过反射机制
import
getattr

Python进阶——Protobuf (gRPC)服务通信

🌟 Protobuf (gRPC) 服务通信

💡 目标:使用 .proto 文件定义接口,通过 gRPC 实现跨语言的服务调用(如 Python → Python 或 Python → Go/Java 等)

✅ 什么是 Protobuf?

Protobuf(Protocol Buffers)是 Google 开发的一种数据序列化协议,类似于 JSON 或 XML,但更高效、更紧凑。

  • 它允许你用 .proto 文件描述数据结构。
  • 然后通过工具生成对应语言(Python、Java、Go 等)的类。
  • 常用于微服务之间的通信。

⚠️ 注意:Protobuf 是“数据格式”,而 gRPC 是基于它的“通信框架”。


✅ 什么是 gRPC?

gRPC 是一个高性能、开源的远程过程调用(RPC)框架,由 Google 发起,它:

  • 使用 HTTP/2 协议
  • 基于 Protobuf 定义服务接口
  • 支持多种语言(Python、Java、Go、C++ 等)
  • 支持双向流、单向流、请求-响应等模式

💡 类比:就像 REST API 用 JSON 传数据,gRPC 用 Protobuf 传数据,并且支持更复杂的通信方式。

在这里插入图片描述

实际案例

🔧 步骤一:安装依赖

pip install grpcio grpcio-tools protobuf

📄 步骤二:编写 .proto 文件

创建文件 helloworld.proto

syntax = "proto3";

package helloworld;

// 定义消息类型
message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

// 定义服务
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}
解释:
  • syntax = "proto3";:使用 Protobuf 第三版语法
  • package helloworld;:命名空间,防止冲突
  • message:定义数据结构
  • service:定义可以被调用的方法(类似接口)
  • rpc SayHello (HelloRequest) returns (HelloResponse);:表示这个服务有一个方法叫 SayHello,接收 HelloRequest 返回 HelloResponse

🛠️ 步骤三:生成 Python 代码

使用 protoc 编译器生成 Python 代码。

# 生成 Python 代码
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto

生成文件:
helloworld_pb2.py(消息类)
helloworld_pb2_grpc.py(服务接口)

核心参数说明
  1. -I.--proto_path=.

    • 作用:指定 .proto 文件的搜索路径(当前目录为 .)。
    • 示例:若文件在 src/protos 目录下,需改为 -Isrc/protos
  2. --python_out=.

    • 作用:生成 Protobuf 消息类的 Python 代码(*_pb2.py),输出到当前目录。
    • 文件内容:包含消息的序列化/反序列化方法和字段定义
  3. --grpc_python_out=.

    • 作用:生成 gRPC 服务接口的 Python 代码(*_pb2_grpc.py),输出到当前目录。
    • 文件内容:包含服务端 Servicer 基类和客户端 Stub
  4. helloworld.proto

    • 作用:指定输入的 Protocol Buffers 文件。支持通配符(如 *.proto)批量编译[1][5]
  • 依赖安装:需提前通过 pip install grpcio-tools 安装编译工具
helloworld_pd2.py(消息类)

它被用来创建 请求 和 响应 消息对象

class HelloRequest:
    def __init__(self):
        self.name = ""

class HelloReply:
    def __init__(self):
        self.message = ""

# 序列化和反序列化方法
def SerializeToString(self):
    # 序列化逻辑
    pass

def ParseFromString(self, data):
    # 反序列化逻辑
    pass
  • 定义消息类:
    • 包含了proto文件中定义的所有消息类型,这些消息类型被转换为Python类
  • 序列化和反序列化:
    • 提供了将消息对象序列化为字节流(用于网络传输)和从字节流反序列化为消息对象的功能
helloworld_pd2_grpc.py(服务接口)

由proto文件中的服务定义生成的,用于定义gRPC服务的接口和实现

  • 服务接口类 GreeterServicer

    • 包含了proto文件中定义的服务接口(service),并为每个服务方法生成了抽象类,自己写的服务端需要实现GreeterServicer类
    • 例如,Greeter服务会生成一个GreeterServicer抽象类,其中包含SayHello方法的定义。
  • 服务注册函数add_GreeterServicer_to_server

    • 提供了将服务实现绑定到gRPC服务器的方法。例如,add_GreeterServicer_to_server方法用于将服务实现注册到服务器。
  • 客户端存根类GreeterStub:

    • 为客户端生成了服务的存根(Stub),用于调用服务端的方法。例如,GreeterStub类允许客户端调用SayHello方法
class GreeterServicer:
    def SayHello(self, request, context):
        raise NotImplementedError()

def add_GreeterServicer_to_server(servicer, server):
    # 注册服务到服务器的逻辑
    pass

class GreeterStub:
    def SayHello(self, request):
        # 调用服务端方法的逻辑
        pass

🧪 步骤四:实现服务端(Server)

import grpc
from concurrent import futures
import helloworld_pb2
import helloworld_pb2_grpc

# 实现服务
class Greeter(helloworld_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        # 接收 client 的 name,返回问候语
        return helloworld_pb2.HelloReply(message=f"Hello, {request.name}!")

# 启动服务器
def serve():
	# 创建线程池(最大10个工作线程)
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
	# 注册服务实现
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')  # 监听地址
    print("Server started on port 50051")
    server.start()
    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()

🔍 模拟客户端调用前先运行此服务!

  • from concurrent import futures:导入concurrent.futures模块,用于创建线程池。gRPC服务器需要一个线程池来处理并发请求
  • request:客户端发送的HelloRequest对象,包含name字段
  • context:提供RPC上下文信息(如超时、元数据等)
  • 返回值必须是HelloReply类型,与Protobuf定义一致
  • '[::]:50051’表示监听所有网络接口的50051端口。insecure表示不使用加密(仅用于本地测试)
  • start():启动服务器,开始监听客户端请求。
  • server.wait_for_termination():阻塞主线程,直到服务器被关闭。这通常用于保持服务器运行
  • server.stop(0):如果捕获到KeyboardInterrupt(如用户按下Ctrl+C),则停止服务器。0表示立即停止

🧩 步骤五:实现客户端(Client)

import grpc
import helloworld_pb2
import helloworld_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)# 创建一个GreeterStub对象,它是客户端代理,用于调用服务端定义的Greeter服务。
	    request = helloworld_pb2.HelloRequest(name='World')# 创建一个HelloRequest消息对象,并设置name字段为"World"。这是客户端发送给服务端的请求数据。
	    response = stub.SayHello(request)# 调用服务端的SayHello方法,将request对象作为参数传递。服务端处理后返回一个HelloReply消息对象。
	    print("Greeter client received: " + response.message)# 打印服务端返回的响应消息中的message字段。

if __name__ == '__main__':
    run()
  • 创建一个gRPC通道,连接到运行在localhost:50051的服务端
  • insecure_channel表示不使用加密连接(仅用于本地测试)

✅ 运行顺序

  1. 先运行服务端脚本(server.py
  2. 再运行客户端脚本(client.py
  3. 输出:
    Server started on port 50051
    Received: Hello, Alice!
    

✅ 优点 & 场景

特性 说明
高性能 序列化快,二进制传输,比 JSON 快很多
跨语言 可以用 Python、Java、Go 互相调用
流式支持 支持双向流(适合实时聊天、日志收集)
接口清晰 所有接口都在 .proto 里明确定义

🚀 典型用途:微服务间通信、API 网关、分布式系统调用

Python进阶——Celery 处理异步任务调度

🌟 Celery 处理异步任务调度

💡 目标:将耗时任务(如发送邮件、处理图片)放入队列,由后台 worker 异步执行,不阻塞主程序

✅ 什么是 Celery?

Celery 是一个分布式任务队列系统,常用于:

  • 异步任务执行(如发送邮件、生成报表)
  • 定时任务(定时清理缓存)
  • 分布式任务处理(多个 worker 并行工作)

核心组件:

组件 作用
Broker(消息中间件) 存储任务,比如 RabbitMQ、Redis
Worker 消费任务并执行
Result Backend 保存任务结果(可选)

🔄 工作流程:
主程序 → 发送任务到 Broker → Worker 拿取 → 执行 → 结果返回(可选)
在这里插入图片描述

  • Version Requirements
  • Celery本身不支持Windows,因此可能会遇到 挺多的问题的呢!
    celery本身不具备任务存储的能力,做不到任务存储的功能,因此在使用Celery时还需要搭配一些具备存储、访问的工具,如消息队列,Redies等等。(Tower线上:broker用RabbitMQ、Backend用redies|线下都用redies)
    如果没有配置 backend,那么获取结果的时候会报错

🧰 步骤一:准备消息中间件(推荐用 Redis 或 RabbitMQ)

方法一:使用 Redis(简单)
# 启动 Redis 服务(本地)
redis-server
方法二:使用 RabbitMQ
# 安装 RabbitMQ(macOS)
brew install rabbitmq
rabbitmq-server

📦 步骤二:安装 Celery

pip install celery[redis]  # 如果用 Redis
# 或者用 RabbitMQ
pip install celery amqp

🧪 步骤三:定义任务(task.py)

from celery import Celery
import time

# 创建 Celery 应用实例
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add(x, y):
    """这是一个耗时任务"""
    time.sleep(5)  # 模拟耗时操作
    return x + y

@app.task
def send_email(to, subject):
    print(f"Sending email to {to}, subject: {subject}")
    time.sleep(3)
    return f"Email sent to {to}"

💡 @app.task 装饰器表示这是一个可被 Celery 执行的任务


🛠️ 步骤四:启动 Celery Worker

在终端运行:

celery -A task.app worker --loglevel=info
  1. -A task.celery_app

    • -A--app 指定 Celery 应用模块的路径。这里的 task.celery_app 表示从 task.py 模块中加载名为 app 的 Celery 应用实例
  2. worker

  3. --loglevel=info

    • 设置日志级别为 info,控制日志输出的详细程度。info 级别会记录常规的运行信息(如任务接收、执行完成等),适合生产环境调试。其他常见级别包括:
      • debug:最详细,包含内部调试信息
      • warning/error:仅记录警告或错误

输出示例:

[2024-04-05 10:00:00,000: INFO/ForkPoolWorker-1] Task add[xxx] received

🧪 步骤五:调用任务(main.py)

from task import add, send_email

# 同步调用(等待结果)
result = add.delay(4, 5)  # 异步执行
print("Task ID:", result.id)
print("Result:", result.get())  # 获取结果(会阻塞直到完成)

# 异步发送邮件
task = send_email.delay("user@example.com", "Welcome!")
print("Email task sent with ID:", task.id)

🔁 delay() 是异步发起任务,不会立即执行。
get() 会阻塞直到任务完成并返回结果。

直接查询任务执行信息

在这里插入图片描述

from app import task
res = task.delay("古明地觉", 17)
print(type(res))
""""""
# 直接打印,显示任务的 id
print(res)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 获取状态, 显然此刻没有执行完
# 因此结果是PENDING, 表示等待状态
print(res.status)
"""
PENDING
"""
# 获取 id,两种方式均可
print(res.task_id)
print(res.id)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 获取任务执行结束时的时间
# 任务还没有结束, 所以返回None
print(res.date_done)
"""
None
"""
# 获取任务的返回值, 可以通过 result 或者 get()
# 注意: 如果是 result, 那么任务还没有执行完的话会直接返回 None
# 如果是 get(), 那么会阻塞直到任务完成
print(res.result)
print(res.get())
"""
None
name is 古明地觉, age is 17
"""
# 再次查看状态和执行结束时的时间
# 发现 status 变成SUCCESS
# date_done 变成了执行结束时的时间
print(res.status)
# 但显示的是 UTC 时间
print(res.date_done)
"""
SUCCESS
2022-09-08 06:40:34.525492
"""

# 1. ready():查看任务状态,返回布尔值
# 任务执行完成返回 True,否则为 False
# 那么问题来了,它和 successful() 有什么区别呢?
# successful() 是在任务执行成功之后返回 True, 否则返回 False
# 而 ready() 只要是任务没有处于阻塞状态就会返回 True
# 比如执行成功、执行失败、被 worker 拒收都看做是已经 ready 了
print(res.ready())
"""
False
"""
# 2. wait():和之前的 get 一样, 因为在源码中写了: wait = get
# 所以调用哪个都可以, 不过 wait 可能会被移除,建议直接用 get 就行
print(res.wait())
print(res.get())
"""
name is 古明地觉, age is 17
name is 古明地觉, age is 17
"""
# 3. trackback:如果任务抛出了一个异常,可以获取原始的回溯信息
# 执行成功就是 None
print(res.traceback)
"""
None
"""

⏱️ 步骤六:定时任务(Cron 任务)

也可以设置定时任务(每分钟执行一次):

@app.task
def every_minute_task():
    print("This runs every minute!")

# 在 main.py 中配置定时任务
from celery.schedules import crontab

app.conf.beat_schedule = {
    'every-minute': {
        'task': 'task.every_minute_task',
        'schedule': crontab(minute='*'),  # 每分钟执行
    },
}

# 启动 beat 服务
# celery -A task.celery_app beat --loglevel=info

✅ 优势与使用场景

场景 说明
发送邮件 用户注册后发送欢迎邮件,不卡页面
图片处理 用户上传图片 → 后台压缩/加水印
数据分析 大量数据计算,避免前端等待
定时任务 清理日志、统计报表生成
分布式部署 多个 worker 并行处理任务

Python进阶——Celery + Web

✅ 第一部分:Flask + Celery 示例

📦 准备工作

1. 安装依赖
pip install flask celery[redis] redis
2. 启动 Redis(本地运行)
redis-server

如果你用的是远程 Redis,请确保地址正确


🧪 步骤一:创建核心文件结构

project/
│
├── app.py              # Flask 主程序
├── tasks.py            # Celery 任务定义
└── requirements.txt

🧩 步骤二:定义 Celery 任务(tasks.py)

from celery import Celery
import time
import random

# 创建 Celery 应用实例
celery_app = Celery('tasks', broker='redis://100.25.69.56:6379/0', backend='redis://100.25.69.56:6379/0')

@celery_app.task
def send_email(to, subject):
    """模拟发送邮件"""
    print(f"Sending email to {to} with subject: {subject}")
    time.sleep(5)  # 模拟耗时操作
    success = random.choice([True, False])
    if success:
        return f"Email sent successfully to {to}"
    else:
        raise Exception("Failed to send email")

@celery_app.task
def long_process(data):
    """长时间处理任务"""
    time.sleep(10)
    return f"Processed: {data.upper()}"

💡 注意:

  • celery_app = Celery(...) 是标准做法
  • broker:Redis 地址(消息中间件)
  • backend:用于保存任务结果(可选但推荐)

🧩 步骤三:编写 Flask 主程序(app.py)

from flask import Flask, jsonify, request
from tasks import celery_app, send_email, long_process
from celery.result import AsyncResult
from datetime import datetime
import uuid

# 初始化 Flask 应用
app = Flask(__name__)

# 配置 Celery
app.config['CELERY_BROKER_URL'] = 'redis://10.25.69.56:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://10.25.69.56:6379/0'

# 配置 CORS(允许跨域访问)
from flask_cors import CORS
CORS(app)

# 注册 Celery 到 Flask 上下文
def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)
    return celery

# 使用自定义函数绑定 Celery
celery = make_celery(app)

# ================================
# API 接口定义
# ================================

@app.route('/api/send-email', methods=['POST'])
def send_email_task():
    data = request.get_json()
    to = data.get('to')
    subject = data.get('subject', 'Welcome!')

    if not to:
        return jsonify({'error': 'Missing "to" field'}), 400

    # 提交任务到 Celery
    task = send_email.delay(to, subject)
    
    return jsonify({
        'task_id': task.id,
        'status': 'PENDING',
        'message': 'Email task submitted!'
    })

@app.route('/api/check-task/<task_id>', methods=['GET'])
def check_task(task_id):
    """检查任务状态"""
    result = AsyncResult(task_id, app=celery)
    status = result.status
    if status == 'SUCCESS':
        return jsonify({
            'task_id': task_id,
            'status': status,
            'result': result.result
        })
    elif status == 'PENDING':
        return jsonify({
            'task_id': task_id,
            'status': status,
            'message': 'Task is still running...'
        })
    elif status == 'FAILURE':
        return jsonify({
            'task_id': task_id,
            'status': status,
            'error': str(result.info)
        })
    else:
        return jsonify({
            'task_id': task_id,
            'status': status,
            'message': 'Unknown status'
        })

@app.route('/api/long-process', methods=['POST'])
def long_process_task():
    data = request.get_json()
    task = long_process.delay(data.get('text', 'default'))
    return jsonify({
        'task_id': task.id,
        'status': 'PENDING',
        'message': 'Long process started!'
    })

# ================================
# 启动服务
# ================================

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

🧠 关键点解析

🔹 app = Flask(__name__)
  • Flask(__name__):创建 Flask 应用对象
  • __name__ 是当前模块名(通常为 app.py),用于定位资源路径
  • 这是 Flask 的“入口”——所有路由都挂载在这个对象上
🔹 app.config['CELERY_BROKER_URL']
  • 指定 消息队列服务器地址
  • 必须和 .py 中的 broker 一致
  • 格式:redis://host:port/dbamqp://user:pass@host:port/vhost
🔹 app.config['CELERY_RESULT_BACKEND']
  • 用来存储任务结果的地方(必须支持读写)
  • Redis 是最常用的(也可用数据库、RabbitMQ 等)
  • 作用:当你调用 result.get() 获取返回值时,就是从这里拿数据
  • Flask应用和Celery是两个独立的实例,但它们可以共享配置
  • Flask的app.config是一个字典对象,用于存储应用程序配置,这是一种配置集中化管理的最佳实践
  • Celery实例创建时从app.config读取相关配置
🔹 CORS(app)
  • 来自 flask_cors
  • 允许浏览器跨域请求(否则前端会报错)
  • 举个例子:前端在 http://localhost:3000 访问 http://localhost:5000 就会被阻止 → 加上 CORS 即可解决
🔹 make_celery(app) 函数
  • 解决 Flask 与 Celery 的集成问题
  • 因为 Celery 需要知道如何加载它的配置(比如 Broker URL)
  • 我们把 Flask 的 config 传给 Celery 实例

🧪 启动方式

  1. 启动 Redis(确保能连接)
  2. 启动 Celery Worker(单独终端)
# 启动 Worker
celery -A tasks.celery_app worker --loglevel=info &> ./logs/celery.log &
  1. 启动 Flask 服务
python app.py

📡 测试接口

使用 Postman 或 curl 测试:

1. 提交任务
curl -X POST http://localhost:5000/api/send-email \
  -H "Content-Type: application/json" \
  -d '{"to": "test@example.com", "subject": "Hello"}'

响应:

{
  "task_id": "a1b2c3d4-e5f6-789g-hijk-lmnopqrstuv",
  "status": "PENDING",
  "message": "Email task submitted!"
}
2. 查询任务状态
curl http://localhost:5000/api/check-task/a1b2c3d4-e5f6-789g-hijk-lmnopqrstuv

响应(等5秒后):

{
  "task_id": "a1b2c3d4-e5f6-789g-hijk-lmnopqrstuv",
  "status": "SUCCESS",
  "result": "Email sent successfully to test@example.com"
}

✅ 第二部分:FastAPI + Celery 示例

FastAPI 更现代,性能强,适合微服务架构。

📦 依赖安装

pip install fastapi uvicorn celery[redis] redis

🧩 文件结构

project/
│
├── main.py             # FastAPI 主程序
├── tasks.py            # Celery 任务
└── requirements.txt

🧩 任务定义(tasks.py)同上 ✅

不需要改!


🧩 FastAPI 主程序(main.py)

from fastapi import FastAPI, HTTPException, BackgroundTasks
from typing import Dict, Any
from tasks import celery_app, send_email, long_process
from celery.result import AsyncResult
import uuid
import asyncio

app = FastAPI(title="FastAPI + Celery Demo")

# 配置 Celery
app.state.celery = celery_app

# 添加 CORS 支持
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 允许所有 origin(生产环境建议限制)
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ================================
# API 接口
# ================================

@app.post("/api/send-email")
async def send_email_task(to: str, subject: str = "Welcome"):
    if not to:
        raise HTTPException(status_code=400, detail="Missing 'to' field")

    # 提交任务
    task = send_email.delay(to, subject)
    return {
        "task_id": task.id,
        "status": "PENDING",
        "message": "Email task queued"
    }

@app.get("/api/check-task/{task_id}")
async def check_task(task_id: str):
    result = AsyncResult(task_id, app=app.state.celery)
    if result.ready():
        if result.successful():
            return {
                "task_id": task_id,
                "status": "SUCCESS",
                "result": result.result
            }
        else:
            return {
                "task_id": task_id,
                "status": "FAILURE",
                "error": str(result.info)
            }
    else:
        return {
            "task_id": task_id,
            "status": "PENDING",
            "message": "Task still running..."
        }

@app.post("/api/long-process")
async def long_process_task(text: str = "default"):
    task = long_process.delay(text)
    return {
        "task_id": task.id,
        "status": "PENDING",
        "message": "Long process started!"
    }

# ================================
# 启动服务(可选)
# ================================

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

🧠 关键点解析

🔹 app = FastAPI()
  • 与 Flask 类似,但更简洁、自动文档化(Swagger UI)
  • 自带 JSON 请求体解析、路径参数、类型提示
🔹 app.state.celery
  • FastAPI 使用 state 字典共享全局对象
  • 把 Celery 实例放进 app.state.celery 方便其他地方访问
🔹 BackgroundTasks
  • 有时我们会用它来添加后台任务(非阻塞)
  • 但在本例中我们直接使用 Celery,所以没用到
🔹 CORSMiddleware
  • FastAPI 的方式,比 Flask 更灵活
  • allow_origins=["*"] 表示允许任何域名访问(开发时可用)

🚀 启动测试

  1. 启动 Redis
  2. 启动 Celery Worker:
celery -A tasks.celery_app worker --loglevel=info &> ./logs/celery.log &
  1. 启动 FastAPI:
uvicorn main:app --reload --port 8000
  1. 前端访问:
    • http://localhost:8000/docs → 查看 Swagger 文档
    • 调用 /api/send-email 接口

🎯 总结对比表

特性 Flask + Celery FastAPI + Celery
是否支持异步 ❌(需手动处理) ✅(内置 async)
开发速度 快(简单) 较快(带类型提示)
文档生成 需要额外工具 内置 Swagger UI
并发处理 一般 强大(基于 Starlette)
错误处理 手动写 更规范
适合场景 小型项目、传统后端 微服务、高性能 API

✅ 最佳实践建议

✅ 1. 环境变量管理(推荐)

避免硬编码 Redis 地址:

import os

broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

celery_app = Celery('tasks', broker=broker_url, backend=result_backend)

然后创建 .env 文件:

CELERY_BROKER_URL=redis://10.25.69.56:6379/0
CELERY_RESULT_BACKEND=redis://10.25.69.56:6379/0

使用 python-dotenv 加载。


✅ 2. 日志记录与监控

  • worker 启动时加上日志输出
  • 使用 flower 监控 Celery(图形界面)
pip install flower
celery -A tasks.celery_app flower --port=5555

访问:http://localhost:5555


✅ 3. 任务重试机制

tasks.py 中添加重试:

@celery_app.task(bind=True, max_retries=3, default_retry_delay=5)
def send_email(self, to, subject):
    try:
        # ... 发送邮件逻辑
        pass
    except Exception as exc:
        self.retry(countdown=5, exc=exc)

📘 下一步提升

如果你想深入:

  1. 将 Redis 替换为 RabbitMQ(适合高吞吐)
  2. 使用 Docker 部署整个系统
  3. 结合 Flask/FastAPI + WebSockets 实现实时通知
  4. 学习 Celery 多队列、优先级调度
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐