源代码仓库agent-lightning 源代码仓库

论文链接Agent Lightning 论文

个人代码仓库agent-lightning 个人仓库

项目参考Write the First Algorithm - Agent-lightning

1、项目介绍

在之前的博文中,我们简单说了以下 Trainer ,并且用了demo中的APO算法来提升 Agent 的性能。这里,Trainer 处理了所有复杂的交互,所以我们只需要关心 Agent 的逻辑即可。

但是算法有很多种,我们也不能每次都是用 APO 来finetune,要引入其他的或者自己的算法才能更有效地适配自己的任务。这里展示一下如何用demo2的例子,从零编写自定义的算法。

我们将构建一个简单的算法,该算法系统地测试一系列提示模板,并找出奖励最高的模板。

最终,我们要理解算法(Algorithm)、运行器(Runner)以及一个新组件——“存储(Store)”——如何协同工作,从而构建出 Agent-loghtning 的核心强大训练循环。

2、训练的核心概念

讨论 LightningStore 之前,先定义一下 Agent Lightning 训练过程中两个重要概念:Resources、Tracer

2.1、Resources - 可调参数

Resources是算法试图改进的参数,可以是:

1、指导 LLM 的提示模板。

2、机器学习模型的权重。

3、Agent 所需的任何其他配置或数据。

算法的任务就是运行实验并迭代更新这些资源,从而找到性能最佳的版本。

2.2、Tracer - 数据收集器

算法如何判断一项更改是否带来了改进,是需要靠数据的。

而 Tracer 的作用就是自动的对 Agent 的代码进行修改。也就是说,它会监视重要事件,比如 LLM调用、Tools使用、奖励信号,并把这些东西记录在log里。每个日志就是一个 Span 。

来自单个任务执行的 Span 集合为算法提供了 Agent 行为的完整、逐步跟踪记录,这对于学习和改进至关重要。

这里,默认 Tracer 是基于 AgentOps SDK 构建的,以支持对使用各种 Agent/non-agent 框架编写的代码进行修改。

3、LightningStore - 中央枢纽

LightningStore 管理着 resources、tasks、spans。

它充当整个系统的中央数据库和消息队列。它是唯一的数据源,将算法与 runner 解耦。

  • 算法连接到 Store 以进行任务入队 (enqueue_rollout) 和资源更新(例如新的提示模板)。它还会查询 Store 以检索已完成的部署的生成跨度 (span) 和奖励。
  • Runner 连接到 Store 以进行任务出队 (dequeue_rollout)(轮询可用任务)。执行任务后,它们使用 Tracer 将生成的跨度和状态更新写回 Store。

这种架构是Agent-lightning可扩展性的关键。由于算法和 runner 只与Store通信,因此它们可以在不同的进程中运行,甚至可以在不同的机器上运行。

LightningStore 不仅仅是一个简单的数据库;它是一个用于管理整个训练生命周期的系统。它跟踪以下内容:

1、Task Queue

待处理的 Rollout 队列,等待 Runner 执行,可通过 `enqueue_rollout` 和 `dequeue_rollout` 进行交互。

2、Rollout

单个任务的记录。Rollout 包含任务的元数据,并跟踪所有尝试完成该任务的记录,可通过 `query_rollouts` 和 `wait_for_rollouts` 进行交互。

3、Attempts

Rollout 的单次执行。如果尝试失败(例如,由于网络错误),Store 可以自动安排重试(如果已配置)。每次尝试都链接到其父 Rollout,并包含状态和时间信息。Rollout 的状态与其子 Rollout 的状态同步。对于初学者,您可以假设每个 Rollout 只有一次尝试,除非您已显式配置重试。

4、Span

追踪器在每次尝试期间生成的详细结构化日志。每个 Span 都与其父级尝试和部署关联。

5、Resources

算法创建的资源(例如提示模板)的版本化集合。每个部署都与其应使用的特定资源版本关联。

4、构建自定义算法

逻辑如下:

1、首先,列出候选提示模板。

2、对于每个模板,在 Store 中创建一个“资源”包。

3、将一个 rollout(任务)加入队列,告诉 Runner 使用此特定资源。

4、等待 Runner 领取并完成该任务。

5、查询 Store 以获取 rollout 各个 span 的最终奖励。

6、测试所有模板后,比较奖励并确定最佳模板。

我们可以将其实现为一个简单的 Python 函数,该函数直接与 LightningStore 交互。

官方给的demo如下:

async def find_best_prompt(store, prompts_to_test, task_input):
    """A simple algorithm to find the best prompt from a list."""
    results = []

    # Iterate through each prompt to test it
    for prompt in prompts_to_test:
        print(f"[Algo] Updating prompt template to: '{prompt}'")

        # 1. Update the resources in the store with the new prompt
        resources_update = await store.add_resources(
            resources={"prompt_template": prompt}
        )

        # 2. Enqueue a rollout task for a runner to execute
        print("[Algo] Queuing task for clients...")
        rollout = await store.enqueue_rollout(
            input=task_input,
            resources_id=resources_update.resources_id,
        )
        print(f"[Algo] Task '{rollout.rollout_id}' is now available for clients.")

        # 3. Wait for the rollout to be completed by a runner
        await store.wait_for_rollouts([rollout.rollout_id])

        # 4. Query the completed rollout and its spans
        completed_rollout = await store.get_rollout_by_id(rollout.rollout_id)
        print(f"[Algo] Received Result: {completed_rollout.model_dump_json(indent=None)}")

        spans = await store.query_spans(rollout.rollout_id)
        # We expect at least two spans: one for the LLM call and one for the final reward
        print(f"[Algo] Queried Spans:\n  - " + "\n  - ".join(str(span) for span in spans))
        # find_final_reward is a helper function to extract the reward span
        final_reward = find_final_reward(spans)
        print(f"[Algo] Final reward: {final_reward}\n")

        results.append((prompt, final_reward))

    # 5. Find and print the best prompt based on the collected rewards
    print(f"[Algo] All prompts and their rewards: {results}")
    best_prompt, best_reward = max(results, key=lambda item: item[1])
    print(f"[Algo] Best prompt found: '{best_prompt}' with reward {best_reward}")

这里我做一下详细的解释:

4.1、函数目的

算法的目的是从多个候选系统提示词中找到效果最好的一个。

4.2、函数签名

async def find_best_prompt(store, prompts_to_test, task_input):

参数说明:

  • store:LightningStore 实例(数据存储和任务管理中心);
  • prompts_to_test:待测试的提示词列表;
  • task_input:测试任务的输入数据;

4.3、核心流程(5个步骤)

4.3.1、step 1:更新资源配置

resources_update = await store.add_resources(
    resources={"prompt_template": prompt}
)

作用:

  • 将新的prompt上传到store;
  • 创建一个资源包,包含配置信息;
  • 返回一个resource id,用于后续引用这个配置。

有点像在游戏中创建一个新的装备配置方案。

4.3.2、step 2:入队任务

rollout = await store.enqueue_rollout(
    input=task_input,
    resources_id=resources_update.resources_id,
)

作用:

  • 创建一个新的“rollout”(执行任务实例);
  • 将任务放入队列,等待 Runner 领取;
  • 指定使用刚才创建的资源配置;

有点像餐厅点单,厨师会按顺序接单制作。

4.3.3、step 3:等待任务完成

await store.wait_for_rollouts([rollout.rollout_id])

作用:

  • 阻塞等待,直到指定的rollout完成。
  • 可以同时等待多个rollout(传入ID列表)。

4.3.4、step 4:获取结果和详细数据

completed_rollout = await store.get_rollout_by_id(rollout.rollout_id)
spans = await store.query_spans(rollout.rollout_id)

主要是获取rollout(包含执行状态、元数据等)和查询span(追踪数据)。

最后提取最终的奖励。

5、Agent 和 Runner

算法需要一个 Agent 来执行任务,以及一个 runner 来管理整个流程。

Runner 是一个长期运行的工作进程。它的任务很简单:

1、通过 LightningStoreClient 连接到 LightningStore。

2、进入一个循环,不断地向 LightningStore 请求新任务(dequeue_rollout)。

3、当它收到任务时,它会运行 simple_agent 函数。

4、关键在于,运行器会使用 Tracer 来封装代理的执行过程。Tracer 会自动捕获所有重要事件(例如 LLM 调用和最终奖励),并将其作为 span 发送回 LightningStore。

# Connecting to Store
store = agl.LightningStoreClient("http://localhost:4747")  # or some other address
runner = LitAgentRunner[str](tracer=AgentOpsTracer())
with runner.run_context(agent=simple_agent, store=store):  # <-- where the wrapping and instrumentation happens
    await runner.iter()  # polling for new tasks forever

在这个例子中,代理的任务是从资源中获取提示,用它来向 LLM 提出问题,并返回一个分数。

def simple_agent(task: str, prompt_template: PromptTemplate) -> float:
    """An agent that answers a question and gets judged by an LLM."""
    client = OpenAI()

    # Generate a response using the provided prompt template
    prompt = prompt_template.format(any_question=task)
    response = client.chat.completions.create(
        model="gpt-4.1-nano", messages=[{"role": "user", "content": prompt}]
    )
    llm_output = response.choices[0].message.content
    print(f"[Rollout] LLM returned: {llm_output}")

    # This llm_output and the final score are automatically logged as spans by the Tracer
    score = random.uniform(0, 1)  # Replace with actual scoring logic if needed
    return score

6、运行讲解

可以参考下:apo_custom_algorithm.py

这里我们需要开启3个终端。

6.1、第一个终端

这个终端我们启动 LightningStore (Store),该组件将等待来自算法和运行器的连接。默认情况下,Store 将监听 4747 端口。

agl store

6.2、启动runner

在第二个终端中,启动 Runner 进程。它将连接到store并等待任务。

python apo_custom_algorithm.py runner

可以看到程序已经启动,并且正在等待 rollouts 的更新。

这里我没有使用openai,换用了sonnet-4-5。

6.3、启动算法

第三个终端中运行算法。这将启动整个过程。

python apo_custom_algorithm.py algo

6.4、总结

可以看下这三个终端的输出。

