1. 先把“部署积木”想清楚:Client、JobManager、TaskManager

Flink 的核心组件永远是这三个:

  • Client:把你的作业(Jar/SQL)编译成 JobGraph 并提交
  • JobManager:统筹调度、容错、Web UI、REST
  • TaskManager:真正跑算子(source/transform/sink)

你后面选 Standalone、Docker、K8s,本质只是“这些进程在哪里跑、怎么拉起、怎么伸缩、怎么高可用”。

2. Session vs Application:这是所有部署方式的“分水岭”

你贴的部署文档里反复强调两件事:资源隔离与 main() 执行位置。

  • Session Mode
    集群长期存在,多个作业共享 TaskManager 资源,提交快、成本低,但隔离差;一个作业把 TM 打崩,可能牵连同机其他作业。
  • Application Mode
    每个应用独占一个集群,JobManager 里直接执行 main();隔离更好、治理更清晰,但每个应用都要有自己的“集群生命周期”。

生产上常见策略:

  • 多租户/多团队、作业很多:Session(配好队列隔离/限额)
  • 关键链路/重作业:Application(隔离+可控)

3. Java 兼容性:别等上线再被 JDK 模块化“打脸”

你贴的兼容性要点可以总结成三句话:

  • Flink 2.0 起默认/推荐 Java 17;2.0 起对 Java 21 属于实验支持
  • Java 16+ 的 Jigsaw 模块化会影响反射(Kryo 序列化常见),需要用 --add-opens/--add-exports,建议通过 env.java.opts.all 统一配置
  • 不要“缩短”官方默认的 opens/export 列表,只能追加,否则 Flink 自己都可能不稳定

4. Standalone:最“裸”的方式,适合本机/小规模自管

4.1 Session(最常用本地起步)

./bin/start-cluster.sh
# Web UI: http://localhost:8081
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
./bin/stop-cluster.sh

4.2 Application(把作业当“集群的一部分”来跑)

把 jar 放进 lib/ 或用 --jars 拉取,再用 standalone-job.sh 启动 JobManager,TaskManager 另起:

cp ./examples/streaming/TopSpeedWindowing.jar lib/
./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
./bin/taskmanager.sh start

4.3 Standalone 的现实代价

  • 进程挂了要你自己拉起、资源扩缩容你自己做
  • HA 要你自己配 ZK、masters/workers、共享存储

5. Working Directory:把“可恢复的本地痕迹”放到正确的盘

Flink 进程可以配置 working directory,用来存这些东西:

  • BlobServer/BlobCache 的 blob
  • 开启 state.backend.local-recovery 时的本地 state
  • RocksDB 工作目录

核心配置思路:

  • process.working-dir(或分别指定 process.jobmanager.working-dir / process.taskmanager.working-dir
  • 想跨进程重启也能本地恢复:必须
    1)state.backend.local-recovery: true
    2)TaskManager resource-id 固定taskmanager.resource-id
    3)重启后仍能挂回同一个 working-dir 盘

典型配置:

process.working-dir: /data/flink/workdir
state.backend.local-recovery: true
taskmanager.resource-id: TaskManager_1

一句话:本地恢复要“同一个 TM 身份 + 同一块盘”。

6. Docker:把 Standalone 变成“可复现的容器化 Standalone”

6.1 Docker Session(最快跑起来)

思路:先建网络、设 jobmanager.rpc.address,再拉起 JM/TM。

FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
docker network create flink-network

docker run --rm --name=jobmanager --network flink-network -p 8081:8081 \
  --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" flink:2.2.0-scala_2.12 jobmanager

docker run --rm --name=taskmanager --network flink-network \
  --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" flink:2.2.0-scala_2.12 taskmanager

6.2 Docker Compose:把“集群拓扑”固化成一份 yaml

你贴的 compose 模板很标准:JM 暴露 8081,TM 按需 scale。
如果要 SQL Client 同容器使用,记得连接器 jar 必须在 集群和 client 都可见(通常做自定义镜像)。

6.3 Docker Application:更接近生产的隔离模式

要点只有一个:作业 jar 必须在容器 classpath / usrlib 可见
三种做法你贴得很全:挂载 volume、构建自定义镜像、或 --jars 指向远端(S3/HTTP)。

7. Kubernetes 部署:Standalone on K8s vs Native K8s

很多人第一次上 K8s 会把两者混在一起:

  • Standalone on K8s:本质仍是 Standalone,只是用 Deployment/Service 把 JM/TM 跑在 K8s 上
  • Native Kubernetes:Flink 直接调用 K8s API 动态申请/释放 TaskManager,更像真正的“资源提供方集成”

下面重点讲你最后贴的 Native Kubernetes。

8. Native Kubernetes:Flink 直连 K8s API 的“正统玩法”

8.1 Session 模式:一套长期集群跑多作业

启动 Session:

./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

提交作业:

./bin/flink run \
  --target kubernetes-session \
  -Dkubernetes.cluster-id=my-first-flink-cluster \
  ./examples/streaming/TopSpeedWindowing.jar

停止:直接删 deployment

kubectl delete deployment/my-first-flink-cluster

(nightlies.apache.org)

补充:Session 还有 detached/attached 两种控制方式;attached 可以在脚本里输入 stop/help 做交互管理。(nightlies.apache.org)

8.2 Application 模式:生产更推荐的隔离方式

Application 模式的关键约束:用户代码要能被集群侧拿到(因为 main() 在集群里跑)。

两条主路:

1)自定义镜像把 jar 打进 usrlib/
2)开启 Artifact Management:本地 jar 上传到远端,再由 JM pod 拉取(也可以混合本地/远端 artifact-list)(nightlies.apache.org)

你还能用 ./bin/flink list/cancel --target kubernetes-application 去管理集群侧作业。(nightlies.apache.org)

