前言

最近工作中使用到 Dify(Do It for You)有些频繁,不得不说,现在在 Dify 上快速搭建一个可以满足自己需求的机器人真的是非常方便、高效,哪怕只是一个简单的 聊天助手 应用。在开发工作中,在搭建完成一个 Agent 以后往往需要使用到 Dify 提供的 API 接口,这样就可以很方便的将 Dify 的能力封装到我们自己的应用中来。
Dify 提供的对话 APIhttp://host:port/v1/chat-messages 提供了两种响应模式:blockingstreaming 两种 请求模式blocking 模式是等待大模型生成完整的答案以后再一次性通过接口返回给客户端,而 streaming 模式则是将大模型生成的答案分成若干个 chunk ,大模型生成一部分数据返回一部分数据,关注的是数据生成的 实时性,从表现形式上看,如果是文本交互的形式,就好比我们常见的大模型应用中的 打字机形式。在 实际应用 中,尤其在 文字流、语音流 等方面,如果采用 blocking 响应模式,会给用户一种响应不及时、等待时间过长的感受,甚至客户端接口调用方会出现 请求超时 的现象,因此 streaming 响应模式( 流式响应模式 )设计在这一方面可谓是产品设计的刚需!因为这将实现更佳的用户体验。

图片来源于网络资源

由于之前没有过 流式服务 设计与开发的经验,浅浅研究了一下在 Python-Flask 框架下如何实现流式接口开发,再将 Dify 提供的流式响应服务应用于此,将这部分理论与实践的内容整理成 笔记 的形式与大家分享,如果有理解不到位的地方,欢迎大家批评指正! 共同学习、共同进步!

流式请求原理

流式输出

  • 表现形式:
    流式输出 形式上是将 一整块数据 切分成多个 chunk 内容,分多次输出的形式。
  • 流式输出底层是以生成器的形式实现的:
import jieba
# 先构造一个实例数据
text = "我爱北京天安门"
words = jieba.lcut(text, cut_all=False)
# 构造一个模拟的生成器
def generator(words: list) -> str:
    for word in words:
        # 省略内容:实际应用中这里会存在一部分较为复杂的生成逻辑
        ...
        yield word

if __name__ == "__main__":
    gen = generator(words=words)
    for word in gen:
        print(word)

上述示例中, 原始文本 -text 或者说整个 单词列表 -words是一个 完整的数据包 ,而实际应用中往往这个数据包的内容并不是一次性完整获取的(示例中是提前模拟生成好的),而是生成一点返回一点(这里的 “一点” 就可以理解成一个 chunk 块,示例中的一个 chunk 块就是一个 单词 ),这就是 流式输出 的基本实现形式。

  • 流式输出的优点:
    在实际应用中,一个完整数据包的产生可能需要花费很长时间,如果以生成器的形式生成一部分获取一部分,即拿即用,而不是等待完整的数据包全部生成完再获取,这样做可以达到数据获取的实时性,虽然总生成时长相同,但可以增强产品体验感,感受上感觉生成速度变快了。

流式请求

流式输出 的形式应用在 HTTP 请求中, 客户端 只向 服务端 发送一次请求, 服务端 每生成一个 chunk 块就返回 客户端 响应数据,直到 服务端 生成完成完整的数据包,通过响应报文告知 客户端 “数据包已发送完成”客户端 即断开请求连接。

  • 服务端实现形式:
from flask import Flask, request, Response
import jieba
import json

app = Flask(__name__)

