前面我们已经成功进行了Flink的部署,这一篇我们准备在这个基础上搞一个单机版的Flink开发环境,以便进行从kafka输入到flink处理再输出到kafka这一过程的编程实现。

        一、Docker离线环境手工安装

        1. 设置系统离线安装位置

       之所以会在安装docker离线环境的时候又提到CentOS系统镜像的离线配置,是因为在卸载podman和buildah的过程当中,yum会默认将其依赖项卸载掉,其中就包括container-selinux和libcgroup这两个docker安装所需的包。虽然可以使用yum remove -noautoremove避免删除依赖项,或者rpm -e手工逐项强制卸载,但要么会遗留依赖包在系统中,要么存在打断依赖链条威胁。比较稳妥的方式,还是删完podman和buildah后再将其装回来。

        当然,也可以选在直接去pkgs.org上下载rpm包来解决:

        参考CENTOS上的网络安全工具(一)Suricata 离线部署 ,将安装光盘设置为系统的本地镜像源安装光盘。安装光盘中有BaseOS和AppStream两个安装包的内容:

        (1)构建/etc/yum.repos.d/myBaseOS.repo

[baseos]
name=CentOS Stream $releasever - BaseOS
#mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo=BaseOS&infra=$infra
baseurl=file:///run/media/root/CentOS-Stream-8-x86_64-dvd/BaseOS
enabled=1

        (2)构建/etc/yum.repos.d/myAppStream.repo

[appstream]
name=CentOS Stream $releasever - AppStream
#mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo=AppStream&infra=$infra
baseurl=file:///run/media/root/CentOS-Stream-8-x86_64-dvd/AppStream
enabled=1

        (3)重构yum cache

[root@localhost yum.repos.d]# yum clean all
13 个文件已删除
[root@localhost yum.repos.d]# yum makecache
CentOS Stream 8 - AppStream                                                                                                                       306 MB/s | 7.7 MB     00:00    
CentOS Stream 8 - BaseOS                                                                                                                          195 MB/s | 2.6 MB     00:00    
元数据缓存已建立。

        2. 卸载podman和buildah

        (1)卸载

        参考CENTOS上的网络安全工具(三十三)  的第一章,但之前如果是centos stream 8以上的系统,需要检查一下是不是podman和buildah已经安装,如果是记得卸掉:

[root@localhost ~]# yum rm podman buildah -y
依赖关系解决。
===========================================================================================================================================================================================================
 软件包                                           架构                                 版本                                                                 仓库                                      大小
===========================================================================================================================================================================================================
移除:
 buildah                                          x86_64                               2:1.33.6-2.module_el8+976+11360731                                   @appstream                                31 M
 podman                                           x86_64                               3:4.9.4-0.1.module_el8+971+3d3df00d                                  @appstream                                52 M
移除依赖的软件包:
 cockpit-podman                                   noarch                               84.1-1.module_el8+977+8ab54275                                       @appstream                               682 k
清除未被使用的依赖关系:
 conmon                                           x86_64                               3:2.1.10-1.module_el8+804+f131391c                                   @appstream                               172 k
 container-selinux                                noarch                               2:2.229.0-2.module_el8+847+7863d4e6                                  @appstream                                67 k
 containers-common                                x86_64                               2:1-81.module_el8+968+fbb249c7                                       @appstream                               580 k
 criu                                             x86_64                               3.18-4.module_el8+804+f131391c                                       @appstream                               1.5 M
 fuse-overlayfs                                   x86_64                               1.13-1.module_el8+804+f131391c                                       @appstream                               128 k
 fuse3                                            x86_64                               3.3.0-19.el8                                                         @baseos                                  100 k
 fuse3-libs                                       x86_64                               3.3.0-19.el8                                                         @baseos                                  274 k
 libnet                                           x86_64                               1.1.6-15.el8                                                         @AppStream                               170 k
 libslirp                                         x86_64                               4.4.0-1.module_el8+804+f131391c                                      @appstream                               134 k
 podman-catatonit                                 x86_64                               3:4.9.4-0.1.module_el8+971+3d3df00d                                  @appstream                               794 k
 podman-gvproxy                                   x86_64                               3:4.9.4-0.1.module_el8+971+3d3df00d                                  @appstream                                13 M
 podman-plugins                                   x86_64                               3:4.9.4-0.1.module_el8+971+3d3df00d                                  @appstream                               3.6 M
 runc                                             x86_64                               1:1.1.12-1.module_el8+885+7da147f3                                   @appstream                               9.9 M
 shadow-utils-subid                               x86_64                               2:4.6-22.el8                                                         @baseos                                  205 k
 slirp4netns                                      x86_64                               1.2.3-1.module_el8+951+32019cde                                      @appstream                               107 k

