三层架构设计:前端+业务+AI层的职责划分

前言

在构建企业级AI Agent系统时,清晰的架构分层至关重要。本文将详细介绍本项目采用的三层架构设计,以及各层的职责划分和通信机制。

适合读者: 架构师、全栈工程师、技术Leader


一、为什么需要三层架构

1.1 单体架构的问题

传统单体架构:
Frontend + Backend + AI 混在一起
├── 代码耦合严重
├── 难以独立扩展
├── 技术栈受限
└── 团队协作困难

1.2 三层架构的优势

三层架构:
Frontend ← → Server ← → Agent
├── 职责清晰
├── 独立部署
├── 技术栈自由
├── 易于扩展
└── 团队并行开发

二、架构全景图

2.1 系统架构

┌─────────────────────────────────────────────────────────┐
│                      用户层                              │
│                   (浏览器/移动端)                         │
└────────────────────┬────────────────────────────────────┘
                     │ HTTPS
                     ▼
┌─────────────────────────────────────────────────────────┐
│                   Frontend Layer                         │
│              Next.js + React + TailwindCSS               │
│                   (Port: 3000)                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ 聊天界面 │  │ 用户认证 │  │ 对话管理 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────┬────────────────────────────────────┘
                     │ HTTP/SSE
                     ▼
┌─────────────────────────────────────────────────────────┐
│                   Server Layer                           │
│          FastAPI + PostgreSQL + Redis                    │
│                   (Port: 8000)                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ 业务逻辑 │  │ 用户管理 │  │ 数据存储 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ JWT认证  │  │ 缓存管理 │  │ 日志记录 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────┬────────────────────────────────────┘
                     │ HTTP/SSE
                     ▼
┌─────────────────────────────────────────────────────────┐
│                    Agent Layer                           │
│         LangChain + Ollama + Weaviate                    │
│                   (Port: 8001)                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ RAG检索  │  │ Prompt组装│  │ 流式生成 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────┬────────────────────────────────────┘
                     │
        ┌────────────┼────────────┐
        ▼            ▼            ▼
   ┌────────┐  ┌────────┐  ┌──────────┐
   │Weaviate│  │ Ollama │  │PostgreSQL│
   │ :8080  │  │:11434  │  │  :5432   │
   └────────┘  └────────┘  └──────────┘

三、Frontend Layer(前端层)

3.1 核心职责

主要职责:

  • 🎨 用户界面 - 提供友好的交互体验
  • 🔐 身份认证 - Token管理和自动刷新
  • 📡 实时通信 - SSE流式接收AI回复
  • 💾 状态管理 - 对话历史和用户状态
  • 🎯 路由管理 - 页面导航和权限控制

3.2 技术栈

// 技术选型
Frontend Stack:
├── Next.js 13.5.6      // React框架,支持SSR
├── React 18.2.0        // UI组件库
├── TypeScript 5.2.2    // 类型安全
├── TailwindCSS 3.3.5   // 原子化CSS
├── Axios 1.6.0         // HTTP客户端
└── @microsoft/fetch-event-source 2.0.1  // SSE支持

3.3 核心代码示例

// frontend/app/chat/page.tsx
'use client'

import { useState } from 'react'
import { streamChat } from '@/lib/api-client'