@app.route("/demo", methods=["POST"])
def streaming():
    query = request.json.get("query")
    # 定义一个流生成器
    def generator():
        # 服务端接口中将客户端请求参数-query进行分词,并分块流式返回
        words = jieba.lcut(query, cut_all=False)
        for word in words:
            ret = {"event": "processing", "chunk": word}
            # 以JSON格式返回数据
            yield json.dumps(ret, ensure_ascii=False)
        # 告知客户端何时结束
        ret = {"event": "done"}
        yield json.dumps(ret, ensure_ascii=False)
    # 以Response体包裹生成器返回给Flask框架,由Flask框架迭代生成器向客户端推送数据
    return Response(generator(), mimetype="text/event-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

注意:
Flask 中执行完成 return Response(generator(), mimetype="text/event-stream") 这句代码后,即向 Flask 框架返回了 Response 响应体,Flask 会立即 释放 当前请求的所有变量( 除内置生成器中使用到的闭包变量 ),因此如果生成器中需要用到当前请求的变量需要使用 Flask 内置的 stream_with_context 装饰器装饰生成器,否则 服务端报出异常 ,具体如何使用 stream_with_context 装饰器暂不叙述。

  • 客户端实现形式:
import requests
import json

# 定义请求参数
url = "http://host:port/demo"
data = {"query": "我爱自然语言处理"}  # 模拟数据

# 这里请求可以不显式指定headers参数
data_package = []
with requests.post(url=url, json=data, stream=True) as response:
    # 迭代请求生成器(从缓冲区读取chunk数据)
    for res in response.iter_lines(decode_unicode=True):
        current = json.loads(res)
        # 判断流数据状态
        if current.get("event") == "processing":
            data_package.append(current.get("chunk"))
        elif current.get("event") == "done":
            break
# 此处跳出with块,客户端与服务端终止连接
# 构造完整的数据包
result = "".join(data_package)
print(result)  # "我爱自然语言处理"字符串

细节点:

  • 如何理解 客户端 迭代 post 流式请求 响应体这一过程?
    通过上述对 服务端 原理描述可知: 服务端 Flask 框架会执行定义的流式生成器,将 yieldchunk 数据不断向 客户端 服务器推送,这些 chunk 块会暂存在 客户端 服务器的 缓冲区 内。那么 客户端 获取到的 requests 响应体中内置的 response.iter_lines(decode_unicode=True) 生成器就是通过 socket 在读取本地 缓冲区 中暂存的 chunk 数据,如果在连接未断开但 缓冲区 内暂无数据( 服务端 生成器仍在生成数据,但 客户端 缓冲区 中的暂存数据已被全部读取),那么 客户端 迭代生成器的代码将会阻塞, socket 会持续监听 缓冲区 ,直到 服务端 生成器 return 或抛出 StopIteration 异常 ,此时 服务端 Flask 会向 客户端 发送结束连接的报文。
  • 客户端服务端 之间 终止连接 的时机?
    • 服务端 主动终止的情况
      在上一点中已经描述了这一情形,即当 服务端 已经执行完成所有的生成器内容会向客户端发送 结束连接报文数据 ,当 客户端 response.iter_lines(deocde_unicode=True) 生成器监听到这一数据将在生成器内部 return 或抛出 StopIteration 异常客户端 生成器迭代终止,此时 客户端服务端 之间已经断开连接, 客户端 会继续正常执行后续代码逻辑,直到退出 with 块, 隐式执行 response.close() 释放 客户端 socket 对象。
    • 客户端 主动终止的情况
      假设在 服务端 生成器还未执行完成的情况下, 客户端 代码中提前跳出了 with 块(即主动与 服务端 断开连接 ),此时 客户端 会向 服务端 发送终止连接的报文数据,但是 服务端 生成器代码会照样持续执行,直到执行到 yield (即向 客户端 发送 chunk 块数据)时会出现 异常 ,这时 Flask 发现 客户端 已经与 服务端 本地断开了连接,将不再继续执行后续代码逻辑。

注意:
需要 关注 的是,如果在 客户端 提前终止连接的情况下,而 服务端 生成器代码已经执行完成所有的 chunk 块数据推送任务(即所有的 yield 都已经执行完毕),但是仍在继续执行后续逻辑,因为此时 服务端 不再需要向 客户端 推送数据,因此此处的后续逻辑将会正常被执行直到 服务端 生成器抛出 StopIteration 异常 为止。
所以如果一般在 服务端 流式输出 任务结束后,仍然有后续的任务需要处理,可以将这个任务逻辑写在生成器最后一个需要执行的 yield 后面即可。

接入 Dify 问答的流式请求(SSE)设计

背景介绍

  • Dify 的对话接口模式
    基于本地搭建的 Dify 问答应用 ,采用提供的问答 API 响应模式(response_mode):streamingDify 的问答 流式接口 本质也是一个 流式(SSE)响应服务
  • 目标
    使用 Dify 接入的 大模型理解、对话能力 ,在此基础上封装一个 Flask 服务,接口内部调用 Dify,需要 Flask 服务具有 ___流式响应___的能力。

实现

from flask import Flask, request, Response
import requests
import os
import json

app = Flask(__name__)

@app.route("/streaming_dify_response", methods=["POST"])
def streaming_dify_response():
    query = request.json.get("query")
    # 定义Dify请求参数
    dify_api_key = os.getenv("DIFY_API_KEY")
    dify_url = "http://host:port/v1/chat-messages"  # Dify对话接口
    headers = {
        "Authorization": f"Bearer {dify_api_key}", 
        "Content-Type": "application/json"
    }
    data = {
        "inputs": {}, 
        "query": query, 
        "response_mode": "streaming", 
        "user": "user"  # 自定义
    }  # 需要实现上下文记忆-多轮对话能力可追加`conversation_id`参数
    # 定义流式生成器
    def streaming_generator() -> None:
        answer = []  # 用于流式任务结束的后续逻辑,获取完整答案
        # 向Dify服务发送流式请求
        with requests.post(url=dify_url, json=data, headers=headers, stream=True) as response:
            # 读取Dify流式响应数据
            for chunk in response.iter_lines(decode_unicode=True):
                if chunk.startswith("data:"):
                    chunk = json.loads(chunk[5:])  # 解析JSON数据
                    if chunk.get("event") == "message" and chunk.get("answer"):
                        answer.append(chunk.get("answer"))
                        # 构造chunk块,并向客户端推送
                        ret = {
                            "event": "generating", 
                            "answer": chunk.get("answer")
                        }
                        yield json.dumps(ret, ensure_ascii=False)
                    elif chunk.get("event") == "message_end":  # 代表Dify已经输出完成
                        ret = {
                            "event": "done"
                        }
                        yield json.dumps(ret, ensure_ascii=False)
                        break
        # 获取完整答案内容
        ans = "".join(answer)
        print(ans)
    return Response(streaming_generator(), mimetype="text/event-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

写在最后

补充一下 Dify 流式服务返回的数据格式:

{
    "event": "message", 
    "conversation_id": "135ef918-c781-437c-b6d3-758186c0416e", 
    "message_id": "33732033-1ace-4eb7-92a0-dff013357b95", 
    "created_at": 1768530499, 
    "task_id": "db5ca1e3-61dd-408d-bf3b-b7e9381b7d91", 
    "id": "33732033-1ace-4eb7-92a0-dff013357b95", 
    "answer": "", 
    "from_variable_selector": null
}
{
    "event": "message_end", 
    "conversation_id": "135ef918-c781-437c-b6d3-758186c0416e", 
    "message_id": "33732033-1ace-4eb7-92a0-dff013357b95", 
    "created_at": 1768530499, 
    "task_id": "db5ca1e3-61dd-408d-bf3b-b7e9381b7d91", 
    "id": "33732033-1ace-4eb7-92a0-dff013357b95", 
    "metadata": {
        "annotation_reply": null, 
        "retriever_resources": [
            {
                "position": 1, 
                "dataset_id": "15ecd83b-e6f7-455b-a2de-0f808f2d74cb", 
                "dataset_name": "md\\u683c\\u5f0f\\u793a\\u4f8b-\\u4fdd\\u9669\\u4ea7\\u54c1\\u77e5\\u8bc6\\u5e93.txt...", 
                "document_id": "14779633-ea03-4aaf-8c2f-7cbd5a97c10e", 
                "document_name": "md\\u683c\\u5f0f-\\u6cf0\\u65e0\\u5fe7-\\u610f\\u5916\\u9669\\uff08\\u5bb6\\u5ead\\u7248\\uff09.txt", 
                "data_source_type": "upload_file", 
                "segment_id": "1366d3ab-a841-454e-b286-aad4922e4987", 
                "retriever_from": "api", 
                "score": 0.15491657954399923, 
                "hit_count": null, 
                "word_count": null, 
                "segment_position": null, 
                "index_node_hash": null, 
                "content": "---", 
                "page": null, 
                "doc_metadata": null, 
                "title": null
            }, 
            {
                "position": 2, 
                "dataset_id": "15ecd83b-e6f7-455b-a2de-0f808f2d74cb", 
                "dataset_name": "md\\u683c\\u5f0f\\u793a\\u4f8b-\\u4fdd\\u9669\\u4ea7\\u54c1\\u77e5\\u8bc6\\u5e93.txt...", 
                "document_id": "b69966a4-c9c4-46d4-a34a-90c1f222283d", 
                "document_name": "md\\u683c\\u5f0f-\\u6cf0\\u5eb7\\u5168\\u80fd\\u4fdd\\u767e\\u4e07\\u533b\\u7597\\u9669.txt", 
                "data_source_type": "upload_file", 
                "segment_id": "ecb7cefd-9b3b-4b90-ae7c-8020df64e9bd", 
                "retriever_from": "api", 
                "score": 0.15491657954399923, 
                "hit_count": null, 
                "word_count": null, 
                "segment_position": null, 
                "index_node_hash": null, 
                "content": "---", 
                "page": null, 
                "doc_metadata": null, 
                "title": null
            }, 
            {
                "position": 3, 
                "dataset_id": "15ecd83b-e6f7-455b-a2de-0f808f2d74cb", 
                "dataset_name": "md\\u683c\\u5f0f\\u793a\\u4f8b-\\u4fdd\\u9669\\u4ea7\\u54c1\\u77e5\\u8bc6\\u5e93.txt...", 
                "document_id": "14779633-ea03-4aaf-8c2f-7cbd5a97c10e", 
                "document_name": "md\\u683c\\u5f0f-\\u6cf0\\u65e0\\u5fe7-\\u610f\\u5916\\u9669\\uff08\\u5bb6\\u5ead\\u7248\\uff09.txt", 
                "data_source_type": "upload_file", 
                "segment_id": "e94589b5-30b3-4b3e-8f39-0ecbc2a16016", 
                "retriever_from": "api", 
                "score": 0.15491657954399923, 
                "hit_count": null, 
                "word_count": null, 
                "segment_position": null, 
                "index_node_hash": null, 
                "content": "---", 
                "page": null, 
                "doc_metadata": null, 
                "title": null
            }
        ], 
        "usage": {
            "prompt_tokens": 229, 
            "prompt_unit_price": "0.0008", 
            "prompt_price_unit": "0.001", 
            "prompt_price": "0.0001832", 
            "completion_tokens": 8, 
            "completion_unit_price": "0.002", 
            "completion_price_unit": "0.001", 
            "completion_price": "0.000016", 
            "total_tokens": 237, 
            "total_price": "0.0001992", 
            "currency": "RMB", 
            "latency": 1.5809249989688396
        }
    }, 
    "files": null
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