Flink CDC Connectors 选型、版本、安装与最佳实践
本文介绍了Flink CDC的connectors支持情况,包括MySQL、Postgres等source连接器,以及StarRocks、Doris、Elasticsearch等多种sink连接器。提供了版本兼容矩阵和选型建议,推荐新项目使用3.5.x+Flink 1.19/1.20组合。详细说明了安装部署流程,并给出5个典型场景的YAML配置模板,包括MySQL到StarRocks/Doris/
1. 支持的 Connectors 一览
Source
- MySQL(5.6/5.7/8.0.x,兼容 RDS/Aurora/PolarDB/MariaDB/PolarDB-X 等)
- Postgres(3.5.0 起)
Sink
- StarRocks(2.x/3.x)
- Apache Doris(1.2.x/2.x.x/3.x.x)
- Paimon(0.6–1.0)
- Kafka
- Elasticsearch(6/7/8)
- Iceberg
- OceanBase(3.x/4.x)
- MaxCompute
- Fluss
下载 JAR:使用对应 Flink CDC 版本线(3.1/3.2/3.3/3.4/3.5)。Source 与 Sink JAR 放置位置见 §4。
2. 版本兼容矩阵(精简速查)
| Flink CDC 版本 | Flink 版本 | Pipeline Source | Pipeline Sink(节选) | 备注 |
|---|---|---|---|---|
| 3.0.x | 1.17/1.18 | MySQL | StarRocks、Doris | |
| 3.1.x | 1.17/1.18/1.19* | MySQL | StarRocks、Doris、Paimon、Kafka | *仅 3.1.1 支持 1.19 |
| 3.2.x | 1.17/1.18/1.19 | MySQL | + Elasticsearch | |
| 3.3.x | 1.19/1.20 | MySQL | + OceanBase、MaxCompute | |
| 3.4.x | 1.19/1.20 | MySQL | + Iceberg | |
| 3.5.x | 1.19/1.20 | MySQL、Postgres | StarRocks、Doris、Paimon、Kafka、Elasticsearch、OceanBase、MaxCompute、Iceberg、Fluss | 最新线 |
建议:新项目优先选择 3.5.x + Flink 1.19/1.20。老项目升级时先对齐小版本,做一次 Savepoint + 回滚演练。
3. 如何选型:用场景倒推 Connector
-
OLAP 明细/聚合 → StarRocks / Doris / Paimon / Iceberg
- 需要主键 Upsert:倾向 StarRocks 主键模型 或 Doris Unique/Primary Key
- 湖仓/批流一体:Iceberg / Paimon
-
检索与近实时搜索 → Elasticsearch(upsert 文档,ID=业务主键)
-
消息分发/总线 → Kafka(下游自取)
-
多引擎协同/企业存储 → MaxCompute / OceanBase
-
二方系统/自建流平台 → Fluss 或自研 Sink
Checklist:
- 是否需要 Upsert 语义?目标库主键模型是否可配?
- Schema 演进策略(lenient/evolve…)在目标端是否支持?
- 延迟与吞吐目标(批写/刷盘参数、并行度)
- 安全与权限(白名单、密钥、网络)
4. 安装与提交流程(标准做法)
-
准备 Flink 集群(1.19/1.20 建议)
-
解压 Flink CDC 二进制(如
flink-cdc-3.5.0) -
拷贝 JAR
- CDC Pipeline Connectors(Source/Sink) → 放到 Flink CDC 的
lib/ - JDBC/MySQL 驱动 → 放到 Flink(非 CDC)的
lib/(或提交时--jar)
- CDC Pipeline Connectors(Source/Sink) → 放到 Flink CDC 的
-
编写 YAML 管道(见 §5)
-
提交作业
# 常见两种写法,按你的 CLI 版本选择 bin/flink-cdc.sh run -f pipeline.yaml # 或 bin/flink-cdc.sh pipeline.yaml -
Web UI 观测:吞吐、延迟、反压、Checkpoint 成功率
你也可以用 CdcUp CLI 一键起本地演练环境(见你之前的 CdcUp 教程)。
5. 可拷贝 YAML 模板(五连发)
5.1 MySQL → StarRocks(整库同步,演进友好)
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: 'app_db\..*'
server-id: 5400-5404
server-time-zone: UTC
sink:
type: starrocks
jdbc-url: 'jdbc:mysql://127.0.0.1:9030'
load-url: '127.0.0.1:8030' # 若镜像暴露为 8080,请改 8080
username: root
password: ""
table.create.properties.replication_num: 1
# 可选:主键模型启用 upsert
# table.create.properties.duplicate_key: false
# table.create.properties.primary_key: "id"
pipeline:
name: MySQL to StarRocks
parallelism: 2
schema.change.behavior: lenient
5.2 MySQL → Doris(加轻量级 schema 变更)
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: 'app_db\..*'
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: MySQL to Doris
parallelism: 2
schema.change.behavior: lenient
5.3 MySQL → Paimon(湖仓明细,支持 upsert)
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: 'ods\..*'
server-id: 5400-5404
server-time-zone: UTC
sink:
type: paimon
warehouse: 'hdfs:///warehouse/paimon'
database: 'ods'
table.create.options: 'primary-key=id' # 视数据模型设置
pipeline:
name: MySQL to Paimon
parallelism: 2
schema.change.behavior: lenient
5.4 MySQL → Elasticsearch(检索)
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: 'search_db\.(items|orders)'
server-id: 5400-5404
server-time-zone: UTC
sink:
type: elasticsearch
hosts: 'http://es1:9200,http://es2:9200'
index: 'items' # 或在 route 中做多表多索引映射
id.key: 'id' # 文档 ID = 业务主键
write.mode: 'upsert'
pipeline:
name: MySQL to ES
parallelism: 2
schema.change.behavior: try_evolve
5.5 Postgres → Iceberg(3.5.x 起支持 PG Source)
source:
type: postgres
hostname: localhost
port: 5432
username: postgres
password: secret
tables: 'app\.public\..*'
server-time-zone: UTC
sink:
type: iceberg
catalog: 'hive'
namespace: 'ods'
warehouse: 'hdfs:///warehouse/iceberg'
write.upsert.enabled: true
pipeline:
name: PG to Iceberg
parallelism: 2
schema.change.behavior: lenient
需要 分表并表/改名 时,叠加
route:;需要字段治理/过滤/软删除时,叠加transform:(参考你已有的 Transform/Route 文档)。
6. 常见坑位与优化清单
- 版本不匹配:确认 Flink 版本 ↔ Flink CDC 版本线,连接器 JAR 与 CDC 版本一致。
- Jar 放错:CDC connectors 放 CDC
lib/;驱动(如 MySQL Connector/J)放 Flinklib/或--jar。 - 端口混淆:StarRocks FE HTTP 端口常见 8030,部分 all-in-one 镜像用 8080;
load-url要一致。 - Upsert 语义:下游无主键或唯一键时,幂等写入无法保证;请在 Sink 侧配置主键模型或唯一键。
- Schema 演进:默认
lenient,drop.table/truncate.table在 lenient 会被忽略;按需用 include/exclude 放开。 - 并行与批写:提升
pipeline.parallelism、调大 Sink 批次与 flush 以增吞吐;同时观察反压。 - 时区:统一
server-time-zone(常用 UTC),避免时间列偏移。 - 安全:最小权限账号;敏感配置用 Secret 注入;日志与位点审计留痕。
- 回滚:重要升级/大改前先打 Savepoint,演练恢复流程。
7. 开发自研 Connector:入门路线图
当官方 Connector 无法满足需求(专有系统/内部总线/定制写入协议):
- 研读 Flink CDC APIs:理解 Source/Sink SPI、
DataChangeEvent、SchemaChangeEvent与 SchemaOperator。 - 最小实现:从一个“打印 Sink”或“内存 Sink”起步,打通事件生命周期与 Checkpoint 协议。
- 语义与容错:明确 Exactly-Once/At-Least-Once 语义、两阶段提交(若需要)、重试与幂等策略。
- Schema 演进:实现/映射
create/add/rename/alter/drop/truncate;决定在lenient下的“保留旧列 + 新增新列”策略。 - 性能:批写聚合、压缩、序列化、分区与并行度、热点打散。
- 打包分发:产出独立 JAR,注册到 CDC
lib/,并编写 YAML 选项说明与验收用例。
更多推荐


所有评论(0)