背景

在项目开发中,我们经常遇到需要将数据同步到其他数据库的场景,例如不同的业务系统需要数据共享,或者将本平台的数据同步到第三方平台。

传统同步方案的痛点

传统的数据同步方案通常采用定时任务调用接口的方式,但存在以下问题:

  • • 实时性差:通常只能做到分钟级或小时级同步,数据延迟高

  • • 协调成本高:需要各部门系统配合开发数据上报接口,沟通协调工作量大

  • • 开发工作量大:每个对接部门都需要定制化开发接口,维护成本高

  • • 可靠性问题:接口调用失败时数据容易丢失,需要复杂的重试和补偿机制

为什么选择Flink CDC

通过实际项目应用,总结出Flink CDC在数据同步方面的核心优势:

1. 同步模式灵活
  • • 全量同步:支持历史数据一次性迁移,适合初始化场景

  • • 增量同步:实时捕获数据变更,保持数据同步状态

  • • 混合模式:先全量后增量,无缝切换,业务无感知

2. 实时性能卓越
  • • 毫秒级延迟:源表数据的增删改操作,几乎实时同步到目标表

  • • 高吞吐量:单任务可处理万级TPS,满足大部分业务场景

  • • 资源可控:支持背压机制,自动调节同步速度

3. 使用门槛低
  • • 配置驱动:通过SQL配置即可完成同步任务,无需编程

  • • 可视化管理:Web界面监控任务状态,运维友好

需要注意的是,使用Flink CDC的前提条件是确保Flink服务器与源数据库(Oracle)和目标数据库(MySQL)之间具备稳定的网络连通性。下面将详细介绍Flink CDC的部署配置和使用方法。

1. Flink环境搭建

1.1 部署模式选择

1、Flink安装、配置

Flink支持多种部署模式,根据业务场景选择合适的部署方案:

  • • Standalone模式:适合开发测试和小规模生产环境

  • • Yarn模式:适合大数据集群环境,资源管理更灵活

  • • Kubernetes模式:适合容器化部署,弹性伸缩能力强

本次实践采用Standalone独立部署模式,具有部署简单、维护方便的特点。

1.2 安装步骤

下载地址:https://flink.apache.org/downloads/
下载 flink-1.19.3-bin-scala_2.12.tgz

tar -xzf flink-*.tgz
export FLINK_HOME=/path/flink-*

# 启动集群
cd /path/flink-*
./bin/start-cluster.sh

# 关闭集群
./bin/stop-cluster.sh

如果需要在web查看状态,需要修改安装路径下/conf/config.yaml,然后重启集群

rest:
  # The address to which the REST client will connect to
  address: 172.29.236.202
  bind-address: 0.0.0.0
  # 默认是8081,由于被占用,改成了8082
  port: 8082

如果是单机部署,任务数 > 1的话,需要修改slot数量,slot可以简单理解为Flink中的执行单元,一个slot执行能够一个任务,配置时可以根据服务器核心数来配置

同样还是修改/conf/config.yaml

taskmanager:
  # 默认是1
  numberOfTaskSlots: 16
  memory:
    process:
      size: 8192m # 默认1728m,建议调大内存,不然数据量过大时容易OOM

web查看状态

2. 依赖驱动准备

使用Flink之前需要准备一些必要的驱动包,放到 /安装目录/lib/。根据自己的任务所需的驱动包有所不同,本次任务需要将Oracle的数据同步到Mysql,需要如下的驱动包,可在Flink CDC文档中查看需要哪些驱动(文末会给出文档链接)。

下图标出了本次任务下载的驱动

 

3. 数据库环境配置

3.1 数据接收端(MySQL)配置

3.1.1 用户权限配置

出于安全考虑,MySQL默认拒绝远程root用户访问,需要创建专用的同步账户并进行精确授权:

-- 创建专用账号(仅限来自 Flink 主机的访问),假设安装flink的机器ip:172.29.236.202
CREATEUSER'flink'@'172.29.236.202' IDENTIFIED BY'Syzxzs@2024';

-- 赋权(按需最小权限,JDBC upsert 需要 SELECT/INSERT/UPDATE/DELETE),假设mysql模式名为flow
GRANTSELECT, INSERT, UPDATE, DELETEON flow.*TO'flink'@'172.29.236.202';

FLUSH PRIVILEGES;

-- 验证用户创建结果:
SELECT host, user, plugin FROM mysql.user WHEREuserIN ('flink','root');
3.1.2 连接器配置

配置MySQL连接器,用于接收同步数据

CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
  ID STRING NOT NULL,
  BANK_DEPOSIT STRING,
  NUM int,
  NAME STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='flink',
'password'='Syzxzs@2024',
'table-name'='test_flink_object',
'scan.fetch-size'='500'
);