事务概要
===========================================================================================================================================================================================================
移除  18 软件包

将会释放空间:113 M
运行事务检查
事务检查成功。
运行事务测试
事务测试成功。
运行事务
  准备中  :                                                                                                                                                                                            1/1 
  运行脚本: buildah-2:1.33.6-2.module_el8+976+11360731.x86_64                                                                                                                                          1/1 
  删除    : buildah-2:1.33.6-2.module_el8+976+11360731.x86_64                                                                                                                                         1/18 
  删除    : cockpit-podman-84.1-1.module_el8+977+8ab54275.noarch                                                                                                                                      2/18 
  运行脚本: podman-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64                                                                                                                                         3/18 
  删除    : podman-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64                                                                                                                                         3/18 
  运行脚本: podman-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64                                                                                                                                         3/18 
  删除    : containers-common-2:1-81.module_el8+968+fbb249c7.x86_64                                                                                                                                   4/18 
  删除    : fuse-overlayfs-1.13-1.module_el8+804+f131391c.x86_64                                                                                                                                      5/18 
  删除    : fuse3-3.3.0-19.el8.x86_64                                                                                                                                                                 6/18 
  删除    : slirp4netns-1.2.3-1.module_el8+951+32019cde.x86_64                                                                                                                                        7/18 
  删除    : runc-1:1.1.12-1.module_el8+885+7da147f3.x86_64                                                                                                                                            8/18 
  删除    : criu-3.18-4.module_el8+804+f131391c.x86_64                                                                                                                                                9/18 
  删除    : podman-plugins-3:4.9.4-0.1.module_el8+971+3d3df00d.x86_64                                                                                                                                10/18 
  删除    : container-selinux-2:2.229.0-2.module_el8+847+7863d4e6.noarch                                                                                                                             11/18 
  运行脚本: container-selinux-2:2.229.0-2.module_el8+847+7863d4e6.noarch     

         (2)装回container-selinux

[root@localhost ~]# yum install container-selinux.noarch 
上次元数据过期检查:0:34:37 前,执行于 2026年01月09日 星期五 22时20分11秒。
依赖关系解决。
==================================================================================================================================================================================
 软件包                                   架构                          版本                                                               仓库                              大小
==================================================================================================================================================================================
安装:
 container-selinux                        noarch                        2:2.180.0-1.module_el8.7.0+1106+45480ee0                           appstream                         59 k

事务概要
==================================================================================================================================================================================
安装  1 软件包

总计:59 k
安装大小:54 k
确定吗?[y/N]: y
下载软件包:
运行事务检查
事务检查成功。
运行事务测试
事务测试成功。
运行事务
  准备中  :                                                                                                                                                                   1/1 
  运行脚本: container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch                                                                                                 1/1 
  安装    : container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch                                                                                                 1/1 
  运行脚本: container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch                                                                                                 1/1 
  验证    : container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch                                                                                                 1/1 

已安装:
  container-selinux-2:2.180.0-1.module_el8.7.0+1106+45480ee0.noarch                                                                                                               

完毕!

        (3)装回libcgroup 

[root@localhost dockerrpm]# yum install libcgroup
上次元数据过期检查:0:37:23 前,执行于 2026年01月09日 星期五 22时20分11秒。
依赖关系解决。
==================================================================================================================================================================================
 软件包                                     架构                                    版本                                            仓库                                     大小
==================================================================================================================================================================================
安装:
 libcgroup                                  x86_64                                  0.41-19.el8                                     baseos                                   70 k

事务概要
==================================================================================================================================================================================
安装  1 软件包

总计:70 k
安装大小:136 k
确定吗?[y/N]: y
下载软件包:
运行事务检查
事务检查成功。
运行事务测试
事务测试成功。
运行事务
  准备中  :                                                                                                                                                                   1/1 
  运行脚本: libcgroup-0.41-19.el8.x86_64                                                                                                                                      1/1 
