封装一个与 AIGC 相关的 Socket 包(通常基于 WebSocket,因 AIGC 场景多需要长连接 + 流式响应),需结合 AIGC 的业务特性(如流式生成、上下文对话、多模态消息)和 Socket 的技术特性(长连接管理、重连、协议解析),设计出高可用、易扩展、低耦合的方案。以下从设计思路、核心模块、注意事项三个维度展开。

一、先明确 AIGC Socket 的核心业务特性

AIGC 场景的 Socket 通信和普通 Socket 有显著差异,需优先考虑:

  • 流式响应:AIGC 生成(如文本、图像)多为流式返回(如 ChatGPT 的 “打字效果”),需逐块处理消息。
  • 上下文依赖:对话场景需要维护会话 ID、历史消息等上下文,重连后需恢复会话。
  • 多消息类型:可能包含文本(text)、图像(image)、系统通知(system)、错误(error)、中断(abort)等类型。
  • 协议多样性:不同 AIGC 服务可能用自定义协议(如 Dify 的data: 前缀流、OpenAI 的 SSE 协议、二进制图像流)。
  • 高稳定性要求:生成过程可能持续数十秒,需处理断网、超时、服务端主动断开等问题。

二、优秀方案的设计架构(分层设计)

采用 “分层解耦” 思想,从底层到上层依次为:核心连接层 → 协议解析层 → 业务服务层 → API 层,每层职责单一,便于维护和扩展。

1. 核心连接层(Core Client)

职责:处理底层 Socket 连接(建立、断开、重连、心跳),与具体 AIGC 业务无关。核心能力

  • 支持 WebSocket(主流)和 SSE(备用,部分 AIGC 服务用 SSE),可通过配置切换。
  • 连接状态管理:CONNECTING/CONNECTED/DISCONNECTED/RECONNECTING 状态机。
  • 自动重连:基于 “指数退避算法”(首次 1s,二次 2s,最大 10s),避免频繁重试。
  • 心跳机制:定期发送ping帧(如每 30s),检测连接活性;超时未收到pong则触发重连。
  • 底层错误捕获:网络错误、连接超时、握手失败等,统一转化为可识别的错误码。

伪代码示例

class CoreSocketClient {
  private socket: WebSocket | null = null;
  private status: 'CONNECTING' | 'CONNECTED' | 'DISCONNECTED' = 'DISCONNECTED';
  private reconnectDelay = 1000; // 初始重连延迟
  private heartbeatTimer: NodeJS.Timeout | null = null;

  // 建立连接
  connect(config: { url: string; headers?: Record<string, string> }) {
    this.status = 'CONNECTING';
    this.socket = new WebSocket(config.url, { headers: config.headers });
    this.socket.onopen = () => this.handleOpen();
    this.socket.onmessage = (e) => this.emit('rawMessage', e.data); // 抛给上层解析
    this.socket.onerror = (err) => this.handleError(err);
    this.socket.onclose = () => this.handleClose();
  }

  // 处理连接成功:启动心跳
  private handleOpen() {
    this.status = 'CONNECTED';
    this.reconnectDelay = 1000; // 重置重连延迟
    this.startHeartbeat();
    this.emit('connected');
  }

  // 心跳逻辑
  private startHeartbeat() {
    this.heartbeatTimer = setInterval(() => {
      if (this.socket?.readyState === WebSocket.OPEN) {
        this.socket.send(JSON.stringify({ type: 'ping' }));
      }
    }, 30000);
  }

  // 处理断开:自动重连
  private handleClose() {
    this.status = 'DISCONNECTED';
    clearInterval(this.heartbeatTimer!);
    this.emit('disconnected');
    // 非主动断开则重连
    if (!this.isManualClose) {
      setTimeout(() => this.connect(this.config), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000); // 指数退避
    }
  }

  // 发送原始数据
  send(data: string | ArrayBuffer) {
    if (this.status !== 'CONNECTED') throw new Error('未连接');
    this.socket?.send(data);
  }
}
2. 协议解析层(Protocol Parser)

