玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


nanobot的多渠道通信:统一的消息总线

通过消息总线模式解耦渠道与 Agent,nanobot 实现了统一的多渠道通信架构,支持 Telegram、Discord、WhatsApp、飞书等 9+ 平台。

概述

nanobot 的多渠道通信系统采用"消息总线 + Channel 抽象"的设计模式,实现了渠道与 Agent 核心的完全解耦。每个通信渠道(如 Telegram、Discord)只需实现统一的 BaseChannel 接口,就能无缝接入系统。本文将深入剖析多渠道通信系统的设计与实现。

问题背景

多渠道通信的核心挑战

构建一个多渠道 AI Agent 需要解决以下问题:

挑战 描述 nanobot 的解决方案
协议差异 不同平台 API 差异巨大 BaseChannel 统一抽象
消息格式 文本、图片、语音等格式不一 InboundMessage/OutboundMessage 统一
身份映射 用户 ID 体系不同 sender_id + chat_id 组合
异步处理 消息到达时间不确定 asyncio.Queue 异步队列
错误隔离 单渠道故障不应影响其他 独立 Channel 实例

多渠道设计目标

设计目标

统一抽象

完全解耦

易于扩展

BaseChannel接口

统一消息格式

消息总线

独立队列

配置驱动

插件化加载

核心架构

消息总线设计

MessageBus 是整个通信系统的核心,使用异步队列解耦生产者和消费者:

class MessageBus:
    """
    Async message bus that decouples chat channels from the agent core.
    
    Channels push messages to the inbound queue, and the agent processes
    them and pushes responses to the outbound queue.
    """
    
    def __init__(self):
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
        self._outbound_subscribers: dict[str, list[Callable]] = {}
        self._running = False
    
    async def publish_inbound(self, msg: InboundMessage) -> None:
        """Publish a message from a channel to the agent."""
        await self.inbound.put(msg)
    
    async def consume_inbound(self) -> InboundMessage:
        """Consume the next inbound message (blocks until available)."""
        return await self.inbound.get()
    
    async def publish_outbound(self, msg: OutboundMessage) -> None:
        """Publish a response from the agent to channels."""
        await self.outbound.put(msg)
    
    async def consume_outbound(self) -> OutboundMessage:
        """Consume the next outbound message (blocks until available)."""
        return await self.outbound.get()

消息数据结构

@dataclass
class InboundMessage:
    """Message received from a chat channel."""
    
    channel: str          # telegram, discord, slack, whatsapp
    sender_id: str        # User identifier
    chat_id: str          # Chat/channel identifier
    content: str          # Message text
    timestamp: datetime = field(default_factory=datetime.now)
    media: list[str] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)
    
    @property
    def session_key(self) -> str:
        """Unique key for session identification."""
        return f"{self.channel}:{self.chat_id}"


@dataclass
class OutboundMessage:
    """Message to send to a chat channel."""
    
    channel: str
    chat_id: str
    content: str
    reply_to: str | None = None
    media: list[str] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

整体架构图

渠道管理器

Agent 核心

消息总线

InboundMessage

OutboundMessage

通信渠道

Telegram

Discord

WhatsApp

Feishu

Slack

QQ

DingTalk

Email

Mochat

Inbound Queue

Outbound Queue

Agent Loop

消息分发

Channel 抽象基类

BaseChannel 接口定义

class BaseChannel(ABC):
    """
    Abstract base class for chat channel implementations.
    
    Each channel (Telegram, Discord, etc.) should implement this interface
    to integrate with the nanobot message bus.
    """
    
    name: str = "base"
    
    def __init__(self, config: Any, bus: MessageBus):
        self.config = config
        self.bus = bus
        self._running = False
    
    @abstractmethod
    async def start(self) -> None:
        """
        Start the channel and begin listening for messages.
        
        This should be a long-running async task that:
        1. Connects to the chat platform
        2. Listens for incoming messages
        3. Forwards messages to the bus via _handle_message()
        """
        pass
    
    @abstractmethod
    async def stop(self) -> None:
        """Stop the channel and clean up resources."""
        pass
    
    @abstractmethod
    async def send(self, msg: OutboundMessage) -> None:
        """Send a message through this channel."""
        pass

权限控制

def is_allowed(self, sender_id: str) -> bool:
    """Check if a sender is allowed to use this bot."""
    allow_list = getattr(self.config, "allow_from", [])
    
    # If no allow list, allow everyone
    if not allow_list:
        return True
    
    sender_str = str(sender_id)
    if sender_str in allow_list:
        return True
    # Support composite ID like "123|username"
    if "|" in sender_str:
        for part in sender_str.split("|"):
            if part and part in allow_list:
                return True
    return False

