LlamaIndex采用事件驱动工作流架构,通过定义事件连接处理节点。文章介绍了四种内置事件及使用方法,展示如何用@step装饰器创建工作流步骤,并通过Context实现流式输出、状态存储和事件协调。工作流可作为服务运行,提供可视化调试页面和API接口,使复杂任务处理变得直观高效,适合大模型应用开发。


01 前言

LlamaIndex的Workflow工作流采用了一种巧妙的事件驱动设计,只需通过简单的事件定义就能实现节点之间的连接:当一个节点输出特定事件时,系统会自动触发下一个以该事件为输入的节点。结合内置定义的开始与结束节点,整个流程被优雅地串联与编排起来,让复杂任务的处理变得直观而高效。

02 相关概念

Event事件

在LlamaIndex中,Workflow模块下events文件中,定义了4种基础的内置事件:StartEvent、StopEvent、InputRequiredEvent、HumanResponseEvent。

  • StartEvent:

    开始事件,即工作流开始执行时发送的默认事件。

  • StopEvent:

    结束事件,即标志着工作流执行结束。该事件有个result属性,可以获取工作流执行后的结果。

  • InputRequiredEvent:

    需要人类干预的事件,用于实现Human in the loop流程,对应的是HumanResponseEvent事件。

  • HumanResponseEvent:

    人类反馈事件,完成人类输入或确认后发送该事件。

上面的事件均继承至Event类,它是LlamaIndex定义的基类事件,扩展自定义事件可以继承Event或者已有的其它事件。

Workflow工作流

以事件驱动的Workflow工作流,每个工作流节点都有一个或多个输入和输出事件,由此构成了完整工作流。这种方式更加灵活和简单,同时可以借助上下文Context控制工作流中的事件,实现如跳转工作流到某步骤的效果。

03 Workflow实战

基础用法

# pip install llama-index-core llama-index
from llama_index.core.workflow import(
StartEvent,
StopEvent,
Workflow,
step,
Event,
)
# 自定义事件
class FirstEvent(Event):
first_output: str
class SecondEvent(Event):
second_output: str
"""
step_one takes a StartEvent and returns a FirstEvent
step_two takes a FirstEvent and returns a SecondEvent
step_three takes a SecondEvent and returns a StopEvent
"""
class MyWorkflow(Workflow):
@step
async def step_one(self, ev: StartEvent) -> FirstEvent:
# do something here
print(ev.first_input_plus)
returnFirstEvent(first_output="First step complete.")
@step
async def step_two(self, ev: FirstEvent) -> SecondEvent:
# do something here
print(ev.first_output)
returnSecondEvent(second_output="Second step complete.")
@step
async def step_three(self, ev: SecondEvent) -> StopEvent:
# do something here
print(ev.second_output)
returnStopEvent(result="Workflow complete.")
# 执行
async def main():
w = MyWorkflow(timeout=10, verbose=False)
result = await w.run(first_input_plus="Hello World!")
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

示例首先定义了两个自定义事件FirstEvent和SecondEvent以及它们的属性,均继承至Event类;然后定义MyWorkflow类继承至Workflow,并定义多个@step标记的异步“函数”,这些函数都有一个事件作为输入,一个事件作为输出的“节点”,最终构成工作流

step_one -> step_two -> step_three

然后实例化MyWorkflow,并调用run方法,得到StopEvent事件定义的result内容。

执行后,输出如下:

Hello World!
First step complete.
Second step complete.
Workflow complete.

关于@step:

1、它是一个将普通Python函数标记为工作流中的一个步骤的装饰器

2、它有三个属性,workflow:指定独立函数所属工作流,在工作流类中定义时忽略;num_workers:指定步骤的并行线程数;retry_policy:指定重试策略。

3、修饰的函数的入参,除了Event子类外,还有可选的上下文Context(可选择带有类型化的状态模型)和 通过 typing.Annotated 进行的任何资源注入。

流式输出

对于耗时较久的任务,可以边执行边输出,提升用户体验。两种常见场景,一个是与模型的对话,一个是耗时任务如文件下载等。

在上面示例的基础上改造,首先多导入Context:

from llama_index.core.workflow import(
Context,
)

定义一个表示进度的事件:

class ProgressEvent(Event):
msg: str

重新定义工作流类:

"""
step_one -》 step_two -》 step_three
"""
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
print("step_one run")
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
returnFirstEvent(first_output="First step complete.")
@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
print("step_two run")
llm = DeepSeek(model="deepseek-chat",
api_key="sk-...")
generator = await llm.astream_complete(
"Please give me the first 3 paragraphs of Moby Dick, a book in the public domain."
)
async for response in generator:
# Allow the workflow to stream this piece of response
ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
return SecondEvent(
second_output="Second step complete, full response attached"
)
@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
print("step_three run")
ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
return StopEvent(result="Workflow complete.")
# 执行
async def main():
w = MyWorkflow(timeout=30, verbose=True)
handler = w.run(first_input="Hello World!")
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg, end="", flush=True)
final_result = await handler
print("\n Final result", final_result)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

