【大数据 & AI】Flink Agents 源码解读 — (7) — AgentsExecutionEnvironment

0x00 概要

AgentsExecutionEnvironment 是在Flink基础上构建的一个更高层次的执行环境,专门为Agent而设计,同时保留了与原生 Flink API 的兼容性。

0x01 基础知识

可以把 Flink 原生的 StreamExecutionEnvironment / TableEnvironment 理解成“Agent 话题里所说的执行环境(Execution Environment)”,但是其无法直接被 Agent 使用。因此需要在其上做一些封装和适配,这就是 AgentsExecutionEnvironment。

  1. Flink Environment = 纯粹的计算资源与运行时容器
    • 负责申请 Slot、管理 Checkpoint、调度算子链、网络 Shuffle
    • 对“业务语义”一无所知,也不管用户写的是 ETL、CEP 还是 AI 推理
    • 对应 Agent 词汇表里的“Runtime / Cluster / Engine”这一层
  2. Agent Environment = 在 Flink Runtime 之上包了一层语义抽象
    • 替用户注册 Chat Model、Tools、Memory、Actions、Event Schema
    • 把“用户消息”或“传感器事件”封装成 Event,按 AgentPlan 去调大模型、执行业务动作
    • 内部仍然用 StreamExecutionEnvironment 去提交算子,只是看不到显式的 keyBy()process()——框架帮用户生成了 ActionExecutionOperator

类比关系如下:

传统 Flink 程序
├─ StreamExecutionEnvironment   ← 纯运行时
├─ your ProcessFunction         ← 用户的业务逻辑
└─ DataStream

Flink Agents 程序
├─ AgentsExecutionEnvironment   ← Agent 语义环境,内部仍持有 StreamExecutionEnvironment
├─ AgentPlan(your business)   ← 定义“遇到啥事件该干啥”
└─ ActionExecutionOperator     ← 框架替用户生成的算子,负责调大模型、更新记忆

1.1 定义

AgentsExecutionEnvironment 的代码如下。

/**
 * Base class for agent execution environment.
 *
 * <p>This class provides the main entry point for integrating Flink Agents with different types of
 * Flink data sources (DataStream, Table, or simple lists).
 */
public abstract class AgentsExecutionEnvironment {
    protected final Map<ResourceType, Map<String, Object>> resources;

    protected AgentsExecutionEnvironment() {
        this.resources = new HashMap<>();
        for (ResourceType type : ResourceType.values()) {
            this.resources.put(type, new HashMap<>());
        }
    }
    
    /**
     * Get agents execution environment.
     *
     * <p>Factory method that creates an appropriate execution environment based on the provided
     * StreamExecutionEnvironment. If no environment is provided, a local execution environment is
     * returned for testing and development.
     *
     * <p>When integrating with Flink DataStream/Table APIs, users should pass the Flink
     * StreamExecutionEnvironment to enable remote execution capabilities.
     *
     * @param env Optional StreamExecutionEnvironment for remote execution. If null, a local
     *     execution environment will be created.
     * @param tEnv Optional StreamTableEnvironment for table-to-stream conversion.
     * @return AgentsExecutionEnvironment appropriate for the execution context.
     */
    public static AgentsExecutionEnvironment getExecutionEnvironment(
            StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) {
        if (env == null) {
            // Return local execution environment for testing/development
            try {
                Class<?> localEnvClass =
                        Class.forName(                                "org.apache.flink.agents.runtime.env.LocalExecutionEnvironment");
                return (AgentsExecutionEnvironment)
                        localEnvClass.getDeclaredConstructor().newInstance();
            } catch (Exception e) {
                throw new RuntimeException("Failed to create LocalExecutionEnvironment", e);
            }
        } else {
            // Return remote execution environment for Flink integration
            try {
                Class<?> remoteEnvClass =
                        Class.forName(                                "org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment");
                return (AgentsExecutionEnvironment)
                        remoteEnvClass
                                .getDeclaredConstructor(
                                        StreamExecutionEnvironment.class,
                                        StreamTableEnvironment.class)
                                .newInstance(env, tEnv);
            } catch (Exception e) {
                throw new RuntimeException("Failed to create RemoteExecutionEnvironment", e);
            }
        }
    }    

1.2 功能

AgentsExecutionEnvironment 的功能如下:

  • 统一入口:
    • 提供getExecutionEnvironment()系列静态工厂方法,可以依据是否传入Flink的 StreamExecutionEnvironment来创建本地或远程执行环境。
    • 支持从不同数据源构建构建Agent管道,包括List,DataStream和Table。
  • 资源配置管理:
    • 内置 resources 映射结构,支持按照资源类型管理各种资源。
    • 提供 addResource() 方法注册可序列化的资源或者资源描述符。
  • 多输入源支持:
    • fromList():支持从简单列表创建本地执行环境。
    • fromDataStream():集成Flink DataStream API
    • fromTable():集成Flink Table API
  • 配置管理:
    • 提供getConfig()抽象方法获取可写的配置对象。

1.3 与原生 Flink Environment 的区别