export default function ChatPage() {
  const [messages, setMessages] = useState<Message[]>([])
  const [inputValue, setInputValue] = useState('')
  const [isStreaming, setIsStreaming] = useState(false)
  const [currentAnswer, setCurrentAnswer] = useState('')

  const handleSend = async () => {
    if (!inputValue.trim() || isStreaming) return

    // 添加用户消息
    const userMessage = {
      id: Date.now().toString(),
      role: 'user',
      content: inputValue
    }
    setMessages(prev => [...prev, userMessage])
    setInputValue('')
    setIsStreaming(true)
    setCurrentAnswer('')

    try {
      // 调用Server API,接收流式响应
      await streamChat(userMessage.content, {
        onToken: (token) => {
          setCurrentAnswer(prev => prev + token)
        },
        onDone: (data) => {
          const aiMessage = {
            id: Date.now().toString(),
            role: 'assistant',
            content: data.answer
          }
          setMessages(prev => [...prev, aiMessage])
          setCurrentAnswer('')
          setIsStreaming(false)
        },
        onError: (error) => {
          console.error('错误:', error)
          setIsStreaming(false)
        }
      })
    } catch (error) {
      console.error('发送失败:', error)
      setIsStreaming(false)
    }
  }

  return (
    <div className="flex flex-col h-screen">
      {/* 消息列表 */}
      <div className="flex-1 overflow-y-auto p-4">
        {messages.map((msg) => (
          <MessageBubble key={msg.id} message={msg} />
        ))}
        
        {/* 实时流式消息 */}
        {currentAnswer && (
          <div className="bg-gray-100 rounded-lg p-4">
            {currentAnswer}
            <span className="animate-pulse"></span>
          </div>
        )}
      </div>

      {/* 输入框 */}
      <div className="p-4 border-t">
        <input
          value={inputValue}
          onChange={(e) => setInputValue(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter' && !e.shiftKey) {
              e.preventDefault()
              handleSend()
            }
          }}
          placeholder="输入问题..."
          disabled={isStreaming}
        />
        <button onClick={handleSend} disabled={!inputValue.trim() || isStreaming}>
          {isStreaming ? '生成中...' : '发送'}
        </button>
      </div>
    </div>
  )
}

四、Server Layer(业务层)

4.1 核心职责

主要职责:

  • 🔒 认证授权 - JWT双Token机制
  • 💼 业务逻辑 - 用户管理、对话管理
  • 🗄️ 数据持久化 - PostgreSQL存储
  • 缓存管理 - Redis加速
  • 🔗 服务编排 - 协调Frontend和Agent
  • 📊 日志监控 - Loguru结构化日志

4.2 技术栈

# 技术选型
Server Stack:
├── FastAPI 0.100.0         # 现代化异步Web框架
├── Uvicorn 0.31.1          # ASGI服务器
├── SQLAlchemy 2.0.18+      # 异步ORM
├── PostgreSQL              # 关系型数据库
├── Redis 4.6.0+            # 缓存和会话
├── Alembic 1.11.1+         # 数据库迁移
├── Pydantic 2.0.0+         # 数据验证
├── Python-Jose 3.3.0+      # JWT认证
├── Passlib[bcrypt] 1.7.4+  # 密码加密
└── Loguru 0.7.0+           # 结构化日志

4.3 核心代码示例

# server/api/agent.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
import httpx
import json

router = APIRouter(prefix="/api/agent", tags=["agent"])

AGENT_URL = "http://localhost:8001"

@router.post("/chat/stream")
async def agent_stream(
    question: str,
    current_user = Depends(get_current_user)
):
    """
    代理Agent流式接口
    
    Server作为中间层:
    1. 验证用户身份(JWT)
    2. 记录请求日志
    3. 转发到Agent服务
    4. 保存对话历史
    """
    
    # 记录请求
    logger.info(f"用户 {current_user.username} 发起问答: {question}")
    
    async def event_generator():
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                f"{AGENT_URL}/stream",
                json={"question": question},
                timeout=60.0
            ) as response:
                async for line in response.aiter_lines():
                    if line:
                        yield f"{line}\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )


# server/api/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix="/api/auth", tags=["auth"])

@router.post("/login")
async def login(
    username: str,
    password: str,
    db: AsyncSession = Depends(get_db)
):
    """用户登录"""
    # 验证用户
    user = await authenticate_user(db, username, password)
    if not user:
        raise HTTPException(status_code=401, detail="用户名或密码错误")
    
    # 生成Token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=timedelta(minutes=30)
    )
    refresh_token = create_refresh_token(
        data={"sub": user.username},
        expires_delta=timedelta(days=7)
    )
    
    return {
        "code": 0,
        "msg": "登录成功",
        "data": {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer"
        }
    }