[sss_cache] [sysdb_domain_cache_connect] (0x0010): DB version too old [0.23], expected [0.24] for domain implicit_files!
Higher version of database is expected!
In order to upgrade the database, you must run SSSD.
Removing cache files in /var/lib/sss/db should fix the issue, but note that removing cache files will also remove all of your cached credentials.
Could not open available domains
[sss_cache] [sysdb_domain_cache_connect] (0x0010): DB version too old [0.23], expected [0.24] for domain implicit_files!
Higher version of database is expected!
In order to upgrade the database, you must run SSSD.
Removing cache files in /var/lib/sss/db should fix the issue, but note that removing cache files will also remove all of your cached credentials.
Could not open available domains

  安装    : libcgroup-0.41-19.el8.x86_64                                                                                                                                      1/1 
  运行脚本: libcgroup-0.41-19.el8.x86_64                                                                                                                                      1/1 
  验证    : libcgroup-0.41-19.el8.x86_64                                                                                                                                      1/1 

已安装:
  libcgroup-0.41-19.el8.x86_64                                                                                                                                                    

完毕!

        3. 安装Docker

        (1)下载安装包

        Docker的手动安装有安装包安装和二进制安装2种方式,这里仅使用安装包方式安装,避开构建服务配置文件和设置用户组权限等容易出错的缓解。

        在Docker官方网站上下载rpm包:

        安装包的下载需要对准操作系统的版本,这个可以通过查看os-release确定:

[root@localhost ~]# cat /etc/os-release
NAME="CentOS Stream"
VERSION="8"
ID="centos"
ID_LIKE="rhel fedora"
VERSION_ID="8"
PLATFORM_ID="platform:el8"
PRETTY_NAME="CentOS Stream 8"
ANSI_COLOR="0;31"
CPE_NAME="cpe:/o:centos:centos:8"
HOME_URL="https://centos.org/"
BUG_REPORT_URL="https://bugzilla.redhat.com/"
REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux 8"
REDHAT_SUPPORT_PRODUCT_VERSION="CentOS Stream"

        对应下载地址:

https://download.docker.com/linux/centos/8/x86_64/stable/Packages/

        下载时注意版本,在下面的测试中,至少显示docker-ce-cli-26.1.3-1的安装是需要containerd.io的版本大于1.6.24的。

        亲测在centos stream 8下能够安装成功的版本关系如下:

[root@localhost dockerrpm]# ll
总用量 100660
-rw-r--r--. 1 root root 35543060 1月   9 23:02 containerd.io-1.6.24-3.1.el8.x86_64.rpm
-rw-r--r--. 1 root root 14279736 1月   9 22:44 docker-buildx-plugin-0.14.0-1.el8.x86_64.rpm
-rw-r--r--. 1 root root 28542392 1月   9 22:44 docker-ce-26.1.3-1.el8.x86_64.rpm
-rw-r--r--. 1 root root  8181560 1月   9 22:44 docker-ce-cli-26.1.3-1.el8.x86_64.rpm
-rw-r--r--. 1 root root  5210852 1月   9 22:44 docker-ce-rootless-extras-26.1.3-1.el8.x86_64.rpm
-rw-r--r--. 1 root root  7370924 1月   9 22:44 docker-compose-plugin-2.6.0-3.el8.x86_64.rpm
-rw-r--r--. 1 root root  3930488 1月   9 22:44 docker-scan-plugin-0.9.0-3.el8.x86_64.rpm

        (2)逐步安装

        对照官网上需要安装的模组进行安装。

        docker-ce的依赖项包含docker-ce-cli和libcgroup,docker-ce-cli的依赖项包含containerd.io,containerd.io又依赖container-selinux,所以倒过来逐个安装就行:

        container.io
[root@localhost dockerrpm]# rpm -ivhU containerd.io-1.6.24-3.1.el8.x86_64.rpm 
警告:containerd.io-1.6.24-3.1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying...                          ################################# [100%]
准备中...                          ################################# [100%]
正在升级/安装...
   1:containerd.io-1.6.24-3.1.el8     ################################# [ 50%]
        docker-ce-cli
[root@localhost dockerrpm]# rpm -ivh docker-ce-cli-26.1.3-1.el8.x86_64.rpm 
警告:docker-ce-cli-26.1.3-1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying...                          ################################# [100%]
准备中...                          ################################# [100%]
正在升级/安装...
   1:docker-ce-cli-1:26.1.3-1.el8     ################################# [100%]
        docker-ce