  • 抽象层级更高:AgentExecutionEnvrionment是对Flink原生环境的封装,在其之上提供了面向Agent的编程模型。主要用于运行Agent,而非 直接处理数据流。
  • 执行模式:AgentExecutionEnvrionment 通过 LocalExecutionEnvironment 和 RemoteExecutionEnvironment 分别支持本地测试和远程集群执行。原生 Flink 主要通过配置参数控制执行模式。
  • 资源管理机制:AgentExecutionEnvrionment 内置了专门的资源注册和管理机制,支持按类型分类管理支援。原生 Flink 没有这种结构化的资源管理方式。
  • API 设计目标:面向 Agent 编程范式,强调事件驱动的自主行为体模型。原生Flink 更关注数据流处理和批处理操作。

0x02 LocalExecutionEnvironment

LocalExecutionEnvironment 是 AgentsExecutionEnvironment 的一种实现形式。

Class<?> localEnvClass = Class.forName(                                "org.apache.flink.agents.runtime.env.LocalExecutionEnvironment");
                return (AgentsExecutionEnvironment)
                        localEnvClass.getDeclaredConstructor().newInstance();

LocalExecutionEnvironment 主要用于

  • 开发阶段的快速测试和调试
  • 无需 Flink 集群即可验证代理逻辑
  • 简化Agent应用的本地开发流程
  • 它与远程执行环境形成对比,后者支持完整的 Flink DataStream 和 Table API 集成。

2.1 定义

2.1.1 主要功能

LocalExecutionEnvironment 的主要功能如下:

  • 本地执行环境实现
    • 集成自 AgentExecutionEnvrionment,为本地测试和开发提供执行环境
    • 不依赖 Flink 集群,可以在本地环境中运行和调试代理
  • 数据源支持
    • 通过from_list方法支持从列表数据源读取输入数据
    • 不支持 Flink 的DataStream 和 Table API(这些在远程执行环境中使用)
  • 资源配置和管理
    • 存储和管理通过 add_resource 方法注册的资源
    • 在构建Agent时,将环境中的资源注入到Agent实例中
  • 代理执行管理
    • 通过 set_agent 方法设置待执行的Agent、输入和输出
    • 使用 LocalRunner 在本地执行代理逻辑
    • 通过 execute 方法触发代理执行
  • 结果收集
    • 收集Agent执行的输出结果
    • 通过 to_list 方法返回执行结果

根据代码分析, execute 函数在以下情况下被调用:

  • 用户手动调用。当用户完成代理配置和输入数据设置后,需要显式调用execute() 方法来启动Agent 执行流程,这通常式配置完所有必要组件后的最后一步。
2.1.2 代码

LocalExecutionEnvironment 的定义如下,其中 LocalRunner 是核心。

class LocalExecutionEnvironment(AgentsExecutionEnvironment):
    """Implementation of AgentsExecutionEnvironment for local execution environment."""

    __input: List[Dict[str, Any]] = None
    __output: List[Any] = None
    __runner: LocalRunner = None
    __executed: bool = False
    __config: AgentConfiguration = AgentConfiguration()
        
    def set_agent(self, input: list, output: list, runner: LocalRunner) -> None:
        """Set agent input, output and runner."""
        self.__input = input
        self.__runner = runner
        self.__output = output

    def execute(self) -> None:
        """Execute agent individually."""
        if self.__executed:
            err_msg = (
                "LocalExecutionEnvironment doesn't support execute multiple times."
            )
            raise RuntimeError(err_msg)
        self.__executed = True
        for input in self.__input:
            self.__runner.run(**input)
        outputs = self.__runner.get_outputs()
        for output in outputs:
            self.__output.append(output)        
2.1.3 执行流程

在 LocalExecutionEnvironment 中,execute 方法会:

  • 遍历所有输入数据项
  • 对每个输入调用 LocalRunner 的run方法进行处理
  • 收集所有输出结果

典型的调用序列如下:

# 1. 获取执行环境
env = StreamExecutionEnvironment.get_execution_environment()
# 2. 添加所需资源
env.add_resource(...)
# 3. 设置输入数据
output_data = env.from_list(input_data) 
# 4. 应用代理
builder.apply(agent) 
# 5. 执行代理
env.execute()
# 6. 获取结果
results = builder.to_list()

关键特点:

  • 手动触发: execute() 不会自动调用,必须由用户显式调用
  • 一次性执行:只能调用一次,多次调用会抛出异常
  • 阻塞操作:在本地环境中,这是个同步阻塞操作,会等待所有输入处理完成
2.1.4 组件关系

组件关系图如下:

LocalExecutionEnvironmet
    ↓
    ↓ creates
LocalAgentBuilder
    ↓
    ↓ creates
LocalRunner
    ↓
    ↓ creates & run(data)
LocalRunnerContext

关系说明:

