【大数据 & AI】Flink Agents 源码解读 — (6) — ActionTask

0x00 概要

ActionTask 是 Action 执行的基本单位,代表一个可执行的任务块。一个完整的 Action 可能会被切分成多个 ActionTask 来执行。ActionTask 在整体流程的位置如下:

Action Code → Agent → AgentPlan → ActionExecutionOperator → ActionTask → Flink Runtime

0x01 基础知识

ActionTask 是 Action 执行过程中的一个片段,用于支持复杂的执行逻辑(如异步处理),对应关系如下:

  • 一个 Action 对应一个函数

在 AgentPlan 中,每个 Action 包含一个执行函数 (exec),通常是 PythonFunction 或 JavaFunction,例如在 tool_call_action.py 中:

TOOL_CALL_ACTION = Action (
    name="tool_call_action",
    exec=PythonFunction.from_callable (process_tool_request), // 一个函数
    listen_event_types=[...]
)
  • 一个 Action 可能产生多个 ActionTask
    • ActionTask 是 Action 的执行时表示,可以看作是 Action 的 “执行片段”
    • 一个 Action 可能在执行过程中被拆分为多个 ActionTask,特别是在处理异步操作时

1.1 相关组件

ActionTask 概念的相关组件如下

组件 核心功能
JavaActionTask 执行 Java 函数
PythonActionTask 执行 Python 函数,支持异步 / 生成器模式,桥接 Java 与 Python 生态
LocalRunnerContext 本地执行上下文,模拟 Flink 分布式状态,管理事件队列、key 隔离状态、资源访问
ActionTaskResult 动作执行结果载体,包含是否完成、输出事件、下一个待执行任务(若有)
PythonGeneratorActionTask 处理 Python 生成器的异步任务,持续执行直到完成所有异步操作
Tool 相关机制 支持装饰器(@tool)、add_resource 等方式注册工具,通过 TOOL_CALL_ACTION 触发执行

在系统中的架构如下

flink-6-1

1.2 ActionTask

我们接下来看看 ActionTask 的具体实现。

ActionTask 是基类。

/**
 * This class represents a task related to the execution of an action in {@link
 * ActionExecutionOperator}.
 *
 * <p>An action is split into multiple code blocks, and each code block is represented by an {@code
 * ActionTask}. You can call {@link #invoke()} to execute a code block and obtain invoke result
 * {@link ActionTaskResult}. If the action contains additional code blocks, you can obtain the next
 * {@code ActionTask} via {@link ActionTaskResult#getGeneratedActionTask()} and continue executing
 * it.
 */
public abstract class ActionTask {
    protected final Object key;
    protected final Event event;
    protected final Action action;
    /**
     * Since RunnerContextImpl contains references to the Operator and state, it should not be
     * serialized and included in the state with ActionTask. Instead, we should check if a valid
     * RunnerContext exists before each ActionTask invocation and create a new one if necessary.
     */
    protected transient RunnerContextImpl runnerContext;

    public ActionTask(Object key, Event event, Action action) {
        this.key = key;
        this.event = event;
        this.action = action;
    }

    public RunnerContextImpl getRunnerContext() {
        return runnerContext;
    }

    public void setRunnerContext(RunnerContextImpl runnerContext) {
        this.runnerContext = runnerContext;
    }

    public Object getKey() {
        return key;
    }

    /** Invokes the action task. */
    public abstract ActionTaskResult invoke() throws Exception;

    public class ActionTaskResult {
        private final boolean finished;
        private final List<Event> outputEvents;
        private final Optional<ActionTask> generatedActionTaskOpt;

        public ActionTaskResult(
                boolean finished,
                List<Event> outputEvents,
                @Nullable ActionTask generatedActionTask) {
            this.finished = finished;
            this.outputEvents = outputEvents;
            this.generatedActionTaskOpt = Optional.ofNullable(generatedActionTask);
        }

