1. 支持的 Connectors 一览

Source

  • MySQL(5.6/5.7/8.0.x,兼容 RDS/Aurora/PolarDB/MariaDB/PolarDB-X 等)
  • Postgres(3.5.0 起)

Sink

  • StarRocks(2.x/3.x)
  • Apache Doris(1.2.x/2.x.x/3.x.x)
  • Paimon(0.6–1.0)
  • Kafka
  • Elasticsearch(6/7/8)
  • Iceberg
  • OceanBase(3.x/4.x)
  • MaxCompute
  • Fluss

下载 JAR:使用对应 Flink CDC 版本线(3.1/3.2/3.3/3.4/3.5)。Source 与 Sink JAR 放置位置见 §4。

2. 版本兼容矩阵(精简速查)

Flink CDC 版本 Flink 版本 Pipeline Source Pipeline Sink(节选) 备注
3.0.x 1.17/1.18 MySQL StarRocks、Doris
3.1.x 1.17/1.18/1.19* MySQL StarRocks、Doris、Paimon、Kafka *仅 3.1.1 支持 1.19
3.2.x 1.17/1.18/1.19 MySQL + Elasticsearch
3.3.x 1.19/1.20 MySQL + OceanBase、MaxCompute
3.4.x 1.19/1.20 MySQL + Iceberg
3.5.x 1.19/1.20 MySQL、Postgres StarRocks、Doris、Paimon、Kafka、Elasticsearch、OceanBase、MaxCompute、Iceberg、Fluss 最新线

建议:新项目优先选择 3.5.x + Flink 1.19/1.20。老项目升级时先对齐小版本,做一次 Savepoint + 回滚演练。

3. 如何选型:用场景倒推 Connector

  • OLAP 明细/聚合StarRocks / Doris / Paimon / Iceberg

    • 需要主键 Upsert:倾向 StarRocks 主键模型Doris Unique/Primary Key
    • 湖仓/批流一体:Iceberg / Paimon
  • 检索与近实时搜索Elasticsearch(upsert 文档,ID=业务主键)

  • 消息分发/总线Kafka(下游自取)

  • 多引擎协同/企业存储MaxCompute / OceanBase

  • 二方系统/自建流平台Fluss 或自研 Sink

Checklist

  1. 是否需要 Upsert 语义?目标库主键模型是否可配?
  2. Schema 演进策略(lenient/evolve…)在目标端是否支持?
  3. 延迟与吞吐目标(批写/刷盘参数、并行度)
  4. 安全与权限(白名单、密钥、网络)