[root@localhost dockerrpm]# rpm -ivh docker-ce-26.1.3-1.el8.x86_64.rpm 
警告:docker-ce-26.1.3-1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying...                          ################################# [100%]
准备中...                          ################################# [100%]
正在升级/安装...
   1:docker-ce-3:26.1.3-1.el8         ################################# [100%]
        docker-compose-plugin
[root@localhost dockerrpm]# rpm -ivh docker-compose-plugin-2.6.0-3.el8.x86_64.rpm 
警告:docker-compose-plugin-2.6.0-3.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying...                          ################################# [100%]
准备中...                          ################################# [100%]
正在升级/安装...
   1:docker-compose-plugin-0:2.6.0-3.e################################# [100%]
        docker-buildx-plugin
[root@localhost dockerrpm]# rpm -ivh docker-buildx-plugin-0.14.0-1.el8.x86_64.rpm 
警告:docker-buildx-plugin-0.14.0-1.el8.x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID 621e9f35: NOKEY
Verifying...                          ################################# [100%]
准备中...                          ################################# [100%]
正在升级/安装...
   1:docker-buildx-plugin-0:0.14.0-1.e################################# [100%]

        (3)启动服务

[root@localhost dockerrpm]# systemctl enable --now docker
Created symlink /etc/systemd/system/multi-user.target.wants/docker.service → /usr/lib/systemd/system/docker.service.
[root@localhost ~]# systemctl start docker
[root@localhost ~]# systemctl status docker
● docker.service - Docker Application Container Engine
   Loaded: loaded (/usr/lib/systemd/system/docker.service; enabled; vendor preset: disabled)
   Active: active (running) since Fri 2026-01-09 23:12:56 EST; 2min 22s ago
     Docs: https://docs.docker.com
 Main PID: 1321 (dockerd)
    Tasks: 13
   Memory: 111.2M
   CGroup: /system.slice/docker.service
           └─1321 /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock

1月 09 23:12:54 localhost.localdomain systemd[1]: Starting Docker Application Container Engine...


        二、Kafka Portainer部署

        1. Portainer安装

        参考CENTOS上的网络安全工具(三十三)  的第二章

        当前(2026年1月)以下镜像有效:

[root@localhost ~]# docker pull docker.1ms.run/portainer/portainer-ce:alpine
alpine: Pulling from portainer/portainer-ce
014e56e61396: Already exists 
97d46cc86f33: Already exists 
1b6da1229ec5: Already exists 
37dcf0e5163d: Already exists 
5f39d7b36694: Already exists 
4dc5fe4c57e2: Already exists 
4f4fb700ef54: Already exists 
Digest: sha256:a04e0ac3e99172e451055419e2ed46c67f24bff72209ab09235079d7642e87d8
Status: Downloaded newer image for docker.1ms.run/portainer/portainer-ce:alpine
docker.1ms.run/portainer/portainer-ce:alpine

        tag一下简化名字

[root@localhost ~]# docker tag docker.1ms.run/portainer/portainer-ce:alpine portainer/portainer-ce
[root@localhost ~]# docker images
REPOSITORY                              TAG       IMAGE ID       CREATED        SIZE
docker.1ms.run/portainer/portainer-ce   alpine    1fc1bbda74d9   2 weeks ago    192MB
portainer/portainer-ce                  latest    1fc1bbda74d9   2 weeks ago    192MB

        然后可以借用(三十三)篇中的方法启动portainer了:

[root@localhost ~]# docker run -d -p 8000:8000 -p 9443:9443 --name portainer --restart=always -v /var/run/docker.sock:/var/run/docker.sock -v portainer_data:/data portainer/portainer-ce
4e2496b162f045557ddd9440589338e3ca205071d626cb6b8e6032f340de7d50
[root@localhost ~]# 
[root@localhost ~]# docker container ls
CONTAINER ID   IMAGE                    COMMAND        CREATED          STATUS          PORTS                                                                                            NAMES
4e2496b162f0   portainer/portainer-ce   "/portainer"   10 seconds ago   Up 10 seconds   0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp   portainer
[root@localhost ~]# ls /var/lib/docker/volumes/
backingFsBlockDev  metadata.db  portainer_data

        启动Portainer,倒是可以不用事先新建volume。但是swarm还是需要init一下,否则后面无法启动stack:

[root@localhost ~]# docker swarm init
Swarm initialized: current node (eec1ag50r9k7odxxntvb4sqvv) is now a manager.
……  ……  ……

        2. 单机Kafka Portainer部署

        (1)下载apache Kafka

        之前我们用的是bitnami/kafka,不过仅仅一两个月功夫,居然就下不到了,看来还是apache官方吧靠谱点,所以这次我们换用apache/kafka镜像一试。        

        当前(2026年1月)以下镜像有效:

[root@localhost ~]# docker pull docker.1ms.run/apache/kafka:4.1.1
4.1.1: Pulling from apache/kafka
2d35ebdb57d9: Pull complete 
34074eb54496: Pull complete 
1efb71aa46e6: Pull complete 
c12c21783ef0: Pull complete 
e0d459441fab: Pull complete 
0c148fd1a481: Pull complete 
4be6c1affd0e: Pull complete 
ebab522082c7: Pull complete 
f8b484aae80d: Pull complete 
b69a77a99b62: Pull complete 
e4b07b2631b1: Pull complete 
Digest: sha256:0bc1bb2478f45b6cea78864df86acdc11e8df2c5172477819a4d12942cbe5d40
Status: Downloaded newer image for docker.1ms.run/apache/kafka:4.1.1
docker.1ms.run/apache/kafka:4.1.1

        (2)启动Kafka 容器

        kafka的部署仍然可以参考 CENTOS上的网络安全工具(三十三)  的第四章,虽然apache/kafka与binami/kafka镜像不同,但是yml文件基本是相似的,只是配置变量的名称稍有不同罢了:

version: '3.8'
services:
  kafka:
    image: apache/kafka:latest
    networks:
      - developnet
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
        max_attempts: 3
    ports:
      - "9092:9092"
      - "19092:19092"
    environment:
      - KAFKA_PROCESS_ROLES=broker,controller
      - KAFKA_NODE_ID=1
      - KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:19092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://192.168.76.128:19092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT     
      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
      - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_NUM_PARTITIONS=3
      - KAFKA_LOG_DIRS=/var/lib/kafka/data
    volumes:
      - kafka-data:/var/lib/kafka/data
networks:
  developnet:
    driver: overlay
    attachable: true
volumes:
  kafka-data:

        成功启动后,进入容器并创建好topic,后面我们会测试kafka的输入和输出,所以创建testin和testout两个topic:

[root@localhost ~]# docker container ls
CONTAINER ID   IMAGE                    COMMAND        CREATED         STATUS         PORTS                                                                                            NAMES
57313536ff91   portainer/portainer-ce   "/portainer"   4 seconds ago   Up 4 seconds   0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp   portainer
[root@localhost ~]# docker container ls
CONTAINER ID   IMAGE                    COMMAND                   CREATED              STATUS              PORTS                                                                                            NAMES
f409097532ce   apache/kafka:latest      "/__cacert_entrypoin…"   About a minute ago   Up About a minute   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:19092->19092/tcp, :::19092->19092/tcp         kafka-kafka-1
[root@localhost ~]# docker exec -it f409 bash
f409097532ce:/$ cd /opt/kafka/
f409097532ce:/opt/kafka$ bin/kafka-topics.sh --create --topic testin --bootstrap-server kafka:9092
Created topic testin.
f409097532ce:/opt/kafka$ bin/kafka-topics.sh --create --topic testout --bootstrap-server kafka:9092
Created topic testout.
f409097532ce:/opt/kafka$ 

        使用kafka-consle-producer.sh和kafka-console-consumer.sh确定这些topic都能够正常生产和消费就可以进行下一步了。      

        三、单节点Flink的本地部署

        参考上一篇CENTOS上的网络安全工具(三十五)Portainer Kafka-Clickhouse部署(4) Flink安装部署与样例测试的第一、二部分即可,安装好Flink 2.2.0,java 11和mvn即可。

        1. OpenJDK 11手动安装

        这里我们尝试一下直接从压缩包手动安装JDK的方法,而不是利用yum方式安装。从ORACLE官网下载JDK需要注册登录,从JDK下载倒是可以的。

        解压

[root@localhost ~]# tar -xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /opt

         设置HOME与PATH等环境变量

export JAVA_HOME=/opt/jdk-11.0.2
export PATH=$JAVA_HOME/bin:$PATH

        这样就可以了

