nanobot的多渠道通信:统一的消息总线
nanobot 的多渠道通信系统采用"消息总线 + Channel 抽象"的设计模式,实现了渠道与 Agent 核心的完全解耦。每个通信渠道(如 Telegram、Discord)只需实现统一的 BaseChannel 接口,就能无缝接入系统。本文将深入剖析多渠道通信系统的设计与实现。
·
关于作者
- 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
- 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
- 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案
「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!
nanobot的多渠道通信:统一的消息总线
通过消息总线模式解耦渠道与 Agent,nanobot 实现了统一的多渠道通信架构,支持 Telegram、Discord、WhatsApp、飞书等 9+ 平台。
概述
nanobot 的多渠道通信系统采用"消息总线 + Channel 抽象"的设计模式,实现了渠道与 Agent 核心的完全解耦。每个通信渠道(如 Telegram、Discord)只需实现统一的 BaseChannel 接口,就能无缝接入系统。本文将深入剖析多渠道通信系统的设计与实现。
问题背景
多渠道通信的核心挑战
构建一个多渠道 AI Agent 需要解决以下问题:
| 挑战 | 描述 | nanobot 的解决方案 |
|---|---|---|
| 协议差异 | 不同平台 API 差异巨大 | BaseChannel 统一抽象 |
| 消息格式 | 文本、图片、语音等格式不一 | InboundMessage/OutboundMessage 统一 |
| 身份映射 | 用户 ID 体系不同 | sender_id + chat_id 组合 |
| 异步处理 | 消息到达时间不确定 | asyncio.Queue 异步队列 |
| 错误隔离 | 单渠道故障不应影响其他 | 独立 Channel 实例 |
多渠道设计目标
核心架构
消息总线设计
MessageBus 是整个通信系统的核心,使用异步队列解耦生产者和消费者:
class MessageBus:
"""
Async message bus that decouples chat channels from the agent core.
Channels push messages to the inbound queue, and the agent processes
them and pushes responses to the outbound queue.
"""
def __init__(self):
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
self._outbound_subscribers: dict[str, list[Callable]] = {}
self._running = False
async def publish_inbound(self, msg: InboundMessage) -> None:
"""Publish a message from a channel to the agent."""
await self.inbound.put(msg)
async def consume_inbound(self) -> InboundMessage:
"""Consume the next inbound message (blocks until available)."""
return await self.inbound.get()
async def publish_outbound(self, msg: OutboundMessage) -> None:
"""Publish a response from the agent to channels."""
await self.outbound.put(msg)
async def consume_outbound(self) -> OutboundMessage:
"""Consume the next outbound message (blocks until available)."""
return await self.outbound.get()
消息数据结构
@dataclass
class InboundMessage:
"""Message received from a chat channel."""
channel: str # telegram, discord, slack, whatsapp
sender_id: str # User identifier
chat_id: str # Chat/channel identifier
content: str # Message text
timestamp: datetime = field(default_factory=datetime.now)
media: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@property
def session_key(self) -> str:
"""Unique key for session identification."""
return f"{self.channel}:{self.chat_id}"
@dataclass
class OutboundMessage:
"""Message to send to a chat channel."""
channel: str
chat_id: str
content: str
reply_to: str | None = None
media: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
整体架构图
Channel 抽象基类
BaseChannel 接口定义
class BaseChannel(ABC):
"""
Abstract base class for chat channel implementations.
Each channel (Telegram, Discord, etc.) should implement this interface
to integrate with the nanobot message bus.
"""
name: str = "base"
def __init__(self, config: Any, bus: MessageBus):
self.config = config
self.bus = bus
self._running = False
@abstractmethod
async def start(self) -> None:
"""
Start the channel and begin listening for messages.
This should be a long-running async task that:
1. Connects to the chat platform
2. Listens for incoming messages
3. Forwards messages to the bus via _handle_message()
"""
pass
@abstractmethod
async def stop(self) -> None:
"""Stop the channel and clean up resources."""
pass
@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through this channel."""
pass
权限控制
def is_allowed(self, sender_id: str) -> bool:
"""Check if a sender is allowed to use this bot."""
allow_list = getattr(self.config, "allow_from", [])
# If no allow list, allow everyone
if not allow_list:
return True
sender_str = str(sender_id)
if sender_str in allow_list:
return True
# Support composite ID like "123|username"
if "|" in sender_str:
for part in sender_str.split("|"):
if part and part in allow_list:
return True
return False
消息处理辅助方法
async def _handle_message(
self,
sender_id: str,
chat_id: str,
content: str,
media: list[str] | None = None,
metadata: dict[str, Any] | None = None
) -> None:
"""Handle an incoming message from the chat platform."""
if not self.is_allowed(sender_id):
logger.warning(f"Access denied for sender {sender_id} on channel {self.name}")
return
msg = InboundMessage(
channel=self.name,
sender_id=str(sender_id),
chat_id=str(chat_id),
content=content,
media=media or [],
metadata=metadata or {}
)
await self.bus.publish_inbound(msg)
ChannelManager 实现
渠道初始化
class ChannelManager:
"""Manages chat channels and coordinates message routing."""
def __init__(self, config: Config, bus: MessageBus):
self.config = config
self.bus = bus
self.channels: dict[str, BaseChannel] = {}
self._init_channels()
def _init_channels(self) -> None:
"""Initialize channels based on config."""
# Telegram channel
if self.config.channels.telegram.enabled:
try:
from nanobot.channels.telegram import TelegramChannel
self.channels["telegram"] = TelegramChannel(
self.config.channels.telegram, self.bus
)
logger.info("Telegram channel enabled")
except ImportError as e:
logger.warning(f"Telegram channel not available: {e}")
# Discord channel
if self.config.channels.discord.enabled:
try:
from nanobot.channels.discord import DiscordChannel
self.channels["discord"] = DiscordChannel(
self.config.channels.discord, self.bus
)
except ImportError as e:
logger.warning(f"Discord channel not available: {e}")
# ... 其他渠道
启动与停止
async def start_all(self) -> None:
"""Start all channels and the outbound dispatcher."""
if not self.channels:
logger.warning("No channels enabled")
return
# Start outbound dispatcher
self._dispatch_task = asyncio.create_task(self._dispatch_outbound())
# Start channels
tasks = []
for name, channel in self.channels.items():
logger.info(f"Starting {name} channel...")
tasks.append(asyncio.create_task(self._start_channel(name, channel)))
await asyncio.gather(*tasks, return_exceptions=True)
async def stop_all(self) -> None:
"""Stop all channels and the dispatcher."""
logger.info("Stopping all channels...")
if self._dispatch_task:
self._dispatch_task.cancel()
for name, channel in self.channels.items():
try:
await channel.stop()
logger.info(f"Stopped {name} channel")
except Exception as e:
logger.error(f"Error stopping {name}: {e}")
消息分发
async def _dispatch_outbound(self) -> None:
"""Dispatch outbound messages to the appropriate channel."""
logger.info("Outbound dispatcher started")
while True:
try:
msg = await asyncio.wait_for(
self.bus.consume_outbound(),
timeout=1.0
)
channel = self.channels.get(msg.channel)
if channel:
try:
await channel.send(msg)
except Exception as e:
logger.error(f"Error sending to {msg.channel}: {e}")
else:
logger.warning(f"Unknown channel: {msg.channel}")
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
Telegram 渠道实现
初始化与启动
class TelegramChannel(BaseChannel):
"""Telegram channel using long polling."""
name = "telegram"
BOT_COMMANDS = [
BotCommand("start", "Start the bot"),
BotCommand("reset", "Reset conversation history"),
BotCommand("help", "Show available commands"),
]
async def start(self) -> None:
"""Start the Telegram bot with long polling."""
if not self.config.token:
logger.error("Telegram bot token not configured")
return
self._running = True
# Build the application
self._app = Application.builder().token(self.config.token).build()
# Add handlers
self._app.add_handler(CommandHandler("start", self._on_start))
self._app.add_handler(CommandHandler("reset", self._on_reset))
self._app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._on_message))
# Start polling
await self._app.initialize()
await self._app.start()
await self._app.updater.start_polling()
# Keep running
while self._running:
await asyncio.sleep(1)
消息处理
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming messages."""
if not update.message or not update.effective_user:
return
message = update.message
user = update.effective_user
chat_id = message.chat_id
# Build sender_id (support both numeric ID and username)
sender_id = str(user.id)
if user.username:
sender_id = f"{sender_id}|{user.username}"
# Start typing indicator
self._start_typing(str(chat_id))
# Forward to message bus
await self._handle_message(
sender_id=sender_id,
chat_id=str(chat_id),
content=message.text or "",
metadata={
"message_id": message.message_id,
"user_id": user.id,
"username": user.username,
}
)
发送消息
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Telegram."""
if not self._app:
logger.warning("Telegram bot not running")
return
# Stop typing indicator
self._stop_typing(msg.chat_id)
try:
chat_id = int(msg.chat_id)
# Convert markdown to Telegram HTML
html_content = _markdown_to_telegram_html(msg.content)
await self._app.bot.send_message(
chat_id=chat_id,
text=html_content,
parse_mode="HTML"
)
except Exception as e:
logger.error(f"Error sending Telegram message: {e}")
Markdown 转换
def _markdown_to_telegram_html(text: str) -> str:
"""Convert markdown to Telegram-safe HTML."""
if not text:
return ""
# Extract and protect code blocks
code_blocks: list[str] = []
def save_code_block(m: re.Match) -> str:
code_blocks.append(m.group(1))
return f"\x00CB{len(code_blocks) - 1}\x00"
text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
# Escape HTML special characters
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
# Convert markdown to HTML
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text) # Bold
text = re.sub(r'`([^`]+)`', r'<code>\1</code>', text) # Inline code
# Restore code blocks
for i, code in enumerate(code_blocks):
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
text = text.replace(f"\x00CB{i}\x00", f"<pre><code>{escaped}</code></pre>")
return text
支持的渠道
| 渠道 | 连接方式 | 特点 |
|---|---|---|
| Telegram | Long Polling | 简单可靠,无需公网 IP |
| Discord | Gateway WebSocket | 实时性好,支持服务器 |
| Socket.IO | 需扫码登录 | |
| Feishu | WebSocket 长连接 | 无需公网 IP |
| Slack | Socket Mode | 无需公网 URL |
| botpy SDK WebSocket | 仅支持私聊 | |
| DingTalk | Stream Mode | 无需公网 IP |
| IMAP/SMTP | 邮件轮询 | |
| Mochat | Socket.IO | Claw IM 集成 |
消息流转流程
配置示例
{
"channels": {
"telegram": {
"enabled": true,
"token": "YOUR_BOT_TOKEN",
"allowFrom": ["YOUR_USER_ID"]
},
"discord": {
"enabled": true,
"token": "YOUR_BOT_TOKEN",
"allowFrom": ["YOUR_USER_ID"]
},
"feishu": {
"enabled": true,
"appId": "cli_xxx",
"appSecret": "xxx"
},
"slack": {
"enabled": true,
"botToken": "xoxb-...",
"appToken": "xapp-..."
}
}
}
扩展新渠道
实现步骤
- 创建配置类:在
config/schema.py中添加配置
class MyChannelConfig(BaseModel):
enabled: bool = False
api_key: str = ""
allow_from: list[str] = []
- 实现 Channel 类:继承
BaseChannel
class MyChannel(BaseChannel):
name = "my_channel"
async def start(self) -> None:
# 连接到平台
pass
async def stop(self) -> None:
# 清理资源
pass
async def send(self, msg: OutboundMessage) -> None:
# 发送消息
pass
- 注册到 ChannelManager
if self.config.channels.my_channel.enabled:
self.channels["my_channel"] = MyChannel(
self.config.channels.my_channel, self.bus
)
总结
nanobot 的多渠道通信系统通过消息总线模式实现了完全解耦:
| 设计原则 | 实现方式 |
|---|---|
| 统一抽象 | BaseChannel 定义 start、stop、send 接口 |
| 消息解耦 | MessageBus 使用异步队列解耦生产者和消费者 |
| 配置驱动 | 通过配置文件启用/禁用渠道 |
| 权限控制 | allowFrom 白名单机制 |
| 错误隔离 | 每个渠道独立运行,互不影响 |
这种设计使得添加新渠道变得非常简单,只需实现 BaseChannel 接口即可无缝接入系统。
更多推荐




所有评论(0)