8.3 Web UI 暴露方式:ClusterIP / NodePort / LoadBalancer

kubernetes.rest-service.exposed.type 支持:

  • ClusterIP:集群内访问,通常配 kubectl port-forward
  • NodePort:节点 IP:NodePort
  • LoadBalancer:云厂商 LB

注意文档里的安全提醒:如果你把 REST/UI 暴露到公网,通常意味着“可执行任意用户代码”的风险。(nightlies.apache.org)

8.4 日志与排障:别只盯 UI

  • 直接看 pod 日志:kubectl logs <pod>
  • TM 会自动回收空闲实例,想留足排障时间可调大 resourcemanager.taskmanager-timeout(nightlies.apache.org)

8.5 Plugins、Secrets、RBAC、Pod Template:上生产绕不开

  • Plugins:通过环境变量把内置插件(例如 S3)挂进 JM/TM (nightlies.apache.org)
  • Secrets:既可按文件挂载,也可注入环境变量 (nightlies.apache.org)
  • HA:把 JM replicas 调大只是“更快重启”,真正 HA 还要启用 HA 配置(文档同页有说明)(nightlies.apache.org)
  • RBAC:默认 service account 可能没权限创建/删除 pod,需要 role binding (nightlies.apache.org)
  • Pod Template:用 kubernetes.pod-template-file.default 支持更复杂的 pod spec,但要知道哪些字段会被 Flink 覆盖 (nightlies.apache.org)

9. Hive Functions:把 Hive 的函数生态“搬”到 Flink SQL

9.1 HiveModule:Hive 内置函数变成 Flink 系统函数

加载方式(Java):

String name = "myhive";
String version = "2.3.4";
tableEnv.loadModule(name, new HiveModule(version));

注意:老版本 Hive 内置函数有线程安全问题,文档建议自行打补丁。

9.2 原生 Hive 聚合:性能开关(Flink 1.17+)

如果 HiveModule 优先级高于 CoreModule,默认会优先用 Hive 聚合,但目前只能走 sort-based 聚合。
开启 table.exec.hive.native-agg-function.enabled=true 后,sum/count/avg/min/max 这 5 个可以走 hash-based 聚合,通常对性能提升很明显。

经验建议:

  • 性能不是瓶颈就别开(能力不完全对齐 Hive)
  • SQL Client 场景下目前不能按 job 单独开,只能模块级别先开再 load module(文档也说未来会修)

9.3 复用 Hive UDF/UDAF/UDTF

前置条件你贴得很明确:

  • 当前 session 的 catalog 要指向含该函数的 HiveCatalog(连 Hive Metastore)
  • 函数 jar 要在 Flink classpath

Flink 会在规划/执行阶段把 Hive 的 UDF/GenericUDF/UDTF/UDAF 自动翻译成 Flink 的 ScalarFunction/TableFunction/AggregateFunction 来跑。

10. Flink SQL 调 OpenAI:把推理当作一等公民接入流批任务

你贴的 “OpenAI Model Function” 思路很像一个可配置的远程模型算子:在 SQL 里 CREATE MODEL,再用 ML_PREDICT 批量推理。

10.1 情感分类示例(Chat 类任务)

CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
  'provider'='openai',
  'endpoint'='https://api.openai.com/v1/chat/completions',
  'api-key' = '<YOUR KEY>',
  'model'='gpt-3.5-turbo',
  'system-prompt'='Classify ... Output only the label.'
);

然后:

INSERT INTO print_sink
SELECT id, movie_name, content as predicit_label, actual_label
FROM ML_PREDICT(
  TABLE movie_comment,
  MODEL ai_analyze_sentiment,
  DESCRIPTOR(user_comment)
);

工程提醒(很关键):

  • API Key 别硬编码在 SQL 文件里,生产建议走 K8s Secret/环境变量注入,再由 Flink 配置引用(跟上一节 K8s secrets 串起来)
  • 成本与吞吐:n 记得保持 1;max-tokensmax-context-size 控制预算;批量推理要盯住并发与重试策略

10.2 错误处理与上下文溢出:先定策略再跑生产

你贴的 options 里最影响“作业稳定性”的是两类:

  • context-overflow-action:截断头/尾、跳过并记录
  • error-handling-strategy + retry:RETRY/FAILOVER/IGNORE + 重试次数与兜底策略

强烈建议:流任务默认别轻易 FAILOVER,把“可恢复错误”做成 IGNORE+日志/指标,避免外部 API 抖动导致作业雪崩。

10.3 Embeddings:向量输出的 schema 要对齐

Embeddings 输出是 ARRAY<FLOAT>,在下游你通常会:

  • 写入向量库/ES 向量字段
  • 或在 Flink 里做相似度计算(注意状态大小与算子开销)

OpenAI 官方 API 参考里仍保留 Embeddings 接口(可选 dimensions 等参数)。(OpenAI 平台)
另外,OpenAI 也提供了更统一的 Responses API(不少新能力会往这里收敛),如果你后续要升级端点,可以优先对齐 Responses 的语义与返回结构。(OpenAI 平台)

11. 最后给一份“上线前检查清单”

  • 选型

    • 多作业共享:Session
    • 强隔离/关键链路:Application
  • Java

    • 推荐 Java 17
    • Java 16+ 模块化 opens/export 配好且不删默认项
  • 状态与恢复

    • RocksDB + 本地恢复要固定 TM resource-id + 持久化 working-dir
  • K8s 安全

    • REST/UI 暴露不要裸奔公网
    • RBAC、Secrets、插件路径一次性打通
  • Hive/AI 扩展

    • HiveModule 优先级与 native agg 开关要评估一致性
    • OpenAI 推理要把重试/忽略/截断策略定好,指标打好
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