6.4.1、算法输出

[Algo] Updating prompt template to: 'You are a friendly chatbot. {any_question}'
[Algo] Queuing task for clients...
[Algo] Task 'ro-feb59feab106' is now available for clients.
[Algo] Received Result: rollout_id='ro-feb59feab106' input='Explain why the sky appears blue using principles of light scattering in 100 words.' start_time=1768292816.7146344 end_time=1768292829.8468764 mode='train' resources_id='rs-ce4816ace922' status='succeeded' 
config=RolloutConfig(timeout_seconds=None, unresponsive_seconds=None, max_attempts=1, retry_condition=[]) metadata={}
[LLM] Span 05bf6f30110aaf9b (openai.chat.completion): {'gen_ai.request.type': 'chat', 'gen_ai.system': 'OpenAI', 'gen_ai.request.model': 'sonnet-4-5', 'gen_ai.request.streaming': False, 'gen_ai.prompt.0.role': 'user', 'gen_ai.prompt.0.content': 'You are a friendly chatbot. 
Explain why the sky appears blue using principles of light scattering in 100 words.', 'gen_ai.response.id': 'chatcmpl-efbc5d8a-e52f-4a71-83d7-326201bf25f4', 'gen_ai.response.model': 'us.anthropic.claude-sonnet-4-5-20250929-v1:0', 'gen_ai.usage.total_tokens': 179, 
'gen_ai.usage.prompt_tokens': 34, 'gen_ai.usage.completion_tokens': 145, 'gen_ai.completion.0.finish_reason': 'stop', 'gen_ai.completion.0.role': 'assistant', 'gen_ai.completion.0.content': "# Why the Sky Appears Blue ☀️\n\nThe sky looks blue because of **Rayleigh scattering**!
When sunlight enters Earth's atmosphere, it contains all colors of the rainbow. The air molecules and tiny particles scatter this light in different directions.\n\nHere's the key: **shorter wavelengths scatter more** than longer ones. Blue and violet light have short 
wavelengths, so they scatter much more than red, orange, or yellow light.\n\nAlthough violet scatters even more than blue, our eyes are more sensitive to blue light, and some violet gets absorbed higher up. That's why we see a beautiful blue sky! 🌤️"}
[LLM] Span 0b2d4e26f0debc75 (openai.chat.completion): {'gen_ai.request.type': 'chat', 'gen_ai.system': 'OpenAI', 'gen_ai.request.model': 'sonnet-4-5', 'gen_ai.request.temperature': 0.0, 'gen_ai.request.streaming': False, 'gen_ai.prompt.0.role': 'user', 
'gen_ai.prompt.0.content': "Evaluate how well the output fulfills the task.\nTask: Explain why the sky appears blue using principles of light scattering in 100 words.\nOutput: # Why the Sky Appears Blue ☀️\n\nThe sky looks blue because of **Rayleigh scattering**! When sunlight
enters Earth's atmosphere, it contains all colors of the rainbow. The air molecules and tiny particles scatter this light in different directions.\n\nHere's the key: **shorter wavelengths scatter more** than longer ones. Blue and violet light have short wavelengths, so they 
scatter much more than red, orange, or yellow light.\n\nAlthough violet scatters even more than blue, our eyes are more sensitive to blue light, and some violet gets absorbed higher up. That's why we see a beautiful blue sky! 🌤️\nYou must be very critical and strict in your 
evaluation.\nReturn only a number between 0 and 1. No text, punctuation, or explanation.", 'gen_ai.response.id': 'chatcmpl-8c05a27d-025d-4514-b8f7-3c6354423ed8', 'gen_ai.response.model': 'us.anthropic.claude-sonnet-4-5-20250929-v1:0', 'gen_ai.usage.total_tokens': 227, 
'gen_ai.usage.prompt_tokens': 220, 'gen_ai.usage.completion_tokens': 7, 'gen_ai.completion.0.finish_reason': 'stop', 'gen_ai.completion.0.role': 'assistant', 'gen_ai.completion.0.content': '0.65'}
[Algo] Final reward: 0.65

算法终端显示主要控制流程:更新提示、任务排队以及接收最终结果。您还可以看到它从存储中检索到的原始跨度数据。

6.4.2、Runner输出

[Rollout] LLM returned: The sky appears blue due to **Rayleigh scattering**, where sunlight interacts with Earth's atmosphere. Sunlight contains all colors of the 
visible spectrum, but shorter wavelengths (blue and violet) scatter more efficiently than longer wavelengths (red and orange) when they collide with air molecules. 
Blue light scatters approximately 10 times more than red light. Although violet scatters even more than blue, our eyes are more sensitive to blue, and the upper 
atmosphere absorbs some violet light. This scattered blue light reaches our eyes from all directions across the sky, creating the characteristic blue color we 
observe during daytime.
[Judge] Judge returned score: 0.92

runner 终端显示它正在获取每个任务、执行代理逻辑并报告完成情况。

6.4.3、Store Server输出

会显示每次交互的详细日志,证实了其作为中央枢纽的作用。您可以查看入队和出队部署、添加跨度以及更新状态的请求。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