3.2 数据发送端(Oracle)配置

3.2.1 系统环境准备

使用root用户创建Oracle操作用户和日志归档目录,确保CDC功能正常运行:

# 如果有,可省略;
sudo adduser oracle
# 找个磁盘空间大的目录,日志很多
mkdir -p '/u03/oracle/oradata/recovery_area'
chown -R oracle:oinstall '/u03/oracle'
chmod 750 '/u03/oracle/oradata/recovery_area'

# 路径中的 SID 需替换为实际实例/库目录;目录必须存在且 oracle 用户可写。
mkdir -p '/opt/oracle/oradata/SID/'
chown -R oracle:oinstall '/opt/oracle/oradata/SID/'
chmod 750 '/opt/oracle/oradata/SID/'
3.2.2 数据库归档配置

使用Oracle管理员账号登录,配置数据库归档模式和补充日志:

alter systemset db_recovery_file_dest_size =10G;
altersystemset db_recovery_file_dest ='/u03/oracle/oradata/recovery_area'scope=spfile;
-- 需要重启数据库
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

--查看数据归档模式,应该输出 "Database log mode: Archive Mode"
archive log list;
-- 查看归档目录
SELECT name, isdefault, valueFROM v$parameterWHERE name LIKE'db_recovery_file_dest%';

-- 开启补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 开启指定表所有列,补充日志
ALTER TABLE 模式.表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 需要提前使用root用户创建目录'/opt/oracle/oradata/SID/'
-- 路径中的 SID 需替换为实际实例/库目录;目录必须存在且 oracle 用户可写。
CREATE TABLESPACE logminer_tbs DATAFILE '/u03/oracle/oradata/coredb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
3.2.3 补充日志说明

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA是CDC功能的核心配置,其重要作用如下:

启用后,Oracle会在Redo Log中记录:
✅ 变更前的数据值(Before Image)
✅ 变更后的数据值(After Image)  
✅ 主键信息
✅ 唯一索引信息,当数据量很大的时候,需要自定义分片的主键名,必须要开启

开启前后对比
-- 没有Supplemental Logging时:
-- Redo Log只记录:UPDATE table SET col1='new_value' WHERE rowid='xxx'

-- 有Supplemental Logging后:
-- Redo Log记录:
-- Before: col1='old_value', col2='value2', primary_key='123'
-- After:  col1='new_value', col2='value2', primary_key='123'

-- 检查是否已启用
SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;
-- 返回 YES 表示已启用
-- 查看详细的补充日志状态
SELECT*FROM V$DATABASE;
3.2.4 CDC用户创建

创建专用的CDC同步用户,并授予必要的权限:

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANTCREATE SESSION TO flinkuser;
GRANTSET CONTAINER TO flinkuser;
GRANTSELECTON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANYTABLETO flinkuser;
GRANTSELECTANYTABLETO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANTSELECTANY TRANSACTION TO flinkuser;
-- 下面这个执行可能报错 不影响(Oracle 版本不支持 LOGMINING 系统权限(11g/12c 没有该权限))
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANYTO flinkuser;

GRANTCREATE TABLETO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANYTABLETO flinkuser;
GRANTALTERANYTABLETO flinkuser;
GRANTCREATE SEQUENCE TO flinkuser;

GRANTEXECUTEON DBMS_LOGMNR TO flinkuser;
GRANTEXECUTEON DBMS_LOGMNR_D TO flinkuser;

GRANTSELECTON V_$LOG TO flinkuser;
GRANTSELECTON V_$LOG_HISTORY TO flinkuser;
GRANTSELECTON V_$LOGMNR_LOGS TO flinkuser;
GRANTSELECTON V_$LOGMNR_CONTENTS TO flinkuser;
GRANTSELECTON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANTSELECTON V_$LOGFILE TO flinkuser;
GRANTSELECTON V_$ARCHIVED_LOG TO flinkuser;
GRANTSELECTON V_$ARCHIVE_DEST_STATUS TO flinkuser;

注意:需要同步的数据表,必须要由一个字段表示主键,原因后面会讲。

4、同步任务实现

首先模拟一个Oracle数据库的一个表

数据同步脚本主要分为三个部分

4.1 源表连接器配置

CREATE TABLE TEST_FLINK_OBJECT_ORACLE
   (ID STRING NOT NULL,
    BANK_DEPOSIT STRING,
    NUM int,
    NAME STRING,
    PRIMARY KEY (ID) NOT ENFORCED
   ) WITH (
'connector'='oracle-cdc',
'hostname'='172.31.236.39',
'port'='1521',
'username'='flinkuser',
'password'='flinkpw',
'database-name'='cnywdb',
'schema-name'='U_CMS',
'table-name'='TEST_FLINK_OBJECT', --映射的真实数据库中的表名
'scan.startup.mode'='initial',
'scan.incremental.snapshot.enabled'='true',
-- 这个配置很重要,但是官方文档没有重点提,实践中如果没有配置分片主键,数据不会分片,当数据读到80多万条时就OOM了
'scan.incremental.snapshot.chunk.key-column'='ID',
'debezium.database.connection.adapter'='logminer',
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true',
);