  • LocalExecutionEnvironmet 是入口点,管理整个执行环境。通过 from_list() 创建LocalAgentBuilder
  • LocalAgentBuilder 负责构建Agent 执行管道,通过 apply() 创建 LocalRunner
  • LocalRunner 是实际的执行器
  • LocalRunner 为每个处理的记录创建 LocalRunnerContext

2.2 LocalRunner

LocalRunner 是本地执行环境中的核心执行器,负责实际运行Agent逻辑。LocalRunner 提供了一个完整的本地执行环境,模拟了Flink 流处理的行为,使得Agent可以在本地进行开发和测试。

2.2.1 LocalRunner 的主要功能

LocalRunner 的主要功能:

  • 代理执行:将Agent转换为可执行的计划并执行
  • 上下文管理:为每个处理单元创建和管理 LocalRunnerContext
  • 事件处理:管理事件队列并驱动Agent的执行流程
  • 结果收集:收集和存储Agent执行的输出结果

run函数是LocalRunner的核心方法,负责处理单个输入记录:

  • 从输入数据中提取键值,用于上下文管理
  • 为每个键创建或复用 LocalRunnerContext
  • 将输入数据包装成 InputEvent 并发送到事件队列
  • 事件循环处理,LocalRunner 的 run 函数使用 while 循环是为了处理事件驱动的代理执行模型。这种设计反映了代理执行的核心机制
    • 持续处理事件队列直到为空
    • 如果是输出事件、收集结果
    • 根据事件类型查找对应的动作
    • 执行动作
    • 处理异步处理结果
    @override
    def run(self, **data: Dict[str, Any]) -> Any:
        """Execute the agent to process the given data.

        Parameters
        ----------
        **data : dict[str, Any]
            input record from upstream.

        Returns:
        -------
        key
            The key of the input that was processed.
        """
        if "key" in data:
            key = data["key"]
        elif "k" in data:
            key = data["k"]
        else:
            key = uuid.uuid4()

        if key not in self.__keyed_contexts:
            self.__keyed_contexts[key] = LocalRunnerContext(self.__agent_plan, key, self.__config)
        context = self.__keyed_contexts[key]

        if "value" in data:
            input_event = InputEvent(input=data["value"])
        elif "v" in data:
            input_event = InputEvent(input=data["v"])
        else:
            msg = "Input data must be dict has 'v' or 'value' field"
            raise RuntimeError(msg)

        context.send_event(input_event)

        while len(context.events) > 0:
            event = context.events.popleft()
            if isinstance(event, OutputEvent):
                self.__outputs.append({key: event.output})
                continue
            event_type = f"{event.__class__.__module__}.{event.__class__.__name__}"
            for action in self.__agent_plan.get_actions(event_type):
                context.action_name = action.name
                func_result = action.exec(event, context)
                if isinstance(func_result, Generator):
                    try:
                        for _ in func_result:
                            pass
                    except Exception:
                        logger.exception("Error in async execution")
                        raise
        return key
2.2.2 事件驱动执行模型

在 Flink Agents 框架中,代理的执行是基于事件的。每个动作 (action)处理一个事件并可能产生新的事件,这些新事件又会触发其他动作的执行。因此需要一个循环来持续处理事件队列中的事件,直到没有更多事件需要处理。

这种设计使得代理可以处理复杂的、多步骤的交互过程,而不需要预先确定执行步骤的数量。while 循环确保所有相关的事件都被处理完毕,形成了一个完整的事件处理链:

  • 持续从事件队列 context.events 中取出事件进行处理
  • 每个事件可能触发一个或多个动作的执行
  • 动作执行可能产生新的事件,这些事件被添加回队列

递归事件处理:

  • 初始时只有一个 InputEvent
  • 动作处理 InputEvent 可能产生 ChatRequestEvent 等中间事件
  • 中间事件可能触发其他动作,产生更多事件
  • 最终产生 OutputEvent 完成处理

完整处理链:

InputEvent  → →   start_action  → →   ChatRequestEvent  →  →  LLM处理  → →  ChatResponseEvent  → →   stop_action  → →   OutputEvent

具体执行流程

  • 初始化:

    • 将输入数据包装成 InputEvent 并添加到事件队列
  • 循环处理:

    • 从队列取出事件
    • 根据事件类型找到对应的 actions
    • 执行所有监听该事件类型的动作
    • 动作可能通过 send_event 发送新事件到队列
  • 终止条件:

    • 当事件队列为空时,处理完成
2.2.3 AgentPlan 和 LocalRunner 的关系
  • LocalRunner 在初始化时候使用 AgentPlan
  • 在执行过程中,LocalRunnerContext 通过 AgentPlan 获取资源和动作
  • 在事件处理中查找对应的动作

具体为:LocalRunner 在初始化时候使用 AgentPlan

class LocalRunner(AgentRunner):
    """Agent runner implementation for local execution, which is
    convenient for debugging.

    Attributes:
    ----------
    __agent_plan : AgentPlan
        Internal agent plan.
    __keyed_contexts : dict[Any, LocalRunnerContext]
        Dictionary of active contexts indexed by key.
    __outputs:
        Outputs generated by agent execution.
    __config:
        Internal configration.
    """

