一、部署架构总览

我们将基于你之前的带人工干预的双智能体系统,构建一个完整的生产级部署方案,包含三个核心部分:

  1. FastAPI 接口层:封装 Agent 为标准 HTTP 接口,支持任务启动、人工干预、状态查询
  2. Redis 持久化层:替代 MemorySaver,实现状态持久化和服务重启恢复
  3. Docker 容器化:打包整个应用为镜像,实现一键部署和环境一致性

标准部署架构图

plaintext

客户端 → FastAPI 接口 → LangGraph Agent → Redis(状态持久化)

二、第一步:项目结构标准化

先将你的代码整理为生产级项目结构:

plaintext

langgraph-agent-deploy/
├── agent/                  # Agent核心逻辑
│   ├── __init__.py
│   ├── multi_agent.py      # 双智能体系统代码
│   └── state.py            # 状态定义
├── main.py                 # FastAPI入口
├── requirements.txt        # 依赖清单
├── Dockerfile              # 镜像构建文件
├── docker-compose.yml      # 服务编排
└── .env                    # 环境变量配置

三、第二步:FastAPI 接口封装

3.1 核心依赖安装

bash

运行

pip install fastapi uvicorn python-multipart langgraph-checkpoint-redis

3.2 完整 FastAPI 代码

# main.py
import asyncio
import traceback
from typing import Optional

import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from langgraph.types import Command, interrupt
from pydantic import BaseModel
from agent.multi_agent import build_multi_agent_system
from agent.state import MultiAgentState

# 初始化FastAPI
app = FastAPI(title="LangGraph 双智能体写作系统 API", version="1.0.0", description="Language Graph Agent")

# 配置CORS 允许前端跨域访问
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],    # 生产环境改为具体域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化 Agent 系统 (全局单例,避免重复初始化)
agent = build_multi_agent_system()

# 请求响应模型
class StartTaskRequest(BaseModel):
    thread_id: str
    task: str
    topic: str

class ResumeTaskRequest(BaseModel):
    thread_id: str
    human_input: str

class TaskStatusResponse(BaseModel):
    thread_id: str
    status: str # running interrupted   completed   failed
    current_step: Optional[str] = None
    interrupt_info: Optional[dict] = None
    final_article: Optional[str] = None
    error: Optional[str] = None

#    核心接口
@app.post("/api/task/start", response_model=TaskStatusResponse)
async def start_task(request: StartTaskRequest):
    """启动一个新的写作任务"""

    try:
        config = {"configurable" : {"thread_id": request.thread_id}}

        # 初始化任务状态
        initial_state: MultiAgentState = {
            "task": request.task,
            "topic": request.topic,
            "research_notes": "",
            "draft": "",
            "final_article": "",
            "current_agent": "researcher",
            "turn_count": 0,
            "max_turns": 5,
            "messages": [],
            "error": None,
            "human_feedback": ""
        }

        # 异步执行Agent 避免阻塞事件循环
        result = await  asyncio.to_thread(
            agent.invoke,
            initial_state,
            config
        )

        # 构建响应
        return await _build_status_response(request.thread_id, result)
    except Exception as e:
        print(f"❌ 详细错误:\n{traceback.format_exc()}")  # 打印完整堆栈
        raise HTTPException(status_code=500, detail=f'任务启动失败:{str(e)}')

@app.post("/api/task/resume", response_model=TaskStatusResponse)
async def resume_task(request: ResumeTaskRequest):
    """恢复被中断的任务,传入人类反馈"""
    try:
        config = {"configurable" : {"thread_id": request.thread_id}}

        # 用Command 恢复执行
        result = await asyncio.to_thread(
            agent.invoke,
            Command(resume=request.human_input),
            config=config
        )
        return await _build_status_response(request.thread_id, result)
    except Exception as e:
        print(f"❌ 详细错误:\n{traceback.format_exc()}")  # 打印完整堆栈
        raise HTTPException(status_code=500, detail=f"任务恢复失败{str(e)}")

@app.get("/api/task/status/{thread_id}", response_model=TaskStatusResponse)
async def get_task_status(thread_id: str):
    """查询任务当前状态"""

    try:
        config = {"configurable" : {"thread_id": thread_id}}

        #获取最新状态
        state = await asyncio.to_thread(agent.get_state, config)
        result = state.values
        # 检查是否有中断(从 tasks 获取)
        if state.tasks:
            # 有待处理的任务,说明被中断了
            task = state.tasks[0]
            if hasattr(task, 'interrupts') and task.interrupts:
                interrupt_info = task.interrupts[0].value
                return TaskStatusResponse(
                    thread_id=thread_id,
                    status="interrupt",
                    current_step=interrupt_info.get("type"),
                    interrupt_info=interrupt_info,
                    error=interrupt_info.get("error"),
                )
        return await _build_status_response(thread_id, result)
    except Exception as e:
        print(f"❌ 详细错误:\n{traceback.format_exc()}")  # 打印完整堆栈
        raise HTTPException(status_code=500, detail=f"状态查询失败:{str(e)}")