职责:将底层的原始消息(字符串 / 二进制)解析为 AIGC 业务可识别的结构化数据,屏蔽不同服务的协议差异。核心能力

  • 多协议适配:支持主流 AIGC 协议(如 Dify 流、OpenAI SSE、自定义 JSON),可通过配置指定解析器。
    • 例:Dify 流消息以data: 为前缀,需剥离前缀后解析 JSON:data: {"type":"text","content":"hello"} → 解析为{ type: "text", content: "hello" }
    • 例:二进制图像流(如 base64 或 Blob),需转换为可直接使用的图像数据(如{ type: "image", url: "data:image/png;base64,..." })。
  • 流式消息拼接:AIGC 文本生成可能分多块返回(如 “你好” 分 “你”“好” 两次发送),需按会话 ID 拼接完整内容。
  • 错误消息识别:从原始消息中提取错误信息(如模型超时、token 不足),转化为标准化错误对象({ code: 'MODEL_TIMEOUT', message: '模型生成超时' })。

伪代码示例

class AIGCProtocolParser {
  private parsers = {
    dify: (rawData: string) => this.parseDify(rawData),
    openai: (rawData: string) => this.parseOpenAI(rawData),
    json: (rawData: string) => JSON.parse(rawData),
  };

  // 解析入口:根据配置的协议类型选择解析器
  parse(rawData: string | Blob, protocol: 'dify' | 'openai' | 'json') {
    if (typeof rawData === 'string') {
      return this.parsers[protocol](rawData);
    } else {
      // 处理二进制(如图像)
      return this.parseBinary(rawData);
    }
  }

  // 解析Dify流式消息
  private parseDify(rawData: string) {
    const data = rawData.replace(/^data: /, ''); // 剥离前缀
    if (data === '[DONE]') { // Dify流结束标识
      return { type: 'stream_end' };
    }
    return JSON.parse(data);
  }

  // 解析二进制(如图像)
  private parseBinary(blob: Blob) {
    return new Promise<{ type: 'image'; url: string }>((resolve) => {
      const reader = new FileReader();
      reader.onload = () => resolve({ type: 'image', url: reader.result as string });
      reader.readAsDataURL(blob);
    });
  }
}
3. 业务服务层(AIGC Service)

职责:封装 AIGC 核心业务逻辑(会话管理、消息分发、上下文维护),是连接层与 API 层的桥梁。核心能力

  • 会话管理:维护sessionId与上下文(历史消息、模型参数)的映射,支持多会话并行(如同时和多个 AI 对话)。
  • 消息类型分发:将解析后的消息按类型(text/image/system/error)分发到对应的处理器(如文本消息更新 UI,图像消息渲染图片)。
  • 发送前校验:发送消息前检查连接状态、会话有效性、参数合法性(如模型是否支持图像输入)。
  • 上下文控制:支持设置上下文长度上限(避免 token 超限),自动截断历史消息。
  • 中断生成:提供abort(sessionId)方法,向服务端发送中断指令,停止当前生成流程。

伪代码示例

class AIGCService {
  private client: CoreSocketClient;
  private parser: AIGCProtocolParser;
  private sessions: Map<string, { context: Message[]; model: string }> = new Map(); // 会话上下文

  constructor(client: CoreSocketClient, parser: AIGCProtocolParser) {
    this.client = client;
    this.parser = parser;
    // 监听底层原始消息,解析后分发
    this.client.on('rawMessage', async (rawData) => {
      const message = await this.parser.parse(rawData, this.config.protocol);
      this.dispatchMessage(message);
    });
  }

  // 新建会话
  createSession(model: string) {
    const sessionId = uuidv4();
    this.sessions.set(sessionId, { context: [], model });
    return sessionId;
  }

