Flume 基础

1、Flume的配置

(1)、 flume使用netcat方法接收数据 的 配置:
注:flume 1.7.0 左右就不能在控制台上输出了
可以参考一下这篇文章:https://blog.csdn.net/qq_36157087/article/details/129746924
属性名称 默认值 描述
type - 必须设置为 netcat(固定值,标识此 Source 类型)。
bind - 监听的主机地址(如 localhost 或具体 IP 地址 192.168.1.100)。
port - 监听的端口号(如 10050,需确保端口未被占用且防火墙允许访问)。
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1 = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)、flume实时读取文件及文件组的配置:
属性名称 默认值 描述
type - 必须设置为 TAILDIR(固定值,标识此 Source 类型)。
filegroups - 定义文件组的名称列表(多个用空格分隔),用于区分不同的文件集合(如 f1 f2)。
filegroups. - 每个文件组对应的文件路径匹配规则(支持正则),如 f1 = /var/log/*.log 表示监控 /var/log 下所有 .log 结尾的文件。
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sourecs.r1.filegroups = f1 f2
a1.sourecs.r1.filegroups.f1 = 文件路径
a1.sources.r1.filegroups.f2 = 文件路径

a1.sinks.k1 = logger

a1.sources.r1.channels = c1
a1.sinks.r1.channel = c1
(3)、flume 通过netcat或读取文件日志的方式存入hdfs的配置:
属性名称 默认值 描述
type - 必须设置为 hdfs(固定值,标识此 Sink 类型)。
hdfs.path - HDFS 中存储文件的基础路径(如 hdfs://namenode:8020/flume/data)。
hdfs.filePrefix FlumeData 生成文件的前缀(如 log-,最终文件名为 log-12345)。
hdfs.rollInterval 30 触发文件滚动的时间间隔(秒),0 表示禁用时间滚动。
hdfs.rollSzie 1024 触发文件滚动的文件大小阈值(字节),0 表示禁用大小滚动。
hdfs.rollCount 10 触发文件滚动的事件数量阈值,0 表示禁用事件数滚动。
hdfs.userLocalTimeStamp false 是否使用本地时间戳生成文件路径(默认使用 HDFS 集群时间)。
hdfs:fileType SequenceFile 文件存储格式,可选 Text(文本)、SequenceFile(序列化)、DataStream(流)。

🔴 注:一定要配置rollIntervalrollSizerollCountfileType 前三个配置没有配好,那么hdfs一定会出现小文件问题,如果不配置好fileType用web的hdfs查看文件内容的时候会乱码所以一般要设置成DataStream0

a1.sources = r1
a1.sinks = k1 
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = 文件路径

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:8080/flume/%y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.userLocalTiemStamp = true
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4)、flume通过netcat或者文件读取的方式发送给kafka
属性名称 默认值 描述
type - 必须设置为 org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers - Kafka Sink 要连接的代理列表,用于获取主题分区列表。这可以是代理的部分列表,但我们建议至少两个。格式为逗号分隔的 hostname:port 列表。
kafka.topic default-flume-topic 消息将在 Kafka 中发布到的主题。如果配置了此参数,消息将发布到该主题。如果事件头包含 “topic” 字段,事件将发布到该主题,覆盖此处配置的主题。支持任意头替换(例如,%{header} 会被名为 “header” 的事件头的值替换。如果使用替换,建议将 auto.create.topics.enable 属性设置为 true 以应对 Kafka 代理事件。)
flumeBatchSize 100 一批处理的消息数。较大的批次可以提高吞吐量,延迟。
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.topic = order
a1.sinks.k1.kafka.batchSize = 1

a1.channels.c1.type = memory
a1.channesl.c1.capacity = 1000

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(5)、多路复用avro

在 Flume 中,Avro SourceAvro Sink 是用于基于 Avro 协议 实现不同 Flume 代理(Agent)之间或 Flume 与外部系统之间数据传输的组件,主要用于构建分布式、多节点的 Flume 数据管道。

一、Avro Source

核心功能

  • 作为 服务端,监听指定端口,接收来自外部 Avro 客户端(如其他 Flume 代理的 Avro Sink、自定义 Avro 程序)发送的 Avro 格式数据。

  • 将接收到的 Avro 数据转换为 Flume 事件(Event),传递给后续的 Channel 进行处理。

  • 支持高吞吐量的二进制数据传输,适合分布式场景下的流式数据采集。

常用配置属性

属性名称 默认值 描述
type - 必须设置为 avro(固定值,标识此 Source 类型)。
bind - 监听的主机地址(如 0.0.0.0 表示监听所有网卡,或具体 IP 如 192.168.1.100)。
port - 监听的端口号(需确保端口未被占用且防火墙允许访问)。
channels - 绑定的 Channel 名称,用于将事件传递给后续组件。
threads 5 处理请求的线程数,可根据并发量调整。
compression-type none 压缩类型,可选 none(不压缩)、deflate(Deflate 压缩)。

配置示例

agent.sources = avroSource

agent.channels = memoryChannel

agent.sources.avroSource.type = avro

agent.sources.avroSource.bind = 0.0.0.0  # 监听所有网卡

agent.sources.avroSource.port = 41414    # 监听端口 41414

agent.sources.avroSource.channels = memoryChannel

使用场景

  • 分布式 Flume 架构:多个 Agent 通过 Avro Source/Sink 级联,实现数据的分布式采集和处理(如前端 Agent 采集数据,后端 Agent 聚合存储)。

  • 与外部系统集成:自定义程序(如 Java、Python 程序)通过 Avro 协议向 Flume 发送数据。

二、Avro Sink

核心功能

  • 作为 客户端,将当前 Flume 代理的 Channel 中事件(Event)序列化为 Avro 格式,发送到指定的 Avro 服务端(通常是另一个 Flume 代理的 Avro Source)。

  • 支持数据压缩,减少网络传输开销。

  • 保证数据传输的可靠性(结合 Channel 的持久化能力)。

常用配置属性

属性名称 默认值 描述
type - 必须设置为 avro(固定值,标识此 Sink 类型)。
hostname - 目标 Avro Source 的主机地址(如 192.168.1.200)。
port - 目标 Avro Source 监听的端口号(如 41414)。
channels - 绑定的 Channel 名称,用于从 Channel 拉取事件。
batch-size 100 每次发送的事件数量(批量发送,提高吞吐量)。
compression-type none 压缩类型,可选 none、deflate,需与 Avro Source 配置一致。

配置示例

agent.sinks = avroSink

agent.channels = memoryChannel

agent.sinks.avroSink.type = avro

agent.sinks.avroSink.hostname = 192.168.1.200  # 目标 Avro Source 主机

agent.sinks.avroSink.port = 41414             # 目标 Avro Source 端口

agent.sinks.avroSink.batch-size = 200         # 每次发送 200 个事件

agent.sinks.avroSink.channel = memoryChannel

使用场景

  • 分布式 Flume 管道:将数据从 “采集层” Agent 转发到 “处理层” 或 “存储层” Agent,实现分层架构。

  • 跨网络数据传输:在不同网络环境的节点间传输数据(如从边缘节点到中心节点)。

2、Flume事务流程

Put 事务流程

doPut:将批数据先写入临时缓冲区putlist

doCommit:检查channel内存队列是否足够合并

doRollback:channel内存队列空间不足,回滚数据

Take 事务

doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS

doCommit:如果数据全部发送成功,则清除临时缓冲区takeList

doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

3、Flume agent 内部原理

单数据源多出口

4、flume拦截器

利用idea需要先配置好pom.xml的配置文件

<dependencies>
	<dependency>
    	<groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>这里输入你的版本</version>
    </dependency>
</dependencies>

拦截器的代码

/**
* 1. 继承flume的拦截器接口
* 2. 重写4个抽象方法
* 3. 编写静态内部类 builder
*
*/
public class MyInterceptor implements Interceptor {
    // 初始化
    @Override
    public void initalize() {
        
    }
    // 处理单个event
    @Override
    public Event intercept(Event event) {
        // 需求:在event的头信息中添加标记
        // 提供给channel selector 选择发送到不同的channel中
        Map<String, String> header4s = event.getHeaders();
        String log = new String(event.getBody());
        
        // 判断log的开头第一个字符 如果是字母发送到channel 1
        // 如果是数字发送到channel 2 
        char c =  log.CharAt(0);
        if (c >= '0' && c <= '9'){
            // c为数字
            headers.put("type", "number");
        }else if (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z'){
            headers.put("type", "letter");
        }
        
        return event;
    }
    // 处理多个event 系统使用这个方法
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
    	return null;    
    }
    // 关闭
    @Override
    public void close() {
        
    }
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
			return new MyInterceptor;
        }
        // 配置方法
        @Override
        public void configure(Context context){
            
        }
    }
}

注: 最后打包将jar包上传到flume/lib目录下

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