# 辅助函数
async def _build_status_response(thread_id: str, result: dict) -> TaskStatusResponse:
    """构建统一的响应状态"""

    # 检查是否有中断
    if "__interrupt" in result:
        interrupt_info = result["__interrupt"][0].value
        return TaskStatusResponse(
            thread_id=thread_id,
            status="interrupt",
            current_step=interrupt_info.get("type"),
            interrupt_info=interrupt_info,
            error=interrupt_info.get("error"),
        )

    # 检查是否完整
    if result.get("final_article"):
        return TaskStatusResponse(
            thread_id=thread_id,
            status="completed",
            final_article=result["final_article"],
            error=result.get("error"),
        )

    # 检查是否出错
    if result.get("error"):
        return TaskStatusResponse(
            thread_id=thread_id,
            status="failed",
            error=result["error"],
        )

    # 运行中
    return TaskStatusResponse(
        thread_id=thread_id,
        status="running",
        current_step=result.get("current_agent"),
        error=result.get("error")
    )

# 启动服务
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

3.3 修改 Agent 代码,使用 Redis 持久化

将之前的MemorySaver替换为RedisSaver,实现状态持久化:

# agent/multi_agent.py
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", message=".*InsecureKeyLengthWarning.*")
from typing import Literal

from dotenv import load_dotenv
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.messages import AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langgraph.constants import START, END
from langgraph.checkpoint.redis import RedisSaver
from langgraph.graph import StateGraph
from langgraph.types import interrupt
from pydantic_settings import BaseSettings, SettingsConfigDict

from agent.state import MultiAgentState

# 加载环境变量
load_dotenv()

class Settings(BaseSettings):
    ZHIPU_API_KEY: str
    ZHIPU_BASE_URL: str
    LLM_MODEL: str
    LLM_BACKUP_MODEL: str
    LLM_TIMEOUT: int=600
    MAX_TURNS: int=3

    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore"
    )

settings = Settings()

# 初始化LLM
llm = ChatZhipuAI(
    api_key=settings.ZHIPU_API_KEY,
    model=settings.LLM_BACKUP_MODEL,
    temperature=0.7
)

# 智能体1:研究员(带人工确认)
def create_researcher_agent():
    researcher_prompt = ChatPromptTemplate.from_messages([
        ("system", """
        你是专业的资深研究员,擅长围绕特定主题收集、整理、提炼准确、全面的资料。
        如果有人类提供的反馈,请优先根据反馈补充或修改研究笔记。
        """),
        ("human", """
        写作任务:{task}
        写作主题:{topic}
        人类反馈:{human_feedback}
        请提供详细的结构化研究笔记。"""
         )
    ])
    def researcher_node(state: MultiAgentState):
        print("\n-- 🔬 研究员开始工作 --")
        try:
            # 调用LLM生成研究员笔记
            response = llm.invoke(researcher_prompt.format(
                task=state["task"],
                topic=state["topic"],
                human_feedback=state.get("human_feedback", "无")
            ))
            research_notes = response.content
            print("✅️ 研究员完成研究笔记")


        except Exception as e:
            print(f"❌️ 研究员工作失败:{str(e)}")
            return {
                "error": str(e),
                "turn_count": state["turn_count"] + 1,
            }
        # ✅️ 关键:出发人工干预 等待人类确认
        print("\n-- 🔄 等待人类确认研究笔记...")
        human_approval = interrupt({
            "type": "research_approval",
            "topic": state["topic"],
            "research_notes": research_notes,
            "prompt": "请确认研究笔记是否合格?输入'通过'继续,或输入意见让研究院补充:"
        })

        # 当人类输入后,代码从这里继续执行
        print(f"\n 📝收到人类反馈:{human_approval}")

        # 如果人类输入“通过”,进入下一步
        if human_approval.strip() == "通过":
            return {
                "research_notes": research_notes,
                "current_agent": "writer",
                "turn_count": state["turn_count"] + 1,
                "human_feedback": "",
                "messages": [AIMessage(content=f"研究员已完成研究笔记:\n{research_notes}")]
            }
        # 否则,研究员根据人类反馈修改研究笔记
        else:
            print(f'🔄 研究员根据人类反馈修改研究笔记...')
            return {
                "human_feedback": human_approval,
                "turn_count": state["turn_count"] + 1,
                "current_agent": "researcher",  # 回到研究员节点,重新生成
            }

    builder = StateGraph(MultiAgentState)
    builder.add_node("researcher", researcher_node)
    builder.set_entry_point("researcher")
    return builder.compile()

