flume
netcatlocalhost10050TAILDIRf1 f2/var/log.loghdfslog-log-12345000TextDataStream🔴 注:一定要配置rollSizerollCount和fileType前三个配置没有配好,那么hdfs一定会出现小文件问题,如果不配置好fileType用web的hdfs查看文件内容的时候会乱码所以一般要设置成。
Flume 基础
1、Flume的配置
(1)、 flume使用netcat方法接收数据 的 配置:
注:flume 1.7.0 左右就不能在控制台上输出了
可以参考一下这篇文章:https://blog.csdn.net/qq_36157087/article/details/129746924
| 属性名称 | 默认值 | 描述 |
|---|---|---|
| type | - | 必须设置为 netcat(固定值,标识此 Source 类型)。 |
| bind | - | 监听的主机地址(如 localhost 或具体 IP 地址 192.168.1.100)。 |
| port | - | 监听的端口号(如 10050,需确保端口未被占用且防火墙允许访问)。 |
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1 = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)、flume实时读取文件及文件组的配置:
| 属性名称 | 默认值 | 描述 |
|---|---|---|
| type | - | 必须设置为 TAILDIR(固定值,标识此 Source 类型)。 |
| filegroups | - | 定义文件组的名称列表(多个用空格分隔),用于区分不同的文件集合(如 f1 f2)。 |
| filegroups. | - | 每个文件组对应的文件路径匹配规则(支持正则),如 f1 = /var/log/*.log 表示监控 /var/log 下所有 .log 结尾的文件。 |
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sourecs.r1.filegroups = f1 f2
a1.sourecs.r1.filegroups.f1 = 文件路径
a1.sources.r1.filegroups.f2 = 文件路径
a1.sinks.k1 = logger
a1.sources.r1.channels = c1
a1.sinks.r1.channel = c1
(3)、flume 通过netcat或读取文件日志的方式存入hdfs的配置:
| 属性名称 | 默认值 | 描述 |
|---|---|---|
| type | - | 必须设置为 hdfs(固定值,标识此 Sink 类型)。 |
| hdfs.path | - | HDFS 中存储文件的基础路径(如 hdfs://namenode:8020/flume/data)。 |
| hdfs.filePrefix | FlumeData | 生成文件的前缀(如 log-,最终文件名为 log-12345)。 |
| hdfs.rollInterval | 30 | 触发文件滚动的时间间隔(秒),0 表示禁用时间滚动。 |
| hdfs.rollSzie | 1024 | 触发文件滚动的文件大小阈值(字节),0 表示禁用大小滚动。 |
| hdfs.rollCount | 10 | 触发文件滚动的事件数量阈值,0 表示禁用事件数滚动。 |
| hdfs.userLocalTimeStamp | false | 是否使用本地时间戳生成文件路径(默认使用 HDFS 集群时间)。 |
| hdfs:fileType | SequenceFile | 文件存储格式,可选 Text(文本)、SequenceFile(序列化)、DataStream(流)。 |
🔴 注:一定要配置rollInterval 、rollSize 、rollCount和fileType 前三个配置没有配好,那么hdfs一定会出现小文件问题,如果不配置好fileType用web的hdfs查看文件内容的时候会乱码所以一般要设置成DataStream0。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = 文件路径
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:8080/flume/%y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.userLocalTiemStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4)、flume通过netcat或者文件读取的方式发送给kafka
| 属性名称 | 默认值 | 描述 |
|---|---|---|
| type | - | 必须设置为 org.apache.flume.sink.kafka.KafkaSink |
| kafka.bootstrap.servers | - | Kafka Sink 要连接的代理列表,用于获取主题分区列表。这可以是代理的部分列表,但我们建议至少两个。格式为逗号分隔的 hostname:port 列表。 |
| kafka.topic | default-flume-topic | 消息将在 Kafka 中发布到的主题。如果配置了此参数,消息将发布到该主题。如果事件头包含 “topic” 字段,事件将发布到该主题,覆盖此处配置的主题。支持任意头替换(例如,%{header} 会被名为 “header” 的事件头的值替换。如果使用替换,建议将 auto.create.topics.enable 属性设置为 true 以应对 Kafka 代理事件。) |
| flumeBatchSize | 100 | 一批处理的消息数。较大的批次可以提高吞吐量,延迟。 |
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.topic = order
a1.sinks.k1.kafka.batchSize = 1
a1.channels.c1.type = memory
a1.channesl.c1.capacity = 1000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(5)、多路复用avro
在 Flume 中,Avro Source 和 Avro Sink 是用于基于 Avro 协议 实现不同 Flume 代理(Agent)之间或 Flume 与外部系统之间数据传输的组件,主要用于构建分布式、多节点的 Flume 数据管道。
一、Avro Source
核心功能
-
作为 服务端,监听指定端口,接收来自外部 Avro 客户端(如其他 Flume 代理的 Avro Sink、自定义 Avro 程序)发送的 Avro 格式数据。
-
将接收到的 Avro 数据转换为 Flume 事件(Event),传递给后续的 Channel 进行处理。
-
支持高吞吐量的二进制数据传输,适合分布式场景下的流式数据采集。
常用配置属性
| 属性名称 | 默认值 | 描述 |
|---|---|---|
| type | - | 必须设置为 avro(固定值,标识此 Source 类型)。 |
| bind | - | 监听的主机地址(如 0.0.0.0 表示监听所有网卡,或具体 IP 如 192.168.1.100)。 |
| port | - | 监听的端口号(需确保端口未被占用且防火墙允许访问)。 |
| channels | - | 绑定的 Channel 名称,用于将事件传递给后续组件。 |
| threads | 5 | 处理请求的线程数,可根据并发量调整。 |
| compression-type | none | 压缩类型,可选 none(不压缩)、deflate(Deflate 压缩)。 |
配置示例
agent.sources = avroSource
agent.channels = memoryChannel
agent.sources.avroSource.type = avro
agent.sources.avroSource.bind = 0.0.0.0 # 监听所有网卡
agent.sources.avroSource.port = 41414 # 监听端口 41414
agent.sources.avroSource.channels = memoryChannel
使用场景
-
分布式 Flume 架构:多个 Agent 通过 Avro Source/Sink 级联,实现数据的分布式采集和处理(如前端 Agent 采集数据,后端 Agent 聚合存储)。
-
与外部系统集成:自定义程序(如 Java、Python 程序)通过 Avro 协议向 Flume 发送数据。
二、Avro Sink
核心功能
-
作为 客户端,将当前 Flume 代理的 Channel 中事件(Event)序列化为 Avro 格式,发送到指定的 Avro 服务端(通常是另一个 Flume 代理的 Avro Source)。
-
支持数据压缩,减少网络传输开销。
-
保证数据传输的可靠性(结合 Channel 的持久化能力)。
常用配置属性
| 属性名称 | 默认值 | 描述 |
|---|---|---|
| type | - | 必须设置为 avro(固定值,标识此 Sink 类型)。 |
| hostname | - | 目标 Avro Source 的主机地址(如 192.168.1.200)。 |
| port | - | 目标 Avro Source 监听的端口号(如 41414)。 |
| channels | - | 绑定的 Channel 名称,用于从 Channel 拉取事件。 |
| batch-size | 100 | 每次发送的事件数量(批量发送,提高吞吐量)。 |
| compression-type | none | 压缩类型,可选 none、deflate,需与 Avro Source 配置一致。 |
配置示例
agent.sinks = avroSink
agent.channels = memoryChannel
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname = 192.168.1.200 # 目标 Avro Source 主机
agent.sinks.avroSink.port = 41414 # 目标 Avro Source 端口
agent.sinks.avroSink.batch-size = 200 # 每次发送 200 个事件
agent.sinks.avroSink.channel = memoryChannel
使用场景
-
分布式 Flume 管道:将数据从 “采集层” Agent 转发到 “处理层” 或 “存储层” Agent,实现分层架构。
-
跨网络数据传输:在不同网络环境的节点间传输数据(如从边缘节点到中心节点)。
2、Flume事务流程
Put 事务流程
doPut:将批数据先写入临时缓冲区putlist
doCommit:检查channel内存队列是否足够合并
doRollback:channel内存队列空间不足,回滚数据
Take 事务
doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列
3、Flume agent 内部原理
单数据源多出口
4、flume拦截器
利用idea需要先配置好pom.xml的配置文件
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>这里输入你的版本</version>
</dependency>
</dependencies>
拦截器的代码
/**
* 1. 继承flume的拦截器接口
* 2. 重写4个抽象方法
* 3. 编写静态内部类 builder
*
*/
public class MyInterceptor implements Interceptor {
// 初始化
@Override
public void initalize() {
}
// 处理单个event
@Override
public Event intercept(Event event) {
// 需求:在event的头信息中添加标记
// 提供给channel selector 选择发送到不同的channel中
Map<String, String> header4s = event.getHeaders();
String log = new String(event.getBody());
// 判断log的开头第一个字符 如果是字母发送到channel 1
// 如果是数字发送到channel 2
char c = log.CharAt(0);
if (c >= '0' && c <= '9'){
// c为数字
headers.put("type", "number");
}else if (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z'){
headers.put("type", "letter");
}
return event;
}
// 处理多个event 系统使用这个方法
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return null;
}
// 关闭
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MyInterceptor;
}
// 配置方法
@Override
public void configure(Context context){
}
}
}
注: 最后打包将jar包上传到flume/lib目录下
更多推荐

所有评论(0)