消息处理辅助方法

async def _handle_message(
    self,
    sender_id: str,
    chat_id: str,
    content: str,
    media: list[str] | None = None,
    metadata: dict[str, Any] | None = None
) -> None:
    """Handle an incoming message from the chat platform."""
    if not self.is_allowed(sender_id):
        logger.warning(f"Access denied for sender {sender_id} on channel {self.name}")
        return
    
    msg = InboundMessage(
        channel=self.name,
        sender_id=str(sender_id),
        chat_id=str(chat_id),
        content=content,
        media=media or [],
        metadata=metadata or {}
    )
    
    await self.bus.publish_inbound(msg)

ChannelManager 实现

渠道初始化

class ChannelManager:
    """Manages chat channels and coordinates message routing."""
    
    def __init__(self, config: Config, bus: MessageBus):
        self.config = config
        self.bus = bus
        self.channels: dict[str, BaseChannel] = {}
        self._init_channels()
    
    def _init_channels(self) -> None:
        """Initialize channels based on config."""
        
        # Telegram channel
        if self.config.channels.telegram.enabled:
            try:
                from nanobot.channels.telegram import TelegramChannel
                self.channels["telegram"] = TelegramChannel(
                    self.config.channels.telegram, self.bus
                )
                logger.info("Telegram channel enabled")
            except ImportError as e:
                logger.warning(f"Telegram channel not available: {e}")
        
        # Discord channel
        if self.config.channels.discord.enabled:
            try:
                from nanobot.channels.discord import DiscordChannel
                self.channels["discord"] = DiscordChannel(
                    self.config.channels.discord, self.bus
                )
            except ImportError as e:
                logger.warning(f"Discord channel not available: {e}")
        
        # ... 其他渠道

启动与停止

async def start_all(self) -> None:
    """Start all channels and the outbound dispatcher."""
    if not self.channels:
        logger.warning("No channels enabled")
        return
    
    # Start outbound dispatcher
    self._dispatch_task = asyncio.create_task(self._dispatch_outbound())
    
    # Start channels
    tasks = []
    for name, channel in self.channels.items():
        logger.info(f"Starting {name} channel...")
        tasks.append(asyncio.create_task(self._start_channel(name, channel)))
    
    await asyncio.gather(*tasks, return_exceptions=True)

async def stop_all(self) -> None:
    """Stop all channels and the dispatcher."""
    logger.info("Stopping all channels...")
    
    if self._dispatch_task:
        self._dispatch_task.cancel()
    
    for name, channel in self.channels.items():
        try:
            await channel.stop()
            logger.info(f"Stopped {name} channel")
        except Exception as e:
            logger.error(f"Error stopping {name}: {e}")

消息分发

async def _dispatch_outbound(self) -> None:
    """Dispatch outbound messages to the appropriate channel."""
    logger.info("Outbound dispatcher started")
    
    while True:
        try:
            msg = await asyncio.wait_for(
                self.bus.consume_outbound(),
                timeout=1.0
            )
            
            channel = self.channels.get(msg.channel)
            if channel:
                try:
                    await channel.send(msg)
                except Exception as e:
                    logger.error(f"Error sending to {msg.channel}: {e}")
            else:
                logger.warning(f"Unknown channel: {msg.channel}")
                
        except asyncio.TimeoutError:
            continue
        except asyncio.CancelledError:
            break

Telegram 渠道实现

初始化与启动

class TelegramChannel(BaseChannel):
    """Telegram channel using long polling."""
    
    name = "telegram"
    
    BOT_COMMANDS = [
        BotCommand("start", "Start the bot"),
        BotCommand("reset", "Reset conversation history"),
        BotCommand("help", "Show available commands"),
    ]
    
    async def start(self) -> None:
        """Start the Telegram bot with long polling."""
        if not self.config.token:
            logger.error("Telegram bot token not configured")
            return
        
        self._running = True
        
        # Build the application
        self._app = Application.builder().token(self.config.token).build()
        
        # Add handlers
        self._app.add_handler(CommandHandler("start", self._on_start))
        self._app.add_handler(CommandHandler("reset", self._on_reset))
        self._app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._on_message))
        
        # Start polling
        await self._app.initialize()
        await self._app.start()
        await self._app.updater.start_polling()
        
        # Keep running
        while self._running:
            await asyncio.sleep(1)

消息处理

