【Agentic AI】并行化模式学习笔记
并行化模式在Agentic AI中的应用 本文介绍了并行化模式在智能体系统中的关键作用,重点分析了其优化复杂Agent工作流性能的技术原理。文章从以下方面展开: 并行化原理:通过并发执行多个独立任务(LLM调用、工具使用等)来减少整体执行时间,而非顺序执行。 应用场景:列举了7个典型应用场景,包括信息收集、数据处理、多API交互、内容生成等,展示了并行化如何提升效率。 技术实现:提供了基于Lang
博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持!
博主链接
本人就职于国际知名终端厂商,负责modem芯片研发。
在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G技术研究。
博客内容主要围绕:
5G/6G协议讲解
高级C语言讲解
Rust语言讲解
文章目录
【Agentic AI】并行化模式学习笔记
一、并行化模式介绍
并行化模式对于提高智能体系统的效率和响应能力至关重要,特别是在处理涉及多个独立查找、计算或与外部服务交互的任务时。它是优化复杂Agent工作流性能的关键技术。并行化涉及并发执行多个组件,如LLM调用、工具使用,甚至整个子代理(如下图所示)。并行执行不再需要等待一个步骤完成再开始下一个步骤,而是允许独立的任务同时运行,大大减少了可分解为独立部分的任务的整体执行时间。