4.2 目标表连接器配置

CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
  ID STRING NOT NULL,
      BANK_DEPOSIT STRING,
      NUM int,
      NAME STRING,
      PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='flink',
'password'='Syzxzs@2024',
'table-name'='test_flink_object',
'scan.fetch-size'='500'
);

4.3 数据同步任务

INSERT INTO TEST_FLINK_OBJECT_MYSQL (select * from TEST_FLINK_OBJECT_ORACLE);

4.4 任务提交与执行

接下来启动sql-client,提交上面编写的任务。切换到 /安装目录/bin/,看到下面的表示即表示启动成功。

分别执行上面编写的MySQL连接器和Oracle连接器:

最后执行一条数据插入语句,提交任务:

也可以将上面的脚本放在一个sql文件中,通过命令./sql-client.sh -f oracle-to-mysql-object.sql执行

oracle-to-mysql-object.sql文件内容如下:

CREATE TABLE TEST_FLINK_OBJECT_ORACLE
   (ID STRING NOT NULL,
    BANK_DEPOSIT STRING,
    NUM int,
    NAME STRING,
    PRIMARY KEY (ID) NOT ENFORCED
   ) WITH (
'connector'='oracle-cdc',
'hostname'='172.31.236.39',
'port'='1521',
'username'='flinkuser',
'password'='flinkpw',
'database-name'='cnywdb',
'schema-name'='U_CMS',
'table-name'='TEST_FLINK_OBJECT',
'scan.startup.mode'='initial',
'scan.incremental.snapshot.enabled'='true',
'scan.incremental.snapshot.chunk.key-column'='ID',
'debezium.database.connection.adapter'='logminer',
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true',
);

CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
  ID STRING NOT NULL,
      BANK_DEPOSIT STRING,
      NUM int,
      NAME STRING,
      PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='flink',
'password'='Syzxzs@2024',
'table-name'='test_flink_object',
'scan.fetch-size'='500'
);
-- 适用于两张表字段完全一致;
INSERT INTO TEST_FLINK_OBJECT_MYSQL select*from TEST_FLINK_OBJECT_ORACLE;

-- 如果两张表字段不一致,通过起别名的方式
INSERT INTO TEST_FLINK_OBJECT_MYSQL
SELECT
  s.ID AS id,
  s.NAME AS BANK_DEPOSIT,
  s.NUM AS NUM,
  s.BANK_DEPOSIT AS NAME
FROM TEST_FLINK_OBJECT_ORACLE AS s;

下面是对字段起了别名的同步结果,将BANK_DEPOSIT 和NAME字段互换的结果

5、生产实践经验

5.1 关键配置优化

scan.incremental.snapshot.chunk.key-column = 'ID'

上面也提到了,源表需要有一个主键,正常应该会默认按照主键分片,但是实践中发现必须要指定分片的主键,否则会导致持续读取数据,直到内存溢出。下面是当时没有配置分片的主键而导致内存溢出的报错截图。从日志中可以看出,没有使用主键进行分片,而是使用Oracle默认的隐藏列ROWID进行分片,导致数据量一直再增加,最终到81万的时候,内存溢出。

 

 

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA:开启数据库级别的日志归档

配置了主键分片之后,必须要开启数据库日志归档,官方文档中提到要开启数据库级日志归档或表级日志归档,所以一开始只开启了表的日志归档,因为配置主键分片之后,就报错了,提示要开启数据库级日志归档。

5.2 数据一致性保障

实践发现,本次同步任务涉及到9个表,总数据量接近4千万,当初次同步完之后,以及后续的增量同步时,总会丢几条数据,当找出丢失的几条数据之后,发现与其他的没有什么区别,排查很久的原因都没找到。目前怀疑是Oracle日志的问题,前面提到Oracle11g 没有LOGMINING权限。当然,估计这个问题只在Oracle中出现,因为当时问了其他用过的同事,使用其他的数据库,没有发现这种问题。最终不得己,用大模型以最快的速度编写了定时任务进行兜底处理。

因此建议,当有类似的数据同步任务时,可以优先考虑使用Flink,不过如果发现有这种莫名丢失数据的问题,应该及时准备兜底方案。使用大模型,将数据表的DDL给到大模型,能够很快就编写出定时任务版本的方案,两者搭配,能够达到几乎实时同步的效果,即使存在少量数据丢失的问题,也能通过定时任务及时弥补。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