一、Flink 中的计算资源

Flink 中的计算资源核心载体是具体执行的 Task,要实现生产环境的高效资源管理与问题定位,需先掌握 Slot、Chain、Task 等核心概念,理解资源的隔离与管理逻辑。

1.1 Task Slot

Flink 集群运行时包含两类核心进程,其中 TaskManager 是计算资源的主要提供方,其核心特性如下:

  • 一个 TaskManager 对应一个独立的 JVM 进程;

  • TaskManager 内部通过独立线程执行 Task;

  • Task Slot 是 TaskManager 的计算资源子集,用于控制 TaskManager 可接收的 Task 数量。

若一个 TaskManager 配置 5 个 Slot,则其计算资源会被平均划分为 5 份,不同 Task 运行在不同 Slot 中,避免资源竞争。需重点注意:Slot 仅实现内存隔离,对 CPU 不做隔离

同一 JVM 进程(同一 TaskManager)中的 Task 可共享 TCP 连接,减少网络传输开销,提升程序运行效率、降低资源消耗。

1.2 Slot 共享

Flink 默认允许同一 Job 的子任务共享 Slot,核心目的是提高集群资源利用率。

Flink 任务中不同算子的计算压力差异较大:简单算子(如 map、filter)资源需求低,复杂算子(如 window、group by)资源需求高。通过 Slot 共享,资源需求高的算子可复用其他算子的空闲 Slot,避免资源浪费,实现集群资源的高效利用。

1.3 Operator Chain

Operator Chain 是 Flink 内置的任务优化机制,核心逻辑是将多个算子的 Task 连接成一个新的 Task,让其在同一个线程中执行。

该优化可有效减少线程间上下文切换的开销,同时避免算子间数据序列化/反序列化带来的资源消耗,最终提升任务吞吐量。

1.4 并行度

并行度是 Flink 中定义“某一算子被切分成多少个子任务”的核心参数。Flink 代码会先转换为逻辑视图,运行时会根据并行度配置,将逻辑算子切分为对应数量的子任务并行执行。

并行度直接决定了算子的并行处理能力。

二、源码解析(资源申请与分配流程)

Flink Job 执行过程中,资源的申请与分配由多个核心类协同完成,核心流程是:JobManager 通过 SlotProvider 向 ResourceManager(RM)申请资源,RM 协调 TaskManager 满足资源请求,最终将资源分配给对应的 Task 执行。

2.1 核心类与交互流程

资源管理核心类包括 Scheduler(调度器)、SlotPool(资源池)、JobMaster,三者交互流程如下:

  1. Scheduler 向 SlotPool 申请或释放 Slot 资源;

  2. 若 SlotPool 中的空闲资源无法满足 Scheduler 的申请需求,则 SlotPool 向 ResourceManager 发起资源申请;

  3. ResourceManager 协调 TaskManager 提供资源,获取到资源后通过 JobMaster 分配给 SlotPool;

  4. SlotProvider 核心功能:通过 allocateSlot() 方法向 SlotPool 申请可用 Slot,通过 returnLogicSlot() 方法将空闲 Slot 释放回 SlotPool,完成资源循环利用。

三、并行度设置(生产环境重点)

Flink 支持 4 个级别设置并行度,不同级别优先级不同,生产环境需根据业务需求选择合适的设置方式,确保资源合理分配。

并行度优先级(从高到低):算子级别 > 环境级别 > 客户端级别 > 集群配置级别

生产环境推荐使用算子级别的并行度进行资源控制,可实现更精细的资源分配与管控。

3.1 算子级别(推荐)

通过显式调用 setParallelism() 方法,为每个算子单独指定并行度,可根据不同算子的计算压力精准分配资源。

实际生产中,推荐在算子级别显式指定各自的并行度,方便资源的可视化管理与精准控制,避免资源浪费或不足。

示例代码(以 map 算子为例):

// 为map算子单独设置并行度为5
dataStream.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        return value.toUpperCase();
    }
}).setParallelism(5);

3.2 环境级别(不推荐)

通过调用 env.setParallelism() 方法,设置整个任务中所有算子的统一并行度。

特点:配置简单,但无法根据算子计算压力差异分配资源,容易造成资源浪费或瓶颈,生产环境不推荐使用。

示例代码:

// 设置整个任务所有算子的并行度为3
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

3.3 客户端级别

提交 Flink Job 时,通过命令行参数指定并行度,仅在代码中未设置任何并行度时生效。

核心命令:使用 \-p 参数指定并行度,示例如下:

./bin/flink run -p 5 ../wordCount-java*.jar

适用场景:临时调整任务并行度,无需修改代码。

3.4 集群配置级别(默认)

在 Flink 集群配置文件 flink\-conf\.yaml 中,通过 parallelism\.default 参数设置默认并行度。

生效条件:用户未设置算子级别、环境级别、客户端级别并行度时,该参数生效,作为全局默认值。

配置示例(flink-conf.yaml):

# 全局默认并行度,默认为1
parallelism.default: 3

四、生产环境配置注意事项

  • Slot 配置:根据 TaskManager 的内存大小合理设置 Slot 数量,避免单个 Slot 内存不足导致 OOM,一般建议每个 Slot 分配 1~4GB 内存;

  • 并行度规划:复杂算子(window、group by)并行度建议高于简单算子,结合数据量调整,避免并行度过高导致资源浪费,或并行度过低导致任务瓶颈;

  • 资源隔离:虽 Slot 不隔离 CPU,但可通过控制每个 TaskManager 的 Slot 数量,间接控制 CPU 使用率,避免单个 TaskManager 过载;

  • 并行度优先级:严格遵循“算子级别优先”,避免环境级别并行度覆盖算子的精准配置。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