示例中工作流的节点函数入参均增加了ctx: Context,通过Context的write_event_to_stream方法以stream的方式发送自定义的事件ProgressEvent。

通过不加await的方式调用工作流的run方法,进而获得工作流执行过程中的stream_events。

执行后,输出如下(中间Of开始到Circumambulate开始的行由模型输出):

step_one run
Step one is happeningstep_two run
Of course. Here are the first three paragraphs of Herman Melville's *Moby-Dick*.
***
**Call me Ishmael.** Some years ago—never mind how long precisely—having little or no money in my purse, and nothing particular to interest me on shore, I thought I would sail about a little and see the watery part of the world. It is a way I have of driving off the spleen, and regulating the circulation. Whenever I find myself growing grim about the mouth; whenever it is a damp, drizzly November in my soul; whenever I find myself involuntarily pausing before coffin warehouses, and bringing up the rear of every funeral I meet; and especially whenever my hypos get such an upper hand of me, that it requires a strong moral principle to prevent me from deliberately stepping into the street, and methodically knocking people’s hats off—then, I account it high time to get to sea as soon as I can. This is my substitute for pistol and ball. With a philosophical flourish Cato throws himself upon his sword; I quietly take to the ship. There is nothing surprising in this. If they but knew it, almost all men in their degree, some time or other, cherish very nearly the same feelings towards the ocean with me.
There now is your insular city of the Manhattoes, belted round by wharves as Indian isles by coral reefs—commerce surrounds it with her surf. Right and left, the streets take you waterward. Its extreme downtown is the battery, where that noble mole is washed by waves, and cooled by breezes, which a few hours previous were out of sight of land. Look at the crowds of water-gazers there.
Circumambulate the city of a dreamy Sabbath afternoon. Go from Corlears Hook to Coenties Slip, and from thence, by Whitehall, northward. What do you see?—Posted like silent sentinels all around the town, stand thousands upon thousands of mortal men fixed in ocean reveries. Some leaning against the spiles; some seated upon the pier-heads; some looking over the bulwarks of ships from China; some high aloft in the rigging, as if striving to get a still better seaward peep. But these are all landsmen; of week days pent up in lath and plaster—tied to counters, nailed to benches, clinched to desks. How then is this? Are the green fields gone? What do they here?step_three run
Step three is happening
Final result Workflow complete.

上下文Context

Context就像工作流中的"大脑",在步骤之间协调事件传递,跟踪进行中的工作,暴露全局状态存储,并提供流式和同步工具。它由 Workflow 在运行时创建,并且可以持久化和恢复。

1、流式传出事件:见上面write_event_to_stream使用。

2、全局状态数据存储

# 设置数据
await ctx.store.set("some_database", [1, 2, 3])
"""
# 避免多agent场景的修改冲突,可以使用原子操作,使用with
async with ctx.store.edit_state() as edit_state:
edit_state["some_database"] = [1, 2, 3]
"""
# 读取
db = await ctx.store.get("some_database", default=None)

3、协调事件,下面示例用于并行执行场景:

# 并行触发流程
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
print("step_one run")
ctx.send_event(StepTwoEvent(query="query 1"))
ctx.send_event(StepTwoEvent(query="query 2"))
ctx.send_event(StepTwoEvent(query="query 3"))
# 等待所有事件到达
res = ctx.collect_events(ev, [StepThreeEvent] * 3)
if res is None:
return None

4、持久化并进行恢复上下文

# 存储序列化后的ctx_dict
ctx_dict = ctx.to_dict(serializer=JsonSerializer())
my_db.set("key", json.dumps(ctx_dict))
# 重新获取上下文,并加入工作流
ctx_dict = my_db.get("key")
restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict), serializer=JsonSerializer())
result = await my_workflow.run(..., ctx=restored_ctx)

这样,借助@step和Context可以实现常见工作流类型包括:条件判断、并行执行、多次循环执行、模型调用等任意任务。

运行工作流作为服务

LlamaIndex可以将工作流作为服务运行,服务同时提供了可视化调用页面和API接口,方便进行Debug。

from workflows import Workflow, step
from workflows.context import Context
from workflows.events import Event, StartEvent, StopEvent
from workflows.server import WorkflowServer
import asyncio
class StreamEvent(Event):
sequence: int
# Define a simple workflow
class GreetingWorkflow(Workflow):
@step
async def greet(self, ctx: Context, ev: StartEvent) -> StopEvent:
for i in range(3):
ctx.write_event_to_stream(StreamEvent(sequence=i))
await asyncio.sleep(0.3)
name = getattr(ev, "name", "World")
return StopEvent(result=f"Hello, {name}!")
greet_wf = GreetingWorkflow()
# Create a server instance
server = WorkflowServer()
# Add the workflow to the server
server.add_workflow("greet", greet_wf)
async def main():
await server.start()
# 异步启动方式,访问:http://127.0.0.1:8000/
if __name__ == "__main__":
import uvicorn
uvicorn.run(server.app, host="127.0.0.1", port=8000)

