三层架构设计:前端+业务+AI层的职责划分
本文介绍了企业级AI Agent系统的三层架构设计,包括前端层、业务层和AI层。前端层采用Next.js+React技术栈,负责用户界面、身份认证和实时通信;业务层基于FastAPI实现核心逻辑,处理用户管理和数据存储;AI层使用LangChain等工具,专注于RAG检索和内容生成。各层通过HTTP/SSE协议通信,实现职责分离、独立扩展和团队并行开发。这种分层架构解决了传统单体应用的耦合问题,提
·
三层架构设计:前端+业务+AI层的职责划分
前言
在构建企业级AI Agent系统时,清晰的架构分层至关重要。本文将详细介绍本项目采用的三层架构设计,以及各层的职责划分和通信机制。
适合读者: 架构师、全栈工程师、技术Leader
一、为什么需要三层架构
1.1 单体架构的问题
传统单体架构:
Frontend + Backend + AI 混在一起
├── 代码耦合严重
├── 难以独立扩展
├── 技术栈受限
└── 团队协作困难
1.2 三层架构的优势
三层架构:
Frontend ← → Server ← → Agent
├── 职责清晰
├── 独立部署
├── 技术栈自由
├── 易于扩展
└── 团队并行开发
二、架构全景图
2.1 系统架构
┌─────────────────────────────────────────────────────────┐
│ 用户层 │
│ (浏览器/移动端) │
└────────────────────┬────────────────────────────────────┘
│ HTTPS
▼
┌─────────────────────────────────────────────────────────┐
│ Frontend Layer │
│ Next.js + React + TailwindCSS │
│ (Port: 3000) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 聊天界面 │ │ 用户认证 │ │ 对话管理 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────┬────────────────────────────────────┘
│ HTTP/SSE
▼
┌─────────────────────────────────────────────────────────┐
│ Server Layer │
│ FastAPI + PostgreSQL + Redis │
│ (Port: 8000) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 业务逻辑 │ │ 用户管理 │ │ 数据存储 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ JWT认证 │ │ 缓存管理 │ │ 日志记录 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────┬────────────────────────────────────┘
│ HTTP/SSE
▼
┌─────────────────────────────────────────────────────────┐
│ Agent Layer │
│ LangChain + Ollama + Weaviate │
│ (Port: 8001) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ RAG检索 │ │ Prompt组装│ │ 流式生成 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────┬────────────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌──────────┐
│Weaviate│ │ Ollama │ │PostgreSQL│
│ :8080 │ │:11434 │ │ :5432 │
└────────┘ └────────┘ └──────────┘
三、Frontend Layer(前端层)
3.1 核心职责
主要职责:
- 🎨 用户界面 - 提供友好的交互体验
- 🔐 身份认证 - Token管理和自动刷新
- 📡 实时通信 - SSE流式接收AI回复
- 💾 状态管理 - 对话历史和用户状态
- 🎯 路由管理 - 页面导航和权限控制
3.2 技术栈
// 技术选型
Frontend Stack:
├── Next.js 13.5.6 // React框架,支持SSR
├── React 18.2.0 // UI组件库
├── TypeScript 5.2.2 // 类型安全
├── TailwindCSS 3.3.5 // 原子化CSS
├── Axios 1.6.0 // HTTP客户端
└── @microsoft/fetch-event-source 2.0.1 // SSE支持
3.3 核心代码示例
// frontend/app/chat/page.tsx
'use client'
import { useState } from 'react'
import { streamChat } from '@/lib/api-client'
export default function ChatPage() {
const [messages, setMessages] = useState<Message[]>([])
const [inputValue, setInputValue] = useState('')
const [isStreaming, setIsStreaming] = useState(false)
const [currentAnswer, setCurrentAnswer] = useState('')
const handleSend = async () => {
if (!inputValue.trim() || isStreaming) return
// 添加用户消息
const userMessage = {
id: Date.now().toString(),
role: 'user',
content: inputValue
}
setMessages(prev => [...prev, userMessage])
setInputValue('')
setIsStreaming(true)
setCurrentAnswer('')
try {
// 调用Server API,接收流式响应
await streamChat(userMessage.content, {
onToken: (token) => {
setCurrentAnswer(prev => prev + token)
},
onDone: (data) => {
const aiMessage = {
id: Date.now().toString(),
role: 'assistant',
content: data.answer
}
setMessages(prev => [...prev, aiMessage])
setCurrentAnswer('')
setIsStreaming(false)
},
onError: (error) => {
console.error('错误:', error)
setIsStreaming(false)
}
})
} catch (error) {
console.error('发送失败:', error)
setIsStreaming(false)
}
}
return (
<div className="flex flex-col h-screen">
{/* 消息列表 */}
<div className="flex-1 overflow-y-auto p-4">
{messages.map((msg) => (
<MessageBubble key={msg.id} message={msg} />
))}
{/* 实时流式消息 */}
{currentAnswer && (
<div className="bg-gray-100 rounded-lg p-4">
{currentAnswer}
<span className="animate-pulse">▊</span>
</div>
)}
</div>
{/* 输入框 */}
<div className="p-4 border-t">
<input
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
onKeyPress={(e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault()
handleSend()
}
}}
placeholder="输入问题..."
disabled={isStreaming}
/>
<button onClick={handleSend} disabled={!inputValue.trim() || isStreaming}>
{isStreaming ? '生成中...' : '发送'}
</button>
</div>
</div>
)
}
四、Server Layer(业务层)
4.1 核心职责
主要职责:
- 🔒 认证授权 - JWT双Token机制
- 💼 业务逻辑 - 用户管理、对话管理
- 🗄️ 数据持久化 - PostgreSQL存储
- ⚡ 缓存管理 - Redis加速
- 🔗 服务编排 - 协调Frontend和Agent
- 📊 日志监控 - Loguru结构化日志
4.2 技术栈
# 技术选型
Server Stack:
├── FastAPI 0.100.0 # 现代化异步Web框架
├── Uvicorn 0.31.1 # ASGI服务器
├── SQLAlchemy 2.0.18+ # 异步ORM
├── PostgreSQL # 关系型数据库
├── Redis 4.6.0+ # 缓存和会话
├── Alembic 1.11.1+ # 数据库迁移
├── Pydantic 2.0.0+ # 数据验证
├── Python-Jose 3.3.0+ # JWT认证
├── Passlib[bcrypt] 1.7.4+ # 密码加密
└── Loguru 0.7.0+ # 结构化日志
4.3 核心代码示例
# server/api/agent.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
import httpx
import json
router = APIRouter(prefix="/api/agent", tags=["agent"])
AGENT_URL = "http://localhost:8001"
@router.post("/chat/stream")
async def agent_stream(
question: str,
current_user = Depends(get_current_user)
):
"""
代理Agent流式接口
Server作为中间层:
1. 验证用户身份(JWT)
2. 记录请求日志
3. 转发到Agent服务
4. 保存对话历史
"""
# 记录请求
logger.info(f"用户 {current_user.username} 发起问答: {question}")
async def event_generator():
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{AGENT_URL}/stream",
json={"question": question},
timeout=60.0
) as response:
async for line in response.aiter_lines():
if line:
yield f"{line}\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
# server/api/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/login")
async def login(
username: str,
password: str,
db: AsyncSession = Depends(get_db)
):
"""用户登录"""
# 验证用户
user = await authenticate_user(db, username, password)
if not user:
raise HTTPException(status_code=401, detail="用户名或密码错误")
# 生成Token
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=30)
)
refresh_token = create_refresh_token(
data={"sub": user.username},
expires_delta=timedelta(days=7)
)
return {
"code": 0,
"msg": "登录成功",
"data": {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
}
# server/api/conversations.py
@router.post("")
async def create_conversation(
title: str,
current_user = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""创建新对话"""
conversation = Conversation(
user_id=current_user.id,
title=title
)
db.add(conversation)
await db.commit()
await db.refresh(conversation)
return {
"code": 0,
"msg": "创建成功",
"data": conversation
}
五、Agent Layer(AI层)
5.1 核心职责
主要职责:
- 🔍 向量检索 - Weaviate相似度搜索
- 🧠 AI推理 - Ollama LLM生成
- 📝 Prompt工程 - 动态组装上下文
- 🌊 流式生成 - 实时返回Token
- 📊 结果格式化 - 统一响应格式
5.2 技术栈
# 技术选型
Agent Stack:
├── LangChain 0.1.0+ # AI应用开发框架
├── LangChain-Community 0.0.10+ # 社区集成
├── Ollama 0.1.0+ # 本地LLM服务
├── Weaviate 1.27.1 # 向量数据库
├── FastAPI 0.104.0+ # HTTP服务框架
└── Pandas 2.0.0+ # 数据处理
AI模型:
├── llama3.2:latest # 对话生成模型
└── nomic-embed-text # 文本向量化模型
5.3 核心代码示例
# agent/http_service.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json
from agent import ServiceTicketAgent
app = FastAPI(title="Service Ticket Agent")
class ChatRequest(BaseModel):
question: str
agent_instance = None
def get_agent():
global agent_instance
if agent_instance is None:
agent_instance = ServiceTicketAgent()
return agent_instance
@app.post("/stream")
async def stream_chat(request: ChatRequest):
"""
流式对话接口(SSE)
返回SSE格式的流式数据:
- event: thinking (思考状态)
- event: sources (检索来源)
- event: token (逐个token)
- event: done (完成)
- event: error (错误)
"""
async def event_generator():
agent = get_agent()
try:
async for event in agent.ask_stream(request.question):
event_type = event.get("type")
event_data = event.get("data", {})
# 格式化为SSE
yield f"event: {event_type}\n"
yield f"data: {json.dumps(event_data, ensure_ascii=False)}\n\n"
if event_type in ["done", "error"]:
break
except Exception as e:
error_event = {"code": 500, "msg": f"Agent错误: {str(e)}"}
yield f"event: error\n"
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# agent/ticket_agent.py
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
class ServiceTicketAgent:
def _setup_qa_chain(self):
"""设置问答链(LCEL)"""
def retrieve_and_format(question):
docs = self._search_similar_documents(question)
return "\n\n".join(doc.page_content for doc in docs)
# LCEL链式调用
self.qa_chain = (
{
"context": retrieve_and_format,
"question": RunnablePassthrough()
}
| self.prompt_template
| self.llm
| StrOutputParser()
)
return self.qa_chain
async def ask_stream(self, question: str):
"""流式问答"""
# 1. 检索相关工单
yield {
"type": "thinking",
"data": {"status": "retrieving", "message": "正在检索相关工单..."}
}
source_docs = self._search_similar_documents(question)
# 2. 返回检索结果
yield {
"type": "sources",
"data": {"sources": [...], "count": len(source_docs)}
}
# 3. 流式生成答案
qa_chain = self._setup_qa_chain()
full_answer = ""
async for chunk in qa_chain.astream(question):
token = str(chunk)
full_answer += token
yield {
"type": "token",
"data": {"token": token}
}
# 4. 完成
yield {
"type": "done",
"data": {"answer": full_answer, "metadata": {...}}
}
六、层间通信协议
6.1 Frontend ↔ Server
// HTTP请求
POST /api/auth/login
Content-Type: application/json
{
"username": "user",
"password": "password"
}
// HTTP响应
{
"code": 0,
"msg": "success",
"data": {
"access_token": "eyJ...",
"refresh_token": "eyJ..."
}
}
// SSE流式响应
POST /api/agent/chat/stream
Authorization: Bearer eyJ...
event: token
data: {"token": "根据"}
event: token
data: {"token": "历史"}
event: done
data: {"answer": "...", "metadata": {...}}
6.2 Server ↔ Agent
# HTTP代理
POST http://localhost:8001/stream
Content-Type: application/json
{
"question": "物流信息5天没更新,怎么处理?"
}
# SSE响应
event: thinking
data: {"status": "retrieving", "message": "正在检索相关工单..."}
event: sources
data: {"sources": [...], "count": 5}
event: token
data: {"token": "根据"}
event: done
data: {"answer": "...", "metadata": {...}}
七、独立部署与扩展
7.1 独立部署
# docker-compose.yml
version: '3.8'
services:
frontend:
build: ./frontend
ports:
- "3000:3000"
depends_on:
- server
server:
build: ./server
ports:
- "8000:8000"
depends_on:
- postgres
- redis
- agent
agent:
build: ./agent
ports:
- "8001:8001"
depends_on:
- ollama
- weaviate
postgres:
image: postgres:15
ports:
- "5432:5432"
redis:
image: redis:7
ports:
- "6379:6379"
weaviate:
image: semitechnologies/weaviate:1.27.1
ports:
- "8080:8080"
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
7.2 水平扩展
# 扩展Agent层
services:
agent:
build: ./agent
deploy:
replicas: 3 # 3个Agent实例
ports:
- "8001-8003:8001"
# 负载均衡
nginx:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
八、团队协作
8.1 职责分工
Frontend团队:
├── UI/UX设计
├── React组件开发
├── 状态管理
└── 前端性能优化
Server团队:
├── 业务逻辑开发
├── 数据库设计
├── API接口设计
└── 认证授权
Agent团队:
├── AI模型选型
├── Prompt工程
├── RAG优化
└── 向量数据库管理
8.2 并行开发
阶段1: 接口定义(1天)
├── Frontend定义需要的API
├── Server定义Agent接口
└── 三方达成一致
阶段2: 并行开发(2周)
├── Frontend: Mock数据开发UI
├── Server: 实现业务逻辑
└── Agent: 实现AI功能
阶段3: 联调测试(3天)
├── Frontend + Server联调
├── Server + Agent联调
└── 端到端测试
九、总结
三层架构的核心优势:
✅ 职责清晰 - 每层专注自己的领域
✅ 独立部署 - 可以单独扩展和升级
✅ 技术自由 - 每层选择最适合的技术栈
✅ 团队协作 - 多团队并行开发
✅ 易于维护 - 降低系统复杂度
下一篇预告: 《技术选型背后的思考:为什么选择Next.js+FastAPI+LangChain》
我们将深入分析每个技术栈的选型理由和替代方案对比。
作者简介: 资深开发者,创业者。专注于视频通讯技术领域。国内首本Flutter著作《Flutter技术入门与实战》作者,另著有《Dart语言实战》及《WebRTC音视频开发》等书籍。多年从事视频会议、远程教育等技术研发,对于Android、iOS以及跨平台开发技术有比较深入的研究和应用,作为主要程序员开发了多个应用项目,涉及医疗、交通、银行等领域。
学习资料:
欢迎交流: 如有问题欢迎在评论区讨论 🚀
更多推荐


所有评论(0)