封装一个 和AIGC 相关的 socket包
本文提出了一种面向AIGC场景的高可用Socket封装方案,采用分层架构设计:1)核心连接层处理底层Socket连接与重连机制;2)协议解析层适配多种AIGC协议格式;3)业务服务层管理会话上下文和消息分发;4)API层提供简洁的开发者接口。方案重点解决了流式响应处理、多模态消息支持、会话状态保持等AIGC特有需求,通过自动重连、心跳检测、错误分类等机制保障稳定性,并采用插件化设计支持扩展。该架构
封装一个与 AIGC 相关的 Socket 包(通常基于 WebSocket,因 AIGC 场景多需要长连接 + 流式响应),需结合 AIGC 的业务特性(如流式生成、上下文对话、多模态消息)和 Socket 的技术特性(长连接管理、重连、协议解析),设计出高可用、易扩展、低耦合的方案。以下从设计思路、核心模块、注意事项三个维度展开。
一、先明确 AIGC Socket 的核心业务特性
AIGC 场景的 Socket 通信和普通 Socket 有显著差异,需优先考虑:
- 流式响应:AIGC 生成(如文本、图像)多为流式返回(如 ChatGPT 的 “打字效果”),需逐块处理消息。
- 上下文依赖:对话场景需要维护会话 ID、历史消息等上下文,重连后需恢复会话。
- 多消息类型:可能包含文本(text)、图像(image)、系统通知(system)、错误(error)、中断(abort)等类型。
- 协议多样性:不同 AIGC 服务可能用自定义协议(如 Dify 的
data:前缀流、OpenAI 的 SSE 协议、二进制图像流)。 - 高稳定性要求:生成过程可能持续数十秒,需处理断网、超时、服务端主动断开等问题。
二、优秀方案的设计架构(分层设计)
采用 “分层解耦” 思想,从底层到上层依次为:核心连接层 → 协议解析层 → 业务服务层 → API 层,每层职责单一,便于维护和扩展。
1. 核心连接层(Core Client)
职责:处理底层 Socket 连接(建立、断开、重连、心跳),与具体 AIGC 业务无关。核心能力:
- 支持 WebSocket(主流)和 SSE(备用,部分 AIGC 服务用 SSE),可通过配置切换。
- 连接状态管理:
CONNECTING/CONNECTED/DISCONNECTED/RECONNECTING状态机。 - 自动重连:基于 “指数退避算法”(首次 1s,二次 2s,最大 10s),避免频繁重试。
- 心跳机制:定期发送
ping帧(如每 30s),检测连接活性;超时未收到pong则触发重连。 - 底层错误捕获:网络错误、连接超时、握手失败等,统一转化为可识别的错误码。
伪代码示例:
class CoreSocketClient {
private socket: WebSocket | null = null;
private status: 'CONNECTING' | 'CONNECTED' | 'DISCONNECTED' = 'DISCONNECTED';
private reconnectDelay = 1000; // 初始重连延迟
private heartbeatTimer: NodeJS.Timeout | null = null;
// 建立连接
connect(config: { url: string; headers?: Record<string, string> }) {
this.status = 'CONNECTING';
this.socket = new WebSocket(config.url, { headers: config.headers });
this.socket.onopen = () => this.handleOpen();
this.socket.onmessage = (e) => this.emit('rawMessage', e.data); // 抛给上层解析
this.socket.onerror = (err) => this.handleError(err);
this.socket.onclose = () => this.handleClose();
}
// 处理连接成功:启动心跳
private handleOpen() {
this.status = 'CONNECTED';
this.reconnectDelay = 1000; // 重置重连延迟
this.startHeartbeat();
this.emit('connected');
}
// 心跳逻辑
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
}
// 处理断开:自动重连
private handleClose() {
this.status = 'DISCONNECTED';
clearInterval(this.heartbeatTimer!);
this.emit('disconnected');
// 非主动断开则重连
if (!this.isManualClose) {
setTimeout(() => this.connect(this.config), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000); // 指数退避
}
}
// 发送原始数据
send(data: string | ArrayBuffer) {
if (this.status !== 'CONNECTED') throw new Error('未连接');
this.socket?.send(data);
}
}
2. 协议解析层(Protocol Parser)
职责:将底层的原始消息(字符串 / 二进制)解析为 AIGC 业务可识别的结构化数据,屏蔽不同服务的协议差异。核心能力:
- 多协议适配:支持主流 AIGC 协议(如 Dify 流、OpenAI SSE、自定义 JSON),可通过配置指定解析器。
- 例:Dify 流消息以
data:为前缀,需剥离前缀后解析 JSON:data: {"type":"text","content":"hello"}→ 解析为{ type: "text", content: "hello" }。 - 例:二进制图像流(如 base64 或 Blob),需转换为可直接使用的图像数据(如
{ type: "image", url: "data:image/png;base64,..." })。
- 例:Dify 流消息以
- 流式消息拼接:AIGC 文本生成可能分多块返回(如 “你好” 分 “你”“好” 两次发送),需按会话 ID 拼接完整内容。
- 错误消息识别:从原始消息中提取错误信息(如模型超时、token 不足),转化为标准化错误对象(
{ code: 'MODEL_TIMEOUT', message: '模型生成超时' })。
伪代码示例:
class AIGCProtocolParser {
private parsers = {
dify: (rawData: string) => this.parseDify(rawData),
openai: (rawData: string) => this.parseOpenAI(rawData),
json: (rawData: string) => JSON.parse(rawData),
};
// 解析入口:根据配置的协议类型选择解析器
parse(rawData: string | Blob, protocol: 'dify' | 'openai' | 'json') {
if (typeof rawData === 'string') {
return this.parsers[protocol](rawData);
} else {
// 处理二进制(如图像)
return this.parseBinary(rawData);
}
}
// 解析Dify流式消息
private parseDify(rawData: string) {
const data = rawData.replace(/^data: /, ''); // 剥离前缀
if (data === '[DONE]') { // Dify流结束标识
return { type: 'stream_end' };
}
return JSON.parse(data);
}
// 解析二进制(如图像)
private parseBinary(blob: Blob) {
return new Promise<{ type: 'image'; url: string }>((resolve) => {
const reader = new FileReader();
reader.onload = () => resolve({ type: 'image', url: reader.result as string });
reader.readAsDataURL(blob);
});
}
}
3. 业务服务层(AIGC Service)
职责:封装 AIGC 核心业务逻辑(会话管理、消息分发、上下文维护),是连接层与 API 层的桥梁。核心能力:
- 会话管理:维护
sessionId与上下文(历史消息、模型参数)的映射,支持多会话并行(如同时和多个 AI 对话)。 - 消息类型分发:将解析后的消息按类型(text/image/system/error)分发到对应的处理器(如文本消息更新 UI,图像消息渲染图片)。
- 发送前校验:发送消息前检查连接状态、会话有效性、参数合法性(如模型是否支持图像输入)。
- 上下文控制:支持设置上下文长度上限(避免 token 超限),自动截断历史消息。
- 中断生成:提供
abort(sessionId)方法,向服务端发送中断指令,停止当前生成流程。
伪代码示例
class AIGCService {
private client: CoreSocketClient;
private parser: AIGCProtocolParser;
private sessions: Map<string, { context: Message[]; model: string }> = new Map(); // 会话上下文
constructor(client: CoreSocketClient, parser: AIGCProtocolParser) {
this.client = client;
this.parser = parser;
// 监听底层原始消息,解析后分发
this.client.on('rawMessage', async (rawData) => {
const message = await this.parser.parse(rawData, this.config.protocol);
this.dispatchMessage(message);
});
}
// 新建会话
createSession(model: string) {
const sessionId = uuidv4();
this.sessions.set(sessionId, { context: [], model });
return sessionId;
}
// 发送消息(带上下文)
async sendMessage(sessionId: string, content: string, type: 'text' | 'image') {
const session = this.sessions.get(sessionId);
if (!session) throw new Error('会话不存在');
if (this.client.status !== 'CONNECTED') throw new Error('连接未就绪');
// 构建消息(包含上下文和模型参数)
const message = {
sessionId,
type: 'user_message',
content,
model: session.model,
context: session.context.slice(-5), // 只带最近5条上下文
};
// 发送并记录到上下文
this.client.send(JSON.stringify(message));
session.context.push({ role: 'user', content, type });
}
// 分发消息到对应处理器
private dispatchMessage(message: ParsedMessage) {
switch (message.type) {
case 'text':
this.handleTextMessage(message); // 更新会话上下文+触发事件
break;
case 'image':
this.handleImageMessage(message);
break;
case 'error':
this.handleError(message);
break;
// ...其他类型
}
}
// 中断生成
abort(sessionId: string) {
this.client.send(JSON.stringify({ type: 'abort', sessionId }));
}
}
4. API 层(Public API)
职责:向上层(如 UI 组件、业务逻辑)提供简洁、易用的接口,隐藏底层复杂度。核心设计原则:
- 接口语义化:用贴近业务的命名(
init/createSession/sendMessage/onMessage)。 - 事件驱动:通过发布订阅模式暴露事件(
message/stream/error/reconnect),避免回调嵌套。 - 类型安全:用 TypeScript 定义消息结构、事件类型,提供完整类型提示。
API 示例:
// 对外暴露的入口类
export class AIGCSocket {
private service: AIGCService;
constructor(config: AIGCSocketConfig) {
const client = new CoreSocketClient(config);
const parser = new AIGCProtocolParser();
this.service = new AIGCService(client, parser);
}
// 初始化连接
init() {
this.service.connect();
}
// 创建会话
createSession(model: string) {
return this.service.createSession(model);
}
// 发送消息
sendMessage(sessionId: string, content: string, type: 'text' | 'image') {
return this.service.sendMessage(sessionId, content, type);
}
// 订阅事件(如接收消息、错误)
on(event: 'message', callback: (msg: TextMessage) => void);
on(event: 'image', callback: (msg: ImageMessage) => void);
on(event: 'error', callback: (err: AIGCError) => void);
on(event: string, callback: (...args: any[]) => void) {
this.service.on(event, callback);
}
// 中断生成
abort(sessionId: string) {
this.service.abort(sessionId);
}
// 销毁连接
destroy() {
this.service.disconnect();
}
}
三、关键注意事项(避坑指南)
-
流式处理的性能优化
- AIGC 文本流可能高频触发(每秒多次),需限制 UI 更新频率(如用
requestAnimationFrame节流),避免卡顿。 - 大文件(如图像二进制流)传输时,优先用分片 + 进度反馈,避免内存暴涨。
- AIGC 文本流可能高频触发(每秒多次),需限制 UI 更新频率(如用
-
上下文一致性保障
- 重连后需自动恢复会话:通过
localStorage缓存sessionId和上下文,重连成功后发送resume(sessionId)指令。 - 处理 “消息乱序”:AIGC 服务可能因负载乱序返回,需在消息中携带
sequenceId,接收后按序号排序。
- 重连后需自动恢复会话:通过
-
错误处理的颗粒度
- 区分错误类型:连接错误(网络问题)、业务错误(模型超时)、权限错误(token 失效),并提供针对性重试策略(如权限错误提示重新登录,网络错误自动重连)。
- 错误透传:避免底层错误被吞噬,确保上层能捕获并展示给用户(如 “模型加载失败,请稍后再试”)。
-
可扩展性设计
- 支持多模型切换:通过配置
model参数(如gpt-4/claude),底层无需修改,只需在sendMessage中携带模型信息。 - 插件化处理器:消息类型处理器(文本 / 图像 / 错误)设计为插件,可动态注册(如
registerProcessor('video', videoProcessor)支持视频生成)。
- 支持多模型切换:通过配置
更多推荐

所有评论(0)