    __agent_plan: AgentPlan
    __keyed_contexts: Dict[Any, LocalRunnerContext]
    __outputs: List[Dict[str, Any]]
    __config: AgentConfiguration

    def __init__(self, agent: Agent, config: AgentConfiguration) -> None:
        """Initialize the runner with the provided agent.

        Parameters
        ----------
        agent : Agent
            The agent class to convert and run.
        """
        self.__agent_plan = AgentPlan.from_agent(agent, config)
        self.__keyed_contexts = {}
        self.__outputs = []
        self.__config = config


具体为:在执行过程中,LocalRunnerContext 通过 AgentPlan 获取资源和动作

class LocalRunnerContext(RunnerContext):
    """Implementation of RunnerContext for local agent execution.

    Attributes:
    ----------
    __agent_plan : AgentPlan
        Internal agent plan for this context.
    __key : Any
        Unique identifier for the context, correspond to the key in flink KeyedStream.
    events : deque[Event]
        Queue of events to be processed in this context.
    action_name: str
        Name of the action being executed.
    """

    __agent_plan: AgentPlan
    __key: Any
    events: deque[Event]
    action_name: str
    _store: dict[str, Any]
    _short_term_memory: MemoryObject
    _config: AgentConfiguration

    def __init__(self, agent_plan: AgentPlan, key: Any, config: AgentConfiguration) -> None:
        """Initialize a new context with the given agent and key.

        Parameters
        ----------
        agent_plan : AgentPlan
            Agent plan used for this context.
        key : Any
            Unique context identifier, which is corresponding to the key in flink
            KeyedStream.
        """
        self.__agent_plan = agent_plan
        self.__key = key
        self.events = deque()
        self._store = {}
        self._short_term_memory = LocalMemoryObject(
            self._store, LocalMemoryObject.ROOT_KEY
        )
        self._config = config
        
    @override
    def get_resource(self, name: str, type: ResourceType) -> Resource:
        return self.__agent_plan.get_resource(name, type)        


具体为:在事件处理中查找对应的动作

            for action in self.__agent_plan.get_actions(event_type):
                context.action_name = action.name
                func_result = action.exec(event, context) # 执行
                if isinstance(func_result, Generator):
                    try:
                        for _ in func_result:
                            pass
                    except Exception:
                        logger.exception("Error in async execution")
                        raise

2.2.4 为什么要为每种 key 维护独立的上下文

在 LocalRunner 中,每种 key 都有自己独立的上下文是为了模拟 Flink 流处理环境中 keyed state 的行为,并支持状态隔离和并发处理。

  • 状态隔离:每个 key 对应的数据流需要维护自己的状态,避免不同 key 的数据相互干扰:
# 不同 key 的状态完全隔离
key1_context.short_term_memory.set("user_name", "Alice")
key2_context.short_term_memory.set("user_name", "Bob")
# 两个 key 有不同的状态值,互不影响


  • 模拟 Flink 的 KeyedStream 行为:在 Flink 中,当使用 keyBy() 操作时,每个 key 会有独立的状态管理。LocalRunner 通过这种方式模拟了相同的行为:

    • 相同 key 的事件会在同一个上下文中处理
    • 不同 key 的事件拥有各自独立的内存和事件队列

    这样保证了本地调试环境与实际 Flink 环境的一致性

  • 支持并发处理:虽然 LocalRunner 是单线程运行的,但它模拟了多 key 并发处理的情况:

  • 支持并发处理:虽然 LocalRunner 是单线程运行的,但它模拟了多 key 并发处理的情况:

# 可以同时处理多个 key 的数据
for input_record in inputs:
    runner.run(**input_record)  # 每个 key 有独立上下文

  • 事件队列隔离:每个 key 都有自己的事件队列(events),这样可以保证:

    • 同一 key 的事件按顺序处理
    • 不同 key 的事件不会互相影响处理顺序
    • 每个 key 可以独立地维护待处理事件队列
  • 内存状态管理:每个 LocalRunnerContext 都有自己的短期记忆对象(_short_term_memory),允许:

# 每个 key 可以存储和检索自己的状态
context.short_term_memory.set("step_count", 1)
context.short_term_memory.get("step_count")  # 获取该 key 特定的状态

2.2.5 实际应用场景示例

考虑一个客户服务聊天机器人应用:

# 用户 Alice 和 Bob 的对话分别用不同的 key 处理
runner.run(key="user_alice", value={"message": "Hello"})
runner.run(key="user_bob", value={"message": "Hi there"})
# 每个用户的对话历史和状态都独立保存
# Alice 的上下文不会影响 Bob 的上下文,反之亦然

这种设计使 LocalRunner 能够准确模拟真实分布式环境中的行为,方便开发者在本地测试和调试复杂的状态依赖型代理应用。

2.3 LocalRunnerContext

LocalRunnerContext 是 Flink Agents 框架中用于本地执行环境的运行上下文实现。它的主要功能是为每个 key 提供独立的执行环境,模拟 Flink 分布式环境中 keyed state 的行为,确保了在本地环境中能够准确模拟基于 key 的状态管理和事件处理流程。

2.3.1 设计目的

LocalRunnerContext 的设计主要是为了:

  • 本地调试:在本地环境中模拟 Flink 分布式执行的行为
  • 状态隔离:确保不同 key 的处理状态完全隔离,避免相互干扰
  • 行为一致性:保证本地测试环境与实际 Flink 执行环境的行为一致
  • 简化开发:提供与生产环境相同的 API 接口,方便开发者测试和调试
2.3.2 主要功能
  • 状态管理
    • 为每个 key 维护独立的短期内存状态(_short_term_memory)
    • 提供内存对象的读写操作,确保不同 key 的状态隔离
  • 事件处理
    • 维护每个 key 的事件队列(events)
    • 提供 send_event 方法将事件添加到处理队列中
    • 记录事件处理日志,便于调试
  • 资源访问
    • 提供 get_resource 方法访问 agent 所需的资源(如模型、工具等)
    • 根据资源名称和类型获取相应的资源实例
  • 配置管理
    • 提供对 action 配置的访问(action_config, get_action_config_value)
    • 提供全局配置信息(config)
  • 度量和监控
    • 提供度量组访问接口(agent_metric_group, action_metric_group)
    • 目前在本地环境中尚未完全实现
  • 异步执行支持
    • 提供 execute_async 方法支持异步函数执行
    • 在本地环境中降级为同步执行并给出警告
2.3.3 定义
class LocalRunnerContext(RunnerContext):
    """Implementation of RunnerContext for local agent execution.

    Attributes:
    ----------
    __agent_plan : AgentPlan
        Internal agent plan for this context.
    __key : Any
        Unique identifier for the context, correspond to the key in flink KeyedStream.
    events : deque[Event]
        Queue of events to be processed in this context.
    action_name: str
        Name of the action being executed.
    """

    __agent_plan: AgentPlan
    __key: Any
    events: deque[Event]
    action_name: str
    _store: dict[str, Any]
    _short_term_memory: MemoryObject
    _config: AgentConfiguration

    def __init__(self, agent_plan: AgentPlan, key: Any, config: AgentConfiguration) -> None:
        """Initialize a new context with the given agent and key.

        Parameters
        ----------
        agent_plan : AgentPlan
            Agent plan used for this context.
        key : Any
            Unique context identifier, which is corresponding to the key in flink
            KeyedStream.
        """
        self.__agent_plan = agent_plan
        self.__key = key
        self.events = deque()
        self._store = {}
        self._short_term_memory = LocalMemoryObject(
            self._store, LocalMemoryObject.ROOT_KEY
        )
        self._config = config

    @property
    def key(self) -> Any:
        """Get the unique identifier for this context.

        Returns:
        -------
        Any
            The unique identifier for this context.
        """
        return self.__key

    @override
    def send_event(self, event: Event) -> None:
        """Send an event to the context's event queue and log it.

        Parameters
        ----------
        event : Event
            The event to be added to the queue.
        """
        logger.info("key: %s, send_event: %s", self.__key, event)
        self.events.append(event)

    @override
    def get_resource(self, name: str, type: ResourceType) -> Resource:
        return self.__agent_plan.get_resource(name, type)

    @property
    @override
    def action_config(self) -> Dict[str, Any]:
        """Get config of the action."""
        return self.__agent_plan.get_action_config(action_name=self.action_name)

    @override
    def get_action_config_value(self, key: str) -> Any:
        """Get config option value of the key."""
        return self.__agent_plan.get_action_config_value(
            action_name=self.action_name, key=key
        )

    @property
    @override
    def short_term_memory(self) -> MemoryObject:
        """Get the short-term memory object associated with this context.

        Returns:
        -------
        MemoryObject
            The root object of the short-term memory.
        """
        return self._short_term_memory

    @property
    @override
    def agent_metric_group(self) -> MetricGroup:
        # TODO: Support metric mechanism for local agent execution.
        err_msg = "Metric mechanism is not supported for local agent execution yet."
        raise NotImplementedError(err_msg)

    @property
    @override
    def action_metric_group(self) -> MetricGroup:
        # TODO: Support metric mechanism for local agent execution.
        err_msg = "Metric mechanism is not supported for local agent execution yet."
        raise NotImplementedError(err_msg)

    def execute_async(
        self,
        func: Callable[[Any], Any],
        *args: Tuple[Any, ...],
        **kwargs: Dict[str, Any],
    ) -> Any:
        """Asynchronously execute the provided function. Access to memory
        is prohibited within the function.
        """
        logger.warning(
            "Local runner does not support asynchronous execution; falling back to synchronous execution."
        )
        func_result = func(*args, **kwargs)
        yield
        return func_result

