博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持!
博主链接

本人就职于国际知名终端厂商,负责modem芯片研发。
在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G技术研究。


博客内容主要围绕:
       5G/6G协议讲解
       高级C语言讲解
       Rust语言讲解



【Agentic AI】并行化模式学习笔记

一、并行化模式介绍

       并行化模式对于提高智能体系统的效率和响应能力至关重要,特别是在处理涉及多个独立查找、计算或与外部服务交互的任务时。它是优化复杂Agent工作流性能的关键技术。并行化涉及并发执行多个组件,如LLM调用、工具使用,甚至整个子代理(如下图所示)。并行执行不再需要等待一个步骤完成再开始下一个步骤,而是允许独立的任务同时运行,大大减少了可分解为独立部分的任务的整体执行时间。

在这里插入图片描述

假设一个智能体被设计用于研究一个主题并总结研究结果。顺序执行的方法可能:

  1. 查找源A;
  2. 总结源A;
  3. 查找源B;
  4. 总结源B;
  5. 从A和B的总结中合成最终答案;

如果是并行方法可以是:

  1. 同时搜索源A和源B;
  2. 一旦两个搜索都完成了,就同时总结源A和源B;
  3. 从A和B的总结中合成最终答案(此步骤通常是顺序的,等待并行步骤完成);

       其核心思想是识别工作流中不依赖于其他部分输出的部分,并并行执行它们。这在处理具有延迟的外部服务(如api或数据库)时特别有效,因为您可以并发发出多个请求。实现并行化通常需要支持异步执行或多线程/多进程的框架。现代智能体框架在设计时考虑了异步操作,允许您轻松定义可以并行运行的步骤。

二、应用场景

2.1 信息收集和研究

同时从多个来源收集信息是一个经典的用例。例如:

  • Agent调查一家公司
    • 并行任务:同时搜索新闻文章、拉股票数据、检查社交媒体提及率和查询公司数据库。
    • 优点:收集全面视图的速度比顺序查找快得多。

2.2 数据处理和分析

应用不同的分析技术或同时处理不同的数据段。例如:

  • Agent分析客户反馈
    • 并行任务:在一批反馈条目中同时运行情感分析,提取关键词,对反馈进行分类并识别紧急问题。
    • 优点:快速提供多方面的分析。

2.3 多API或工具交互

调用多个独立的API或工具来收集不同类型的信息或执行不同的操作。例如:

  • 旅行计划代理
    • 并行任务:检查机票价格,搜索酒店可用性,查找当地活动,同时查找餐厅推荐。
    • 优点:更快地呈现完整的旅行计划。

2.4 使用多个组件生成内容

并行生成复杂内容的不同部分。例如:

  • 代理创建营销邮件
    • 并行任务:生成主题行,起草电子邮件正文,查找相关图像,并同时创建一个call-to-action按钮文本。
    • 优点:更高效地组装最终邮件。

2.5 检查和验证

同时执行多个独立的检查或验证。例如:

  • 代理验证用户输入
    • 并行任务:同时检查电子邮件格式、验证电话号码、验证地址是否与数据库相符、检查是否亵渎。
    • 优点:提供更快的输入有效性反馈。

2.6 多模态处理

同时处理相同输入的不同模态(文本、图像、音频)。例如:

  • 一个代理分析带有文本和图像的社交媒体帖子
    • 并行任务:分析文本中的情感和关键词,同时分析图像中的物体和场景描述。
    • 优点:更快地整合来自不同模态的见解。

2.7 A/B测试或多个选项生成

并行生成响应或输出的多个变体,以选择最佳的一个。例如:

  • 生成不同创造性文本选项的代理
    • 并行任务:使用略有不同的提示或模型同时为一篇文章生成三个不同的标题
    • 优点:允许快速比较和选择最佳选项。

并行化是智能体设计中的一种基本优化技术,它允许开发人员通过利用独立任务的并发执行来构建性能更高、响应更快的应用程序。

三、并行化演示代码

       LangChain框架中的并行执行由LangChain表达式语言(LangChain Expression Language, LCEL)实现。主要方法是在字典或列表结构中构造多个可运行组件。当此集合作为输入传递给链中的后续组件时,LCEL运行时将并发执行包含的runnables。在LangGraph中,这一原则应用于图的拓扑结构。并行工作流是通过构建图来定义的,这样就可以从单个公共节点发起多个节点,而没有直接的顺序依赖关系。这些并行路径在其结果可以在图中的后续收敛点聚合之前独立执行。下面的实现演示了一个使用LangChain框架构建的并行处理工作流。该工作流设计为并发执行两个4个独立操作以响应单个用户查询。这些并行过程被实例化为不同的链或函数,它们各自的输出随后聚合为一个统一的结果。

import asyncio
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough


try:
    llm = ChatOllama(
        model="aaa",
        validate_model_on_init=True,
        temperature=0.8,
        num_predict=2000,
        base_url="xxx"
        # other params ...
    )