# server/api/conversations.py
@router.post("")
async def create_conversation(
    title: str,
    current_user = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    """创建新对话"""
    conversation = Conversation(
        user_id=current_user.id,
        title=title
    )
    db.add(conversation)
    await db.commit()
    await db.refresh(conversation)
    
    return {
        "code": 0,
        "msg": "创建成功",
        "data": conversation
    }

五、Agent Layer(AI层)

5.1 核心职责

主要职责:

  • 🔍 向量检索 - Weaviate相似度搜索
  • 🧠 AI推理 - Ollama LLM生成
  • 📝 Prompt工程 - 动态组装上下文
  • 🌊 流式生成 - 实时返回Token
  • 📊 结果格式化 - 统一响应格式

5.2 技术栈

# 技术选型
Agent Stack:
├── LangChain 0.1.0+           # AI应用开发框架
├── LangChain-Community 0.0.10+ # 社区集成
├── Ollama 0.1.0+              # 本地LLM服务
├── Weaviate 1.27.1            # 向量数据库
├── FastAPI 0.104.0+           # HTTP服务框架
└── Pandas 2.0.0+              # 数据处理

AI模型:
├── llama3.2:latest     # 对话生成模型
└── nomic-embed-text    # 文本向量化模型

5.3 核心代码示例

# agent/http_service.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json

from agent import ServiceTicketAgent

app = FastAPI(title="Service Ticket Agent")

class ChatRequest(BaseModel):
    question: str

agent_instance = None

def get_agent():
    global agent_instance
    if agent_instance is None:
        agent_instance = ServiceTicketAgent()
    return agent_instance


@app.post("/stream")
async def stream_chat(request: ChatRequest):
    """
    流式对话接口(SSE)
    
    返回SSE格式的流式数据:
    - event: thinking (思考状态)
    - event: sources (检索来源)
    - event: token (逐个token)
    - event: done (完成)
    - event: error (错误)
    """
    async def event_generator():
        agent = get_agent()
        
        try:
            async for event in agent.ask_stream(request.question):
                event_type = event.get("type")
                event_data = event.get("data", {})
                
                # 格式化为SSE
                yield f"event: {event_type}\n"
                yield f"data: {json.dumps(event_data, ensure_ascii=False)}\n\n"
                
                if event_type in ["done", "error"]:
                    break
                    
        except Exception as e:
            error_event = {"code": 500, "msg": f"Agent错误: {str(e)}"}
            yield f"event: error\n"
            yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )


# agent/ticket_agent.py
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

class ServiceTicketAgent:
    def _setup_qa_chain(self):
        """设置问答链(LCEL)"""
        
        def retrieve_and_format(question):
            docs = self._search_similar_documents(question)
            return "\n\n".join(doc.page_content for doc in docs)
        
        # LCEL链式调用
        self.qa_chain = (
            {
                "context": retrieve_and_format,
                "question": RunnablePassthrough()
            }
            | self.prompt_template
            | self.llm
            | StrOutputParser()
        )
        
        return self.qa_chain
    
    async def ask_stream(self, question: str):
        """流式问答"""
        # 1. 检索相关工单
        yield {
            "type": "thinking",
            "data": {"status": "retrieving", "message": "正在检索相关工单..."}
        }
        
        source_docs = self._search_similar_documents(question)
        
        # 2. 返回检索结果
        yield {
            "type": "sources",
            "data": {"sources": [...], "count": len(source_docs)}
        }
        
        # 3. 流式生成答案
        qa_chain = self._setup_qa_chain()
        
        full_answer = ""
        async for chunk in qa_chain.astream(question):
            token = str(chunk)
            full_answer += token
            
            yield {
                "type": "token",
                "data": {"token": token}
            }
        
        # 4. 完成
        yield {
            "type": "done",
            "data": {"answer": full_answer, "metadata": {...}}
        }

六、层间通信协议

6.1 Frontend ↔ Server

// HTTP请求
POST /api/auth/login
Content-Type: application/json

{
  "username": "user",
  "password": "password"
}

// HTTP响应
{
  "code": 0,
  "msg": "success",
  "data": {
    "access_token": "eyJ...",
    "refresh_token": "eyJ..."
  }
}

// SSE流式响应
POST /api/agent/chat/stream
Authorization: Bearer eyJ...

event: token
data: {"token": "根据"}

event: token
data: {"token": "历史"}

event: done
data: {"answer": "...", "metadata": {...}}

6.2 Server ↔ Agent

# HTTP代理
POST http://localhost:8001/stream
Content-Type: application/json

{
  "question": "物流信息5天没更新,怎么处理?"
}

# SSE响应
event: thinking
data: {"status": "retrieving", "message": "正在检索相关工单..."}

event: sources
data: {"sources": [...], "count": 5}

event: token
data: {"token": "根据"}

event: done
data: {"answer": "...", "metadata": {...}}

七、独立部署与扩展

7.1 独立部署

# docker-compose.yml
version: '3.8'

services:
  frontend:
    build: ./frontend
    ports:
      - "3000:3000"
    depends_on:
      - server

  server:
    build: ./server
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
      - agent

  agent:
    build: ./agent
    ports:
      - "8001:8001"
    depends_on:
      - ollama
      - weaviate

  postgres:
    image: postgres:15
    ports:
      - "5432:5432"

  redis:
    image: redis:7
    ports:
      - "6379:6379"

  weaviate:
    image: semitechnologies/weaviate:1.27.1
    ports:
      - "8080:8080"

  ollama:
    image: ollama/ollama:latest
    ports:
      - "11434:11434"

7.2 水平扩展

# 扩展Agent层
services:
  agent:
    build: ./agent
    deploy:
      replicas: 3  # 3个Agent实例
    ports:
      - "8001-8003:8001"

  # 负载均衡
  nginx:
    image: nginx:latest
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf

八、团队协作

8.1 职责分工

Frontend团队:
├── UI/UX设计
├── React组件开发
├── 状态管理
└── 前端性能优化

Server团队:
├── 业务逻辑开发
├── 数据库设计
├── API接口设计
└── 认证授权

Agent团队:
├── AI模型选型
├── Prompt工程
├── RAG优化
└── 向量数据库管理

8.2 并行开发

阶段1: 接口定义(1天)
├── Frontend定义需要的API
├── Server定义Agent接口
└── 三方达成一致

阶段2: 并行开发(2周)
├── Frontend: Mock数据开发UI
├── Server: 实现业务逻辑
└── Agent: 实现AI功能

阶段3: 联调测试(3天)
├── Frontend + Server联调
├── Server + Agent联调
└── 端到端测试

九、总结

三层架构的核心优势:

职责清晰 - 每层专注自己的领域
独立部署 - 可以单独扩展和升级
技术自由 - 每层选择最适合的技术栈
团队协作 - 多团队并行开发
易于维护 - 降低系统复杂度

下一篇预告: 《技术选型背后的思考:为什么选择Next.js+FastAPI+LangChain》

我们将深入分析每个技术栈的选型理由和替代方案对比。


作者简介: 资深开发者,创业者。专注于视频通讯技术领域。国内首本Flutter著作《Flutter技术入门与实战》作者,另著有《Dart语言实战》及《WebRTC音视频开发》等书籍。多年从事视频会议、远程教育等技术研发,对于Android、iOS以及跨平台开发技术有比较深入的研究和应用,作为主要程序员开发了多个应用项目,涉及医疗、交通、银行等领域。

学习资料:

欢迎交流: 如有问题欢迎在评论区讨论 🚀

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