基于Flink CDC实现Oracle到MySQL数据实时同步实践分享
实践发现,本次同步任务涉及到9个表,总数据量接近4千万,当初次同步完之后,以及后续的增量同步时,总会丢几条数据,当找出丢失的几条数据之后,发现与其他的没有什么区别,排查很久的原因都没找到。使用大模型,将数据表的DDL给到大模型,能够很快就编写出定时任务版本的方案,两者搭配,能够达到几乎实时同步的效果,即使存在少量数据丢失的问题,也能通过定时任务及时弥补。上面也提到了,源表需要有一个主键,正常应该会
背景
在项目开发中,我们经常遇到需要将数据同步到其他数据库的场景,例如不同的业务系统需要数据共享,或者将本平台的数据同步到第三方平台。
传统同步方案的痛点
传统的数据同步方案通常采用定时任务调用接口的方式,但存在以下问题:
-
• 实时性差:通常只能做到分钟级或小时级同步,数据延迟高
-
• 协调成本高:需要各部门系统配合开发数据上报接口,沟通协调工作量大
-
• 开发工作量大:每个对接部门都需要定制化开发接口,维护成本高
-
• 可靠性问题:接口调用失败时数据容易丢失,需要复杂的重试和补偿机制
为什么选择Flink CDC
通过实际项目应用,总结出Flink CDC在数据同步方面的核心优势:
1. 同步模式灵活
-
• 全量同步:支持历史数据一次性迁移,适合初始化场景
-
• 增量同步:实时捕获数据变更,保持数据同步状态
-
• 混合模式:先全量后增量,无缝切换,业务无感知
2. 实时性能卓越
-
• 毫秒级延迟:源表数据的增删改操作,几乎实时同步到目标表
-
• 高吞吐量:单任务可处理万级TPS,满足大部分业务场景
-
• 资源可控:支持背压机制,自动调节同步速度
3. 使用门槛低
-
• 配置驱动:通过SQL配置即可完成同步任务,无需编程
-
• 可视化管理:Web界面监控任务状态,运维友好
需要注意的是,使用Flink CDC的前提条件是确保Flink服务器与源数据库(Oracle)和目标数据库(MySQL)之间具备稳定的网络连通性。下面将详细介绍Flink CDC的部署配置和使用方法。
1. Flink环境搭建
1.1 部署模式选择
1、Flink安装、配置
Flink支持多种部署模式,根据业务场景选择合适的部署方案:
-
• Standalone模式:适合开发测试和小规模生产环境
-
• Yarn模式:适合大数据集群环境,资源管理更灵活
-
• Kubernetes模式:适合容器化部署,弹性伸缩能力强
本次实践采用Standalone独立部署模式,具有部署简单、维护方便的特点。
1.2 安装步骤
下载地址:https://flink.apache.org/downloads/
下载 flink-1.19.3-bin-scala_2.12.tgz
tar -xzf flink-*.tgz
export FLINK_HOME=/path/flink-*
# 启动集群
cd /path/flink-*
./bin/start-cluster.sh
# 关闭集群
./bin/stop-cluster.sh
如果需要在web查看状态,需要修改安装路径下/conf/config.yaml,然后重启集群
rest:
# The address to which the REST client will connect to
address: 172.29.236.202
bind-address: 0.0.0.0
# 默认是8081,由于被占用,改成了8082
port: 8082
如果是单机部署,任务数 > 1的话,需要修改slot数量,slot可以简单理解为Flink中的执行单元,一个slot执行能够一个任务,配置时可以根据服务器核心数来配置
同样还是修改/conf/config.yaml
taskmanager:
# 默认是1
numberOfTaskSlots: 16
memory:
process:
size: 8192m # 默认1728m,建议调大内存,不然数据量过大时容易OOM
web查看状态
2. 依赖驱动准备
使用Flink之前需要准备一些必要的驱动包,放到 /安装目录/lib/。根据自己的任务所需的驱动包有所不同,本次任务需要将Oracle的数据同步到Mysql,需要如下的驱动包,可在Flink CDC文档中查看需要哪些驱动(文末会给出文档链接)。
下图标出了本次任务下载的驱动
3. 数据库环境配置
3.1 数据接收端(MySQL)配置
3.1.1 用户权限配置
出于安全考虑,MySQL默认拒绝远程root用户访问,需要创建专用的同步账户并进行精确授权:
-- 创建专用账号(仅限来自 Flink 主机的访问),假设安装flink的机器ip:172.29.236.202
CREATEUSER'flink'@'172.29.236.202' IDENTIFIED BY'Syzxzs@2024';
-- 赋权(按需最小权限,JDBC upsert 需要 SELECT/INSERT/UPDATE/DELETE),假设mysql模式名为flow
GRANTSELECT, INSERT, UPDATE, DELETEON flow.*TO'flink'@'172.29.236.202';
FLUSH PRIVILEGES;
-- 验证用户创建结果:
SELECT host, user, plugin FROM mysql.user WHEREuserIN ('flink','root');
3.1.2 连接器配置
配置MySQL连接器,用于接收同步数据
CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
ID STRING NOT NULL,
BANK_DEPOSIT STRING,
NUM int,
NAME STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='flink',
'password'='Syzxzs@2024',
'table-name'='test_flink_object',
'scan.fetch-size'='500'
);
3.2 数据发送端(Oracle)配置
3.2.1 系统环境准备
使用root用户创建Oracle操作用户和日志归档目录,确保CDC功能正常运行:
# 如果有,可省略;
sudo adduser oracle
# 找个磁盘空间大的目录,日志很多
mkdir -p '/u03/oracle/oradata/recovery_area'
chown -R oracle:oinstall '/u03/oracle'
chmod 750 '/u03/oracle/oradata/recovery_area'
# 路径中的 SID 需替换为实际实例/库目录;目录必须存在且 oracle 用户可写。
mkdir -p '/opt/oracle/oradata/SID/'
chown -R oracle:oinstall '/opt/oracle/oradata/SID/'
chmod 750 '/opt/oracle/oradata/SID/'
3.2.2 数据库归档配置
使用Oracle管理员账号登录,配置数据库归档模式和补充日志:
alter systemset db_recovery_file_dest_size =10G;
altersystemset db_recovery_file_dest ='/u03/oracle/oradata/recovery_area'scope=spfile;
-- 需要重启数据库
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
--查看数据归档模式,应该输出 "Database log mode: Archive Mode"
archive log list;
-- 查看归档目录
SELECT name, isdefault, valueFROM v$parameterWHERE name LIKE'db_recovery_file_dest%';
-- 开启补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 开启指定表所有列,补充日志
ALTER TABLE 模式.表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 需要提前使用root用户创建目录'/opt/oracle/oradata/SID/'
-- 路径中的 SID 需替换为实际实例/库目录;目录必须存在且 oracle 用户可写。
CREATE TABLESPACE logminer_tbs DATAFILE '/u03/oracle/oradata/coredb/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
3.2.3 补充日志说明
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA
是CDC功能的核心配置,其重要作用如下:
启用后,Oracle会在Redo Log中记录:
✅ 变更前的数据值(Before Image)
✅ 变更后的数据值(After Image)
✅ 主键信息
✅ 唯一索引信息,当数据量很大的时候,需要自定义分片的主键名,必须要开启
开启前后对比
-- 没有Supplemental Logging时:
-- Redo Log只记录:UPDATE table SET col1='new_value' WHERE rowid='xxx'
-- 有Supplemental Logging后:
-- Redo Log记录:
-- Before: col1='old_value', col2='value2', primary_key='123'
-- After: col1='new_value', col2='value2', primary_key='123'
-- 检查是否已启用
SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE;
-- 返回 YES 表示已启用
-- 查看详细的补充日志状态
SELECT*FROM V$DATABASE;
3.2.4 CDC用户创建
创建专用的CDC同步用户,并授予必要的权限:
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANTCREATE SESSION TO flinkuser;
GRANTSET CONTAINER TO flinkuser;
GRANTSELECTON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANYTABLETO flinkuser;
GRANTSELECTANYTABLETO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANTSELECTANY TRANSACTION TO flinkuser;
-- 下面这个执行可能报错 不影响(Oracle 版本不支持 LOGMINING 系统权限(11g/12c 没有该权限))
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANYTO flinkuser;
GRANTCREATE TABLETO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANYTABLETO flinkuser;
GRANTALTERANYTABLETO flinkuser;
GRANTCREATE SEQUENCE TO flinkuser;
GRANTEXECUTEON DBMS_LOGMNR TO flinkuser;
GRANTEXECUTEON DBMS_LOGMNR_D TO flinkuser;
GRANTSELECTON V_$LOG TO flinkuser;
GRANTSELECTON V_$LOG_HISTORY TO flinkuser;
GRANTSELECTON V_$LOGMNR_LOGS TO flinkuser;
GRANTSELECTON V_$LOGMNR_CONTENTS TO flinkuser;
GRANTSELECTON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANTSELECTON V_$LOGFILE TO flinkuser;
GRANTSELECTON V_$ARCHIVED_LOG TO flinkuser;
GRANTSELECTON V_$ARCHIVE_DEST_STATUS TO flinkuser;
注意:需要同步的数据表,必须要由一个字段表示主键,原因后面会讲。
4、同步任务实现
首先模拟一个Oracle数据库的一个表
数据同步脚本主要分为三个部分
4.1 源表连接器配置
CREATE TABLE TEST_FLINK_OBJECT_ORACLE
(ID STRING NOT NULL,
BANK_DEPOSIT STRING,
NUM int,
NAME STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='oracle-cdc',
'hostname'='172.31.236.39',
'port'='1521',
'username'='flinkuser',
'password'='flinkpw',
'database-name'='cnywdb',
'schema-name'='U_CMS',
'table-name'='TEST_FLINK_OBJECT', --映射的真实数据库中的表名
'scan.startup.mode'='initial',
'scan.incremental.snapshot.enabled'='true',
-- 这个配置很重要,但是官方文档没有重点提,实践中如果没有配置分片主键,数据不会分片,当数据读到80多万条时就OOM了
'scan.incremental.snapshot.chunk.key-column'='ID',
'debezium.database.connection.adapter'='logminer',
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true',
);
4.2 目标表连接器配置
CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
ID STRING NOT NULL,
BANK_DEPOSIT STRING,
NUM int,
NAME STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='flink',
'password'='Syzxzs@2024',
'table-name'='test_flink_object',
'scan.fetch-size'='500'
);
4.3 数据同步任务
INSERT INTO TEST_FLINK_OBJECT_MYSQL (select * from TEST_FLINK_OBJECT_ORACLE);
4.4 任务提交与执行
接下来启动sql-client,提交上面编写的任务。切换到 /安装目录/bin/,看到下面的表示即表示启动成功。
分别执行上面编写的MySQL连接器和Oracle连接器:
最后执行一条数据插入语句,提交任务:
也可以将上面的脚本放在一个sql文件中,通过命令./sql-client.sh -f oracle-to-mysql-object.sql
执行
oracle-to-mysql-object.sql文件内容如下:
CREATE TABLE TEST_FLINK_OBJECT_ORACLE
(ID STRING NOT NULL,
BANK_DEPOSIT STRING,
NUM int,
NAME STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='oracle-cdc',
'hostname'='172.31.236.39',
'port'='1521',
'username'='flinkuser',
'password'='flinkpw',
'database-name'='cnywdb',
'schema-name'='U_CMS',
'table-name'='TEST_FLINK_OBJECT',
'scan.startup.mode'='initial',
'scan.incremental.snapshot.enabled'='true',
'scan.incremental.snapshot.chunk.key-column'='ID',
'debezium.database.connection.adapter'='logminer',
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true',
);
CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
ID STRING NOT NULL,
BANK_DEPOSIT STRING,
NUM int,
NAME STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='flink',
'password'='Syzxzs@2024',
'table-name'='test_flink_object',
'scan.fetch-size'='500'
);
-- 适用于两张表字段完全一致;
INSERT INTO TEST_FLINK_OBJECT_MYSQL select*from TEST_FLINK_OBJECT_ORACLE;
-- 如果两张表字段不一致,通过起别名的方式
INSERT INTO TEST_FLINK_OBJECT_MYSQL
SELECT
s.ID AS id,
s.NAME AS BANK_DEPOSIT,
s.NUM AS NUM,
s.BANK_DEPOSIT AS NAME
FROM TEST_FLINK_OBJECT_ORACLE AS s;
下面是对字段起了别名的同步结果,将BANK_DEPOSIT
和NAME
字段互换的结果
5、生产实践经验
5.1 关键配置优化
scan.incremental.snapshot.chunk.key-column = 'ID'
上面也提到了,源表需要有一个主键,正常应该会默认按照主键分片,但是实践中发现必须要指定分片的主键,否则会导致持续读取数据,直到内存溢出。下面是当时没有配置分片的主键而导致内存溢出的报错截图。从日志中可以看出,没有使用主键进行分片,而是使用Oracle默认的隐藏列ROWID
进行分片,导致数据量一直再增加,最终到81万的时候,内存溢出。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA
:开启数据库级别的日志归档
配置了主键分片之后,必须要开启数据库日志归档,官方文档中提到要开启数据库级日志归档或表级日志归档,所以一开始只开启了表的日志归档,因为配置主键分片之后,就报错了,提示要开启数据库级日志归档。
5.2 数据一致性保障
实践发现,本次同步任务涉及到9个表,总数据量接近4千万,当初次同步完之后,以及后续的增量同步时,总会丢几条数据,当找出丢失的几条数据之后,发现与其他的没有什么区别,排查很久的原因都没找到。目前怀疑是Oracle日志的问题,前面提到Oracle11g 没有LOGMINING权限。当然,估计这个问题只在Oracle中出现,因为当时问了其他用过的同事,使用其他的数据库,没有发现这种问题。最终不得己,用大模型以最快的速度编写了定时任务进行兜底处理。
因此建议,当有类似的数据同步任务时,可以优先考虑使用Flink,不过如果发现有这种莫名丢失数据的问题,应该及时准备兜底方案。使用大模型,将数据表的DDL给到大模型,能够很快就编写出定时任务版本的方案,两者搭配,能够达到几乎实时同步的效果,即使存在少量数据丢失的问题,也能通过定时任务及时弥补。
更多推荐
所有评论(0)