示例启动后,访问http://127.0.0.1:8000/,即可打开可视化调试页面,页面左侧是历史对话和输入,中间是工作流的可视化,右侧是工作流输出:

如图,输入:{“name”:“张三”}并运行,参数会传入StartEvent事件属性中,通过获取name属性值可得到“张三”;工作流通过Context发送三次StreamEvent事件并设置sequence属性,最终在有右侧展示出结果。

04 总结

LlamaIndex 采用事件驱动模型来构建工作流,通过简洁的方式将各个处理步骤连接起来。每个步骤都是一个独立的 Python 函数,专注于实现特定任务,最终组合成完整工作流。配合全局上下文(Context)类对工作流生命周期进行统一管理,进一步增强了流程的灵活性与可控性。

关于Human in the loop放在下篇Agent中介绍。LamaIndex中的Agent实现也是基于Workflow的事件驱动的。

​最后

我在一线科技企业深耕十二载,见证过太多因技术卡位而跃迁的案例。那些率先拥抱 AI 的同事,早已在效率与薪资上形成代际优势,我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在大模型的学习中的很多困惑。

我整理出这套 AI 大模型突围资料包:

  • ✅AI大模型学习路线图
  • ✅Agent行业报告
  • ✅100集大模型视频教程
  • ✅大模型书籍PDF
  • ✅DeepSeek教程
  • ✅AI产品经理入门资料

完整的大模型学习和面试资料已经上传带到CSDN的官方了,有需要的朋友可以扫描下方二维码免费领取【保证100%免费】👇👇
​​
在这里插入图片描述

为什么说现在普通人就业/升职加薪的首选是AI大模型?

人工智能技术的爆发式增长,正以不可逆转之势重塑就业市场版图。从DeepSeek等国产大模型引发的科技圈热议,到全国两会关于AI产业发展的政策聚焦,再到招聘会上排起的长队,AI的热度已从技术领域渗透到就业市场的每一个角落。

img
智联招聘的最新数据给出了最直观的印证:2025年2月,AI领域求职人数同比增幅突破200% ,远超其他行业平均水平;整个人工智能行业的求职增速达到33.4%,位居各行业榜首,其中人工智能工程师岗位的求职热度更是飙升69.6%。

AI产业的快速扩张,也让人才供需矛盾愈发突出。麦肯锡报告明确预测,到2030年中国AI专业人才需求将达600万人,人才缺口可能高达400万人,这一缺口不仅存在于核心技术领域,更蔓延至产业应用的各个环节。

在这里插入图片描述

​​
在这里插入图片描述

资料包有什么?

①从入门到精通的全套视频教程⑤⑥

包含提示词工程、RAG、Agent等技术点
在这里插入图片描述

② AI大模型学习路线图(还有视频解说)

全过程AI大模型学习路线

在这里插入图片描述

③学习电子书籍和技术文档

市面上的大模型书籍确实太多了,这些是我精选出来的

在这里插入图片描述

④各大厂大模型面试题目详解

在这里插入图片描述

⑤ 这些资料真的有用吗?

这份资料由我和鲁为民博士共同整理,鲁为民博士先后获得了北京清华大学学士和美国加州理工学院博士学位,在包括IEEE Transactions等学术期刊和诸多国际会议上发表了超过50篇学术论文、取得了多项美国和中国发明专利,同时还斩获了吴文俊人工智能科学技术奖。目前我正在和鲁博士共同进行人工智能的研究。

所有的视频教程由智泊AI老师录制,且资料与智泊AI共享,相互补充。这份学习大礼包应该算是现在最全面的大模型学习资料了。

资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。

在这里插入图片描述
在这里插入图片描述

智泊AI始终秉持着“让每个人平等享受到优质教育资源”的育人理念‌,通过动态追踪大模型开发、数据标注伦理等前沿技术趋势‌,构建起"前沿课程+智能实训+精准就业"的高效培养体系。

课堂上不光教理论,还带着学员做了十多个真实项目。学员要亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事‌!

​​​​在这里插入图片描述
在这里插入图片描述

如果说你是以下人群中的其中一类,都可以来智泊AI学习人工智能,找到高薪工作,一次小小的“投资”换来的是终身受益!

应届毕业生‌:无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。

零基础转型‌:非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界‌。

业务赋能 ‌突破瓶颈:传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型‌。

👉获取方式:

😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓**

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