摘要

Flink一般常用的集群模式有 flink on yarn 和standalone模式。
yarn模式需要搭建hadoop集群,该模式主要依靠hadoop的yarn资源调度来实现flink的高可用,达到资源的充分利用和合理分配。一般用于生产环境。
standalone模式主要利用flink自带的分布式集群来提交任务,该模式的优点是不借助其他外部组件,缺点是资源不足需要手动处理。
本文主要以 standalone集群模式为例。

提示:flinkcdc获取oracle date日期字段的值存在时差而且是long型
一种方法:改java代码 例如:
preparedStatement.setObject(i, new Timestamp((Long) o - 8 * 60 * 60 * 1000));
另一种方法:flink-conf.yaml添加(未验证)
env.java.opts.taskmanager: -Duser.timezone=GMT+08

1.项目添加flink依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>test-cdc</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <fastjson.vsersion>1.2.68</fastjson.vsersion>
        <druid.version>1.2.8</druid.version>
        <flink.version>1.13.6</flink.version>
        <flinkcdc.vsersion>3.0.0</flinkcdc.vsersion>
        <scala.version>2.12</scala.version>
        <postgresql.version>42.2.12</postgresql.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>${postgresql.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oracle-cdc</artifactId>
            <version>${flinkcdc.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

       <!--  <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
                    <version>${flink.version}</version>
                </dependency>
              <dependency>
                       <groupId>mysql</groupId>
                       <artifactId>mysql-connector-java</artifactId>
                       <version>8.0.15</version>
                </dependency>
                <dependency>
                    <groupId>com.oracle.database.jdbc</groupId>
                    <artifactId>ojdbc10</artifactId>
                    <version>19.10.0.0</version>
                </dependency>
          -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
            <version>2.1.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.22</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.oracle开启日志归档

sqlplus / as sysdba

启用日志归档
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

检查日志归档是否开启
archive log list;

为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

创建用户flinkcdc绑定表空间LOGMINER_TBS
CREATE USER flinkcdc IDENTIFIED BY flinkcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

授予flinkcdc用户dba的权限
 grant connect,resource,dba to flinkcdc;

并授予权限
  GRANT CREATE SESSION TO flinkcdc;
  GRANT SELECT ON V_$DATABASE to flinkcdc;
  GRANT FLASHBACK ANY TABLE TO flinkcdc;
  GRANT SELECT ANY TABLE TO flinkcdc;
  GRANT SELECT_CATALOG_ROLE TO flinkcdc;
  GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
  GRANT SELECT ANY TRANSACTION TO flinkcdc;
  GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
  GRANT CREATE TABLE TO flinkcdc;
  GRANT LOCK ANY TABLE TO flinkcdc;
  GRANT ALTER ANY TABLE TO flinkcdc;
  GRANT CREATE SEQUENCE TO flinkcdc;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;

  GRANT SELECT ON V_$LOG TO flinkcdc;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
  GRANT SELECT ON V_$LOGFILE TO flinkcdc;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;

修改以下表让其支持增量日志

ALTER TABLE test.table1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE test.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE test.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

3.Flink集群搭建

版本类型 版本号
项目版本 flink1.13.6、scala2.12、flinkoraclecdc3.0.0
flink集群版本 flink1.15.3
hostname ip 配置 内存核数 solt
yy1 10.201.1.1 StandaloneSessionClusterEntrypoint、Taskmanager 64核512G 2个
yy2 10.201.1.2 Taskmanager 112核512G 10个
yy3 10.201.1.3 Taskmanager 112核512G 10个

3.1 Flink下载安装并配置
1) 登录linux
2) cd /usr/local/
3) wget https://archive.apache.org/dist/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
4) tar –zxvf flink-1.15.3-bin-scala_2.12.tgz
5) cd flink-1.15.3/conf
6) vi flink-conf.yaml
注意:冒号后面有个空格
jobmanager.rpc.address: yy1

# 这个参数比较重要,这个是总内存
jobmanager.memory.process.size: 10gb

# taskmanager大小
taskmanager.memory.process.size: 3gb

# 打开注释,并修改保存点存储目录
# 配置hdfs目录,一般用于搭建了hadoop集群
#state.savepoint.dir: hdfs://yy1:9000/flink/cdc

#存储目录设为服务器本地
state.checkpoints.dir: file:///data1/flink/checkpoints

state.savepoints.dir: file:///data1/flink/savepoints

#设置检查点保存的数据 默认是一个,增加下面
#state.checkpoints.num-retained: 3