# 智能体2: 作家 带人工审核
def create_writer_agent():
    writer_prompt = ChatPromptTemplate.from_messages([
        ("system", """
        你是专业的资深作家,擅长基于研究员提供的资料,创作高质量、结构清晰、语言流畅的文章。
        如果有人类提供的修改意见,请优先根据意见修改文章。
        """),
        ("human", """
        写作任务:{task}
        写作主题:{topic}
        研究笔记:{research_notes}
        人类修改意见:{human_feedback}
        请写一篇完整的文章初稿。""")
    ])

    def writer_node(state: MultiAgentState):
        print("\n-- ✍️ 作家开始工作 --")
        try:
            # 调用LLM生成初稿
            response = llm.invoke(writer_prompt.format(
                task=state["task"],
                topic=state["topic"],
                research_notes=state["research_notes"],
                human_feedback=state.get("human_feedback", "无")
            ))
            draft = response.content
            print("✅️ 作家完成文章初稿")


        except Exception as e:
            print(f"❌️ 作家工作失败:{str(e)}")
            return {
                "error": str(e),
                "turn_count": state["turn_count"] + 1,
            }
        # ✅️ 关键:出发人工干预,等待人类审核
        print("\n 🔄 等待人类审核文章初稿...")
        human_approval = interrupt({
            "type": "writer_approval",
            "topic": state["topic"],
            "draft": draft,
            "prompt": "请审核文章初稿:输入'通过' 生成最终文章,或输入修改意见让作家修改:"
        })

        # 人类输入后继续执行
        print(f"\n 📝 收到人类审核意见:{human_approval}")

        if human_approval.strip() == "通过":
            return {
                "draft": draft,
                "current_agent": "finalize",
                "turn_count": state["turn_count"] + 1,
                "human_feedback": "",
                "messages": [AIMessage(content=f"作家已完成文章初稿:\n{draft}")]
            }
        else:
            print("🔄 作家根据人类意见修改文章")
            return {
                "human_feedback": human_approval,
                "turn_count": state["turn_count"] + 1,
                "current_agent": "writer",  # 回到作家起点 重新生成
            }
    builder = StateGraph(MultiAgentState)
    builder.add_node("writer", writer_node)
    builder.set_entry_point("writer")
    return builder.compile()

def build_multi_agent_system():
    researcher_agent = create_researcher_agent()
    writer_agent = create_writer_agent()

    builder = StateGraph(MultiAgentState)

    # 把子图作为节点加入主图
    builder.add_node("researcher", researcher_agent)
    builder.add_node("writer", writer_agent)

    # 最终汇成总节点
    def finalize_node(state: MultiAgentState):
        print("\n-- 🎉 任务完成 生成最终文章 --")
        return {
            "final_article": state["draft"],
            "messages": [AIMessage(content=f"最终文章:\n{state['draft']}")],
        }

    builder.add_node("finalize", finalize_node)

    # 条件路由
    def router(state: MultiAgentState) -> Literal["researcher", "writer", "finalize", "end"]:
        if state.get("error"):
            print(f"❌️ 系统出错:{state['error']}, 任务结束")
            return "end"

        if state.get("turn_count") >= state["max_turns"]:
            print("⚠️ 超过最大交互轮次,强制结束任务")
            return "finalize"

        current_agent = state["current_agent"]
        if current_agent == "researcher":
            return "researcher"
        elif current_agent == "writer":
            return "writer"
        elif current_agent == "finalize":
            return "finalize"
        else:
            return "finalize"

    builder.add_edge(START, "researcher")
    builder.add_conditional_edges(
        "researcher",
        router,
        {
            "writer": "writer",
            "finalize": "finalize",
            "end": END
        }
    )

    builder.add_conditional_edges(
        "writer",
        router,
        {
            "researcher": "researcher",
            "finalize": "finalize",
            "end": END
        }
    )
    builder.add_edge("finalize", END)

    # 从环境变量获取Redis连接地址
    checkpointer = RedisSaver("redis://localhost:6379/0")
    checkpointer.setup()
    return builder.compile(checkpointer=checkpointer)
state.py
from typing import TypedDict, Literal, Annotated, Sequence

from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages


# 全局状态定义
class MultiAgentState(TypedDict):
    task: str
    topic: str
    research_notes: str
    draft: str
    final_article: str
    current_agent: Literal["researcher", "writer", "human"]
    turn_count: int
    max_turns: int
    messages: Annotated[Sequence[BaseMessage], add_messages]
    error: str | None
    human_feedback: str  # ✅️ 新增: 存储人类反馈

四、第三步:Docker 容器化

4.1 编写 requirements.txt

# requirements.txt
fastapi==0.115.0
uvicorn==0.30.6
python-multipart==0.0.12
langgraph==0.2.39
langchain==0.2.16
langchain-openai==0.1.25
pydantic-settings==2.5.2
python-dotenv==1.0.1
langgraph-checkpoint-redis==1.0.11
redis==5.2.1