    @property
    @override
    def config(self) -> AgentConfiguration:
        return self._config

0x03 RemoteExecutionEnvironment

RemoteExecutionEnvironment 是 Flink Agents 对原生 Flink RemoteEnvironment 的封装(适配 Agent 语义),是 Agent(智能体)与 Flink 集群融合的核心载体,核心目标是让 Agent 能够在 Flink 集群中处理 DataStream 和 Table 类型的流式数据,具体承担以下关键职责:

  • RemoteExecutionEnvironment 是 Flink Agents 实现 Agent 远程执行的核心环境组件,封装了远程集群连接、作业提交、资源管理等能力;

  • 核心价值是屏蔽 Flink 远程执行的底层细节,让用户聚焦 Agent 逻辑定义,而非集群操作;

  • 关键特性是兼容本地调试与远程执行,同时适配 Agent 特有的状态、事件、资源需求,是 Flink Agents 从 “本地单机” 走向 “分布式集群” 的核心支撑。

  • 桥接 Agent 框架与 Flink 运行时:将 Agent 的执行逻辑嵌入 Flink 的 StreamExecutionEnvironment/StreamTableEnvironment,使 Agent 能利用 Flink 的分布式计算能力处理流式数据;

  • 标准化 Agent 执行流程:提供统一的入口(fromDataStream/fromTable)、处理(apply Agent)、输出(toDataStream/toTable)接口,规范 Agent 在 Flink 中的数据处理链路;

  • 配置管理:加载 Flink 集群中 Agent 的专属配置文件(config.yaml),为 Agent 执行提供环境配置支撑;

  • 执行调度:最终触发 Flink 作业的执行(env.execute ()),完成 Agent 处理逻辑的分布式运行。

3.1 核心特色

特色维度 具体说明
环境适配性 专门面向 Flink 集群的远程执行场景设计,依赖 Flink 的流执行环境(StreamExecutionEnvironment)和表环境(StreamTableEnvironment),而非本地执行;
数据类型聚焦 仅支持 Flink 原生的 DataStream/Table 作为输入输出,明确禁用本地场景的 List 类型(fromList/toList),贴合流式计算场景;
分层设计 采用 “环境类(RemoteExecutionEnvironment)+ 构建器类(RemoteAgentBuilder)” 分层模式:环境类管控全局配置和 Flink 环境,构建器类聚焦单个 Agent 的执行链路;
灵活的 Key 选择 支持自定义 KeySelector 对输入数据分片,无自定义 Key 时默认使用数据自身作为 Key,适配不同的分布式处理需求;
配置解耦 从 Flink 配置目录加载 Agent 专属配置,配置文件与代码解耦,便于集群环境下的配置管理;
容错与校验 包含关键流程校验(如调用 toDataStream 前必须先 apply Agent)、空值处理(TableEnvironment 懒加载)、异常封装(配置加载 / Agent 执行异常统一抛出 RuntimeException);
表与流互通 支持 Table 与 DataStream 的双向转换(Table 转 DataStream 作为 Agent 输入、DataStream 转 Table 作为输出),适配 Flink SQL / 流处理双场景;
资源关联 为 Agent 绑定运行时资源(resources),保障 Agent 执行所需的资源依赖;

3.2 与原生 Flink RemoteEnvironment 的对比

特性 Flink 原生 RemoteEnvironment Flink Agents RemoteExecutionEnvironment
核心定位 通用 Flink Job 的远程执行环境 Agent 语义专属的远程执行环境
封装层级 底层 Flink Job 提交 上层 Agent/AgentPlan 提交
核心适配 无 Agent 语义,仅处理通用 JobGraph 适配 AgentPlan → JobGraph 编译、Agent 状态管理
资源管理 通用 Slot / 资源分配 按 Agent 实例隔离资源,适配工具 / 动作资源需求
事件 / 状态处理 无内置事件语义 封装 Agent 事件(Event)的跨集群传输、状态序列化

3.3 如何使用 ActionExecutionOperator

RemoteExecutionEnvironment 通过 Python 层的 RemoteAgentBuilder.to_datastream() 方法间接使用ActionExecutionOperator,过程为:

  • 用户定义了一个Agent并应用到执行环境中
  • 调用 to_datastream() 方法触发实际的操作符创建
  • 通过 JNI 调用 Java 层的 CompileUtils.connectToAgent() 方法
  • 最终在 Flink 作业图中创建并配置 ActionExecutionOperator

connectToAgent()中会:

  • 接收输入的 Java DATa Stream 对象
  • 接收序列化的 AgentPlan(包含所有动作和资源配置)
  • 创建并连接 ActionExecutionOperator 到数据流处理图中
    // ============================ basic ====================================
    /**
     * Connects the given KeyedStream to the Flink Agents agent.
     *
     * <p>This method accepts a keyed DataStream and applies the specified agent plan to it. The
     * source of the input stream determines the data format: Java streams provide Objects, while
     * Python streams use serialized byte arrays.
     *
     * @param keyedInputStream The input keyed DataStream.
     * @param agentPlan The agent plan to be executed.
     * @param inputIsJava A flag indicating whether the input stream originates from Java. - If
     *     true, input and output types are Java Objects. - If false, input and output types are
     *     byte[].
     * @param <K> The type of the key used in the keyed DataStream.
     * @param <IN> The type of the input data (Object or byte[]).
     * @param <OUT> The type of the output data (Object or byte[]).
     * @return The processed DataStream as the result of the agent.
     */
    private static <K, IN, OUT> DataStream<OUT> connectToAgent(
            KeyedStream<IN, K> keyedInputStream,
            AgentPlan agentPlan,
            TypeInformation<OUT> outTypeInformation,
            boolean inputIsJava) {
        return (DataStream<OUT>)
                keyedInputStream
                        .transform(
                                "action-execute-operator",
                                outTypeInformation,
                                new ActionExecutionOperatorFactory(agentPlan, inputIsJava))
                        .setParallelism(keyedInputStream.getParallelism());
    }

3.4 典型流程

当用户编写基于 RemoteExecutionEnvironment 的应用程序时,典型流程如下:

# 获取Flink执行环境
env = StreamExecutionEnvironment.get_execution_environment()
# 创建 Agent 执行环境
agent_env = AgentsExecutionEnvironment.get_execution_environment(env)
# 添加所需资源
agent_env.add_resource(...)
# 设置输入数据流到Agent
input_stream = agent_env.from_Collection(input_data) 
builder = agent_env.from_datastream(input_stream)
# 应用代理逻辑
agent = MyCustomAgent()
builder.apply(agent) 
# 执行代理
output_stream = builder.to_datastream()
env.execute()

3.5 交互逻辑

RemoteExecutionEnvironment、ActionExecutionOperator 和 ActionTask 之间的交互逻辑如下。

这三个组件在 Flink Agents 框架中扮演不同的角色,协同工作来执行 Agent 逻辑:

  • RemoteExecutionEnvironment:提供远程执行环境,负责构建和连接 Agent 到 Flink 数据流
  • ActionExecutionOperator:Flink 流处理操作符,实际执行 Agent 的动作逻辑
  • ActionTask:表示单个动作执行任务的抽象概念

交互流程图解

Flink-7-1

详细交互流程如下:

  1. 初始化阶段

Flink-7-2

  1. 运行时事件处理流程

Flink-7-3

3.6 实现

RemoteAgentBuilder 的代码如下

class RemoteAgentBuilder(AgentBuilder):
    """RemoteAgentBuilder for integrating datastream/table and agent."""

    __input: DataStream
    __agent_plan: AgentPlan = None
    __output: DataStream = None
    __t_env: StreamTableEnvironment
    __config: AgentConfiguration
    __resources: Dict[ResourceType, Dict[str, Any]] = None

    def __init__(
        self,
        input: DataStream,
        config: AgentConfiguration,
        t_env: StreamTableEnvironment | None = None,
        resources: Dict[ResourceType, Dict[str, Any]] | None = None,
    ) -> None:
        """Init method of RemoteAgentBuilder."""
        self.__input = input
        self.__t_env = t_env
        self.__config = config
        self.__resources = resources

    @property
    def t_env(self) -> StreamTableEnvironment:
        """Get or crate table environment."""
        if self.__t_env is None:
            self.__t_env = StreamTableEnvironment.create(
                stream_execution_environment=self.__env
            )
        return self.__t_env

    def apply(self, agent: Agent) -> "AgentBuilder":
        """Set agent of execution environment.

        Parameters
        ----------
        agent : Agent
            The agent user defined to run in execution environment.
        """
        if self.__agent_plan is not None:
            err_msg = "RemoteAgentBuilder doesn't support apply multiple agents yet."
            raise RuntimeError(err_msg)

        # inspect refer actions and resources from env to agent.
        for type, name_to_resource in self.__resources.items():
            agent.resources[type] = name_to_resource | agent.resources[type]

        self.__agent_plan = AgentPlan.from_agent(agent, self.__config)

        return self

    def to_datastream(self, output_type: TypeInformation | None = None) -> DataStream:
        """Get output datastream of agent execution.

        Returns:
        -------
        DataStream
            Output datastream of agent execution.
        """
        if self.__agent_plan is None:
            err_msg = "Must apply agent before call to_datastream/to_table."
            raise RuntimeError(err_msg)

        # return the same output datastream when call to_datastream multiple.
        if self.__output is None:
            j_data_stream_output = invoke_method(
                None,
                "org.apache.flink.agents.runtime.CompileUtils",
                "connectToAgent",
                [
                    self.__input._j_data_stream,
                    self.__agent_plan.model_dump_json(serialize_as_any=True),
                ],
                [
                    "org.apache.flink.streaming.api.datastream.KeyedStream",
                    "java.lang.String",
                ],
            )
            output_stream = DataStream(j_data_stream_output)
            self.__output = output_stream.map(
                lambda x: cloudpickle.loads(x), output_type=output_type
            )
        return self.__output

    def to_table(self, schema: Schema, output_type: TypeInformation) -> Table:
        """Get output Table of agent execution.

        Parameters
        ----------
        schema : Schema
            Indicate schema of the output table.
        output_type : TypeInformation
            Indicate schema corresponding type information.

        Returns:
        -------
        Table
            Output Table of agent execution.
        """
        return self.t_env.from_data_stream(self.to_datastream(output_type), schema)

