用 Python 重写 Go IM 系统:FastAPI + LangGraph + asyncio 全栈实践

关键词: Python / FastAPI / asyncio / SQLAlchemy / LangChain / LangGraph / WebSocket / SSE / JWT


一、项目背景:为什么要用 Python 重写

我之前用 Go(Gin + go-zero + GORM)实现过一套完整的 IM 系统,覆盖了 WebSocket 实时通信、AI 工具调用、管理后台等功能。这次想用 Python 把它完整重写一遍,原因有三:

  1. AI 生态:Python 的 LangChain/LangGraph 生态比 Go 成熟得多。Go 版本需要自己封装 HTTP 客户端做 Function Calling 的多轮循环,Python 版本可以直接用 create_react_agent 一行搞定。

  2. 验证等价性:同一套业务逻辑用两种语言实现,能看清楚 Go 的 goroutine/channel 和 Python 的 asyncio/Queue 在设计层面的对应关系。

  3. 前端完全复用:与 Go 版本保持完全相同的 API 路径和数据格式(端口 8080),Vue3 前端无需改动。

最终项目的完整功能链路:

Vue3 前端 (go-zero-im-ui)
   ↕ HTTP REST (FastAPI)    ← JWT 鉴权、限流、日志脱敏
   ↕ WebSocket              ← 实时消息、心跳、ACK、离线推送
   ↕ SSE 流式               ← AI 对话逐 token 输出

数据层:MySQL(SQLAlchemy 2.0 async)+ Redis(缓存)


二、技术栈总览

分层 技术选型 对应 Go 版本
Web 框架 FastAPI 0.115 Gin v1.12
实时通信 FastAPI WebSocket (Starlette) gorilla/websocket
ORM SQLAlchemy 2.0 async + aiomysql GORM v1.31
缓存 redis-py 5.x async go-redis/v9
认证 python-jose (JWT) golang-jwt/jwt v5
密码安全 bcrypt 5.x(直接使用) golang.org/x/crypto/bcrypt
AI 框架 LangChain 1.x + LangGraph 1.x 自封装 HTTP 客户端
日志 loguru go.uber.org/zap
限流 slowapi(基于 limits) golang.org/x/time/rate
配置 PyYAML + dataclass spf13/viper
并发 asyncio(单线程事件循环) goroutine + channel

Python 版本最大的技术亮点是 AI 层:用 LangGraph 的 create_react_agent 替代了 Go 版本手写的多轮 Function Calling 循环,代码量从 200 行降到 30 行,且支持原生流式输出。


三、项目架构:分层 + 依赖注入

3.1 目录结构

py-im/
├── main.py                  # 程序入口(Windows asyncio 修复)
├── config.yaml              # 配置文件
├── requirements.txt
└── app/
    ├── __init__.py          # FastAPI 工厂函数 + lifespan
    ├── config.py            # 配置加载(dataclass)
    ├── database.py          # SQLAlchemy 2.0 async engine
    ├── models/              # SQLAlchemy ORM 模型(6 张表)
    ├── schemas/             # Pydantic 请求/响应 schema
    ├── repositories/        # 数据访问层(SQL 封装)
    ├── services/            # 业务逻辑层
    ├── api/                 # FastAPI 路由(user/message/group/ai/admin)
    ├── ws/                  # WebSocket(Hub + Client + protocol)
    ├── ai/                  # AI 服务(LangGraph agent + 工具)
    ├── middleware/          # JWT、日志脱敏、限流
    └── utils/               # bcrypt、JWT 工具、响应封装

3.2 应用工厂与生命周期

FastAPI 的 lifespan 对应 Go 的 main.go 启动/关闭钩子:

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("py-im starting up...")
    await init_db()           # 自动建表(相当于 AutoMigrate)
    yield
    # 主动释放连接池,避免 aiomysql 在事件循环关闭后触发 RuntimeError
    engine = get_engine()
    await engine.dispose()
    logger.info("Database connections closed")

这里有个 Windows 特有的坑:aiomysql 的 __del__ 析构器会在 Python GC 时异步关闭连接,而此时 asyncio 事件循环可能已经关闭,导致 RuntimeError: Event loop is closed。解决方案是双管齐下:

# main.py:强制使用 SelectorEventLoop(Windows 默认是 ProactorEventLoop)
if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# lifespan 关闭时主动 dispose,不留给 __del__ 处理
await engine.dispose()

四、WebSocket 核心:asyncio 重写 Hub/Client 模型

Go 版本的 Hub/Client 依赖 goroutine 和 channel,Python 版本用 asyncio 原语做了完整对应:

Go 并发原语 Python 等价物 用途
goroutine asyncio.Task / coroutine 并发执行
chan []byte asyncio.Queue(maxsize=256) 消息发送队列
sync.RWMutex asyncio.Lock 保护 clients 字典
go func() asyncio.create_task(...) 非阻塞派生子任务
select {} await asyncio.gather(...) 同时等待多个协程

4.1 Hub:在线连接注册表

class Hub:
    def __init__(self):
        self._clients: dict[str, Client] = {}
        self._lock = asyncio.Lock()   # 替代 Go 的 sync.RWMutex
​
    async def register(self, client: Client) -> None:
        async with self._lock:
            old = self._clients.get(client.username)
            if old:
                # 同一用户二次登录:踢掉旧连接(对应 Go 的 close(old.send))
                old.kicked = True
                await old.close()
            self._clients[client.username] = client
​
    async def unregister(self, client: Client) -> None:
        async with self._lock:
            current = self._clients.get(client.username)
            # 防止新连接上线后旧连接的 unregister 误删新连接
            if current is client:
                del self._clients[client.username]

注意 current is client 的身份检查——这是处理"旧连接断开时新连接已经上线"竞态的关键,Go 版本同样有这个逻辑。

4.2 Client:read/write 双任务模型

Go 版本每个连接启动两个 goroutine(readPump/writePump),Python 版本改为两个 coroutine,用 asyncio.gather 并发运行:

async def run(self, message_svc, group_svc, ai_svc):
    await self.hub.register(self)
    await self._push_offline(message_svc, group_svc)   # 上线推离线消息
    try:
        await asyncio.gather(
            self._read_pump(message_svc, group_svc, ai_svc),
            self._write_pump(),
        )
    finally:
        # 断线时更新 last_read_at(对应 Go 的 defer)
        group_ids = await group_svc.get_my_group_ids(self.username)
        if group_ids:
            await group_svc.update_last_read_at(self.username, group_ids)
        await self.hub.unregister(self)

心跳保活:writePump 用 asyncio.wait_for 实现定时 ping,超时无消息则发心跳帧:

async def _write_pump(self):
    while True:
        try:
            msg = await asyncio.wait_for(
                self.send_queue.get(), timeout=self.PING_INTERVAL  # 54秒
            )
            await self.ws.send_text(msg)
        except asyncio.TimeoutError:
            # 超时无消息 → 发心跳,对应 Go 的 ticker.C → WriteMessage(PingMessage)
            await self.ws.send_text(json.dumps({"type": 1, "msg_id": "ping"}))

4.3 消息协议

# protocol.py
MSG_TYPE_HEARTBEAT = 1
MSG_TYPE_PRIVATE   = 2
MSG_TYPE_ACK       = 3
MSG_TYPE_GROUP     = 4
MSG_TYPE_SYSTEM    = 5

ACK_DELIVERED = "delivered"   # 对方在线,已推送
ACK_STORED    = "stored"      # 对方离线,已存库
ACK_RECEIVED  = "received"    # 心跳确认
ACK_FAILED    = "failed"      # 非群成员,拒绝

from 字段由服务端在 readPump 中强制写入 client.username,客户端发来的 from 字段被完全忽略,无法伪造发送方。


五、消息存储与离线推送

5.1 写入流程

客户端发消息
    ↓
_read_pump 接收 JSON,强制设置 from = client.username
    ↓
message_svc.save_private_message()   # 先写 MySQL
    ↓
hub.send_to_user(to, out_msg)        # 尝试投递
    ├── 在线 → put 到 send_queue → ACK=delivered
    └── 离线 → 消息已在库 → ACK=stored
    ↓
给发送方回 ACK

单聊消息同时也推给发送方自己的 send_queue,这样多设备场景下发送方也能看到自己发的消息实时出现。

5.2 上线拉取离线消息

连接建立后立即调用 _push_offline,两类消息分开处理:

单聊离线消息:查 status='stored' AND to_user=?,推送后批量标记 delivered

群聊离线消息:采用 per-user 读取位点 方案,避免共享 status 字段导致的漏推/重推问题。每个群成员的 group_members 表维护 last_read_at 时间戳:

-- 查询某用户在所有群的离线消息(JOIN group_members 按 last_read_at 过滤)
SELECT m.* FROM messages m
JOIN group_members gm ON m.to_user = CONCAT('group_', gm.group_id)
    AND gm.username = :username
    AND m.created_at > COALESCE(gm.last_read_at, gm.joined_at)
WHERE m.chat_type = 'group'
ORDER BY m.created_at ASC

断线时 finally 块更新位点:

finally:
    group_ids = await group_svc.get_my_group_ids(self.username)
    if group_ids:
        await group_svc.update_last_read_at(self.username, group_ids)

5.3 会话列表统一排序

会话列表混合单聊和群聊,查询后统一按最后消息时间倒序:

# message_service.py
conversations = private_convs + group_convs
conversations.sort(key=lambda x: x["last_time"], reverse=True)

六、AI 集成:LangGraph 替代手写 Agentic 循环

这是整个项目里 Python 版本相比 Go 版本提升最大的部分。

6.1 从 AgentExecutor 到 create_react_agent

Go 版本需要手写多轮 Function Calling 循环(约 200 行),Python 生态经历了几次迭代:

  • 旧 LangChain (0.3.x)AgentExecutor + initialize_agent,使用 Tool 对象

  • LangChain 1.x(当前)AgentExecutor 已废弃,推荐 langgraph.prebuilt.create_react_agent

正确使用姿势:

from langgraph.prebuilt import create_react_agent

agent = create_react_agent(
    model=llm,
    tools=tools,              # @tool 装饰的异步函数列表
    prompt=SystemMessage(content=system_content),
)

# stream_mode="messages" 逐步输出每条消息(AIMessage / ToolMessage)
async for event in agent.astream(
    {"messages": history_msgs},
    stream_mode="messages",
):
    msg, meta = event

LangGraph 内部自动处理工具调用的多轮循环(ReAct 模式),不需要手写 for i in range(max_turns) 的循环。

6.2 SSE 事件流拆解

前端能实时看到每一步的执行过程:

async for event in agent.astream(..., stream_mode="messages"):
    msg, meta = event

    if isinstance(msg, AIMessage):
        if msg.tool_calls:
            # AI 决定调用工具
            for tc in msg.tool_calls:
                yield {"type": "tool_call", "name": tc["name"], "args": ...}
        elif msg.content:
            # AI 生成文字 token
            yield {"type": "chunk", "content": msg.content}

    elif isinstance(msg, ToolMessage):
        # 工具执行结果
        yield {"type": "tool_result", "name": msg.name, "result": msg.content}

yield {"type": "done"}

前端 SSE 事件流示例(帮用户建群并邀请成员的完整过程):

{"type":"tool_call","name":"create_group","args":"{\"name\":\"Go学习小组\"}"}
{"type":"tool_result","name":"create_group","result":"群聊创建成功,ID=42","success":true}
{"type":"tool_call","name":"invite_member","args":"{\"group_id\":42,\"target_username\":\"alice\"}"}
{"type":"tool_result","name":"invite_member","result":"邀请成功","success":true}
{"type":"chunk","content":"好的,我已经帮你创建了「Go学习小组」"}
{"type":"chunk","content":",并成功邀请了 alice 加入。"}
{"type":"done","content":""}

6.3 工具定义:闭包注入模式

Python 的 @tool + 闭包完美对应 Go 的 buildUserToolExecutor 函数:

def build_user_tools(username: str, group_svc, user_svc) -> list:
    """工厂函数:把 username + services 注入到每个工具的闭包中。"""

    @tool
    async def create_group(name: str, max_count: int = 200) -> str:
        """创建一个新群聊。"""
        group = await group_svc.create_group(name, username, max_count)
        return f"群聊创建成功,ID={group['id']}"

    @tool
    async def get_weather(city: str) -> str:
        """查询城市天气信息。"""
        return await fetch_weather(city)

    # ... 共 13 个工具
    return [create_group, get_weather, ...]