        public boolean isFinished() {
            return finished;
        }

        public List<Event> getOutputEvents() {
            return outputEvents;
        }

        public Optional<ActionTask> getGeneratedActionTask() {
            return generatedActionTaskOpt;
        }
    }
}

1.3 PythonActionTask

PythonActionTask 是一个专门用于执行 Python 动作任务的特殊 ActionTask 实现。它的主要作用包括:

  • 执行 Python 函数:调用 Python 函数来处理事件
  • 处理异步操作:支持 Python 中的异步操作,通过生成器机制实现
  • 桥接 Java 和 Python:作为 Java 端和 Python 端之间的桥梁,协调两者间的交互
1.3.1 定义

PythonActionTask 对应一个 Python 函数(更准确地说是一个 PythonFunction 对象),这个函数是在创建 Action 时定义的,存储在 action.getExec() 中。但PythonActionTask 不仅仅是简单的函数封装,而是使其能够在 Flink Agents 框架中正确执行,并支持框架所需的高级特性。它提供了以下附加价值:

  • 复杂逻辑:PythonActionTask 不仅仅是执行函数,还负责处理复杂的交互逻辑
  • 执行环境管理:为函数提供合适的执行上下文
  • 异步支持:通过生成器机制支持长时间运行的操作
  • 事件处理:管理和传递执行过程中产生的事件
  • 状态维护:在整个执行过程中维护必要的状态信息

PythonActionTask 在系统架构中的位置和交互关系如下:

Flink-6-2

代码如下:

public class PythonActionTask extends ActionTask {
    public ActionTaskResult invoke() throws Exception {
        PythonActionExecutor pythonActionExecutor = getPythonActionExecutor();

        // 这里执行实际的 Python 函数
        String pythonGeneratorRef =
            pythonActionExecutor.executePythonFunction(
                (PythonFunction) action.getExec(), // <-- 这就是对应的函数
                (PythonEvent) event,
                runnerContext);

        // 处理异步情况
        if (pythonGeneratorRef != null) {
            // 如果函数返回了生成器,则创建新的任务继续执行
            ActionTask tempGeneratedActionTask =
                new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
            tempGeneratedActionTask.setRunnerContext(runnerContext);
        if (pythonGeneratorRef != null) {
            // 如果函数返回了生成器,则创建新的任务继续执行
            ActionTask tempGeneratedActionTask = 
                new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
            tempGeneratedActionTask.setRunnerContext(runnerContext);
            return tempGeneratedActionTask.invoke();
        }
        // 否则表示函数已执行完毕
        return new ActionTaskResult(
            true, 
            runnerContext.drainEvents(event.getSourceTimestamp()), 
            null);            
1.3.2 PythonActionTask 与 Function 的关系

PythonActionTask 与 Function 的关系的如下

Flink-6-3

1.3.3 与其他组件的关系

PythonActionTask 与其他组件的关系如下图所示。

Flink-6-4

1.3.4 调用流程

PythonActionTask.invoke() 流程如下图所示,其关键特点为:

  • 异步支持:通过 Python 生成器机制支持长时间运行的操作
  • 状态管理:与 ActionExecutionOperator 协作管理执行状态
  • 错误处理:适当地处理 Python 执行过程中可能出现的异常
  • 内存管理:与 RunnerContext 集成,管理短期记忆和其他状态

Flink-6-5

PythonActionTask 在整个系统中起到了至关重要的作用,它使得 Flink Agents 能够无缝集成 Python 生态系统中的各种 AI 工具和库,同时保持与 Flink 流处理引擎的良好集成。

1.4 PythonGeneratorActionTask

PythonGeneratorActionTask 是 PythonActionTask 的派生类。

  • 当 Python 函数中使用了 yield 或异步操作时,Python 执行器会检测到这种情况并返回一个 Generator 引用
  • 系统会创建 PythonGeneratorActionTask 来继续执行
/** An {@link ActionTask} wrapper a Python Generator to represent a code block in Python action. */
public class PythonGeneratorActionTask extends PythonActionTask {
    private final String pythonGeneratorRef;

    public PythonGeneratorActionTask(
            Object key, Event event, Action action, String pythonGeneratorRef) {
        super(key, event, action);
        this.pythonGeneratorRef = pythonGeneratorRef;
    }

    @Override
    public ActionTaskResult invoke() {
        boolean finished = getPythonActionExecutor().callPythonGenerator(pythonGeneratorRef);
        ActionTask generatedActionTask = finished ? null : this;
        return new ActionTaskResult(
                finished,
                runnerContext.drainEvents(event.getSourceTimestamp()),
                generatedActionTask);
    }
}

1.5 JavaActionTask

JavaActionTask 执行 Java ActionTask。

/**
 * A special {@link ActionTask} designed to execute a Java action task.
 *
 * <p>Note that Java action currently do not support asynchronous execution. As a result, a Java
 * action task will be invoked only once.
 */
public class JavaActionTask extends ActionTask {

    private final ClassLoader userCodeClassLoader;

    public JavaActionTask(Object key, Event event, Action action, ClassLoader userCodeClassLoader) {
        super(key, event, action);
        checkState(action.getExec() instanceof JavaFunction);
        this.userCodeClassLoader = userCodeClassLoader;
    }

    @Override
    public ActionTaskResult invoke() throws Exception {
        runnerContext.checkNoPendingEvents();
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(userCodeClassLoader);
            action.getExec().call(event, runnerContext);
        } finally {
            Thread.currentThread().setContextClassLoader(cl);
        }
        return new ActionTaskResult(
                true, runnerContext.drainEvents(event.getSourceTimestamp()), null);
    }
}

1.6 ActionTaskResult 结构

每次执行 ActionTask 后会返回一个 ActionTaskResult 对象:

public class ActionTaskResult {
    private final boolean finished;                 // 是否已完成
    private final List<Event> outputEvents;         // 输出事件
    private final Optional<ActionTask> generatedActionTaskOpt; // 下一个 ActionTask(如果有)
// ...
}

0x02 ActionTask 切分机制

在 Flink Agents 框架中,ActionTask 的切分是为了支持长时间运行的操作和异步执行。切分的好处如下:

  • 避免阻塞:长时间运行的操作不会阻塞整个操作符
  • 提高并发性:允许其他 key 的任务同时执行
  • 容错能力:每个 ActionTask 可以单独失败和恢复
  • 资源管理:更好地管理内存和其他资源

这种切分机制使得 Flink Agents 能够高效地处理复杂的,长时间运行的 AI Agent 任务,同时保持系统的响应的稳定性。

2.1 切分方式

主要切分方式如下:

概念上的拆分

  • 一个 Action 在概念上可被拆成多个顺序执行的代码块,每个代码块成为一个 ActionTask 实例。
  • 设计目的在于细粒度控制,尤其适用于异步操作。

创建与流程

  • 初始触发:ActionExecutionOperator 通过 createActionTask() 为每个动作生成首个 ActionTask。
  • 执行过程:若动作产生生成器(异步),可再实例化新的 ActionTask 继续后续代码块。
    /**
     * Processes an incoming event for the given key and may submit a new mail
     * `tryProcessActionTaskForKey` to continue processing.
     */
    private void processEvent(Object key, Event event) throws Exception {
        notifyEventProcessed(event);

        boolean isInputEvent = EventUtil.isInputEvent(event);
        if (EventUtil.isOutputEvent(event)) {
        } else {
            // We then obtain the triggered action and add ActionTasks to the waiting processing
            // queue.
            List<Action> triggerActions = getActionsTriggeredBy(event);
            if (triggerActions != null && !triggerActions.isEmpty()) {
                for (Action triggerAction : triggerActions) {
                    actionTasksKState.add(createActionTask(key, triggerAction, event));
                }
            }
        }
    }

createActionTask 代码如下。

    private ActionTask createActionTask(Object key, Action action, Event event) {
        if (action.getExec() instanceof JavaFunction) {
            return new JavaActionTask(
                    key, event, action, getRuntimeContext().getUserCodeClassLoader());
        } else if (action.getExec() instanceof PythonFunction) {
            return new PythonActionTask(key, event, action);
        } else {
            throw new IllegalStateException(
                    "Unsupported action type: " + action.getExec().getClass());
        }
    }

2.2 实现细节

ActionTask.java 中有如下,这意味着:

  • 一个 Action 可能被拆分成多个代码块
  • 每个代码块由一个 ActionTask 表示
  • 通过调用 invoke () 执行代码块并获得结果
  • 如果 Action 包含更多代码块,可以从 ActionTaskResult 中获取下一个 ActionTask
/**
 * 此类表示在 ActionExecutionOperator 中执行的动作任务。
 *
 * <p>一个动作会被拆分为多个代码块,每个代码块都由一个 ActionTask 表示。
 * 可以调用 #invoke() 来执行一个代码块并获得执行结果(ActionTaskResult)。
 * 如果动作包含额外的代码块,可通过 ActionTaskResult#getGeneratedActionTask()
 * 获取下一个 ActionTask 并继续执行它。
 */

Python 集成(PythonActionTask.java)

  • 当 Python 函数中使用了 yield 或异步操作时,Python 执行器会检测到这种情况并返回一个 Generator 引用
  • 系统会创建 PythonGeneratorActionTask 来继续执行

这说明在处理异步 Python 函数时,一个 Action 可能会产生多个 ActionTask:

  • 初始的 PythonActionTask
  • 后续的 PythonGeneratorActionTask(如果需要)
/**
 * 专门用于执行 Python 动作任务的特殊 ActionTask。
 *
 * <p>在 Python 中进行异步执行期间,PythonActionTask 可以生成一个
 * PythonGeneratorActionTask 来代表后续需要的代码块。
 */
public class PythonActionTask extends ActionTask {
    public ActionTaskResult invoke() throws Exception {
        ...
        String pythonGeneratorRef = 
            pythonActionExecutor.executePythonFunction(
                (PythonFunction)action.getExec(),
                (PythonEvent)event,
                runnerContext);

        // 如果用户定义的动作使用接口提交了异步任务,
        // 它将在第一次执行后返回一个 Python 生成器对象实例。
        // 否则意味着没有提交异步任务且动作已经完成。
        if (pythonGeneratorRef != null) {
            // Python 动作生成了一个生成器。我们需要执行一次它,
            // 这将提交一个异步任务并返回动作是否已完成。
            ActionTask tempGeneratedActionTask = 
                new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
            tempGeneratedActionTask.setRunnerContext(runnerContext);
            return tempGeneratedActionTask.invoke();
        }
        return new ActionTaskResult(
            true, runnerContext.drainEvents(event.getSourceTimestamp()), null);
    }
}

2.3 关键点

ActionTask 拆分的关键点如下:

  • 顺序执行:对于给定的键,任务按顺序执行,以保持顺序性和一致性
  • 异步处理:当动作涉及异步操作时:
    • 初始任务执行到异步操作为止;
    • 返回一个新的 ActionTask 来表示后续的操作;
    • 后续任务处理异步结果并继续执行
  • 状态管理:每个 ActionTask 通过 RunnerContext 维护其状态,确保拆分之间的连续性
  • 基于邮箱的处理:使用 Flink 的邮箱执行器来调度后续任务的处理:
// 提交新邮件以继续处理
mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");

这种设计允许将带有异步操作的复杂动作分解为可管理的单元,同时保持执行语义和状态一致性。

0xEE 个人信息

★★★★★★关于生活和技术的思考★★★★★★

微信公众账号:罗西的思考

如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