引言

随着AIGC(生成式人工智能)的普及,大模型如GPT、通义千问等已广泛应用于对话、代码生成、数据分析等场景。这些模型在运行时需要与客户端进行高效通信,SSE(Server-Sent Events) 和 Stream HTTP(流式HTTP) 是两种核心通信协议。本文将结合理论和Java代码示例,帮助你理解这两种协议的原理、区别及在AIGC中的实际应用。

一、通信协议基础

1.1 什么是SSE?

SSE(Server-Sent Events)是一种基于HTTP的单向通信协议,允许服务器主动向客户端推送实时事件。其特点包括:
● 事件格式标准化:通过data:字段传递结构化数据(如JSON)。
● 自动重连:网络中断后客户端会自动重新连接。
● 长连接保持:服务器持续发送数据,客户端逐步接收。
典型应用场景:实时聊天、股票行情推送、大模型流式输出。

1.2 什么是Stream HTTP?

Stream HTTP 是基于 HTTP 的分块传输编码(Chunked Transfer Encoding) 的流式传输方式。服务器可以持续向客户端发送数据块,客户端逐步接收并处理。
典型应用场景:视频流、大文件下载、自定义数据流。

二、SSE与Stream HTTP的区别

在这里插入图片描述

三、Java代码示例

3.1 使用SSE实现大模型流式输出

服务器端(Spring Boot)

package com.lzc.ai;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@RestController
public class SseController {

    private final ExecutorService executor = new ThreadPoolExecutor(5, 5,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // 使用线程池管理发射器

    @GetMapping("/sse")
    public SseEmitter streamData() {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 创建一个SseEmitter实例,不设置超时时间
        executor.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) { // 发送10条消息
                    String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

                    emitter.send(SseEmitter.event()
                            .id(Thread.currentThread().getName() + "=" + i)
                            .name("msg")
                            .data("Message " + i + " " + currentTime)); // 发送事件和数据
                    Thread.sleep(1000); // 等待1秒
                }
                emitter.complete(); // 完成SSE连接
            } catch (Exception e) {
                emitter.completeWithError(e); // 出现错误时,关闭连接并发送错误信息
            }
        });
        return emitter;
    }
}

客户端(chrome浏览器)

http://localhost:8000/sse
在这里插入图片描述

3.2 使用Stream HTTP分块传输

服务器端(Spring Boot)

package com.lzc.ai;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@RestController
public class StreamController {

    @GetMapping("/stream")
    public void streamData(HttpServletResponse response) throws IOException, InterruptedException {
        //默认即是chunked,所以不需要设置
        //response.setHeader("Transfer-Encoding", "chunked");
        //text/plain模式下,浏览器会缓存数据直到连接关闭,再一次性渲染,如果想观察数据分块传输效果,可以使用curl命令行或者其它手写java client工具
        response.setContentType("text/plain");

        PrintWriter writer = response.getWriter();
        for (int i = 0; i < 10; i++) {
            //得到当前时间 yyyy-MM-dd HH:mm:ss
            String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            writer.write("Data chunk " + i + " " + currentTime + "\n");
            writer.flush(); // 必须调用 flush 以触发分块传输
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

客户端(chrome浏览器)

在这里插入图片描述
说明:虽然从浏览器中来看,页面数据是在10秒后统一返回的,但是由于服务端设置了writer.flush(); 故每秒钟其实是有返回的,只不过浏览器进行了缓存,待全部数据拿回来之后才显示出来,如果想看过程数据,可以使用curl命令查看

四、在AIGC中的实际应用

4.1 大模型流式输出(SSE)

● OpenAI API:当调用 POST /chat/completions 并设置 stream: true 时,返回的是SSE格式的数据流。
● 代码示例(Python):

import openai

openai.api_key = "YOUR_API_KEY"
response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

for chunk in response:
    print(chunk.choices[0].delta.get("content", ""), end="", flush=True)

4.2 大文件下载(Stream HTTP)

● 适用场景:模型权重文件、训练数据集的分块下载。
● 优势:支持断点续传,节省带宽。

五、总结

● SSE 是 AIGC 中流式输出的首选协议,因其标准化格式、自动重连和实时性优势。
● Stream HTTP 更适合大文件传输或自定义协议场景,但需手动处理数据解析和重连。
● 选择建议:

  • AI对话/代码生成:使用 SSE。
  • 大文件下载/视频流:使用 Stream HTTP。

通过本文的理论讲解和Java代码示例,你可以快速入门AIGC中的通信协议设计,并在实际项目中灵活应用。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