LangChain 1.x 的 @tool 原生支持 async 函数,直接 await 调用业务 service,无需任何 run_sync 包装。工具函数的 docstring 就是传给 LLM 的工具描述。

内置工具清单(13个):

工具名 功能
search_user 按关键词搜索用户
get_my_groups 查询已加入的群聊
search_group 按名称搜索群聊
create_group 创建群聊
join_group 加入群聊
leave_group 退出群聊
invite_member 邀请成员入群
get_contacts 获取联系人列表
add_contact 添加联系人
delete_contact 删除联系人
get_weather 查询城市天气
get_news 获取最新新闻(按分类)
get_route 查询两地路线

6.4 上下文管理:滚动摘要压缩

每个用户独立维护两层存储,避免 token 无限增长:

@dataclass
class UserContext:
    summary: str = ""          # 已压缩的历史摘要
    recent: List[dict] = ...   # 最近未压缩的对话

发给 LLM 的消息列表构建逻辑:

system_content = CHAT_SYSTEM
if ctx.summary:
    system_content += f"\n\n历史对话摘要:{ctx.summary}"  # 摘要并入 system 消息
messages = [SystemMessage(content=system_content)]
messages += [HumanMessage/AIMessage 交替...]              # 近期原始对话
messages.append(HumanMessage(content=question))           # 当前问题

超过阈值时异步压缩(不阻塞当前回复),对应 Go 的 go func()

async def _maybe_compress(self, username, ctx, ctx_type):
    if len(ctx.recent) <= threshold:
        return

    async def _do_compress():
        # 调 LLM 生成新摘要
        new_summary = await llm.ainvoke([...])
        async with self._lock:
            ctx.summary = new_summary
            ctx.recent = ctx.recent[removed_count:]   # 只删已压缩部分
        await self._persist_context(...)

    asyncio.create_task(_do_compress())   # 非阻塞 fire-and-forget

上下文持久化到 ai_context 表,进程重启后首次访问时从 DB 恢复,对话历史不会因重启而丢失。

6.5 主备 AI 切换

try:
    async for chunk in llm.astream(messages):
        yield chunk.content
except Exception:
    backup = self._make_backup_llm(streaming=True)
    if backup:
        async for chunk in backup.astream(messages):
            yield chunk.content
    else:
        raise

config.yaml 分别配置主力(如 DeepSeek)和备用(如 Qwen)模型,主力失败时无缝切换,上下文不丢失。


七、中间件:安全与可观测性

7.1 JWT 鉴权

app/middleware/auth.py 封装 FastAPI 的 Depends 依赖注入:

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db),
) -> str:
    try:
        payload = jwt.decode(
            credentials.credentials,
            cfg.jwt.secret,
            algorithms=["HS256"],
        )
        username = payload.get("username")
        if not username:
            raise HTTPException(status_code=401, detail="unauthorized")
        return username
    except JWTError:
        raise HTTPException(status_code=401, detail="unauthorized")

WebSocket 升级时同样校验 JWT:

@router.websocket("/api/v1/ws")
async def ws_endpoint(websocket: WebSocket, token: str = Query(...)):
    username = verify_ws_token(token)  # 失败则 websocket.close(4001)
    client = Client(websocket, username, hub)
    await client.run(message_svc, group_svc, ai_svc)

7.2 请求日志 + 敏感字段脱敏

LoggingMiddleware 继承 Starlette 的 BaseHTTPMiddleware,在打印 request body 前先脱敏:

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        body_bytes = await request.body()
        body_str = mask_sensitive(body_bytes.decode("utf-8"))

        # 关键:重建 request body stream 让后续 handler 还能读到
        async def receive():
            return {"type": "http.request", "body": body_bytes}
        request._receive = receive

        response = await call_next(request)
        # 按状态码分级:5xx→ERROR,4xx→WARNING,2xx→INFO
        logger.log(level, f"{method} {path} {status} {elapsed:.1f}ms | body={body_str[:200]}")

脱敏正则覆盖 password、secret、api_key、token、old_password、new_password:

_PATTERNS = [
    re.compile(r'("password"\s*:\s*)"[^"]*"'),
    re.compile(r'("api_key"\s*:\s*)"[^"]*"'),
    # ...
]

