用 Python 重写 Go IM 系统:FastAPI + LangGraph + asyncio 全栈实践
Python 的@tool+ 闭包完美对应 Go 的函数:"""工厂函数:把 username + services 注入到每个工具的闭包中。"""@tool"""创建一个新群聊。"""return f"群聊创建成功,ID={group['id']}"@tool"""查询城市天气信息。"""# ... 共 13 个工具LangChain 1.x 的@tool原生支持 async 函数,直接awai
用 Python 重写 Go IM 系统:FastAPI + LangGraph + asyncio 全栈实践
关键词: Python / FastAPI / asyncio / SQLAlchemy / LangChain / LangGraph / WebSocket / SSE / JWT
一、项目背景:为什么要用 Python 重写
我之前用 Go(Gin + go-zero + GORM)实现过一套完整的 IM 系统,覆盖了 WebSocket 实时通信、AI 工具调用、管理后台等功能。这次想用 Python 把它完整重写一遍,原因有三:
-
AI 生态:Python 的 LangChain/LangGraph 生态比 Go 成熟得多。Go 版本需要自己封装 HTTP 客户端做 Function Calling 的多轮循环,Python 版本可以直接用
create_react_agent一行搞定。 -
验证等价性:同一套业务逻辑用两种语言实现,能看清楚 Go 的 goroutine/channel 和 Python 的 asyncio/Queue 在设计层面的对应关系。
-
前端完全复用:与 Go 版本保持完全相同的 API 路径和数据格式(端口 8080),Vue3 前端无需改动。
最终项目的完整功能链路:
Vue3 前端 (go-zero-im-ui) ↕ HTTP REST (FastAPI) ← JWT 鉴权、限流、日志脱敏 ↕ WebSocket ← 实时消息、心跳、ACK、离线推送 ↕ SSE 流式 ← AI 对话逐 token 输出
数据层:MySQL(SQLAlchemy 2.0 async)+ Redis(缓存)
二、技术栈总览
| 分层 | 技术选型 | 对应 Go 版本 |
|---|---|---|
| Web 框架 | FastAPI 0.115 | Gin v1.12 |
| 实时通信 | FastAPI WebSocket (Starlette) | gorilla/websocket |
| ORM | SQLAlchemy 2.0 async + aiomysql | GORM v1.31 |
| 缓存 | redis-py 5.x async | go-redis/v9 |
| 认证 | python-jose (JWT) | golang-jwt/jwt v5 |
| 密码安全 | bcrypt 5.x(直接使用) | golang.org/x/crypto/bcrypt |
| AI 框架 | LangChain 1.x + LangGraph 1.x | 自封装 HTTP 客户端 |
| 日志 | loguru | go.uber.org/zap |
| 限流 | slowapi(基于 limits) | golang.org/x/time/rate |
| 配置 | PyYAML + dataclass | spf13/viper |
| 并发 | asyncio(单线程事件循环) | goroutine + channel |
Python 版本最大的技术亮点是 AI 层:用 LangGraph 的 create_react_agent 替代了 Go 版本手写的多轮 Function Calling 循环,代码量从 200 行降到 30 行,且支持原生流式输出。
三、项目架构:分层 + 依赖注入
3.1 目录结构
py-im/ ├── main.py # 程序入口(Windows asyncio 修复) ├── config.yaml # 配置文件 ├── requirements.txt └── app/ ├── __init__.py # FastAPI 工厂函数 + lifespan ├── config.py # 配置加载(dataclass) ├── database.py # SQLAlchemy 2.0 async engine ├── models/ # SQLAlchemy ORM 模型(6 张表) ├── schemas/ # Pydantic 请求/响应 schema ├── repositories/ # 数据访问层(SQL 封装) ├── services/ # 业务逻辑层 ├── api/ # FastAPI 路由(user/message/group/ai/admin) ├── ws/ # WebSocket(Hub + Client + protocol) ├── ai/ # AI 服务(LangGraph agent + 工具) ├── middleware/ # JWT、日志脱敏、限流 └── utils/ # bcrypt、JWT 工具、响应封装
3.2 应用工厂与生命周期
FastAPI 的 lifespan 对应 Go 的 main.go 启动/关闭钩子:
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("py-im starting up...")
await init_db() # 自动建表(相当于 AutoMigrate)
yield
# 主动释放连接池,避免 aiomysql 在事件循环关闭后触发 RuntimeError
engine = get_engine()
await engine.dispose()
logger.info("Database connections closed")
这里有个 Windows 特有的坑:aiomysql 的 __del__ 析构器会在 Python GC 时异步关闭连接,而此时 asyncio 事件循环可能已经关闭,导致 RuntimeError: Event loop is closed。解决方案是双管齐下:
# main.py:强制使用 SelectorEventLoop(Windows 默认是 ProactorEventLoop) if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# lifespan 关闭时主动 dispose,不留给 __del__ 处理 await engine.dispose()
四、WebSocket 核心:asyncio 重写 Hub/Client 模型
Go 版本的 Hub/Client 依赖 goroutine 和 channel,Python 版本用 asyncio 原语做了完整对应:
| Go 并发原语 | Python 等价物 | 用途 |
|---|---|---|
goroutine |
asyncio.Task / coroutine |
并发执行 |
chan []byte |
asyncio.Queue(maxsize=256) |
消息发送队列 |
sync.RWMutex |
asyncio.Lock |
保护 clients 字典 |
go func() |
asyncio.create_task(...) |
非阻塞派生子任务 |
select {} |
await asyncio.gather(...) |
同时等待多个协程 |
4.1 Hub:在线连接注册表
class Hub:
def __init__(self):
self._clients: dict[str, Client] = {}
self._lock = asyncio.Lock() # 替代 Go 的 sync.RWMutex
async def register(self, client: Client) -> None:
async with self._lock:
old = self._clients.get(client.username)
if old:
# 同一用户二次登录:踢掉旧连接(对应 Go 的 close(old.send))
old.kicked = True
await old.close()
self._clients[client.username] = client
async def unregister(self, client: Client) -> None:
async with self._lock:
current = self._clients.get(client.username)
# 防止新连接上线后旧连接的 unregister 误删新连接
if current is client:
del self._clients[client.username]
注意 current is client 的身份检查——这是处理"旧连接断开时新连接已经上线"竞态的关键,Go 版本同样有这个逻辑。
4.2 Client:read/write 双任务模型
Go 版本每个连接启动两个 goroutine(readPump/writePump),Python 版本改为两个 coroutine,用 asyncio.gather 并发运行:
async def run(self, message_svc, group_svc, ai_svc): await self.hub.register(self) await self._push_offline(message_svc, group_svc) # 上线推离线消息 try: await asyncio.gather( self._read_pump(message_svc, group_svc, ai_svc), self._write_pump(), ) finally: # 断线时更新 last_read_at(对应 Go 的 defer) group_ids = await group_svc.get_my_group_ids(self.username) if group_ids: await group_svc.update_last_read_at(self.username, group_ids) await self.hub.unregister(self)
心跳保活:writePump 用 asyncio.wait_for 实现定时 ping,超时无消息则发心跳帧:
async def _write_pump(self):
while True:
try:
msg = await asyncio.wait_for(
self.send_queue.get(), timeout=self.PING_INTERVAL # 54秒
)
await self.ws.send_text(msg)
except asyncio.TimeoutError:
# 超时无消息 → 发心跳,对应 Go 的 ticker.C → WriteMessage(PingMessage)
await self.ws.send_text(json.dumps({"type": 1, "msg_id": "ping"}))
4.3 消息协议
# protocol.py MSG_TYPE_HEARTBEAT = 1 MSG_TYPE_PRIVATE = 2 MSG_TYPE_ACK = 3 MSG_TYPE_GROUP = 4 MSG_TYPE_SYSTEM = 5 ACK_DELIVERED = "delivered" # 对方在线,已推送 ACK_STORED = "stored" # 对方离线,已存库 ACK_RECEIVED = "received" # 心跳确认 ACK_FAILED = "failed" # 非群成员,拒绝
from 字段由服务端在 readPump 中强制写入 client.username,客户端发来的 from 字段被完全忽略,无法伪造发送方。
五、消息存储与离线推送
5.1 写入流程
客户端发消息
↓
_read_pump 接收 JSON,强制设置 from = client.username
↓
message_svc.save_private_message() # 先写 MySQL
↓
hub.send_to_user(to, out_msg) # 尝试投递
├── 在线 → put 到 send_queue → ACK=delivered
└── 离线 → 消息已在库 → ACK=stored
↓
给发送方回 ACK
单聊消息同时也推给发送方自己的 send_queue,这样多设备场景下发送方也能看到自己发的消息实时出现。
5.2 上线拉取离线消息
连接建立后立即调用 _push_offline,两类消息分开处理:
单聊离线消息:查 status='stored' AND to_user=?,推送后批量标记 delivered。
群聊离线消息:采用 per-user 读取位点 方案,避免共享 status 字段导致的漏推/重推问题。每个群成员的 group_members 表维护 last_read_at 时间戳:
-- 查询某用户在所有群的离线消息(JOIN group_members 按 last_read_at 过滤)
SELECT m.* FROM messages m
JOIN group_members gm ON m.to_user = CONCAT('group_', gm.group_id)
AND gm.username = :username
AND m.created_at > COALESCE(gm.last_read_at, gm.joined_at)
WHERE m.chat_type = 'group'
ORDER BY m.created_at ASC
断线时 finally 块更新位点:
finally:
group_ids = await group_svc.get_my_group_ids(self.username)
if group_ids:
await group_svc.update_last_read_at(self.username, group_ids)
5.3 会话列表统一排序
会话列表混合单聊和群聊,查询后统一按最后消息时间倒序:
# message_service.py conversations = private_convs + group_convs conversations.sort(key=lambda x: x["last_time"], reverse=True)
六、AI 集成:LangGraph 替代手写 Agentic 循环
这是整个项目里 Python 版本相比 Go 版本提升最大的部分。
6.1 从 AgentExecutor 到 create_react_agent
Go 版本需要手写多轮 Function Calling 循环(约 200 行),Python 生态经历了几次迭代:
-
旧 LangChain (0.3.x):
AgentExecutor+initialize_agent,使用 Tool 对象 -
LangChain 1.x(当前):
AgentExecutor已废弃,推荐langgraph.prebuilt.create_react_agent
正确使用姿势:
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
model=llm,
tools=tools, # @tool 装饰的异步函数列表
prompt=SystemMessage(content=system_content),
)
# stream_mode="messages" 逐步输出每条消息(AIMessage / ToolMessage)
async for event in agent.astream(
{"messages": history_msgs},
stream_mode="messages",
):
msg, meta = event
LangGraph 内部自动处理工具调用的多轮循环(ReAct 模式),不需要手写 for i in range(max_turns) 的循环。
6.2 SSE 事件流拆解
前端能实时看到每一步的执行过程:
async for event in agent.astream(..., stream_mode="messages"):
msg, meta = event
if isinstance(msg, AIMessage):
if msg.tool_calls:
# AI 决定调用工具
for tc in msg.tool_calls:
yield {"type": "tool_call", "name": tc["name"], "args": ...}
elif msg.content:
# AI 生成文字 token
yield {"type": "chunk", "content": msg.content}
elif isinstance(msg, ToolMessage):
# 工具执行结果
yield {"type": "tool_result", "name": msg.name, "result": msg.content}
yield {"type": "done"}
前端 SSE 事件流示例(帮用户建群并邀请成员的完整过程):
{"type":"tool_call","name":"create_group","args":"{\"name\":\"Go学习小组\"}"}
{"type":"tool_result","name":"create_group","result":"群聊创建成功,ID=42","success":true}
{"type":"tool_call","name":"invite_member","args":"{\"group_id\":42,\"target_username\":\"alice\"}"}
{"type":"tool_result","name":"invite_member","result":"邀请成功","success":true}
{"type":"chunk","content":"好的,我已经帮你创建了「Go学习小组」"}
{"type":"chunk","content":",并成功邀请了 alice 加入。"}
{"type":"done","content":""}
6.3 工具定义:闭包注入模式
Python 的 @tool + 闭包完美对应 Go 的 buildUserToolExecutor 函数:
def build_user_tools(username: str, group_svc, user_svc) -> list:
"""工厂函数:把 username + services 注入到每个工具的闭包中。"""
@tool
async def create_group(name: str, max_count: int = 200) -> str:
"""创建一个新群聊。"""
group = await group_svc.create_group(name, username, max_count)
return f"群聊创建成功,ID={group['id']}"
@tool
async def get_weather(city: str) -> str:
"""查询城市天气信息。"""
return await fetch_weather(city)
# ... 共 13 个工具
return [create_group, get_weather, ...]
LangChain 1.x 的 @tool 原生支持 async 函数,直接 await 调用业务 service,无需任何 run_sync 包装。工具函数的 docstring 就是传给 LLM 的工具描述。
内置工具清单(13个):
| 工具名 | 功能 |
|---|---|
search_user |
按关键词搜索用户 |
get_my_groups |
查询已加入的群聊 |
search_group |
按名称搜索群聊 |
create_group |
创建群聊 |
join_group |
加入群聊 |
leave_group |
退出群聊 |
invite_member |
邀请成员入群 |
get_contacts |
获取联系人列表 |
add_contact |
添加联系人 |
delete_contact |
删除联系人 |
get_weather |
查询城市天气 |
get_news |
获取最新新闻(按分类) |
get_route |
查询两地路线 |
6.4 上下文管理:滚动摘要压缩
每个用户独立维护两层存储,避免 token 无限增长:
@dataclass
class UserContext:
summary: str = "" # 已压缩的历史摘要
recent: List[dict] = ... # 最近未压缩的对话
发给 LLM 的消息列表构建逻辑:
system_content = CHAT_SYSTEM
if ctx.summary:
system_content += f"\n\n历史对话摘要:{ctx.summary}" # 摘要并入 system 消息
messages = [SystemMessage(content=system_content)]
messages += [HumanMessage/AIMessage 交替...] # 近期原始对话
messages.append(HumanMessage(content=question)) # 当前问题
超过阈值时异步压缩(不阻塞当前回复),对应 Go 的 go func():
async def _maybe_compress(self, username, ctx, ctx_type):
if len(ctx.recent) <= threshold:
return
async def _do_compress():
# 调 LLM 生成新摘要
new_summary = await llm.ainvoke([...])
async with self._lock:
ctx.summary = new_summary
ctx.recent = ctx.recent[removed_count:] # 只删已压缩部分
await self._persist_context(...)
asyncio.create_task(_do_compress()) # 非阻塞 fire-and-forget
上下文持久化到 ai_context 表,进程重启后首次访问时从 DB 恢复,对话历史不会因重启而丢失。
6.5 主备 AI 切换
try:
async for chunk in llm.astream(messages):
yield chunk.content
except Exception:
backup = self._make_backup_llm(streaming=True)
if backup:
async for chunk in backup.astream(messages):
yield chunk.content
else:
raise
config.yaml 分别配置主力(如 DeepSeek)和备用(如 Qwen)模型,主力失败时无缝切换,上下文不丢失。
七、中间件:安全与可观测性
7.1 JWT 鉴权
app/middleware/auth.py 封装 FastAPI 的 Depends 依赖注入:
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> str:
try:
payload = jwt.decode(
credentials.credentials,
cfg.jwt.secret,
algorithms=["HS256"],
)
username = payload.get("username")
if not username:
raise HTTPException(status_code=401, detail="unauthorized")
return username
except JWTError:
raise HTTPException(status_code=401, detail="unauthorized")
WebSocket 升级时同样校验 JWT:
@router.websocket("/api/v1/ws")
async def ws_endpoint(websocket: WebSocket, token: str = Query(...)):
username = verify_ws_token(token) # 失败则 websocket.close(4001)
client = Client(websocket, username, hub)
await client.run(message_svc, group_svc, ai_svc)
7.2 请求日志 + 敏感字段脱敏
LoggingMiddleware 继承 Starlette 的 BaseHTTPMiddleware,在打印 request body 前先脱敏:
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
body_bytes = await request.body()
body_str = mask_sensitive(body_bytes.decode("utf-8"))
# 关键:重建 request body stream 让后续 handler 还能读到
async def receive():
return {"type": "http.request", "body": body_bytes}
request._receive = receive
response = await call_next(request)
# 按状态码分级:5xx→ERROR,4xx→WARNING,2xx→INFO
logger.log(level, f"{method} {path} {status} {elapsed:.1f}ms | body={body_str[:200]}")
脱敏正则覆盖 password、secret、api_key、token、old_password、new_password:
_PATTERNS = [
re.compile(r'("password"\s*:\s*)"[^"]*"'),
re.compile(r'("api_key"\s*:\s*)"[^"]*"'),
# ...
]
def mask_sensitive(body: str) -> str:
for p in _PATTERNS:
body = p.sub(r'\1"***"', body)
return body
7.3 密码哈希:直接用 bcrypt,不用 passlib
这里有个容易踩的坑:passlib 不兼容 bcrypt v5+。
# 错误方式(passlib 在 bcrypt v5+ 会报 AttributeError)
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
# 正确方式:直接用 bcrypt
import bcrypt
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
八、群聊的安全设计
群聊部分有几处容易忽略的安全校验:
1. 发消息前验证群成员身份:_handle_group 收到消息后先查库验证 username 是否在群里,非成员返回 ACK_FAILED 并拒绝广播:
if not await group_svc.is_member(group_id, self.username):
await self.send_queue.put(json.dumps(make_ack(msg_id, ACK_FAILED)))
return
2. 查看群历史需校验成员身份:GET /api/v1/message/history?group_id=xxx 先确认当前用户是该群成员,防止任意用户查任意群历史。
3. 查看群成员需校验:GET /api/v1/group/members?group_id=xxx 同样验证,防非成员探测群成员列表。
4. from 字段服务端强制写入:WebSocket readPump 处理任何消息时,from 字段全部覆盖为 client.username(已由 JWT 鉴权绑定),客户端无法伪造发送方。
九、SQLAlchemy 2.0 async:连接池与特殊字符
9.1 异步引擎配置
_engine = create_async_engine(
cfg.database.url,
pool_size=10, # 连接池大小
max_overflow=20, # 超出 pool_size 后最多再开 20 个连接
pool_recycle=3600, # 连接超过 1 小时自动回收(防 MySQL 8h timeout)
echo=(mode == "debug"),
)
_session_factory = async_sessionmaker(
bind=_engine,
class_=AsyncSession,
expire_on_commit=False, # 提交后不过期,避免 lazy load 问题
autoflush=False,
)
9.2 密码含特殊字符的 URL 编码
数据库密码含 @ 时,SQLAlchemy 的 URL 解析器会把 @ 当成 host 分隔符,导致连接到错误地址。解决:
@property
def url(self) -> str:
from urllib.parse import quote_plus
pwd = quote_plus(self.password) # "pass@word" → "pass%40word"
return f"mysql+aiomysql://{self.username}:{pwd}@{self.host}:{self.port}/{self.dbname}?charset=utf8mb4"
quote_plus 会将 @、#、% 等所有特殊字符全部转义为 URL 安全格式。
十、管理后台
/api/admin/* 共 14 个端点,复用 service 层,不新建 DB 连接:
| 功能 | 端点 |
|---|---|
| 管理员登录 | POST /api/admin/login |
| 用户列表/创建/编辑/删除/改密 | /api/admin/user/* |
| 消息列表/删除/统计 | /api/admin/message/* |
| 群组列表/解散/踢人 | /api/admin/group/* |
| 在线用户列表 | GET /api/admin/monitor/online |
| 系统状态统计 | GET /api/admin/monitor/stats |
/api/admin/monitor/stats 返回实时系统指标:
import psutil, sys, time
return {
"online_users": hub.online_count(),
"total_users": await admin_svc.count_users(),
"total_messages": await admin_svc.count_messages(),
"total_groups": await admin_svc.count_groups(),
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"uptime_seconds": int(time.time() - START_TIME),
"python_version": sys.version,
}
十一、踩坑记录:开发中遇到的真实 Bug
| # | 文件 | 问题 | 根因 / 修复 |
|---|---|---|---|
| 1 | utils/hash.py |
AttributeError: module 'bcrypt' has no attribute '__about__' |
passlib 与 bcrypt v5+ 不兼容,改用直接 import bcrypt |
| 2 | config.py |
Can't connect to MySQL server on '163.com@115.190.190.83' |
密码含 @ 导致 SQLAlchemy URL 解析错误,加 quote_plus |
| 3 | ai/service.py |
ImportError: cannot import name 'AgentExecutor' |
LangChain 1.x 已移除 AgentExecutor,改用 create_react_agent |
| 4 | ai/service.py |
SyntaxError: invalid syntax("灵犀") |
源文件中含中文弯引号 " ",改为 【灵犀】 |
| 5 | main.py |
RuntimeError: Event loop is closed(shutdown) |
Windows ProactorEventLoop + aiomysql del 竞态,改用 WindowsSelectorEventLoopPolicy + await engine.dispose() |
| 6 | database.py |
ValueError: the greenlet library is required |
SQLAlchemy async 依赖 greenlet,venv 中未安装,补 pip install greenlet |
| 7 | ws/hub.py |
旧连接断开后误删新连接的注册 | unregister 用 current is client 做身份检查而非仅比对 username |
| 8 | ws/hub.py |
群聊 @AI 触发阻塞 readPump |
AI 调用放在 asyncio.create_task 而非直接 await,非阻塞派生 |
| 9 | services/message_service.py |
会话列表混合单群聊后乱序 | append 后缺统一排序,加 sort(key=lambda x: x["last_time"], reverse=True) |
| 10 | venv |
PyCharm 环境用 LangChain 0.3.7,缺 langgraph | venv 版本与系统 Python 不一致,全量升级到 LangChain 1.x |
这些 bug 里最有代表性的两个:
-
#3(AgentExecutor 废弃):LangChain 1.x 是破坏性升级,网上大量教程仍用 0.3.x 的 API,选型时必须明确版本。
-
#5(Windows 事件循环关闭):Python asyncio 在 Windows 上的生命周期与 aiomysql 的析构时序存在竞态,这是 Windows 开发 Python 异步服务时必须处理的问题。
十二、API 兼容性验证
所有 47 个端点与 Go 版本完全一致,Vue3 前端无需改动:
POST /api/v1/user/register → 200 {"message":"register success"}
POST /api/v1/user/login → 200 {"token":"eyJ..."}
GET /api/v1/user/me → 200 {"username":"...","nickname":"..."}
GET /api/v1/user/search → 200 {"users":[...]}
POST /api/v1/group/create → 200 {"id":42,"name":"..."}
GET /api/v1/group/mygroups → 200 {"groups":[...]}
GET /api/v1/message/conversations → 200 {"conversations":[...]}
GET /api/admin/monitor/stats → 401(未携带 admin token,正确)
WS /api/v1/ws?token=eyJ... → 101 Switching Protocols
十三、总结与对比
Go vs Python 核心差异
| 维度 | Go 版本 | Python 版本 |
|---|---|---|
| 并发模型 | goroutine + channel(M:N 调度) | asyncio 协程(单线程事件循环) |
| AI 工具调用 | 手写多轮循环(~200 行) | create_react_agent 3 行 |
| 流式 AI 输出 | 自封装 HTTP SSE 解析 | ChatOpenAI.astream() 原生支持 |
| ORM | GORM(同步,自动建表) | SQLAlchemy 2.0 async(需 greenlet) |
| 构建/部署 | 单二进制,Docker 镜像 ~15MB | 需要 venv/pip,镜像 ~300MB |
| 性能 | 更高吞吐(真并发) | IO 密集型场景性能接近 |
| AI 生态 | 需要自封装 | LangChain/LangGraph 开箱即用 |
已实现功能
-
完整单聊/群聊 WebSocket,含心跳保活、ACK、离线消息
-
per-user 群消息读取位点,断线不丢群消息
-
@AI 指令(私聊/群聊)+ SSE 流式对话 + 上下文滚动摘要 + 持久化
-
LangGraph Agentic 工具调用(多轮、流式、13 个工具)
-
多 AI 提供商主备切换
-
JWT 鉴权 + 密码脱敏日志 + 令牌桶限流
-
管理后台(用户/消息/群组管理 + 系统监控)
待完善
-
消息可靠性:当前 ACK 无客户端重发机制,极端网络下可能丢消息
-
水平扩展:Hub 是内存状态,多实例部署需 Redis Pub/Sub 跨实例推送
-
类型安全:Python 的
dict大量使用,可进一步用 Pydantic 模型约束内部传递
整个项目把 Go 版本的架构设计(Hub/Client 模型、per-user 读取位点、主备 AI 切换)完整移植到了 Python,同时用 LangGraph 大幅简化了 AI Agent 的实现。两个版本放在一起对比,能很直观地看到 Go 和 Python 在并发模型上的哲学差异:Go 用 goroutine 做真并发,Python 用 asyncio 做协作式调度——对 IM 这类 IO 密集型场景,两者在实际吞吐上差距没有想象中那么大,但 Python 在 AI 生态上的优势是决定性的。
更多推荐




所有评论(0)