# 修改slot的个数
taskmanager.numberOfTaskSlots: 2

#如果不想用flink默认目录/temp 可以自己配置如下并打开
# io.tmp.dirs: /data1/flink/tmp
# env.pid.dir: /data1/flink/env
# web.tmpdir: /data1/flink/tmp
#上传的jar包目录,这样不用每次都上传
#web.upload.dir: /data1/flink/jar

7)修改masters和workers 文件
masters内容:
yy1:8081

workers内容:
yy1
yy2
yy3

8)复制到其他节点
scp -rq flink-1.15.3 yy2:/usr/local
scp -rq flink-1.15.3 yy3:/usr/local

9)每个节点上建立flink-1.15.3目录的链接(每个节点操作)
ln -s flink-1.15.3 flink

10)配置flink的环境变量(每个节点操作)
vi /etc/profile
#配置如下

export JAVA_HOME=/usr/local/jdk18
export FLINK_HOME=/usr/local/flink
export JRE_HOME=$JAVA_HOME/jre
export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$FLINK_HOME/bin

11)使其修改生效(每个节点操作)
source /etc/profile
12)在master节点上启动flink集群
start-cluster.sh
13)打开flink任务管理界面

http://10.201.1.1:8081

在这里插入图片描述

14)在界面提交任务

在这里插入图片描述
15)效果图
在这里插入图片描述

4. Flink 提交任务的常用命令

4.1 stantalone模式
flink run –m [ip]:[端口] -p[并行数] -c[main方法所在类的全路径] [jar文件的绝对路径]

flink run -m 10.201.1.1:8090 -p 1 -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

stantalone 模式下savepoint,取消任务的同时savepoint

flink cancel -s 282c334dd9dc9ae04c3d6cbe1bfdf8f2

暂停任务的同时savepoint

flink savepoint 282c334dd9dc9ae04c3d6cbe1bfdf8f2

4.2 flink on yarn模式

flink run -t yarn-per-job -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

Flink on yarn 模式下savepoint

flink savepoint 8f1d21525dc3bebf22f9c3a617326142 hdfs:///flink/cdc -yid application_1657250519562_0007

从保存点恢复
$ bin/flink run -s :savepointPath [:runArgs]

flink run  -s hdfs:///flink/cdc/savepoint-a4f769-58ee3095ee02

5.完成

6.问题汇总

1)报错信息:ERROR: Attempting to operate on hdfs namenode as root
ERROR: but there is no HDFS_NAMENODE_USER defined. Aborting operation.
Starting datanodes
ERROR: Attempting to operate on hdfs datanode as root
ERROR: but there is no HDFS_DATANODE_USER defined. Aborting operation.
Starting secondary namenodes [hadoop]
ERROR: Attempting to operate on hdfs secondarynamenode as root
ERROR: but there is no HDFS_SECONDARYNAMENODE_USER defined. Aborting operation.
2018-07-16 05:45:04,628 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Starting resourcemanager
ERROR: Attempting to operate on yarn resourcemanager as root
ERROR: but there is no YARN_RESOURCEMANAGER_USER defined. Aborting operation.
Starting nodemanagers
ERROR: Attempting to operate on yarn nodemanager as root
ERROR: but there is no YARN_NODEMANAGER_USER defined. Aborting operation.

解决:
vi /etc/profile 加入以下信息,然后source /etc/profile
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
export HADOOP_CLASSPATH=hadoop classpath
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

2)报错信息:java.lang.IllegalStateException: Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
解决:
修改flink配置文件:vi flink-conf.yaml
增加:classloader.check-leaked-classloader: false

3)File /tmp/logs/root/logs-tfile/application_1656991740104_0001 does not exist.
File /tmp/logs/root/bucket-logs-tfile/0001/application_1656991740104_0001 does not exist.

Can not find any log file matching the pattern: [ALL] for the application: application_1656991740104_0001
Can not find the logs for the application: application_1656991740104_0001 with the appOwner: root

解决:
yarn-site.xml 增加以下

<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>

4)报错信息:DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA

解决:
ALTER TABLE 表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

5)oracle版本是19c,flink ui界面报错
报错信息:Caused by: java.sql.SQLException: ORA-44609: CONTINOUS_MINE is desupported for use with DBMS_LOGMNR.START_LOGMNR.
ORA-06512: at “SYS.DBMS_LOGMNR”, line 72
解决
注释以下配置
// proper.setProperty(“log.mining.continuous.mine”, “true”);

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