第七篇:LangChain 1.0 多模态实战:Content Blocks 与批处理完全指南
本文详细讲解了 标准化内容块 Content Blocks 和批处理流程,包括多模态数据处理、批量调用、异步并发等核心概念。通过本文的学习,你将掌握如何高效处理图片、音频等多模态数据,以及如何批量处理大量请求。
📌 所属章节:第三阶段 - Content Blocks 与批处理
← 上一篇:Messages 与 Prompt 提示词模板 | 📚 系列目录 | 下一篇:流式传输与结构化输出 →
📋 摘要
本文详细讲解了 标准化内容块 Content Blocks 和批处理流程,包括多模态数据处理、批量调用、异步并发等核心概念。通过本文的学习,你将掌握如何高效处理图片、音频等多模态数据,以及如何批量处理大量请求。
适合人群:
- 需要处理多模态数据的 AI 应用开发者
- 需要批量处理请求的工程师
- 想深入理解 LangChain 数据处理机制的开发者
3.4 标准化内容块Content Blocks
统一所有模型厂商的输出格式,解决"换模型就要重写解析代码"的痛点:
LangChain 1.0 引入了 provider-agnostic与厂商无关 的 standard content blocks标准化内容块,使得消息中的多模态数据(图片、音频、PDF、视频等)能以统一、类型化的方式被构造与阅读。通过 content blocks 属性实现多模态统一处理,封装 TextBlock、ToolCallBlock、ImageBlock 等结构,扩展 LLM 应用至图片理解、语音交互等场景。其核心优势在于标准化内容流转与跨平台兼容性,可通过 langchain-openai 等厂商包直接初始化多模态模型,支持图片输入生成描述、语音转文本问答等功能,依赖 Model I/O 模块完成格式化与解析
支持类型:text 、 tool_call 、 image 、 audio 、 video
| 场景 | 内容块作用 |
|---|---|
| 📄 文档解析(PDF / 图片 / 表格) | 用 imageblock 把 Document OCR 图像传给模型 |
| 🔊 语音问答(ASR) | 用 audioblock 发送语音样本 |
| 🎞 多模态 RAG | 将检索到的图片、图表、视频帧作为 input blocks 传给模型 |
| 🤖 多工具 Agent | 工具返回的媒体统一包装成 block 再传回模型 |
| 🧪 模型评估(LangSmith / LangChain Playground) | 进行 multimodal prompt 测试与 A/B,对 content blocks 标注与评估。 |
输出提取content_blocks
# 1. 定义带速率限制的load_chat_model函数
from langchain.chat_models import init_chat_model
from langchain_core.rate_limiters import InMemoryRateLimiter
# 2. 配置速率限制器
rate_limiter = InMemoryRateLimiter(
requests_per_second=5, # 每秒最多5个请求
check_every_n_seconds=1.0 # 每1秒检查一次是否超过速率限制
)
# 3. 对模型调用进行封装,后续直接调用传参数就行
def load_chat_model(
model: str,
provider: str,
temperature: float = 0.7,
max_tokens: int | None = None,
base_url: str | None = None,
):
return init_chat_model(
model=model, # 模型名称
model_provider=provider, # 模型供应商
temperature=temperature, # 温度参数,用于控制模型的随机性,值越小则随机性越小
max_tokens=max_tokens, # 最大生成token数
base_url=base_url, # 专用于自定义 API Server 或代理
rate_limiter=rate_limiter # 自动限速
)
# 加载 DeepSeek 提供的推理模型 deepseek-reasoner
deepseek_model = load_chat_model(
model="deepseek-reasoner", # 指定模型名称
provider="deepseek", # 指定模型提供商
)
# 调用模型
res = deepseek_model.invoke("请介绍一下你自己")
# 输出模型返回的结果
res
# 从模型返回的结果中提取内容块
res.content_blocks
多模态输入content_blocks
# 如果需要把本地文件以 base64 形式发送,建议安装 pillow/ffmpeg 等按需工具
#!pip install pillow
base64 形式触及了网络传输与数据序列化的底层原理。
简单来说,将图片或音频转换为 Base64,主要是为了解决 “在纯文本协议(HTTP/JSON)中传输二进制数据(Binary Data)” 的兼容性问题。
绝大多数大模型 API(OpenAI, Anthropic, Google Gemini 等)都是基于 RESTful API,数据载荷(Payload)格式通常是 JSON。
-
JSON 的本质:JSON 是一种纯文本格式。它只能理解字符串(String)、数字、布尔值等文本数据。
-
图片/音频的本质:它们是二进制数据(Binary Bytes)。如果你直接用文本编辑器打开一张 JPG 图片,你会看到乱码,其中包含了大量的不可见字符、控制字符(如换行符、空字符 \0 等)。
-
冲突点:如果你直接把这些二进制乱码塞进 JSON 的字符串字段里(例如 {“image”: “ÿØÿà…”}),这些特殊字符会破坏 JSON 的语法结构,导致服务端无法解析,或者被 HTTP 协议拦截。
-
解决方案:Base64 编码可以将任意二进制数据,映射为 64 个标准的、可打印的 ASCII 字符(A-Z, a-z, 0-9, +, /)。这样,原本的二进制图片就变成了普通的字符串,可以完美地嵌入到 JSON 中。
content_blocks 是 LangChain v1 的标准化多模态消息单元,你可以用 dict 结构把图片与音频纳入消息里,框架会把它们转换为各 provider 可识别的格式;在实际使用时务必确认目标模型/provider 对 multimodal 的支持和所需的 mime_type / metadata 字段。
from langchain_core.messages import HumanMessage, SystemMessage
# 创建系统提示
system_msg = SystemMessage("你是一个专业的问答专家。")
# 构造用户消息:文本+图像
human_msg = HumanMessage(content=[
{"type": "text", "text": "请描述图像:"},
{"type": "image_url",
"image_url": {"url": "https://i-blog.csdnimg.cn/img_convert/4c7d96f7c9c9762f34d1befc7250fe72.png",
"mime_type": "image/jpeg",
"metadata": "RAG基础流程图"}
},
])
# 形成消息列表
messages = [system_msg, human_msg]
# 框架会懒解析 content -> content_blocks
for cb in human_msg.content_blocks:
print(cb) # content block 对象视图
输出结果:
{'type': 'text', 'text': '请描述图像:'}
{'type': 'image', 'id': 'lc_ee96e0ce-c5a9-4dec-9f82-29180fa7f85f', 'url': 'https://i-blog.csdnimg.cn/img_convert/4c7d96f7c9c9762f34d1befc7250fe72.png', 'extras': {'image_url_mime_type': 'image/jpeg', 'image_url_metadata': 'RAG基础流程图'}}
import os
from openai import OpenAI
client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key = "********************",
# 各地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key = "********************",
# 以下为北京地域的 base_url,若使用弗吉尼亚地域模型,需要将base_url换成https://dashscope-us.aliyuncs.com/compatible-mode/v1
# 若使用新加坡地域的模型,需将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"
},
},
{"type": "text", "text": "图中描绘的是什么景象?"},
],
},
],
)
print(completion.choices[0].message.content)
输出结果:
图中描绘的是一幅温馨宁静的海边黄昏景象:一位年轻女子与一只金毛寻回犬(或拉布拉多)坐在柔软的沙滩上,正愉快地互动——狗狗抬起前爪与女子击掌(“high-five”),女子面带灿烂笑容,眼神温柔地注视着它。
背景是平静的海面与柔和的落日余晖,阳光从右侧低角度洒下,为人物和狗镀上一层温暖的金色光晕,营造出浪漫、治愈的氛围。女子身穿格子衬衫和深色裤子,赤脚坐在沙中;狗狗佩戴着彩色图案的胸背带和牵引绳,姿态温顺而活泼。
整幅画面传递出人与宠物之间深厚的信任、陪伴与快乐,象征着自由、纯真与生活中的小确幸,是一幅充满情感温度的自然人文写照。
import requests
import json
# 请求配置
url = "https://perception-openai-japan.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2024-02-01"
headers = {
"Content-Type": "application/json",
"api-key": "********************"
}
# 请求体
payload = {
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "讲个笑话"}
],
"temperature": 0.7,
"max_tokens": 800
}
# 发送请求
response = requests.post(url, headers=headers, json=payload)
result = response.json()
# 获取回复内容
print(result["choices"][0]["message"]["content"])
输出结果:
当然!这是一个简单的笑话:
有一天,一只鸭子走进一家超市,问店员:“你们有葡萄吗?”
店员回答:“没有,我们不卖葡萄。”
第二天,鸭子又来了,问:“你们有葡萄吗?”
店员有点生气:“我不是告诉过你了吗?我们没有葡萄!”
第三天,鸭子又来了,问:“你们有葡萄吗?”
店员怒了:“我警告你!再问一次,我就用锤子把你的嘴钉住!”
第四天,鸭子来了,问:“你们有锤子吗?”
店员说:“没有。”
鸭子笑了:“那你们有葡萄吗?”
😄
from langchain_openai import AzureChatOpenAI
model = AzureChatOpenAI(
azure_endpoint="https://perception-openai-japan.openai.azure.com", # 注意:azure_endpoint,不是 base_url
api_key = "********************",
azure_deployment="gpt-4o", # 部署名称
api_version="2024-02-01",
temperature=0.7,
)
# 使用方式相同
res = model.invoke([{"role": "user", "content": "讲个笑话"}])
print(res.content)
输出结果:
好的!这儿有一个:
有一天,老师问小明:“如果地球是个正方形,会怎么样?”
小明想了想,说:“那就得改叫地方了!”
# # 使用具有多模态能力的模型
# model = load_chat_model(
# model="gpt-4o-mini",
# provider="openai",
# )
# res = model.invoke(messages)
# print(res.content)
# # 使用具有多模态能力的模型
# model = load_chat_model(
# model="gpt-4o-mini",
# provider="openai",
# )
# res = model.invoke(messages)
# print(res.content)
from langchain_openai import AzureChatOpenAI
model = AzureChatOpenAI(
azure_endpoint="https://perception-openai-japan.openai.azure.com", # 注意:azure_endpoint,不是 base_url
api_key = "********************",
azure_deployment="gpt-4o", # 部署名称
api_version="2024-02-01",
temperature=0.7,
)
# from langchain_openai import ChatOpenAI
# model = ChatOpenAI(
# model="qwen3-vl-plus",
# base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
# api_key = "********************"
# )
res = model.invoke(messages)
print(res.content)
输出结果:
这幅图展示了一个基于大语言模型(LLM)的问答流程,主要包括以下步骤:
1. **query(查询)**:用户输入问题或查询。
2. **知识库**:系统从知识库中检索相关信息,知识库包含不同类型的文件(如PDF、TXT、DOC、Excel等)。
3. **检索结果**:从知识库中返回与查询相关的信息(标注为“xxx”),作为上下文内容。
4. **上下文与问题组合**:将检索到的上下文信息与用户的问题结合,生成一个新的“prompt”(提示)。
5. **LLM处理**:将生成的提示输入到大语言模型(LLM)中进行处理。
6. **最终回复**:LLM根据提示内容生成最终的回答并返回给用户。
图中用箭头清晰地表示了各个步骤之间的流程关系。
内容块创建标准格式
comparison = """
┌─────────────┬──────────────────────────────────────────────────────┐
│ 内容块类型 │ 标准格式(LangChain 1.0) │
├─────────────┼──────────────────────────────────────────────────────┤
│ 文本 │ {"type": "text", "text": "..."} │
│ 图像 │ {"type": "image", "url": "...", "mime_type": "..."} │
│ 音频 │ {"type": "audio", "url": "...", "mime_type": "..."} │
│ 视频 │ {"type": "video", "url": "...", "mime_type": "..."} │
│ 文件 │ {"type": "file", "url": "...", "mime_type": "..."} │
│ Base64 图像 │ {"type": "image", "base64": "...", "mime_type": "..."} │
│ Base64 音频 │ {"type": "audio", "base64": "...", "mime_type": "..."} │
│ OpenAI 图像 │ {"type": "image_url", "image_url": {"url": "..."}} │
└─────────────┴──────────────────────────────────────────────────────┘
"""
#OpenAI 内容块支持对比表:
support_table = """
┌─────────────┬──────────┬─────────────────────────────────────┐
│ 内容块类型 │ 支持情况 │ 说明 │
├─────────────┼──────────┼─────────────────────────────────────┤
│ text │ ✅ 支持 │ 纯文本内容 │
│ image_url │ ✅ 支持 │ 图像 URL(支持 jpg, png, gif, webp)│
│ audio │ ❌ 不支持│ 需要先用 Whisper 转录为文本 │
│ video │ ❌ 不支持│ 需要提取关键帧或转录音频 │
│ file │ ❌ 不支持│ 需要提取文本内容 │
└─────────────┴──────────┴─────────────────────────────────────┘
"""
3.5 批处理流程
在使用大模型时,如果需要同时处理多条独立请求(例如多个问题或多段文本),则可以使用 批量调用(Batch) 方法一次性提交这些请求。LangChain 中的 batch() 方法允许你同时发送一组请求,模型会在后台并行处理,然后返回所有结果:
import time
from datetime import datetime
# 记录开始时间
start_time = time.time()
print(f"⏱️ 开始时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# 批量提问
responses = model.batch([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
])
# 记录结束时间
end_time = time.time()
total_duration = end_time - start_time
print(f"⏱️ 结束时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
print(f"📊 总耗时: {total_duration:.2f}s")
for response in responses:
print(response)
| 特性 | 说明 |
|---|---|
| 执行位置 | batch() 在客户端(Client-side)并行调用模型,而非调用模型提供商的批量API(如OpenAI或Anthropic自带的batch API)。 |
| 返回结果 | 默认会在所有任务完成后,统一返回完整结果列表。 |
| 并行优势 | 多条独立请求可同时执行,无需等待彼此完成。 |
| 适用场景 | 文档摘要、批量问答、数据预处理、多样本分类等。 |
当然,我们也可以进行流式批处理,也就是每个任务完成后就立即获取结果(而不是等待全部完成),可以使用 batch_as_completed() 方法。
# 使用 model.batch_as_completed 批量提交多个问题,并逐个获取回答
for response in model.batch_as_completed([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
]):
print(response)
异步并发处理RunnableConfig
而为了更好的控制并发,我们还可以在config参数中设置批处理的并发数,例如
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="qwen-plus",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key = "********************"
)
import time
from datetime import datetime
#import asyncio
from langchain_core.runnables import RunnableConfig
#设置并发数为3
config = RunnableConfig(max_concurrency=3)
# 记录开始时间
start_time = time.time()
print(f"⏱️ 开始时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# 并发调用模型,批量处理三个问题
# Jupyter 已经支持顶级 await,无需 asyncio.run()
responses = await model.abatch([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
],config=config)
# 记录结束时间
end_time = time.time()
total_duration = end_time - start_time
print(f"⏱️ 结束时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
print(f"📊 总耗时: {total_duration:.2f}s")
# for response in responses:
# print(response)
输出结果:
⏱️ 开始时间: 08:33:36.469
⏱️ 结束时间: 08:33:53.540
📊 总耗时: 17.07s
特别注意
-
RunnableConfig(max_concurrency=N) 只是告诉 LangChain 在执行 abatch/batch 时最多并发 N 个子任务。
-
是否能提速,取决于整个 pipeline 是否为 I/O-bound(等待网络/模型服务)或 CPU/GPU-bound(单次推理占满资源)。
-
如果单次推理把 GPU/CPU 占满(例如单卡的 vLLM 同步推理),增加并发不会变快,甚至更慢(资源竞争)。
-
如果调用的是远端云 API(有网络延迟)或能并行处理多请求的模型服务,且客户端/服务端都允许并发,则会明显提速。
-
框架内部可能会在某些组件对并发做序列化(例如某些 LLM 客户端在后端使用同步 HTTP 会阻塞),这也会导致看起来并发无效。
-
确认你使用的是 abatch(异步)而不是 batch(同步)
from langchain_core.runnables import RunnableConfig
# 配置:最多 2 个并发任务
config = RunnableConfig(
max_concurrency=2, # 最大并发数:限制同时运行的任务数量,防止资源耗尽
abstimeout=8.0, # 单个任务超时时间(秒):超过此时间未完成的任务将被强制终止
metadata={"request_id": "abc123", "task": "query"}, # 元数据:记录请求ID和任务类型,便于追踪和日志分析
)
# 创建一个带有{product}占位符变量的模板
prompt_template = [PromptTemplate](06_messages_prompt.md).from_template(
"为生产{product}的公司起一个好名字?"
)
# 准备一个输入列表
inputs = ["彩色袜子", "环保咖啡杯", "智能水杯"]
formatted_prompts = [prompt_template.format(product=product) for product in inputs]
# Jupyter 已经支持顶级 await,无需 asyncio.run()
results = await model.abatch(formatted_prompts, config=config)
for i, r in enumerate(results):
print(f"=== Query {i+1} ===")
print(r.content)
print(r.model_config)
# 可能输出: ['Fun Socks Co.', 'Green Cup Co.', 'HydraSmart']
更多config参数解释如下:
| 属性名 | 类型 | 说明 |
|---|---|---|
max_concurrency |
int |
最大并行执行数 |
timeout |
float |
每个请求的最大超时时间(秒) |
callbacks |
list |
触发事件回调,用于日志或监控 |
metadata |
dict |
额外的上下文信息,可用于追踪 |
🎯 总结
本文详细介绍了 Content Blocks 与批处理 的核心概念和实战技巧。希望这些内容能帮助你更好地理解和使用 LangChain 1.0!
如果你有任何问题或建议,欢迎在评论区留言交流!💬
🏷️ 标签:LangChain 多模态 ContentBlocks
💝 感谢阅读!如果觉得有帮助,记得点赞收藏关注哦!
本文为原创内容,版权归作者所有,转载需注明出处。
更多推荐



所有评论(0)