【大数据 & AI】Flink Agents 源码解读 --- (6) --- ActionTask
PythonActionTask 对应一个 Python 函数(更准确地说是一个 PythonFunction 对象),这个函数是在创建 Action 时定义的,存储在 action.getExec() 中。但PythonActionTask 不仅仅是简单的函数封装,而是使其能够在 Flink Agents 框架中正确执行,并支持框架所需的高级特性。复杂逻辑:PythonActionTask 不仅
【大数据 & AI】Flink Agents 源码解读 — (6) — ActionTask
0x00 概要
ActionTask 是 Action 执行的基本单位,代表一个可执行的任务块。一个完整的 Action 可能会被切分成多个 ActionTask 来执行。ActionTask 在整体流程的位置如下:
Action Code → Agent → AgentPlan → ActionExecutionOperator → ActionTask → Flink Runtime
0x01 基础知识
ActionTask 是 Action 执行过程中的一个片段,用于支持复杂的执行逻辑(如异步处理),对应关系如下:
- 一个 Action 对应一个函数
在 AgentPlan 中,每个 Action 包含一个执行函数 (exec),通常是 PythonFunction 或 JavaFunction,例如在 tool_call_action.py 中:
TOOL_CALL_ACTION = Action (
name="tool_call_action",
exec=PythonFunction.from_callable (process_tool_request), // 一个函数
listen_event_types=[...]
)
- 一个 Action 可能产生多个 ActionTask
- ActionTask 是 Action 的执行时表示,可以看作是 Action 的 “执行片段”
- 一个 Action 可能在执行过程中被拆分为多个 ActionTask,特别是在处理异步操作时
1.1 相关组件
ActionTask 概念的相关组件如下
| 组件 | 核心功能 |
|---|---|
| JavaActionTask | 执行 Java 函数 |
| PythonActionTask | 执行 Python 函数,支持异步 / 生成器模式,桥接 Java 与 Python 生态 |
| LocalRunnerContext | 本地执行上下文,模拟 Flink 分布式状态,管理事件队列、key 隔离状态、资源访问 |
| ActionTaskResult | 动作执行结果载体,包含是否完成、输出事件、下一个待执行任务(若有) |
| PythonGeneratorActionTask | 处理 Python 生成器的异步任务,持续执行直到完成所有异步操作 |
| Tool 相关机制 | 支持装饰器(@tool)、add_resource 等方式注册工具,通过 TOOL_CALL_ACTION 触发执行 |
在系统中的架构如下

1.2 ActionTask
我们接下来看看 ActionTask 的具体实现。
ActionTask 是基类。
/**
* This class represents a task related to the execution of an action in {@link
* ActionExecutionOperator}.
*
* <p>An action is split into multiple code blocks, and each code block is represented by an {@code
* ActionTask}. You can call {@link #invoke()} to execute a code block and obtain invoke result
* {@link ActionTaskResult}. If the action contains additional code blocks, you can obtain the next
* {@code ActionTask} via {@link ActionTaskResult#getGeneratedActionTask()} and continue executing
* it.
*/
public abstract class ActionTask {
protected final Object key;
protected final Event event;
protected final Action action;
/**
* Since RunnerContextImpl contains references to the Operator and state, it should not be
* serialized and included in the state with ActionTask. Instead, we should check if a valid
* RunnerContext exists before each ActionTask invocation and create a new one if necessary.
*/
protected transient RunnerContextImpl runnerContext;
public ActionTask(Object key, Event event, Action action) {
this.key = key;
this.event = event;
this.action = action;
}
public RunnerContextImpl getRunnerContext() {
return runnerContext;
}
public void setRunnerContext(RunnerContextImpl runnerContext) {
this.runnerContext = runnerContext;
}
public Object getKey() {
return key;
}
/** Invokes the action task. */
public abstract ActionTaskResult invoke() throws Exception;
public class ActionTaskResult {
private final boolean finished;
private final List<Event> outputEvents;
private final Optional<ActionTask> generatedActionTaskOpt;
public ActionTaskResult(
boolean finished,
List<Event> outputEvents,
@Nullable ActionTask generatedActionTask) {
this.finished = finished;
this.outputEvents = outputEvents;
this.generatedActionTaskOpt = Optional.ofNullable(generatedActionTask);
}
public boolean isFinished() {
return finished;
}
public List<Event> getOutputEvents() {
return outputEvents;
}
public Optional<ActionTask> getGeneratedActionTask() {
return generatedActionTaskOpt;
}
}
}
1.3 PythonActionTask
PythonActionTask 是一个专门用于执行 Python 动作任务的特殊 ActionTask 实现。它的主要作用包括:
- 执行 Python 函数:调用 Python 函数来处理事件
- 处理异步操作:支持 Python 中的异步操作,通过生成器机制实现
- 桥接 Java 和 Python:作为 Java 端和 Python 端之间的桥梁,协调两者间的交互
1.3.1 定义
PythonActionTask 对应一个 Python 函数(更准确地说是一个 PythonFunction 对象),这个函数是在创建 Action 时定义的,存储在 action.getExec() 中。但PythonActionTask 不仅仅是简单的函数封装,而是使其能够在 Flink Agents 框架中正确执行,并支持框架所需的高级特性。它提供了以下附加价值:
- 复杂逻辑:PythonActionTask 不仅仅是执行函数,还负责处理复杂的交互逻辑
- 执行环境管理:为函数提供合适的执行上下文
- 异步支持:通过生成器机制支持长时间运行的操作
- 事件处理:管理和传递执行过程中产生的事件
- 状态维护:在整个执行过程中维护必要的状态信息
PythonActionTask 在系统架构中的位置和交互关系如下:

代码如下:
public class PythonActionTask extends ActionTask {
public ActionTaskResult invoke() throws Exception {
PythonActionExecutor pythonActionExecutor = getPythonActionExecutor();
// 这里执行实际的 Python 函数
String pythonGeneratorRef =
pythonActionExecutor.executePythonFunction(
(PythonFunction) action.getExec(), // <-- 这就是对应的函数
(PythonEvent) event,
runnerContext);
// 处理异步情况
if (pythonGeneratorRef != null) {
// 如果函数返回了生成器,则创建新的任务继续执行
ActionTask tempGeneratedActionTask =
new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
tempGeneratedActionTask.setRunnerContext(runnerContext);
if (pythonGeneratorRef != null) {
// 如果函数返回了生成器,则创建新的任务继续执行
ActionTask tempGeneratedActionTask =
new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
tempGeneratedActionTask.setRunnerContext(runnerContext);
return tempGeneratedActionTask.invoke();
}
// 否则表示函数已执行完毕
return new ActionTaskResult(
true,
runnerContext.drainEvents(event.getSourceTimestamp()),
null);
1.3.2 PythonActionTask 与 Function 的关系
PythonActionTask 与 Function 的关系的如下

1.3.3 与其他组件的关系
PythonActionTask 与其他组件的关系如下图所示。

1.3.4 调用流程
PythonActionTask.invoke() 流程如下图所示,其关键特点为:
- 异步支持:通过 Python 生成器机制支持长时间运行的操作
- 状态管理:与 ActionExecutionOperator 协作管理执行状态
- 错误处理:适当地处理 Python 执行过程中可能出现的异常
- 内存管理:与 RunnerContext 集成,管理短期记忆和其他状态

PythonActionTask 在整个系统中起到了至关重要的作用,它使得 Flink Agents 能够无缝集成 Python 生态系统中的各种 AI 工具和库,同时保持与 Flink 流处理引擎的良好集成。
1.4 PythonGeneratorActionTask
PythonGeneratorActionTask 是 PythonActionTask 的派生类。
- 当 Python 函数中使用了 yield 或异步操作时,Python 执行器会检测到这种情况并返回一个 Generator 引用
- 系统会创建 PythonGeneratorActionTask 来继续执行
/** An {@link ActionTask} wrapper a Python Generator to represent a code block in Python action. */
public class PythonGeneratorActionTask extends PythonActionTask {
private final String pythonGeneratorRef;
public PythonGeneratorActionTask(
Object key, Event event, Action action, String pythonGeneratorRef) {
super(key, event, action);
this.pythonGeneratorRef = pythonGeneratorRef;
}
@Override
public ActionTaskResult invoke() {
boolean finished = getPythonActionExecutor().callPythonGenerator(pythonGeneratorRef);
ActionTask generatedActionTask = finished ? null : this;
return new ActionTaskResult(
finished,
runnerContext.drainEvents(event.getSourceTimestamp()),
generatedActionTask);
}
}
1.5 JavaActionTask
JavaActionTask 执行 Java ActionTask。
/**
* A special {@link ActionTask} designed to execute a Java action task.
*
* <p>Note that Java action currently do not support asynchronous execution. As a result, a Java
* action task will be invoked only once.
*/
public class JavaActionTask extends ActionTask {
private final ClassLoader userCodeClassLoader;
public JavaActionTask(Object key, Event event, Action action, ClassLoader userCodeClassLoader) {
super(key, event, action);
checkState(action.getExec() instanceof JavaFunction);
this.userCodeClassLoader = userCodeClassLoader;
}
@Override
public ActionTaskResult invoke() throws Exception {
runnerContext.checkNoPendingEvents();
ClassLoader cl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
action.getExec().call(event, runnerContext);
} finally {
Thread.currentThread().setContextClassLoader(cl);
}
return new ActionTaskResult(
true, runnerContext.drainEvents(event.getSourceTimestamp()), null);
}
}
1.6 ActionTaskResult 结构
每次执行 ActionTask 后会返回一个 ActionTaskResult 对象:
public class ActionTaskResult {
private final boolean finished; // 是否已完成
private final List<Event> outputEvents; // 输出事件
private final Optional<ActionTask> generatedActionTaskOpt; // 下一个 ActionTask(如果有)
// ...
}
0x02 ActionTask 切分机制
在 Flink Agents 框架中,ActionTask 的切分是为了支持长时间运行的操作和异步执行。切分的好处如下:
- 避免阻塞:长时间运行的操作不会阻塞整个操作符
- 提高并发性:允许其他 key 的任务同时执行
- 容错能力:每个 ActionTask 可以单独失败和恢复
- 资源管理:更好地管理内存和其他资源
这种切分机制使得 Flink Agents 能够高效地处理复杂的,长时间运行的 AI Agent 任务,同时保持系统的响应的稳定性。
2.1 切分方式
主要切分方式如下:
概念上的拆分
- 一个 Action 在概念上可被拆成多个顺序执行的代码块,每个代码块成为一个 ActionTask 实例。
- 设计目的在于细粒度控制,尤其适用于异步操作。
创建与流程
- 初始触发:ActionExecutionOperator 通过 createActionTask() 为每个动作生成首个 ActionTask。
- 执行过程:若动作产生生成器(异步),可再实例化新的 ActionTask 继续后续代码块。
/**
* Processes an incoming event for the given key and may submit a new mail
* `tryProcessActionTaskForKey` to continue processing.
*/
private void processEvent(Object key, Event event) throws Exception {
notifyEventProcessed(event);
boolean isInputEvent = EventUtil.isInputEvent(event);
if (EventUtil.isOutputEvent(event)) {
} else {
// We then obtain the triggered action and add ActionTasks to the waiting processing
// queue.
List<Action> triggerActions = getActionsTriggeredBy(event);
if (triggerActions != null && !triggerActions.isEmpty()) {
for (Action triggerAction : triggerActions) {
actionTasksKState.add(createActionTask(key, triggerAction, event));
}
}
}
}
createActionTask 代码如下。
private ActionTask createActionTask(Object key, Action action, Event event) {
if (action.getExec() instanceof JavaFunction) {
return new JavaActionTask(
key, event, action, getRuntimeContext().getUserCodeClassLoader());
} else if (action.getExec() instanceof PythonFunction) {
return new PythonActionTask(key, event, action);
} else {
throw new IllegalStateException(
"Unsupported action type: " + action.getExec().getClass());
}
}
2.2 实现细节
ActionTask.java 中有如下,这意味着:
- 一个 Action 可能被拆分成多个代码块
- 每个代码块由一个 ActionTask 表示
- 通过调用 invoke () 执行代码块并获得结果
- 如果 Action 包含更多代码块,可以从 ActionTaskResult 中获取下一个 ActionTask
/**
* 此类表示在 ActionExecutionOperator 中执行的动作任务。
*
* <p>一个动作会被拆分为多个代码块,每个代码块都由一个 ActionTask 表示。
* 可以调用 #invoke() 来执行一个代码块并获得执行结果(ActionTaskResult)。
* 如果动作包含额外的代码块,可通过 ActionTaskResult#getGeneratedActionTask()
* 获取下一个 ActionTask 并继续执行它。
*/
Python 集成(PythonActionTask.java)
- 当 Python 函数中使用了 yield 或异步操作时,Python 执行器会检测到这种情况并返回一个 Generator 引用
- 系统会创建 PythonGeneratorActionTask 来继续执行
这说明在处理异步 Python 函数时,一个 Action 可能会产生多个 ActionTask:
- 初始的 PythonActionTask
- 后续的 PythonGeneratorActionTask(如果需要)
/**
* 专门用于执行 Python 动作任务的特殊 ActionTask。
*
* <p>在 Python 中进行异步执行期间,PythonActionTask 可以生成一个
* PythonGeneratorActionTask 来代表后续需要的代码块。
*/
public class PythonActionTask extends ActionTask {
public ActionTaskResult invoke() throws Exception {
...
String pythonGeneratorRef =
pythonActionExecutor.executePythonFunction(
(PythonFunction)action.getExec(),
(PythonEvent)event,
runnerContext);
// 如果用户定义的动作使用接口提交了异步任务,
// 它将在第一次执行后返回一个 Python 生成器对象实例。
// 否则意味着没有提交异步任务且动作已经完成。
if (pythonGeneratorRef != null) {
// Python 动作生成了一个生成器。我们需要执行一次它,
// 这将提交一个异步任务并返回动作是否已完成。
ActionTask tempGeneratedActionTask =
new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
tempGeneratedActionTask.setRunnerContext(runnerContext);
return tempGeneratedActionTask.invoke();
}
return new ActionTaskResult(
true, runnerContext.drainEvents(event.getSourceTimestamp()), null);
}
}
2.3 关键点
ActionTask 拆分的关键点如下:
- 顺序执行:对于给定的键,任务按顺序执行,以保持顺序性和一致性
- 异步处理:当动作涉及异步操作时:
- 初始任务执行到异步操作为止;
- 返回一个新的 ActionTask 来表示后续的操作;
- 后续任务处理异步结果并继续执行
- 状态管理:每个 ActionTask 通过 RunnerContext 维护其状态,确保拆分之间的连续性
- 基于邮箱的处理:使用 Flink 的邮箱执行器来调度后续任务的处理:
// 提交新邮件以继续处理
mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
这种设计允许将带有异步操作的复杂动作分解为可管理的单元,同时保持执行语义和状态一致性。
0xEE 个人信息
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

更多推荐



所有评论(0)