4. 安装与提交流程(标准做法)

  1. 准备 Flink 集群(1.19/1.20 建议)

  2. 解压 Flink CDC 二进制(如 flink-cdc-3.5.0

  3. 拷贝 JAR

    • CDC Pipeline Connectors(Source/Sink) → 放到 Flink CDC 的 lib/
    • JDBC/MySQL 驱动 → 放到 Flink(非 CDC)的 lib/(或提交时 --jar
  4. 编写 YAML 管道(见 §5)

  5. 提交作业

    # 常见两种写法,按你的 CLI 版本选择
    bin/flink-cdc.sh run -f pipeline.yaml
    # 或
    bin/flink-cdc.sh pipeline.yaml
    
  6. Web UI 观测:吞吐、延迟、反压、Checkpoint 成功率

你也可以用 CdcUp CLI 一键起本地演练环境(见你之前的 CdcUp 教程)。

5. 可拷贝 YAML 模板(五连发)

5.1 MySQL → StarRocks(整库同步,演进友好)

source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: 'app_db\..*'
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: starrocks
  jdbc-url: 'jdbc:mysql://127.0.0.1:9030'
  load-url: '127.0.0.1:8030'     # 若镜像暴露为 8080,请改 8080
  username: root
  password: ""
  table.create.properties.replication_num: 1
  # 可选:主键模型启用 upsert
  # table.create.properties.duplicate_key: false
  # table.create.properties.primary_key: "id"

pipeline:
  name: MySQL to StarRocks
  parallelism: 2
  schema.change.behavior: lenient

5.2 MySQL → Doris(加轻量级 schema 变更)

source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: 'app_db\..*'
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: MySQL to Doris
  parallelism: 2
  schema.change.behavior: lenient

5.3 MySQL → Paimon(湖仓明细,支持 upsert)

source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: 'ods\..*'
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: paimon
  warehouse: 'hdfs:///warehouse/paimon'
  database: 'ods'
  table.create.options: 'primary-key=id'   # 视数据模型设置

pipeline:
  name: MySQL to Paimon
  parallelism: 2
  schema.change.behavior: lenient

5.4 MySQL → Elasticsearch(检索)

source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: 'search_db\.(items|orders)'
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: elasticsearch
  hosts: 'http://es1:9200,http://es2:9200'
  index: 'items'              # 或在 route 中做多表多索引映射
  id.key: 'id'                # 文档 ID = 业务主键
  write.mode: 'upsert'

pipeline:
  name: MySQL to ES
  parallelism: 2
  schema.change.behavior: try_evolve

5.5 Postgres → Iceberg(3.5.x 起支持 PG Source)

source:
  type: postgres
  hostname: localhost
  port: 5432
  username: postgres
  password: secret
  tables: 'app\.public\..*'
  server-time-zone: UTC

sink:
  type: iceberg
  catalog: 'hive'
  namespace: 'ods'
  warehouse: 'hdfs:///warehouse/iceberg'
  write.upsert.enabled: true

pipeline:
  name: PG to Iceberg
  parallelism: 2
  schema.change.behavior: lenient

需要 分表并表/改名 时,叠加 route:;需要字段治理/过滤/软删除时,叠加 transform:(参考你已有的 Transform/Route 文档)。

6. 常见坑位与优化清单

  • 版本不匹配:确认 Flink 版本 ↔ Flink CDC 版本线,连接器 JAR 与 CDC 版本一致。
  • Jar 放错:CDC connectors 放 CDC lib/;驱动(如 MySQL Connector/J)放 Flink lib/--jar
  • 端口混淆:StarRocks FE HTTP 端口常见 8030,部分 all-in-one 镜像用 8080load-url 要一致。
  • Upsert 语义:下游无主键或唯一键时,幂等写入无法保证;请在 Sink 侧配置主键模型或唯一键。
  • Schema 演进:默认 lenientdrop.table/truncate.table 在 lenient 会被忽略;按需用 include/exclude 放开。
  • 并行与批写:提升 pipeline.parallelism、调大 Sink 批次与 flush 以增吞吐;同时观察反压。
  • 时区:统一 server-time-zone(常用 UTC),避免时间列偏移。
  • 安全:最小权限账号;敏感配置用 Secret 注入;日志与位点审计留痕。
  • 回滚:重要升级/大改前先打 Savepoint,演练恢复流程。

7. 开发自研 Connector:入门路线图

当官方 Connector 无法满足需求(专有系统/内部总线/定制写入协议):

  1. 研读 Flink CDC APIs:理解 Source/Sink SPIDataChangeEventSchemaChangeEventSchemaOperator
  2. 最小实现:从一个“打印 Sink”或“内存 Sink”起步,打通事件生命周期与 Checkpoint 协议。
  3. 语义与容错:明确 Exactly-Once/At-Least-Once 语义、两阶段提交(若需要)、重试与幂等策略。
  4. Schema 演进:实现/映射 create/add/rename/alter/drop/truncate;决定在 lenient 下的“保留旧列 + 新增新列”策略。
  5. 性能:批写聚合、压缩、序列化、分区与并行度、热点打散。
  6. 打包分发:产出独立 JAR,注册到 CDC lib/,并编写 YAML 选项说明与验收用例。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