[root@localhost ~]# java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

        2. maven手动安装

        (1)maven环境安装        

        参考CENTOS上的网络安全工具(三十五)Portainer Kafka-Clickhouse部署(4) Flink安装部署与样例测试中方式安装即可。

[root@localhost flink-2.2.0]# mvn -version
Apache Maven 3.9.12 (848fbb4bf2d427b72bdb2471c22fced7ebd9a7a1)
Maven home: /opt/apache-maven-3.9.12
Java version: 11.0.20.1, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.1.1-2.el8.x86_64
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "4.18.0-553.6.1.el8.x86_64", arch: "amd64", family: "unix"
[root@localhost flink-2.2.0]# 

        (2)依赖项拷贝

        在离线环境中,jar包无法由mvn自行下载,所以相关的依赖包最好是在在线环境中用具有同样pom.xml文件的编译通过的项目来完成依赖包下载,然后将~/.m2中(默认是,否则看maven目录下conf中的settings.xml中的配置)的repository打包拷贝到离线环境中的对应目录下。

[root@localhost ~]# tar xf repo_bak.tar .m2/
[root@localhost ~]# ll .m2/repository/
总用量 17076
drwxr-xr-x  3 root root       19 12月 22 20:23 antlr
drwxr-xr-x  3 root root       25 12月 29 19:18 aopalliance
-rw-r--r--  1 root root 17469929 12月 31 01:01 archetype-catalog-central.xml
-rw-r--r--  1 root root       40 1月   3 19:39 archetype-catalog-central.xml.sha1
drwxr-xr-x  8 root root      106 12月 29 20:39 asm
drwxr-xr-x  3 root root       30 12月 23 04:09 avalon-framework
drwxr-xr-x  3 root root       38 12月 22 20:35 backport-util-concurrent
drwxr-xr-x  3 root root       25 12月 22 20:34 classworlds
drwxr-xr-x 12 root root      162 12月 30 02:29 com
drwxr-xr-x  3 root root       31 12月 23 04:09 commons-beanutils
drwxr-xr-x  3 root root       27 12月 23 04:09 commons-chain
drwxr-xr-x  3 root root       25 12月 22 20:34 commons-cli
drwxr-xr-x  3 root root       27 12月 22 20:38 commons-codec
drwxr-xr-x  3 root root       33 12月 22 20:22 commons-collections
drwxr-xr-x  3 root root       30 12月 23 04:09 commons-digester
drwxr-xr-x  3 root root       24 12月 22 20:22 commons-io
drwxr-xr-x  3 root root       26 12月 22 20:22 commons-lang
drwxr-xr-x  4 root root       56 12月 23 04:08 commons-logging
drwxr-xr-x  3 root root       19 12月 23 04:09 dom4j
drwxr-xr-x  6 root root       63 12月 30 02:41 io
drwxr-xr-x  3 root root       22 12月 30 02:41 jakarta
drwxr-xr-x  5 root root       56 12月 29 19:18 javax
drwxr-xr-x  3 root root       18 12月 29 20:39 jdom
drwxr-xr-x  3 root root       19 12月 22 20:34 junit
drwxr-xr-x  3 root root       20 12月 29 19:19 lib
drwxr-xr-x  3 root root       19 12月 22 20:35 log4j
drwxr-xr-x  3 root root       20 12月 23 04:09 logkit
drwxr-xr-x  5 root root       52 1月   4 01:44 net
drwxr-xr-x 32 root root     4096 12月 30 03:15 org
drwxr-xr-x  3 root root       17 12月 23 04:09 oro
-rw-r--r--  1 root root      196 1月   3 19:39 resolver-status.properties
drwxr-xr-x  3 root root       22 12月 31 03:47 tools
drwxr-xr-x  3 root root       20 12月 29 20:58 trans
drwxr-xr-x  3 root root       22 12月 23 04:09 xml-apis
drwxr-xr-x  3 root root       21 12月 30 00:22 xmlpull
[root@localhost ~]# 

        尝试编译一下项目,能够通过即可:

[root@localhost metaCongress]# mvn compile
[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------------< com.TLLIB:metaCongress >-----------------------
[INFO] Building Flink Quickstart Job 1.0
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- resources:3.3.1:resources (default-resources) @ metaCongress ---
[INFO] Copying 1 resource from src/main/resources to target/classes
[INFO] 
[INFO] --- compiler:3.1:compile (default-compile) @ metaCongress ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 6 source files to /root/share/metaCongress/target/classes
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.742 s
[INFO] Finished at: 2026-01-10T06:40:58-05:00
[INFO] ------------------------------------------------------------------------
[root@localhost metaCongress]# 

        3. Flink服务安装启动

        同样是参考上一篇文章,下载压缩包解压安装,此处不赘述。

        四、Kafka+Flink流式处理框架

        完成以上两个步骤,基本就可以开始进行编程开发了。同样把上一章我们创建的helloFlink拿来往下改装,不过在正式开发之前,我们首先需要测试flink的Kafka的输入输出透传,以初步掌握其框架。

        1. 导入Kafka连接器依赖

        首先将pom.xml中的flink-connector-kafka连接器依赖的注释去掉,将依赖导入进来(不过这个应该是在在线环境中做,下载后在打包拷贝进来),以支持使用KafkaSource和KafkaSink。

<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.0.0-1.17</version>
    </dependency>
-->

#注意版本
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>4.0.0-2.0</version>
</dependency>

        注意修改自动生成的pom.xml中这段注释中的版本为4.0.0-2.0,否则会出现编译出错:

[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.417 s
[INFO] Finished at: 2026-01-10T07:52:46-05:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project helloFlink: Compilation failure: Compilation failure: 
[ERROR] /root/share/helloFlink/src/main/java/com/pighome/DataStreamJob.java:[51,43] 无法访问org.apache.flink.api.connector.sink2.StatefulSink
[ERROR]   找不到org.apache.flink.api.connector.sink2.StatefulSink的类文件
[ERROR] /root/share/helloFlink/src/main/java/com/pighome/DataStreamJob.java:[61,14] 无法访问org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink
[ERROR]   找不到org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink的类文件

        进一步在pom.xml文件中再加入flink-connector-base连接器,以支持使用DeliveryGuarantee

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

        2. Import 相关连接器的包

        对于使用Java语言开发的项目来说,查询依赖包可以从Flink主页的下图链接进入:

        对于1.14版本以后的KafkaSource与KafkaSink框架,至少需要以下的依赖包:

 21 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;   //flink执行环境
 22 
 23 import org.apache.flink.connector.kafka.source.KafkaSource;                     //KafkaSource 输入使用
 24 import org.apache.flink.connector.kafka.sink.KafkaSink;                         //KafkaSink   输出使用
 25 
 26 import org.apache.flink.streaming.api.datastream.DataStream;                    //DataStream  中间处理使用
 27 
 28 import org.apache.flink.api.common.serialization.SimpleStringSchema;            //String变量的序列化器
 29 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;    //Kafka输出的序列化器
 30 
 31 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;   //输入策略 earlist相当于frombeginning,一般用latest
 32 import org.apache.flink.api.common.eventtime.WatermarkStrategy;                             //时间水印处理策略 这里暂不使用策略,所以noWatermarks
 33 import org.apache.flink.connector.base.DeliveryGuarantee;                                   //输出策略 至少一次输出 AT_LEAST_ONCE

        3. 最简Flink Kafka 输入输出框架

​ 19 package com.pighome;
 20 
 21 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;   //flink执行环境
 22 
 23 import org.apache.flink.connector.kafka.source.KafkaSource;                     //KafkaSource 输入使用
 24 import org.apache.flink.connector.kafka.sink.KafkaSink;                         //KafkaSink   输出使用
 25 
 26 import org.apache.flink.streaming.api.datastream.DataStream;                    //DataStream  中间处理使用
 27 
 28 import org.apache.flink.api.common.serialization.SimpleStringSchema;            //String变量的序列化器
 29 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;    //Kafka输出的序列化器
 30 
 31 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;   //输入策略 earlist相当于frombeginning,一般用latest
 32 import org.apache.flink.api.common.eventtime.WatermarkStrategy;                             //时间水印处理策略 这里暂不使用策略,所以noWatermarks
 33 import org.apache.flink.connector.base.DeliveryGuarantee;                                   //输出策略 至少一次输出 AT_LEAST_ONCE
 34 
 35 public class DataStreamJob {
 36     private static final String brokers = "192.168.76.128:19092";
 37     public static void main(String[] args) throws Exception {
 38         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 39 
 40         KafkaSource<String> source = KafkaSource.<String>builder()
 41             .setBootstrapServers(brokers)
 42             .setTopics("testin")
 43             .setGroupId("pighome")
 44             .setStartingOffsets(OffsetsInitializer.earliest())
 45             .setValueOnlyDeserializer(new SimpleStringSchema())
 46             .build();
 47 
 48         DataStream<String> lines = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 49         lines.print();
 50 
 51         KafkaSink<String> sink = KafkaSink.<String>builder()
 52             .setBootstrapServers(brokers)
 53             .setRecordSerializer(KafkaRecordSerializationSchema.builder()
 54                 .setTopic("testout")
 55                 .setValueSerializationSchema(new SimpleStringSchema())
 56                 .build()
 57                 )
 58             .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
 59             .build();
 60 
 61         lines.sinkTo(sink);
 62 
 63         // Execute program, beginning computation.
 64         env.execute("Flink Kafka框架程序");
 65     }
 66 }

        4. 测试Kafka输入输出

        使用最简单的String类型作为Kafka的输入输出,这样我们暂时可以不考虑序列化反序列化的问题,直接使用org.apache.flink.api.common.serialization.SimpleStringSchema字符串类型序列化器测试Kafka输入输出的基本功能。

        (1)Kafka输入端

        进入Kafka容器,启动生产者,接入testin主题,为flink程序的输入端提供数据:

[root@localhost ~]# docker container ls
CONTAINER ID   IMAGE                    COMMAND                   CREATED       STATUS       PORTS                                                                                            NAMES
5e05b01ff9c2   apache/kafka:4.1.1-rc2   "/__cacert_entrypoin…"   4 hours ago   Up 4 hours   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:19092->19092/tcp, :::19092->19092/tcp         kafka-kafka-1
db817f5a31a9   portainer/portainer-ce   "/portainer"              4 hours ago   Up 4 hours   0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp   portainer
[root@localhost ~]# docker exec -it 5e05 bash
5e05b01ff9c2:/$ cd /opt
5e05b01ff9c2:/opt$ cd kafka/
5e05b01ff9c2:/opt/kafka$ bin/kafka-console-producer.sh --topic testin --bootstrap-server kafka:9092
>

        (2) Kafka输出端

        进入Kafka容器,启动消费者,接入testout主题,检查flink程序是否正常输出:

[root@localhost conf]# docker container ls
CONTAINER ID   IMAGE                    COMMAND                   CREATED       STATUS       PORTS                                                                                            NAMES
5e05b01ff9c2   apache/kafka:4.1.1-rc2   "/__cacert_entrypoin…"   4 hours ago   Up 4 hours   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:19092->19092/tcp, :::19092->19092/tcp         kafka-kafka-1
db817f5a31a9   portainer/portainer-ce   "/portainer"              4 hours ago   Up 4 hours   0.0.0.0:8000->8000/tcp, :::8000->8000/tcp, 0.0.0.0:9443->9443/tcp, :::9443->9443/tcp, 9000/tcp   portainer
[root@localhost conf]# docker exec -it 5e05 bash
5e05b01ff9c2:/$ cd /opt/kafka/
5e05b01ff9c2:/opt/kafka$ bin/kafka-console-consumer.sh --topic testout --bootstrap-server kafka:9092

        (3)启动Flink程序

        打包并从命令行提交helloFlink.jar,以免还要把它拷来拷去:

[root@localhost helloFlink]# mvn compile
[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------------< com.pighome:helloFlink >-----------------------
……… …… ………
[INFO] Excluding org.apache.logging.log4j:log4j-api:jar:2.24.3 from the shaded jar.
……… …… ………
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  3.195 s
[INFO] Finished at: 2026-01-10T08:36:50-05:00
[INFO] ------------------------------------------------------------------------
[root@localhost helloFlink]# flink run target/helloFlink-1.0.jar 
Job has been submitted with JobID 7cd4b1c54964982c3cff38063c056bb0

        可以在WEB界面看到JOB以提交:

        (4)在Flink log目录下查看日志情况

[root@localhost log]# watch -n 1 tail -n 1 *.out

        (5) 在生产者键入信息,并查看转发情况

        可以看到,这条消息在日志和testout中都有出现(请忽略各种拼写错误,这里无法敲删除键:P )

        生产者

          中间处理过程日志

          消费者

        从WEB界面可以查看到发出了一条消息:


       

     

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