def mask_sensitive(body: str) -> str:
    for p in _PATTERNS:
        body = p.sub(r'\1"***"', body)
    return body

7.3 密码哈希:直接用 bcrypt,不用 passlib

这里有个容易踩的坑:passlib 不兼容 bcrypt v5+。

# 错误方式(passlib 在 bcrypt v5+ 会报 AttributeError)
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])

# 正确方式:直接用 bcrypt
import bcrypt

def hash_password(plain: str) -> str:
    return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")

def verify_password(plain: str, hashed: str) -> bool:
    return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))

八、群聊的安全设计

群聊部分有几处容易忽略的安全校验:

1. 发消息前验证群成员身份_handle_group 收到消息后先查库验证 username 是否在群里,非成员返回 ACK_FAILED 并拒绝广播:

if not await group_svc.is_member(group_id, self.username):
    await self.send_queue.put(json.dumps(make_ack(msg_id, ACK_FAILED)))
    return

2. 查看群历史需校验成员身份GET /api/v1/message/history?group_id=xxx 先确认当前用户是该群成员,防止任意用户查任意群历史。

3. 查看群成员需校验GET /api/v1/group/members?group_id=xxx 同样验证,防非成员探测群成员列表。

4. from 字段服务端强制写入:WebSocket readPump 处理任何消息时,from 字段全部覆盖为 client.username(已由 JWT 鉴权绑定),客户端无法伪造发送方。


九、SQLAlchemy 2.0 async:连接池与特殊字符

9.1 异步引擎配置

_engine = create_async_engine(
    cfg.database.url,
    pool_size=10,          # 连接池大小
    max_overflow=20,       # 超出 pool_size 后最多再开 20 个连接
    pool_recycle=3600,     # 连接超过 1 小时自动回收(防 MySQL 8h timeout)
    echo=(mode == "debug"),
)

_session_factory = async_sessionmaker(
    bind=_engine,
    class_=AsyncSession,
    expire_on_commit=False,   # 提交后不过期,避免 lazy load 问题
    autoflush=False,
)

9.2 密码含特殊字符的 URL 编码

数据库密码含 @ 时,SQLAlchemy 的 URL 解析器会把 @ 当成 host 分隔符,导致连接到错误地址。解决:

@property
def url(self) -> str:
    from urllib.parse import quote_plus
    pwd = quote_plus(self.password)   # "pass@word" → "pass%40word"
    return f"mysql+aiomysql://{self.username}:{pwd}@{self.host}:{self.port}/{self.dbname}?charset=utf8mb4"

quote_plus 会将 @#% 等所有特殊字符全部转义为 URL 安全格式。


十、管理后台