except Exception as e:
    print(f"Error initializing language model: {e}")
    llm = None

# --- Define Independent Chains ---
# These three chains represent distinct tasks that can be executed in parallel.
summarize_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Summarize the following topic concisely:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

questions_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Generate three interesting questions about the following topic:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

terms_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Identify 5-10 key terms from the following topic, separated by commas:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)
# --- Build the Parallel + Synthesis Chain ---
# 1. Define the block of tasks to run in parallel. The results of these,
# along with the original topic, will be fed into the next step.
map_chain = RunnableParallel(
    {
        "summary": summarize_chain,
        "questions": questions_chain,
        "key_terms": terms_chain,
        "topic": RunnablePassthrough(), # Pass the original topic through
    }
)


# 2. Define the final synthesis prompt which will combine the parallel results.
synthesis_prompt = ChatPromptTemplate.from_messages([
    ("system", """Based on the following information:
    Summary: {summary}
    Related Questions: {questions}
    Key Terms: {key_terms}
    Synthesize a comprehensive answer."""),
    ("user", "Original topic: {topic}")
])

# 3. Construct the full chain by piping the parallel results directly
# into the synthesis prompt, followed by the LLM and output parser.
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()


# --- Run the Chain ---
async def run_parallel_example(topic: str) -> None:
    """
    Asynchronously invokes the parallel processing chain with a
    specific topic
    and prints the synthesized result.
    Args:
    topic: The input topic to be processed by the LangChain
    chains.
    """
    if not llm:
        print("LLM not initialized. Cannot run example.")
        return

    print(f"\n--- Running Parallel LangChain Example for Topic:'{topic}' ---")

    try:
        # The input to `ainvoke` is the single 'topic' string,
        # then passed to each runnable in the `map_chain`.
        response = await full_parallel_chain.ainvoke(topic)
        print("\n--- Final Response ---")
        print(response)
    except Exception as e:
        print(f"\nAn error occurred during chain execution: {e}")


if __name__ == "__main__":
    test_topic = "The history of space exploration"
    # In Python 3.7+, asyncio.run is the standard way to run an async function.
    asyncio.run(run_parallel_example(test_topic))

       所提供的Python代码实现了一个LangChain应用程序,该应用程序旨在通过利用并行执行来高效地处理给定的主题。注意,asyncio提供的是并发,而不是并行。它在单线程上通过使用事件循环来实现这一点,当任务空闲时(例如,等待网络请求),该事件循环智能地在任务之间切换。这创建了同时执行多个任务的效果,但代码本身仍然由一个线程执行,这受到Python的全局解释器锁(GIL)的限制。

       代码定义了三个独立的LangChain “链”,每个链对输入主题执行不同的任务。第一个链用于简洁地总结主题,使用包含主题占位符的System消息和User消息。第二个链被配置为生成三个与主题相关的有趣问题。第三条链用于从输入主题中识别5到10个关键词,这些关键词之间用逗号分隔。每个独立的链都由一个为其特定任务量身定制的ChatPromptTemplate组成,后面是初始化的语言模型和一个StrOutputParser,用于将输出格式化为字符串。

       然后构建一个RunnableParallel块来捆绑这三个链,允许它们同时执行。这个并行runnable还包括一个RunnablePassthrough,以确保原始输入topic可用于后续步骤。为最后的合成步骤定义了一个单独的ChatPromptTemplate,将“summary”、“questions”、“key_terms”和“topic”作为输入来生成一个全面的答案。通过将map_chain(并行块)排序到合成提示中,然后是语言模型和输出解析器,创建完整的端到端处理链,称为full_parallel_chain。我们提供了一个异步函数run_parallel_example来演示如何调用这个full_parallel_chain。该函数将主题作为输入,并使用invoke运行异步链。

       最后,使用asyncio.run来管理异步执行。本质上,此代码设置了一个工作流,其中针对给定主题同时发生多个LLM调用(用于总结、问题和术语),然后通过最终的LLM调用组合它们的结果。这展示了在使用LangChain的智能体工作流中并行化的核心思想。

四、总结

       许多智能体工作流涉及多个必须完成的子任务,以实现最终目标。纯粹的顺序执行,即每个任务都要等待前一个任务完成,通常低效且缓慢。当任务依赖于外部I/O操作(如调用不同的api或查询多个数据库)时,这种延迟成为一个重要的瓶颈。如果没有并发执行的机制,总的处理时间就是所有单独任务持续时间的总和,这阻碍了系统的整体性能和响应能力。

  • 并行化是一种并行执行独立任务以提高效率的模式;
  • 当任务涉及等待外部资源(如API调用)时,它特别有用;
  • 采用并发或并行架构会引入大量的复杂性和成本,影响关键的开发阶段,如设计、调试和系统日志记录;
  • 并行化有助于减少整体延迟,并使智能体系统对复杂任务的响应性更强;

《图片》



在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