假设一个智能体被设计用于研究一个主题并总结研究结果。顺序执行的方法可能:
- 查找源A;
- 总结源A;
- 查找源B;
- 总结源B;
- 从A和B的总结中合成最终答案;
如果是并行方法可以是:
- 同时搜索源A和源B;
- 一旦两个搜索都完成了,就同时总结源A和源B;
- 从A和B的总结中合成最终答案(此步骤通常是顺序的,等待并行步骤完成);
其核心思想是识别工作流中不依赖于其他部分输出的部分,并并行执行它们。这在处理具有延迟的外部服务(如api或数据库)时特别有效,因为您可以并发发出多个请求。实现并行化通常需要支持异步执行或多线程/多进程的框架。现代智能体框架在设计时考虑了异步操作,允许您轻松定义可以并行运行的步骤。
二、应用场景
2.1 信息收集和研究
同时从多个来源收集信息是一个经典的用例。例如:
- Agent调查一家公司
- 并行任务:同时搜索新闻文章、拉股票数据、检查社交媒体提及率和查询公司数据库。
- 优点:收集全面视图的速度比顺序查找快得多。
2.2 数据处理和分析
应用不同的分析技术或同时处理不同的数据段。例如:
- Agent分析客户反馈
- 并行任务:在一批反馈条目中同时运行情感分析,提取关键词,对反馈进行分类并识别紧急问题。
- 优点:快速提供多方面的分析。
2.3 多API或工具交互
调用多个独立的API或工具来收集不同类型的信息或执行不同的操作。例如:
- 旅行计划代理
- 并行任务:检查机票价格,搜索酒店可用性,查找当地活动,同时查找餐厅推荐。
- 优点:更快地呈现完整的旅行计划。
2.4 使用多个组件生成内容
并行生成复杂内容的不同部分。例如:
- 代理创建营销邮件
- 并行任务:生成主题行,起草电子邮件正文,查找相关图像,并同时创建一个call-to-action按钮文本。
- 优点:更高效地组装最终邮件。
2.5 检查和验证
同时执行多个独立的检查或验证。例如:
- 代理验证用户输入
- 并行任务:同时检查电子邮件格式、验证电话号码、验证地址是否与数据库相符、检查是否亵渎。
- 优点:提供更快的输入有效性反馈。
2.6 多模态处理
同时处理相同输入的不同模态(文本、图像、音频)。例如:
- 一个代理分析带有文本和图像的社交媒体帖子
- 并行任务:分析文本中的情感和关键词,同时分析图像中的物体和场景描述。
- 优点:更快地整合来自不同模态的见解。
2.7 A/B测试或多个选项生成
并行生成响应或输出的多个变体,以选择最佳的一个。例如:
- 生成不同创造性文本选项的代理
- 并行任务:使用略有不同的提示或模型同时为一篇文章生成三个不同的标题
- 优点:允许快速比较和选择最佳选项。
并行化是智能体设计中的一种基本优化技术,它允许开发人员通过利用独立任务的并发执行来构建性能更高、响应更快的应用程序。
三、并行化演示代码
LangChain框架中的并行执行由LangChain表达式语言(LangChain Expression Language, LCEL)实现。主要方法是在字典或列表结构中构造多个可运行组件。当此集合作为输入传递给链中的后续组件时,LCEL运行时将并发执行包含的runnables。在LangGraph中,这一原则应用于图的拓扑结构。并行工作流是通过构建图来定义的,这样就可以从单个公共节点发起多个节点,而没有直接的顺序依赖关系。这些并行路径在其结果可以在图中的后续收敛点聚合之前独立执行。下面的实现演示了一个使用LangChain框架构建的并行处理工作流。该工作流设计为并发执行两个4个独立操作以响应单个用户查询。这些并行过程被实例化为不同的链或函数,它们各自的输出随后聚合为一个统一的结果。
import asyncio
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough
try:
llm = ChatOllama(
model="aaa",
validate_model_on_init=True,
temperature=0.8,
num_predict=2000,
base_url="xxx"
# other params ...
)
except Exception as e:
print(f"Error initializing language model: {e}")
llm = None
# --- Define Independent Chains ---
# These three chains represent distinct tasks that can be executed in parallel.
summarize_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "Summarize the following topic concisely:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
questions_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "Generate three interesting questions about the following topic:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
terms_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "Identify 5-10 key terms from the following topic, separated by commas:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
# --- Build the Parallel + Synthesis Chain ---
# 1. Define the block of tasks to run in parallel. The results of these,
# along with the original topic, will be fed into the next step.
map_chain = RunnableParallel(
{
"summary": summarize_chain,
"questions": questions_chain,
"key_terms": terms_chain,
"topic": RunnablePassthrough(), # Pass the original topic through
}
)
# 2. Define the final synthesis prompt which will combine the parallel results.
synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """Based on the following information:
Summary: {summary}
Related Questions: {questions}
Key Terms: {key_terms}
Synthesize a comprehensive answer."""),
("user", "Original topic: {topic}")
])
# 3. Construct the full chain by piping the parallel results directly
# into the synthesis prompt, followed by the LLM and output parser.
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()
# --- Run the Chain ---
async def run_parallel_example(topic: str) -> None:
"""
Asynchronously invokes the parallel processing chain with a
specific topic
and prints the synthesized result.
Args:
topic: The input topic to be processed by the LangChain
chains.
"""
if not llm:
print("LLM not initialized. Cannot run example.")
return
print(f"\n--- Running Parallel LangChain Example for Topic:'{topic}' ---")
try:
# The input to `ainvoke` is the single 'topic' string,
# then passed to each runnable in the `map_chain`.
response = await full_parallel_chain.ainvoke(topic)
print("\n--- Final Response ---")
print(response)
except Exception as e:
print(f"\nAn error occurred during chain execution: {e}")
if __name__ == "__main__":
test_topic = "The history of space exploration"
# In Python 3.7+, asyncio.run is the standard way to run an async function.
asyncio.run(run_parallel_example(test_topic))
所提供的Python代码实现了一个LangChain应用程序,该应用程序旨在通过利用并行执行来高效地处理给定的主题。注意,asyncio提供的是并发,而不是并行。它在单线程上通过使用事件循环来实现这一点,当任务空闲时(例如,等待网络请求),该事件循环智能地在任务之间切换。这创建了同时执行多个任务的效果,但代码本身仍然由一个线程执行,这受到Python的全局解释器锁(GIL)的限制。
代码定义了三个独立的LangChain “链”,每个链对输入主题执行不同的任务。第一个链用于简洁地总结主题,使用包含主题占位符的System消息和User消息。第二个链被配置为生成三个与主题相关的有趣问题。第三条链用于从输入主题中识别5到10个关键词,这些关键词之间用逗号分隔。每个独立的链都由一个为其特定任务量身定制的ChatPromptTemplate组成,后面是初始化的语言模型和一个StrOutputParser,用于将输出格式化为字符串。
然后构建一个RunnableParallel块来捆绑这三个链,允许它们同时执行。这个并行runnable还包括一个RunnablePassthrough,以确保原始输入topic可用于后续步骤。为最后的合成步骤定义了一个单独的ChatPromptTemplate,将“summary”、“questions”、“key_terms”和“topic”作为输入来生成一个全面的答案。通过将map_chain(并行块)排序到合成提示中,然后是语言模型和输出解析器,创建完整的端到端处理链,称为full_parallel_chain。我们提供了一个异步函数run_parallel_example来演示如何调用这个full_parallel_chain。该函数将主题作为输入,并使用invoke运行异步链。
最后,使用asyncio.run来管理异步执行。本质上,此代码设置了一个工作流,其中针对给定主题同时发生多个LLM调用(用于总结、问题和术语),然后通过最终的LLM调用组合它们的结果。这展示了在使用LangChain的智能体工作流中并行化的核心思想。
四、总结
许多智能体工作流涉及多个必须完成的子任务,以实现最终目标。纯粹的顺序执行,即每个任务都要等待前一个任务完成,通常低效且缓慢。当任务依赖于外部I/O操作(如调用不同的api或查询多个数据库)时,这种延迟成为一个重要的瓶颈。如果没有并发执行的机制,总的处理时间就是所有单独任务持续时间的总和,这阻碍了系统的整体性能和响应能力。
- 并行化是一种并行执行独立任务以提高效率的模式;
- 当任务涉及等待外部资源(如API调用)时,它特别有用;
- 采用并发或并行架构会引入大量的复杂性和成本,影响关键的开发阶段,如设计、调试和系统日志记录;
- 并行化有助于减少整体延迟,并使智能体系统对复杂任务的响应性更强;


更多推荐


所有评论(0)