/api/admin/* 共 14 个端点,复用 service 层,不新建 DB 连接:

功能 端点
管理员登录 POST /api/admin/login
用户列表/创建/编辑/删除/改密 /api/admin/user/*
消息列表/删除/统计 /api/admin/message/*
群组列表/解散/踢人 /api/admin/group/*
在线用户列表 GET /api/admin/monitor/online
系统状态统计 GET /api/admin/monitor/stats

/api/admin/monitor/stats 返回实时系统指标:

import psutil, sys, time

return {
    "online_users": hub.online_count(),
    "total_users": await admin_svc.count_users(),
    "total_messages": await admin_svc.count_messages(),
    "total_groups": await admin_svc.count_groups(),
    "cpu_percent": psutil.cpu_percent(),
    "memory_percent": psutil.virtual_memory().percent,
    "uptime_seconds": int(time.time() - START_TIME),
    "python_version": sys.version,
}

十一、踩坑记录:开发中遇到的真实 Bug

# 文件 问题 根因 / 修复
1 utils/hash.py AttributeError: module 'bcrypt' has no attribute '__about__' passlib 与 bcrypt v5+ 不兼容,改用直接 import bcrypt
2 config.py Can't connect to MySQL server on '163.com@115.190.190.83' 密码含 @ 导致 SQLAlchemy URL 解析错误,加 quote_plus
3 ai/service.py ImportError: cannot import name 'AgentExecutor' LangChain 1.x 已移除 AgentExecutor,改用 create_react_agent
4 ai/service.py SyntaxError: invalid syntax"灵犀" 源文件中含中文弯引号 " ",改为 【灵犀】
5 main.py RuntimeError: Event loop is closed(shutdown) Windows ProactorEventLoop + aiomysql del 竞态,改用 WindowsSelectorEventLoopPolicy + await engine.dispose()
6 database.py ValueError: the greenlet library is required SQLAlchemy async 依赖 greenlet,venv 中未安装,补 pip install greenlet
7 ws/hub.py 旧连接断开后误删新连接的注册 unregistercurrent is client 做身份检查而非仅比对 username
8 ws/hub.py 群聊 @AI 触发阻塞 readPump AI 调用放在 asyncio.create_task 而非直接 await,非阻塞派生
9 services/message_service.py 会话列表混合单群聊后乱序 append 后缺统一排序,加 sort(key=lambda x: x["last_time"], reverse=True)
10 venv PyCharm 环境用 LangChain 0.3.7,缺 langgraph venv 版本与系统 Python 不一致,全量升级到 LangChain 1.x

这些 bug 里最有代表性的两个:

  • #3(AgentExecutor 废弃):LangChain 1.x 是破坏性升级,网上大量教程仍用 0.3.x 的 API,选型时必须明确版本。

  • #5(Windows 事件循环关闭):Python asyncio 在 Windows 上的生命周期与 aiomysql 的析构时序存在竞态,这是 Windows 开发 Python 异步服务时必须处理的问题。


十二、API 兼容性验证

所有 47 个端点与 Go 版本完全一致,Vue3 前端无需改动:

POST   /api/v1/user/register          → 200 {"message":"register success"}
POST   /api/v1/user/login             → 200 {"token":"eyJ..."}
GET    /api/v1/user/me                → 200 {"username":"...","nickname":"..."}
GET    /api/v1/user/search            → 200 {"users":[...]}
POST   /api/v1/group/create           → 200 {"id":42,"name":"..."}
GET    /api/v1/group/mygroups         → 200 {"groups":[...]}
GET    /api/v1/message/conversations  → 200 {"conversations":[...]}
GET    /api/admin/monitor/stats       → 401(未携带 admin token,正确)
WS     /api/v1/ws?token=eyJ...        → 101 Switching Protocols

十三、总结与对比

Go vs Python 核心差异

维度 Go 版本 Python 版本
并发模型 goroutine + channel(M:N 调度) asyncio 协程(单线程事件循环)
AI 工具调用 手写多轮循环(~200 行) create_react_agent 3 行
流式 AI 输出 自封装 HTTP SSE 解析 ChatOpenAI.astream() 原生支持
ORM GORM(同步,自动建表) SQLAlchemy 2.0 async(需 greenlet)
构建/部署 单二进制,Docker 镜像 ~15MB 需要 venv/pip,镜像 ~300MB
性能 更高吞吐(真并发) IO 密集型场景性能接近
AI 生态 需要自封装 LangChain/LangGraph 开箱即用

已实现功能

  • 完整单聊/群聊 WebSocket,含心跳保活、ACK、离线消息

  • per-user 群消息读取位点,断线不丢群消息

  • @AI 指令(私聊/群聊)+ SSE 流式对话 + 上下文滚动摘要 + 持久化

  • LangGraph Agentic 工具调用(多轮、流式、13 个工具)

  • 多 AI 提供商主备切换

  • JWT 鉴权 + 密码脱敏日志 + 令牌桶限流

  • 管理后台(用户/消息/群组管理 + 系统监控)

待完善

  • 消息可靠性:当前 ACK 无客户端重发机制,极端网络下可能丢消息

  • 水平扩展:Hub 是内存状态,多实例部署需 Redis Pub/Sub 跨实例推送

  • 类型安全:Python 的 dict 大量使用,可进一步用 Pydantic 模型约束内部传递


整个项目把 Go 版本的架构设计(Hub/Client 模型、per-user 读取位点、主备 AI 切换)完整移植到了 Python,同时用 LangGraph 大幅简化了 AI Agent 的实现。两个版本放在一起对比,能很直观地看到 Go 和 Python 在并发模型上的哲学差异:Go 用 goroutine 做真并发,Python 用 asyncio 做协作式调度——对 IM 这类 IO 密集型场景,两者在实际吞吐上差距没有想象中那么大,但 Python 在 AI 生态上的优势是决定性的。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