  // 发送消息(带上下文)
  async sendMessage(sessionId: string, content: string, type: 'text' | 'image') {
    const session = this.sessions.get(sessionId);
    if (!session) throw new Error('会话不存在');
    if (this.client.status !== 'CONNECTED') throw new Error('连接未就绪');

    // 构建消息(包含上下文和模型参数)
    const message = {
      sessionId,
      type: 'user_message',
      content,
      model: session.model,
      context: session.context.slice(-5), // 只带最近5条上下文
    };

    // 发送并记录到上下文
    this.client.send(JSON.stringify(message));
    session.context.push({ role: 'user', content, type });
  }

  // 分发消息到对应处理器
  private dispatchMessage(message: ParsedMessage) {
    switch (message.type) {
      case 'text':
        this.handleTextMessage(message); // 更新会话上下文+触发事件
        break;
      case 'image':
        this.handleImageMessage(message);
        break;
      case 'error':
        this.handleError(message);
        break;
      // ...其他类型
    }
  }

  // 中断生成
  abort(sessionId: string) {
    this.client.send(JSON.stringify({ type: 'abort', sessionId }));
  }
}
4. API 层(Public API)

职责:向上层(如 UI 组件、业务逻辑)提供简洁、易用的接口,隐藏底层复杂度。核心设计原则

  • 接口语义化:用贴近业务的命名(init/createSession/sendMessage/onMessage)。
  • 事件驱动:通过发布订阅模式暴露事件(message/stream/error/reconnect),避免回调嵌套。
  • 类型安全:用 TypeScript 定义消息结构、事件类型,提供完整类型提示。

API 示例

// 对外暴露的入口类
export class AIGCSocket {
  private service: AIGCService;

  constructor(config: AIGCSocketConfig) {
    const client = new CoreSocketClient(config);
    const parser = new AIGCProtocolParser();
    this.service = new AIGCService(client, parser);
  }

  // 初始化连接
  init() {
    this.service.connect();
  }

  // 创建会话
  createSession(model: string) {
    return this.service.createSession(model);
  }

  // 发送消息
  sendMessage(sessionId: string, content: string, type: 'text' | 'image') {
    return this.service.sendMessage(sessionId, content, type);
  }

  // 订阅事件(如接收消息、错误)
  on(event: 'message', callback: (msg: TextMessage) => void);
  on(event: 'image', callback: (msg: ImageMessage) => void);
  on(event: 'error', callback: (err: AIGCError) => void);
  on(event: string, callback: (...args: any[]) => void) {
    this.service.on(event, callback);
  }

  // 中断生成
  abort(sessionId: string) {
    this.service.abort(sessionId);
  }

  // 销毁连接
  destroy() {
    this.service.disconnect();
  }
}

三、关键注意事项(避坑指南)

  1. 流式处理的性能优化

    • AIGC 文本流可能高频触发(每秒多次),需限制 UI 更新频率(如用requestAnimationFrame节流),避免卡顿。
    • 大文件(如图像二进制流)传输时,优先用分片 + 进度反馈,避免内存暴涨。
  2. 上下文一致性保障

    • 重连后需自动恢复会话:通过localStorage缓存sessionId和上下文,重连成功后发送resume(sessionId)指令。
    • 处理 “消息乱序”:AIGC 服务可能因负载乱序返回,需在消息中携带sequenceId,接收后按序号排序。
  3. 错误处理的颗粒度

    • 区分错误类型:连接错误(网络问题)、业务错误(模型超时)、权限错误(token 失效),并提供针对性重试策略(如权限错误提示重新登录,网络错误自动重连)。
    • 错误透传:避免底层错误被吞噬,确保上层能捕获并展示给用户(如 “模型加载失败,请稍后再试”)。
  4. 可扩展性设计

    • 支持多模型切换:通过配置model参数(如gpt-4/claude),底层无需修改,只需在sendMessage中携带模型信息。
    • 插件化处理器:消息类型处理器(文本 / 图像 / 错误)设计为插件,可动态注册(如registerProcessor('video', videoProcessor)支持视频生成)。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