    def to_list(self) -> List[Dict[str, Any]]:
        """Get output list of agent execution.

        This method is not supported for remote execution environments.
        """
        msg = "RemoteAgentBuilder does not support to_list."
        raise NotImplementedError(msg)


RemoteExecutionEnvironment 的代码如下。

class RemoteExecutionEnvironment(AgentsExecutionEnvironment):
    """Implementation of AgentsExecutionEnvironment for execution with DataStream."""

    __env: StreamExecutionEnvironment
    __t_env: StreamTableEnvironment
    __config: AgentConfiguration

    def __init__(
        self,
        env: StreamExecutionEnvironment,
        t_env: StreamTableEnvironment | None = None,
    ) -> None:
        """Init method of RemoteExecutionEnvironment."""
        super().__init__()
        self.__env = env
        self.__t_env = t_env
        self.__config = AgentConfiguration()
        self.__load_config_from_flink_conf_dir()

    @property
    def t_env(self) -> StreamTableEnvironment:
        """Get or crate table environment."""
        if self.__t_env is None:
            self.__t_env = StreamTableEnvironment.create(
                stream_execution_environment=self.__env
            )
        return self.__t_env

    def get_config(self, path: str | None = None) -> AgentConfiguration:
        """Get the writable configuration for flink agents.

        Returns:
        -------
        LocalConfiguration
            The configuration for flink agents.
        """
        return self.__config

    @staticmethod
    def __process_input_datastream(
        input: DataStream, key_selector: KeySelector | Callable | None = None
    ) -> KeyedStream:
        if isinstance(input, KeyedStream):
            return input
        else:
            if key_selector is None:
                msg = "KeySelector must be provided."
                raise RuntimeError(msg)
            input = input.key_by(key_selector)
            return input

    def from_datastream(
        self, input: DataStream, key_selector: KeySelector | Callable | None = None
    ) -> RemoteAgentBuilder:
        """Set input datastream of agent.

        Parameters
        ----------
        input : DataStream
            Receive a DataStream as input.
        key_selector : KeySelector
            Extract key from each input record, must not be None when input is
            not KeyedStream.
        """
        input = self.__process_input_datastream(input, key_selector)

        return RemoteAgentBuilder(
            input=input,
            config=self.__config,
            t_env=self.__t_env,
            resources=self.resources,
        )

    def from_table(
        self,
        input: Table,
        key_selector: KeySelector | Callable | None = None,
    ) -> AgentBuilder:
        """Set input Table of agent.

        Parameters
        ----------
        input : Table
            Receive a Table as input.
        key_selector : KeySelector
            Extract key from each input record.
        """
        input = self.t_env.to_data_stream(table=input)

        input = input.map(lambda x: x, output_type=PickledBytesTypeInfo())

        input = self.__process_input_datastream(input, key_selector)
        return RemoteAgentBuilder(
            input=input,
            config=self.__config,
            t_env=self.t_env,
            resources=self.resources,
        )

    def from_list(self, input: List[Dict[str, Any]]) -> "AgentsExecutionEnvironment":
        """Set input list of agent execution.

        This method is not supported for remote execution environments.
        """
        msg = "RemoteExecutionEnvironment does not support from_list."
        raise NotImplementedError(msg)

    def execute(self) -> None:
        """Execute agent."""
        self.__env.execute()


    def __load_config_from_flink_conf_dir(self) -> None:
        """Load agent configuration from FLINK_CONF_DIR if available."""
        flink_conf_dir = os.environ.get("FLINK_CONF_DIR")
        if flink_conf_dir is None:
            return

        # Try to find config file, with fallback to legacy name
        config_path = self.__find_config_file(flink_conf_dir)

        if config_path is None:
            logging.error(f"Config file not found in {flink_conf_dir}")
        else:
            self.__config.load_from_file(str(config_path))

    def __find_config_file(self, flink_conf_dir: str) -> Path | None:
        """Find config file in the given directory, checking both new and legacy names.

        Parameters
        ----------
        flink_conf_dir : str
            Directory to search for config files.

        Returns:
        -------
        Path | None
            Path to the config file if found, None otherwise.
        """
        # Try legacy config file name first
        legacy_config_path = Path(flink_conf_dir).joinpath(_LEGACY_CONFIG_FILE_NAME)
        if legacy_config_path.exists():
            logging.warning(
                f"Using legacy config file {_LEGACY_CONFIG_FILE_NAME}"
            )
            return legacy_config_path

        # Try new config file name as fallback
        primary_config_path = Path(flink_conf_dir).joinpath(_CONFIG_FILE_NAME)
        if primary_config_path.exists():
            return primary_config_path

        return None

0xEE 个人信息

★★★★★★关于生活和技术的思考★★★★★★

微信公众账号:罗西的思考

如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

在这里插入图片描述

0xFF 参考

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