async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle incoming messages."""
    if not update.message or not update.effective_user:
        return
    
    message = update.message
    user = update.effective_user
    chat_id = message.chat_id
    
    # Build sender_id (support both numeric ID and username)
    sender_id = str(user.id)
    if user.username:
        sender_id = f"{sender_id}|{user.username}"
    
    # Start typing indicator
    self._start_typing(str(chat_id))
    
    # Forward to message bus
    await self._handle_message(
        sender_id=sender_id,
        chat_id=str(chat_id),
        content=message.text or "",
        metadata={
            "message_id": message.message_id,
            "user_id": user.id,
            "username": user.username,
        }
    )

发送消息

async def send(self, msg: OutboundMessage) -> None:
    """Send a message through Telegram."""
    if not self._app:
        logger.warning("Telegram bot not running")
        return
    
    # Stop typing indicator
    self._stop_typing(msg.chat_id)
    
    try:
        chat_id = int(msg.chat_id)
        # Convert markdown to Telegram HTML
        html_content = _markdown_to_telegram_html(msg.content)
        await self._app.bot.send_message(
            chat_id=chat_id,
            text=html_content,
            parse_mode="HTML"
        )
    except Exception as e:
        logger.error(f"Error sending Telegram message: {e}")

Markdown 转换

def _markdown_to_telegram_html(text: str) -> str:
    """Convert markdown to Telegram-safe HTML."""
    if not text:
        return ""
    
    # Extract and protect code blocks
    code_blocks: list[str] = []
    def save_code_block(m: re.Match) -> str:
        code_blocks.append(m.group(1))
        return f"\x00CB{len(code_blocks) - 1}\x00"
    
    text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
    
    # Escape HTML special characters
    text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
    
    # Convert markdown to HTML
    text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)  # Bold
    text = re.sub(r'`([^`]+)`', r'<code>\1</code>', text)  # Inline code
    
    # Restore code blocks
    for i, code in enumerate(code_blocks):
        escaped = code.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
        text = text.replace(f"\x00CB{i}\x00", f"<pre><code>{escaped}</code></pre>")
    
    return text

支持的渠道

渠道 连接方式 特点
Telegram Long Polling 简单可靠,无需公网 IP
Discord Gateway WebSocket 实时性好,支持服务器
WhatsApp Socket.IO 需扫码登录
Feishu WebSocket 长连接 无需公网 IP
Slack Socket Mode 无需公网 URL
QQ botpy SDK WebSocket 仅支持私聊
DingTalk Stream Mode 无需公网 IP
Email IMAP/SMTP 邮件轮询
Mochat Socket.IO Claw IM 集成

消息流转流程

AgentLoop ChannelManager MessageBus TelegramChannel User AgentLoop ChannelManager MessageBus TelegramChannel User Send message Check permission publish_inbound(InboundMessage) consume_inbound() Process message publish_outbound(OutboundMessage) consume_outbound() channel.send(msg) Send response

配置示例

{
  "channels": {
    "telegram": {
      "enabled": true,
      "token": "YOUR_BOT_TOKEN",
      "allowFrom": ["YOUR_USER_ID"]
    },
    "discord": {
      "enabled": true,
      "token": "YOUR_BOT_TOKEN",
      "allowFrom": ["YOUR_USER_ID"]
    },
    "feishu": {
      "enabled": true,
      "appId": "cli_xxx",
      "appSecret": "xxx"
    },
    "slack": {
      "enabled": true,
      "botToken": "xoxb-...",
      "appToken": "xapp-..."
    }
  }
}

扩展新渠道

实现步骤

  1. 创建配置类:在 config/schema.py 中添加配置
class MyChannelConfig(BaseModel):
    enabled: bool = False
    api_key: str = ""
    allow_from: list[str] = []
  1. 实现 Channel 类:继承 BaseChannel
class MyChannel(BaseChannel):
    name = "my_channel"
    
    async def start(self) -> None:
        # 连接到平台
        pass
    
    async def stop(self) -> None:
        # 清理资源
        pass
    
    async def send(self, msg: OutboundMessage) -> None:
        # 发送消息
        pass
  1. 注册到 ChannelManager
if self.config.channels.my_channel.enabled:
    self.channels["my_channel"] = MyChannel(
        self.config.channels.my_channel, self.bus
    )

总结

nanobot 的多渠道通信系统通过消息总线模式实现了完全解耦:

设计原则 实现方式
统一抽象 BaseChannel 定义 start、stop、send 接口
消息解耦 MessageBus 使用异步队列解耦生产者和消费者
配置驱动 通过配置文件启用/禁用渠道
权限控制 allowFrom 白名单机制
错误隔离 每个渠道独立运行,互不影响

这种设计使得添加新渠道变得非常简单,只需实现 BaseChannel 接口即可无缝接入系统。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