CENTOS上的网络安全工具(三十六)Portainer Kafka-Clickhouse部署(5) docker + kafka + flink单机离线测试开发环境
介绍了离线场景下安装docker portainer flink kafka openjdk maven等,以配置flink程序的离线开发测试环境的过程,以及简单的Flink Kafka框架。
前面我们已经成功进行了Flink的部署,这一篇我们准备在这个基础上搞一个单机版的Flink开发环境,以便进行从kafka输入到flink处理再输出到kafka这一过程的编程实现。
一、Docker离线环境手工安装
1. 设置系统离线安装位置
之所以会在安装docker离线环境的时候又提到CentOS系统镜像的离线配置,是因为在卸载podman和buildah的过程当中,yum会默认将其依赖项卸载掉,其中就包括container-selinux和libcgroup这两个docker安装所需的包。虽然可以使用yum remove -noautoremove避免删除依赖项,或者rpm -e手工逐项强制卸载,但要么会遗留依赖包在系统中,要么存在打断依赖链条威胁。比较稳妥的方式,还是删完podman和buildah后再将其装回来。
当然,也可以选在直接去pkgs.org上下载rpm包来解决:

参考CENTOS上的网络安全工具(一)Suricata 离线部署 ,将安装光盘设置为系统的本地镜像源安装光盘。安装光盘中有BaseOS和AppStream两个安装包的内容:

(1)构建/etc/yum.repos.d/myBaseOS.repo
[baseos]
name=CentOS Stream $releasever - BaseOS
#mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo=BaseOS&infra=$infra
baseurl=file:///run/media/root/CentOS-Stream-8-x86_64-dvd/BaseOS
enabled=1
(2)构建/etc/yum.repos.d/myAppStream.repo
[appstream]
name=CentOS Stream $releasever - AppStream
#mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo=AppStream&infra=$infra
baseurl=file:///run/media/root/CentOS-Stream-8-x86_64-dvd/AppStream
enabled=1
(3)重构yum cache
[root@localhost yum.repos.d]# yum clean all
13 个文件已删除
[root@localhost yum.repos.d]# yum makecache
CentOS Stream 8 - AppStream 306 MB/s | 7.7 MB 00:00
CentOS Stream 8 - BaseOS 195 MB/s | 2.6 MB 00:00
元数据缓存已建立。
2. 卸载podman和buildah
(1)卸载
参考CENTOS上的网络安全工具(三十三) 的第一章,但之前如果是centos stream 8以上的系统,需要检查一下是不是podman和buildah已经安装,如果是记得卸掉:
[root@localhost ~]# yum rm podman buildah -y
依赖关系解决。
===========================================================================================================================================================================================================
软件包 架构 版本 仓库 大小
===========================================================================================================================================================================================================
移除:
buildah x86_64 2:1.33.6-2.module_el8+976+11360731 @appstream 31 M
podman x86_64 3:4.9.4-0.1.module_el8+971+3d3df00d @appstream 52 M
移除依赖的软件包:
cockpit-podman noarch 84.1-1.module_el8+977+8ab54275 @appstream 682 k
清除未被使用的依赖关系:
conmon x86_64 3:2.1.10-1.module_el8+804+f131391c @appstream 172 k
container-selinux noarch 2:2.229.0-2.module_el8+847+7863d4e6 @appstream 67 k
containers-common x86_64 2:1-81.module_el8+968+fbb249c7 @appstream 580 k
criu x86_64 3.18-4.module_el8+804+f131391c @appstream 1.5 M
fuse-overlayfs x86_64 1.13-1.module_el8+804+f131391c @appstream 128 k
fuse3 x86_64 3.3.0-19.el8 @baseos 100 k
fuse3-libs x86_64 3.3.0-19.el8 @baseos 274 k
libnet x86_64 1.1.6-15.el8 @AppStream 170 k
libslirp x86_64 4.4.0-1.module_el8+804+f131391c @appstream 134 k
podman-catatonit x86_64 3:4.9.4-0.1.module_el8+971+3d3df00d @appstream 794 k
podman-gvproxy x86_64 3:4.9.4-0.1.module_el8+971+3d3df00d @appstream 13 M
podman-plugins x86_64 3:4.9.4-0.1.module_el8+971+3d3df00d @appstream 3.6 M
runc x86_64 1:1.1.12-1.module_el8+885+7da147f3 @appstream 9.9 M
shadow-utils-subid x86_64 2:4.6-22.el8 @baseos 205 k
slirp4netns x86_64 1.2.3-1.module_el8+951+32019cde @appstream 107 k
事务概要
===========================================================================================================================================================================================================
移除 18 软件包
将会释放空间:113 M
运行事务检查
事务检查成功。
运行事务测试
事务测试成功。
运行事务
准备中 : 1/1
运行脚本: buildah-2:1.33.6-2.module_el8+976+11360731.x86_64 1/1
删除 : buildah-2:1.33.6-2.module_el8+976+11360731.x86_64 1/18
删除 : cockpit-podman-84.1-1.module_el8+977+8ab54275.noarch 2/18
运行脚本: podman-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64 3/18
删除 : podman-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64 3/18
运行脚本: podman-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64 3/18
删除 : containers-common-2:1-81.module_el8+968+fbb249c7.x86_64 4/18
删除 : fuse-overlayfs-1.13-1.module_el8+804+f131391c.x86_64 5/18
删除 : fuse3-3.3.0-19.el8.x86_64 6/18
删除 : slirp4netns-1.2.3-1.module_el8+951+32019cde.x86_64 7/18
删除 : runc-1:1.1.12-1.module_el8+885+7da147f3.x86_64 8/18
删除 : criu-3.18-4.module_el8+804+f131391c.x86_64 9/18
删除 : podman-plugins-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64 10/18
删除 : container-selinux-2:2.229.0-2.module_el8+847+7863d4e6.noarch 11/18
运行脚本: container-selinux-2:2.229.0-2.module_el8+847+7863d4e6.noarch
(2)装回container-selinux
[root@localhost ~]# yum install container-selinux.noarch
上次元数据过期检查:0:34:37 前,执行于 2026年01月09日 星期五 22时20分11秒。
依赖关系解决。
==================================================================================================================================================================================
软件包 架构 版本 仓库 大小
==================================================================================================================================================================================
安装:
container-selinux noarch 2:2.180.0-1.module_el8.7.0+1106+45480ee0 appstream 59 k
事务概要
==================================================================================================================================================================================
安装 1 软件包
总计:59 k
安装大小:54 k
确定吗?[y/N]: y
下载软件包:
运行事务检查
事务检查成功。
运行事务测试
事务测试成功。
运行事务
准备中 : 1/1
运行脚本: container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch 1/1
安装 : container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch 1/1
运行脚本: container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch 1/1
验证 : container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch 1/1
已安装:
container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch
完毕!
(3)装回libcgroup
[root@localhost dockerrpm]# yum install libcgroup
上次元数据过期检查:0:37:23 前,执行于 2026年01月09日 星期五 22时20分11秒。
依赖关系解决。
==================================================================================================================================================================================
软件包 架构 版本 仓库 大小
==================================================================================================================================================================================
安装:
libcgroup x86_64 0.41-19.el8 baseos 70 k
事务概要
==================================================================================================================================================================================
安装 1 软件包
总计:70 k
安装大小:136 k
确定吗?[y/N]: y
下载软件包:
运行事务检查
事务检查成功。
运行事务测试
事务测试成功。
运行事务
准备中 : 1/1
运行脚本: libcgroup-0.41-19.el8.x86_64 1/1
[sss_cache] [sysdb_domain_cache_connect] (0x0010): DB version too old [0.23], expected [0.24] for domain implicit_files!
Higher version of database is expected!
In order to upgrade the database, you must run SSSD.
Removing cache files in /var/lib/sss/db should fix the issue, but note that removing cache files will also remove all of your cached credentials.
Could not open available domains
[sss_cache] [sysdb_domain_cache_connect] (0x0010): DB version too old [0.23], expected [0.24] for domain implicit_files!
Higher version of database is expected!
In order to upgrade the database, you must run SSSD.
Removing cache files in /var/lib/sss/db should fix the issue, but note that removing cache files will also remove all of your cached credentials.
Could not open available domains
安装 : libcgroup-0.41-19.el8.x86_64 1/1
运行脚本: libcgroup-0.41-19.el8.x86_64 1/1
验证 : libcgroup-0.41-19.el8.x86_64 1/1
已安装:
libcgroup-0.41-19.el8.x86_64
完毕!
3. 安装Docker
(1)下载安装包
Docker的手动安装有安装包安装和二进制安装2种方式,这里仅使用安装包方式安装,避开构建服务配置文件和设置用户组权限等容易出错的缓解。
在Docker官方网站上下载rpm包:

安装包的下载需要对准操作系统的版本,这个可以通过查看os-release确定:
[root@localhost ~]# cat /etc/os-release
NAME="CentOS Stream"
VERSION="8"
ID="centos"
ID_LIKE="rhel fedora"
VERSION_ID="8"
PLATFORM_ID="platform:el8"
PRETTY_NAME="CentOS Stream 8"
ANSI_COLOR="0;31"
CPE_NAME="cpe:/o:centos:centos:8"
HOME_URL="https://centos.org/"
BUG_REPORT_URL="https://bugzilla.redhat.com/"
REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux 8"
REDHAT_SUPPORT_PRODUCT_VERSION="CentOS Stream"
对应下载地址:
https://download.docker.com/linux/centos/8/x86_64/stable/Packages/
下载时注意版本,在下面的测试中,至少显示docker-ce-cli-26.1.3-1的安装是需要containerd.io的版本大于1.6.24的。

亲测在centos stream 8下能够安装成功的版本关系如下:
[root@localhost dockerrpm]# ll
总用量 100660
-rw-r--r--. 1 root root 35543060 1月 9 23:02 containerd.io-1.6.24-3.1.el8.x86_64.rpm
-rw-r--r--. 1 root root 14279736 1月 9 22:44 docker-buildx-plugin-0.14.0-1.el8.x86_64.rpm
-rw-r--r--. 1 root root 28542392 1月 9 22:44 docker-ce-26.1.3-1.el8.x86_64.rpm
-rw-r--r--. 1 root root 8181560 1月 9 22:44 docker-ce-cli-26.1.3-1.el8.x86_64.rpm
-rw-r--r--. 1 root root 5210852 1月 9 22:44 docker-ce-rootless-extras-26.1.3-1.el8.x86_64.rpm
-rw-r--r--. 1 root root 7370924 1月 9 22:44 docker-compose-plugin-2.6.0-3.el8.x86_64.rpm
-rw-r--r--. 1 root root 3930488 1月 9 22:44 docker-scan-plugin-0.9.0-3.el8.x86_64.rpm
(2)逐步安装
对照官网上需要安装的模组进行安装。
docker-ce的依赖项包含docker-ce-cli和libcgroup,docker-ce-cli的依赖项包含containerd.io,containerd.io又依赖container-selinux,所以倒过来逐个安装就行:
container.io
[root@localhost dockerrpm]# rpm -ivhU containerd.io-1.6.24-3.1.el8.x86_64.rpm
警告:containerd.io-1.6.24-3.1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying... ################################# [100%]
准备中... ################################# [100%]
正在升级/安装...
1:containerd.io-1.6.24-3.1.el8 ################################# [ 50%]
docker-ce-cli
[root@localhost dockerrpm]# rpm -ivh docker-ce-cli-26.1.3-1.el8.x86_64.rpm
警告:docker-ce-cli-26.1.3-1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying... ################################# [100%]
准备中... ################################# [100%]
正在升级/安装...
1:docker-ce-cli-1:26.1.3-1.el8 ################################# [100%]
docker-ce
[root@localhost dockerrpm]# rpm -ivh docker-ce-26.1.3-1.el8.x86_64.rpm
警告:docker-ce-26.1.3-1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying... ################################# [100%]
准备中... ################################# [100%]
正在升级/安装...
1:docker-ce-3:26.1.3-1.el8 ################################# [100%]
docker-compose-plugin
[root@localhost dockerrpm]# rpm -ivh docker-compose-plugin-2.6.0-3.el8.x86_64.rpm
警告:docker-compose-plugin-2.6.0-3.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying... ################################# [100%]
准备中... ################################# [100%]
正在升级/安装...
1:docker-compose-plugin-0:2.6.0-3.e################################# [100%]
docker-buildx-plugin
[root@localhost dockerrpm]# rpm -ivh docker-buildx-plugin-0.14.0-1.el8.x86_64.rpm
警告:docker-buildx-plugin-0.14.0-1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying... ################################# [100%]
准备中... ################################# [100%]
正在升级/安装...
1:docker-buildx-plugin-0:0.14.0-1.e################################# [100%]
(3)启动服务
[root@localhost dockerrpm]# systemctl enable --now docker
Created symlink /etc/systemd/system/multi-user.target.wants/docker.service → /usr/lib/systemd/system/docker.service.
[root@localhost ~]# systemctl start docker
[root@localhost ~]# systemctl status docker
● docker.service - Docker Application Container Engine
Loaded: loaded (/usr/lib/systemd/system/docker.service; enabled; vendor preset: disabled)
Active: active (running) since Fri 2026-01-09 23:12:56 EST; 2min 22s ago
Docs: https://docs.docker.com
Main PID: 1321 (dockerd)
Tasks: 13
Memory: 111.2M
CGroup: /system.slice/docker.service
└─1321 /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock
1月 09 23:12:54 localhost.localdomain systemd[1]: Starting Docker Application Container Engine...
二、Kafka Portainer部署
1. Portainer安装
参考CENTOS上的网络安全工具(三十三) 的第二章
当前(2026年1月)以下镜像有效:
[root@localhost ~]# docker pull docker.1ms.run/portainer/portainer-ce:alpine
alpine: Pulling from portainer/portainer-ce
014e56e61396: Already exists
97d46cc86f33: Already exists
1b6da1229ec5: Already exists
37dcf0e5163d: Already exists
5f39d7b36694: Already exists
4dc5fe4c57e2: Already exists
4f4fb700ef54: Already exists
Digest: sha256:a04e0ac3e99172e451055419e2ed46c67f24bff72209ab09235079d7642e87d8
Status: Downloaded newer image for docker.1ms.run/portainer/portainer-ce:alpine
docker.1ms.run/portainer/portainer-ce:alpine
tag一下简化名字
[root@localhost ~]# docker tag docker.1ms.run/portainer/portainer-ce:alpine portainer/portainer-ce
[root@localhost ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
docker.1ms.run/portainer/portainer-ce alpine 1fc1bbda74d9 2 weeks ago 192MB
portainer/portainer-ce latest 1fc1bbda74d9 2 weeks ago 192MB
然后可以借用(三十三)篇中的方法启动portainer了:
[root@localhost ~]# docker run -d -p 8000:8000 -p 9443:9443 --name portainer --restart=always -v /var/run/docker.sock:/var/run/docker.sock -v portainer_data:/data portainer/portainer-ce
4e2496b162f045557ddd9440589338e3ca205071d626cb6b8e6032f340de7d50
[root@localhost ~]#
[root@localhost ~]# docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4e2496b162f0 portainer/portainer-ce "/portainer" 10 seconds ago Up 10 seconds 0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp portainer
[root@localhost ~]# ls /var/lib/docker/volumes/
backingFsBlockDev metadata.db portainer_data
启动Portainer,倒是可以不用事先新建volume。但是swarm还是需要init一下,否则后面无法启动stack:
[root@localhost ~]# docker swarm init
Swarm initialized: current node (eec1ag50r9k7odxxntvb4sqvv) is now a manager.
…… …… ……
2. 单机Kafka Portainer部署
(1)下载apache Kafka
之前我们用的是bitnami/kafka,不过仅仅一两个月功夫,居然就下不到了,看来还是apache官方吧靠谱点,所以这次我们换用apache/kafka镜像一试。
当前(2026年1月)以下镜像有效:
[root@localhost ~]# docker pull docker.1ms.run/apache/kafka:4.1.1
4.1.1: Pulling from apache/kafka
2d35ebdb57d9: Pull complete
34074eb54496: Pull complete
1efb71aa46e6: Pull complete
c12c21783ef0: Pull complete
e0d459441fab: Pull complete
0c148fd1a481: Pull complete
4be6c1affd0e: Pull complete
ebab522082c7: Pull complete
f8b484aae80d: Pull complete
b69a77a99b62: Pull complete
e4b07b2631b1: Pull complete
Digest: sha256:0bc1bb2478f45b6cea78864df86acdc11e8df2c5172477819a4d12942cbe5d40
Status: Downloaded newer image for docker.1ms.run/apache/kafka:4.1.1
docker.1ms.run/apache/kafka:4.1.1
(2)启动Kafka 容器
kafka的部署仍然可以参考 CENTOS上的网络安全工具(三十三) 的第四章,虽然apache/kafka与binami/kafka镜像不同,但是yml文件基本是相似的,只是配置变量的名称稍有不同罢了:
version: '3.8'
services:
kafka:
image: apache/kafka:latest
networks:
- developnet
deploy:
replicas: 1
restart_policy:
condition: on-failure
max_attempts: 3
ports:
- "9092:9092"
- "19092:19092"
environment:
- KAFKA_PROCESS_ROLES=broker,controller
- KAFKA_NODE_ID=1
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:19092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://192.168.76.128:19092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_NUM_PARTITIONS=3
- KAFKA_LOG_DIRS=/var/lib/kafka/data
volumes:
- kafka-data:/var/lib/kafka/data
networks:
developnet:
driver: overlay
attachable: true
volumes:
kafka-data:
成功启动后,进入容器并创建好topic,后面我们会测试kafka的输入和输出,所以创建testin和testout两个topic:
[root@localhost ~]# docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
57313536ff91 portainer/portainer-ce "/portainer" 4 seconds ago Up 4 seconds 0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp portainer
[root@localhost ~]# docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f409097532ce apache/kafka:latest "/__cacert_entrypoin…" About a minute ago Up About a minute 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:19092->19092/tcp, :::19092->19092/tcp kafka-kafka-1
[root@localhost ~]# docker exec -it f409 bash
f409097532ce:/$ cd /opt/kafka/
f409097532ce:/opt/kafka$ bin/kafka-topics.sh --create --topic testin --bootstrap-server kafka:9092
Created topic testin.
f409097532ce:/opt/kafka$ bin/kafka-topics.sh --create --topic testout --bootstrap-server kafka:9092
Created topic testout.
f409097532ce:/opt/kafka$
使用kafka-consle-producer.sh和kafka-console-consumer.sh确定这些topic都能够正常生产和消费就可以进行下一步了。
三、单节点Flink的本地部署
参考上一篇CENTOS上的网络安全工具(三十五)Portainer Kafka-Clickhouse部署(4) Flink安装部署与样例测试的第一、二部分即可,安装好Flink 2.2.0,java 11和mvn即可。
1. OpenJDK 11手动安装
这里我们尝试一下直接从压缩包手动安装JDK的方法,而不是利用yum方式安装。从ORACLE官网下载JDK需要注册登录,从JDK下载倒是可以的。

解压
[root@localhost ~]# tar -xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /opt
设置HOME与PATH等环境变量
export JAVA_HOME=/opt/jdk-11.0.2
export PATH=$JAVA_HOME/bin:$PATH
这样就可以了
[root@localhost ~]# java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
2. maven手动安装
(1)maven环境安装
参考CENTOS上的网络安全工具(三十五)Portainer Kafka-Clickhouse部署(4) Flink安装部署与样例测试中方式安装即可。
[root@localhost flink-2.2.0]# mvn -version
Apache Maven 3.9.12 (848fbb4bf2d427b72bdb2471c22fced7ebd9a7a1)
Maven home: /opt/apache-maven-3.9.12
Java version: 11.0.20.1, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.1.1-2.el8.x86_64
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "4.18.0-553.6.1.el8.x86_64", arch: "amd64", family: "unix"
[root@localhost flink-2.2.0]#
(2)依赖项拷贝
在离线环境中,jar包无法由mvn自行下载,所以相关的依赖包最好是在在线环境中用具有同样pom.xml文件的编译通过的项目来完成依赖包下载,然后将~/.m2中(默认是,否则看maven目录下conf中的settings.xml中的配置)的repository打包拷贝到离线环境中的对应目录下。
[root@localhost ~]# tar xf repo_bak.tar .m2/
[root@localhost ~]# ll .m2/repository/
总用量 17076
drwxr-xr-x 3 root root 19 12月 22 20:23 antlr
drwxr-xr-x 3 root root 25 12月 29 19:18 aopalliance
-rw-r--r-- 1 root root 17469929 12月 31 01:01 archetype-catalog-central.xml
-rw-r--r-- 1 root root 40 1月 3 19:39 archetype-catalog-central.xml.sha1
drwxr-xr-x 8 root root 106 12月 29 20:39 asm
drwxr-xr-x 3 root root 30 12月 23 04:09 avalon-framework
drwxr-xr-x 3 root root 38 12月 22 20:35 backport-util-concurrent
drwxr-xr-x 3 root root 25 12月 22 20:34 classworlds
drwxr-xr-x 12 root root 162 12月 30 02:29 com
drwxr-xr-x 3 root root 31 12月 23 04:09 commons-beanutils
drwxr-xr-x 3 root root 27 12月 23 04:09 commons-chain
drwxr-xr-x 3 root root 25 12月 22 20:34 commons-cli
drwxr-xr-x 3 root root 27 12月 22 20:38 commons-codec
drwxr-xr-x 3 root root 33 12月 22 20:22 commons-collections
drwxr-xr-x 3 root root 30 12月 23 04:09 commons-digester
drwxr-xr-x 3 root root 24 12月 22 20:22 commons-io
drwxr-xr-x 3 root root 26 12月 22 20:22 commons-lang
drwxr-xr-x 4 root root 56 12月 23 04:08 commons-logging
drwxr-xr-x 3 root root 19 12月 23 04:09 dom4j
drwxr-xr-x 6 root root 63 12月 30 02:41 io
drwxr-xr-x 3 root root 22 12月 30 02:41 jakarta
drwxr-xr-x 5 root root 56 12月 29 19:18 javax
drwxr-xr-x 3 root root 18 12月 29 20:39 jdom
drwxr-xr-x 3 root root 19 12月 22 20:34 junit
drwxr-xr-x 3 root root 20 12月 29 19:19 lib
drwxr-xr-x 3 root root 19 12月 22 20:35 log4j
drwxr-xr-x 3 root root 20 12月 23 04:09 logkit
drwxr-xr-x 5 root root 52 1月 4 01:44 net
drwxr-xr-x 32 root root 4096 12月 30 03:15 org
drwxr-xr-x 3 root root 17 12月 23 04:09 oro
-rw-r--r-- 1 root root 196 1月 3 19:39 resolver-status.properties
drwxr-xr-x 3 root root 22 12月 31 03:47 tools
drwxr-xr-x 3 root root 20 12月 29 20:58 trans
drwxr-xr-x 3 root root 22 12月 23 04:09 xml-apis
drwxr-xr-x 3 root root 21 12月 30 00:22 xmlpull
[root@localhost ~]#
尝试编译一下项目,能够通过即可:
[root@localhost metaCongress]# mvn compile
[INFO] Scanning for projects...
[INFO]
[INFO] -----------------------< com.TLLIB:metaCongress >-----------------------
[INFO] Building Flink Quickstart Job 1.0
[INFO] from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- resources:3.3.1:resources (default-resources) @ metaCongress ---
[INFO] Copying 1 resource from src/main/resources to target/classes
[INFO]
[INFO] --- compiler:3.1:compile (default-compile) @ metaCongress ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 6 source files to /root/share/metaCongress/target/classes
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.742 s
[INFO] Finished at: 2026-01-10T06:40:58-05:00
[INFO] ------------------------------------------------------------------------
[root@localhost metaCongress]#
3. Flink服务安装启动
同样是参考上一篇文章,下载压缩包解压安装,此处不赘述。
四、Kafka+Flink流式处理框架
完成以上两个步骤,基本就可以开始进行编程开发了。同样把上一章我们创建的helloFlink拿来往下改装,不过在正式开发之前,我们首先需要测试flink的Kafka的输入输出透传,以初步掌握其框架。
1. 导入Kafka连接器依赖
首先将pom.xml中的flink-connector-kafka连接器依赖的注释去掉,将依赖导入进来(不过这个应该是在在线环境中做,下载后在打包拷贝进来),以支持使用KafkaSource和KafkaSink。
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.0-1.17</version>
</dependency>
-->
#注意版本
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>4.0.0-2.0</version>
</dependency>
注意修改自动生成的pom.xml中这段注释中的版本为4.0.0-2.0,否则会出现编译出错:
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.417 s
[INFO] Finished at: 2026-01-10T07:52:46-05:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project helloFlink: Compilation failure: Compilation failure:
[ERROR] /root/share/helloFlink/src/main/java/com/pighome/DataStreamJob.java:[51,43] 无法访问org.apache.flink.api.connector.sink2.StatefulSink
[ERROR] 找不到org.apache.flink.api.connector.sink2.StatefulSink的类文件
[ERROR] /root/share/helloFlink/src/main/java/com/pighome/DataStreamJob.java:[61,14] 无法访问org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink
[ERROR] 找不到org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink的类文件
进一步在pom.xml文件中再加入flink-connector-base连接器,以支持使用DeliveryGuarantee
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
2. Import 相关连接器的包
对于使用Java语言开发的项目来说,查询依赖包可以从Flink主页的下图链接进入:

对于1.14版本以后的KafkaSource与KafkaSink框架,至少需要以下的依赖包:
21 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; //flink执行环境
22
23 import org.apache.flink.connector.kafka.source.KafkaSource; //KafkaSource 输入使用
24 import org.apache.flink.connector.kafka.sink.KafkaSink; //KafkaSink 输出使用
25
26 import org.apache.flink.streaming.api.datastream.DataStream; //DataStream 中间处理使用
27
28 import org.apache.flink.api.common.serialization.SimpleStringSchema; //String变量的序列化器
29 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; //Kafka输出的序列化器
30
31 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; //输入策略 earlist相当于frombeginning,一般用latest
32 import org.apache.flink.api.common.eventtime.WatermarkStrategy; //时间水印处理策略 这里暂不使用策略,所以noWatermarks
33 import org.apache.flink.connector.base.DeliveryGuarantee; //输出策略 至少一次输出 AT_LEAST_ONCE
3. 最简Flink Kafka 输入输出框架
19 package com.pighome;
20
21 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; //flink执行环境
22
23 import org.apache.flink.connector.kafka.source.KafkaSource; //KafkaSource 输入使用
24 import org.apache.flink.connector.kafka.sink.KafkaSink; //KafkaSink 输出使用
25
26 import org.apache.flink.streaming.api.datastream.DataStream; //DataStream 中间处理使用
27
28 import org.apache.flink.api.common.serialization.SimpleStringSchema; //String变量的序列化器
29 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; //Kafka输出的序列化器
30
31 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; //输入策略 earlist相当于frombeginning,一般用latest
32 import org.apache.flink.api.common.eventtime.WatermarkStrategy; //时间水印处理策略 这里暂不使用策略,所以noWatermarks
33 import org.apache.flink.connector.base.DeliveryGuarantee; //输出策略 至少一次输出 AT_LEAST_ONCE
34
35 public class DataStreamJob {
36 private static final String brokers = "192.168.76.128:19092";
37 public static void main(String[] args) throws Exception {
38 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
39
40 KafkaSource<String> source = KafkaSource.<String>builder()
41 .setBootstrapServers(brokers)
42 .setTopics("testin")
43 .setGroupId("pighome")
44 .setStartingOffsets(OffsetsInitializer.earliest())
45 .setValueOnlyDeserializer(new SimpleStringSchema())
46 .build();
47
48 DataStream<String> lines = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
49 lines.print();
50
51 KafkaSink<String> sink = KafkaSink.<String>builder()
52 .setBootstrapServers(brokers)
53 .setRecordSerializer(KafkaRecordSerializationSchema.builder()
54 .setTopic("testout")
55 .setValueSerializationSchema(new SimpleStringSchema())
56 .build()
57 )
58 .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
59 .build();
60
61 lines.sinkTo(sink);
62
63 // Execute program, beginning computation.
64 env.execute("Flink Kafka框架程序");
65 }
66 }
4. 测试Kafka输入输出
使用最简单的String类型作为Kafka的输入输出,这样我们暂时可以不考虑序列化反序列化的问题,直接使用org.apache.flink.api.common.serialization.SimpleStringSchema字符串类型序列化器测试Kafka输入输出的基本功能。
(1)Kafka输入端
进入Kafka容器,启动生产者,接入testin主题,为flink程序的输入端提供数据:
[root@localhost ~]# docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5e05b01ff9c2 apache/kafka:4.1.1-rc2 "/__cacert_entrypoin…" 4 hours ago Up 4 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:19092->19092/tcp, :::19092->19092/tcp kafka-kafka-1
db817f5a31a9 portainer/portainer-ce "/portainer" 4 hours ago Up 4 hours 0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp portainer
[root@localhost ~]# docker exec -it 5e05 bash
5e05b01ff9c2:/$ cd /opt
5e05b01ff9c2:/opt$ cd kafka/
5e05b01ff9c2:/opt/kafka$ bin/kafka-console-producer.sh --topic testin --bootstrap-server kafka:9092
>
(2) Kafka输出端
进入Kafka容器,启动消费者,接入testout主题,检查flink程序是否正常输出:
[root@localhost conf]# docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5e05b01ff9c2 apache/kafka:4.1.1-rc2 "/__cacert_entrypoin…" 4 hours ago Up 4 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:19092->19092/tcp, :::19092->19092/tcp kafka-kafka-1
db817f5a31a9 portainer/portainer-ce "/portainer" 4 hours ago Up 4 hours 0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp portainer
[root@localhost conf]# docker exec -it 5e05 bash
5e05b01ff9c2:/$ cd /opt/kafka/
5e05b01ff9c2:/opt/kafka$ bin/kafka-console-consumer.sh --topic testout --bootstrap-server kafka:9092
(3)启动Flink程序
打包并从命令行提交helloFlink.jar,以免还要把它拷来拷去:
[root@localhost helloFlink]# mvn compile
[INFO] Scanning for projects...
[INFO]
[INFO] -----------------------< com.pighome:helloFlink >-----------------------
……… …… ………
[INFO] Excluding org.apache.logging.log4j:log4j-api:jar:2.24.3 from the shaded jar.
……… …… ………
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.195 s
[INFO] Finished at: 2026-01-10T08:36:50-05:00
[INFO] ------------------------------------------------------------------------
[root@localhost helloFlink]# flink run target/helloFlink-1.0.jar
Job has been submitted with JobID 7cd4b1c54964982c3cff38063c056bb0
可以在WEB界面看到JOB以提交:

(4)在Flink log目录下查看日志情况
[root@localhost log]# watch -n 1 tail -n 1 *.out

(5) 在生产者键入信息,并查看转发情况
可以看到,这条消息在日志和testout中都有出现(请忽略各种拼写错误,这里无法敲删除键:P )
生产者

中间处理过程日志

消费者

从WEB界面可以查看到发出了一条消息:

更多推荐



所有评论(0)