4.2 编写 Dockerfile(多阶段构建,优化镜像体积)

dockerfile

# Dockerfile
# 第一阶段:构建阶段
FROM python:3.11-slim AS builder

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# 创建虚拟环境
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r requirements.txt

# 第二阶段:运行阶段
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 复制虚拟环境
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# 创建非root用户,增强安全性
RUN useradd -m appuser
USER appuser

# 复制应用代码
COPY --chown=appuser:appuser . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

4.3 编写 docker-compose.yml

yaml

# docker-compose.yml
version: '3.8'

volumes:
  redis-data:
    driver: local

services:
  # Redis服务:状态持久化
  redis:
    image: redis:7-alpine
    container_name: langgraph-redis
    restart: unless-stopped
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 1s
      retries: 5

  # LangGraph Agent API服务
  langgraph-agent:
    build: .
    container_name: langgraph-agent
    restart: unless-stopped
    ports:
      - "8000:8000"
    depends_on:
      redis:
        condition: service_healthy
    env_file:
      - .env
    environment:
      - REDIS_URI=redis://redis:6379/0
      - PYTHONUNBUFFERED=1  # 禁用Python输出缓冲,确保日志实时显示

4.4 编写 .env 配置文件

env

# .env
# 智谱API配置
ZHIPU_API_KEY=你的智谱API密钥
ZHIPU_BASE_URL=https://open.bigmodel.cn/api/paas/v4
LLM_MODEL=glm-4.6
LLM_BACKUP_MODEL=glm-4-flash
LLM_TIMEOUT=30

# Redis配置(docker-compose中会覆盖这个值)
REDIS_URI=redis://localhost:6379/0

五、第四步:部署与运行

5.1 一键启动所有服务

# 构建镜像并启动服务
docker-compose up -d --build

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs -f langgraph-agent

5.2 测试 API 接口

1. 启动任务
curl -X POST http://localhost:8000/api/task/start \
  -H "Content-Type: application/json" \
  -d '{
    "thread_id": "test_task_001",
    "task": "写一篇关于LangGraph多智能体系统的技术文章",
    "topic": "LangGraph多智能体系统的原理、架构与实现"
  }'

2. 查询任务状态

bash

curl http://localhost:8000/api/task/status/test_task_001

3. 恢复中断任务(当任务等待人工确认时)

bash

curl -X POST http://localhost:8000/api/task/resume \
  -H "Content-Type: application/json" \
  -d '{
    "thread_id": "test_task_001",
    "human_input": "通过"
  }'

4. 访问自动生成的 API 文档

六、生产环境优化最佳实践

6.1 性能优化

  1. 增加 Worker 进程数:在 Dockerfile 的 CMD 中调整--workers参数,通常设置为 CPU 核心数的 2-4 倍
  2. 使用 Gunicorn + Uvicorn:生产环境推荐用 Gunicorn 管理 Uvicorn Worker,提高并发能力

    dockerfile

    CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]
    
  3. 添加 Nginx 反向代理:处理静态资源、SSL 终止、负载均衡

6.2 安全优化

  1. 限制 CORS 来源:将allow_origins=["*"]改为具体的前端域名
  2. 添加 API 密钥认证:给接口添加 API 密钥验证,防止未授权访问
  3. 使用 Docker Secrets:生产环境不要用.env 文件存储敏感信息,改用 Docker Secrets 或 Kubernetes Secrets
  4. 定期更新基础镜像:及时修复安全漏洞

6.3 可观测性

  1. 添加日志配置:配置结构化日志,方便日志收集和分析
  2. 添加健康检查接口

    python

    @app.get("/health")
    async def health_check():
        return {"status": "healthy", "timestamp": datetime.now().isoformat()}
    
  3. 集成监控系统:用 Prometheus + Grafana 监控服务指标

6.4 高可用部署

  1. 多实例部署:启动多个 Agent 实例,用 Nginx 或 Kubernetes 进行负载均衡
  2. Redis 集群:生产环境使用 Redis 集群,保证数据高可用
  3. 自动扩缩容:根据请求量自动调整实例数量

七、常见问题排查

  1. Redis 连接失败:检查 docker-compose 中的 Redis 服务是否正常启动,REDIS_URI 是否正确
  2. 接口超时:增加 Uvicorn 的超时时间,或优化 Agent 执行逻辑
  3. 状态丢失:确保使用了 RedisSaver,并且 Redis 数据卷配置正确
  4. 镜像体积过大:使用多阶段构建,删除不必要的依赖和文件

按照这个方案部署后,你的 LangGraph Agent 系统就具备了生产级的稳定性、可扩展性和可维护性,可以安全地对外提供服务。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