概念与官方文档

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。

Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。
一个agent内部有三个组件:

  1. Source:采集源,用于跟数据源对接,以获取数据;
  2. Channel:agent内部的数据传输通道,用于从source将数据传递到sink;
  3. Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据;

在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。
event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

event可以是日志记录、 avro 对象等

flume传输数据时会使用事务,source推送数据到channel以及sink从channel拉取数据时都是以事务方式进行的。就比如sink输出数据完毕后应该将channel中缓存的数据删除,sink输出数据和channel删除数据是一个事务里面的,要么都成功要么都失败

在这里插入图片描述
常见的Source,Channel,Sink有

Source类型 描述
Avro Source Avro Source监听Avro端口,接收从外部Avro客户端发送来的数据流
Exec Source 这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据
Spooling Directory Source 监控指定目录内的数据变更
Netcat Source 监控某个端口,将流经端口的每一个文本行数据作为event输入
Http Source 基于Http Post或者Get方式的数据源,支持JSON,BLOB表示形式
Channel类型 描述
Memory Channe Event数据存储在内存中
File Channel Event数据存储在磁盘文件中

至于Sink的类型,写入到hdfs,hive,hbase都是可以的

下面来说一下怎么使用flume

要想实现flume,需要编写conf文件,conf文件里面应该包含source,channel,sink的相关配置信息
这些信息可以在flume官网看到
flume官网文档,里面有source,channel,sink的相关配置信息

官网的文档内容是纯英文的,如果感觉阅读吃力,可以去已经翻译成中文的这个网站
Flume 1.9用户手册中文版

下面来举几个例子

监听文件目录数据变更,输出到控制台
# 首先先给agent起一个名字 叫a1
# 分别给source channel sink取名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 分别对source、channel、sink进行配置
# 配置source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/data/test
a1.sources.r1.fileSuffix = .ok
a1.sources.r1.fileHeader = true


# 配置sink
# 使用logger作为sink组件,可以将收集到数据直接打印到控制台
a1.sinks.k1.type = logger

# 配置channel
# 将channel的类型设置为memory,表示将event缓存在内存中
a1.channels.c1.type = memory

# 组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

首先给source,channel,sink起一个别名,然后分别对其进行相关参数的配置
具体应该配置哪些参数,这些参数的含义是什么,可以从官方文档上获取
在这里插入图片描述
首先确定自己要使用的source类型,这里打算监听某个目录下的文件数据变更信息,因此选择Spooling Directory Source。在官方文档找到这一source,再看Property Name这一栏,加粗的是必选部分,除此之外都是可写可不写。

这里其实推荐使用中文文档,感谢翻译成中文的大佬
在这里插入图片描述
这样看起来就容易了很多

编写完相应的conf文件后,保存退出,开始执行flume

flume-ng agent -n a1 -f spoolTolog.conf 

-n 的含义是agent的名字,与上面conf文件里面的agent名字保持一致
-f 的含义是conf文件位置,这里的spoolTolog.conf 就是我上面编写的conf文件名称

这里是在conf文件夹下面执行,如果在其他的文件夹下面执行吗,就需要
flume-ng agent -n a1 -f /usr/local/soft/flume-1.9.0/data/spoolTolog.conf

在这里插入图片描述
conf文件里面指定了监听目录
a1.sources.r1.spoolDir = /usr/local/data/test
到这个目录下面创建一个 a.txt 文件,在里面随便写一些数据,然后再来看flume这边
在这里插入图片描述
在控制台打印了相关的信息

回到a.txt 文件所在的目录下面
在这里插入图片描述
发现a.txt 的后缀名被加上了 .ok 这是因为配置source的时候,a1.sources.r1.fileSuffix = .ok
被加上后缀的文件说明已经被flume读取,之后flume启动后,不会对已经打上指定后缀的文件进行扫描监听。这里可以通过 mv a.txt.ok a.txt 的方式来删去后缀,让flume继续监听这个文件

另外在上面图中,打印监听文件目录的信息到控制台时,

Event: { headers:{file=/usr/local/data/test/a.txt} body: 31 32 33         123 }

这是sink输出在控制台的信息,可以发现是以 event 的形式传输的数据。

event里面分为两个部分,分别是 header 和 body
header信息,event的头信息,这里显示的内容是file路径,是因为conf文件里面加上了a1.sources.r1.fileHeader = true参数
body信息,body信息又分为两个部分,后半部分的123是 a.txt 文件里面的内容,而前面的31,32,33则分别是 1,2,3的十六进制ASCII码值

拦截器

flume监听指定目录下的文件数据变更信息,对于这些信息可以全盘保留,然后以指定的sink方式输出。但是也可以通过拦截器的方式,根据需要过滤或者保留一些信息,然后只会把拦截器拦截过后的这一部分信息传输到sink

# 首先先给agent起一个名字 叫a1
# 分别给source channel sink取名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 分别对source、channel、sink进行配置
# 配置source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/data/test
a1.sources.r1.fileSuffix = .ok
a1.sources.r1.fileHeader = true

# 给r1这个souces配置一个拦截器并取名为 i1
a1.sources.r1.interceptors = i1
# 将拦截器i1的类型设置为regex_filter 会根据正则表达式过滤数据
a1.sources.r1.interceptors.i1.type = regex_filter
# 配置正则表达式
a1.sources.r1.interceptors.i1.regex = \\d{3,6}
# excludeEvents = true 表示将匹配到的过滤,未匹配到的放行
# excludeEvents = false 表示只传输匹配成功的
a1.sources.r1.interceptors.i1.excludeEvents = true

# 配置sink
# 使用logger作为sink组件,可以将收集到数据直接打印到控制台
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory

# 组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

同样开始flume监听,然后到监听目录下面创建b.txt文件,文件内容为

12
123
123456
12345678
ab
abcde
123abc

flume输出在控制台的信息是
在这里插入图片描述
这里只输出了 12,ab,abcde,以及最后的一个空行
这是因为拦截器这里设置a1.sources.r1.interceptors.i1.excludeEvents = true,true表明匹配正则表达式成功则被拦截,不会放行

这里的12345678,123abc同样被拦截了,但正则表达式的要求是匹配3到6个数字,这些也匹配成功了,是因为一行数据内容只要包含能与正则表达式匹配成功的部分,即会满足正则表达式拦截器的条件。

source监听文件目录,sink输出到hdfs
#给agent,source,channel,sink取名
a.sources = r1
a.sinks = k1 
a.channels = c1

#配置source
a.sources.r1.type = spooldir 
a.sources.r1.spoolDir = /usr/local/data/test
a.sources.r1.fileHeader = true 

#配置拦截器
a.sources.r1.interceptors = i1 
a.sources.r1.interceptors.i1.type = timestamp

#配置sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir1
# 设置sink写入文件的要求
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt

#指定channel
a.channels.c1.type = memory 
#capacity,默认值为100,是内存中存储 Event 的最大数
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100

# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

注意这里的sink修改为写入到hdfs后,需要加上一个配置信息
a.sinks.k1.hdfs.rollSize = 102400
a.sinks.k1.hdfs.rollCount = 1000
这两个参数都是用来限制hdfs中写入的文件大小和数目的

每一次roll滚动都会生成一个新的文件,假设文件一共有一千行,设置rollCount = 10,每十行滚动一次,那么每十行就会在指定目录/flume/data/dir1下生成一个新的hdfs文件,总共100个文件,导致小文件太多。

除了rollIsize和rollcount之外还可以通过
hdfs.rollInterval,当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,来设置划分文件的时间

下面这两个参数用来设置最终hdfs文件的格式,如果不指定,默认以Writable形式写入,以SequenceFile格式存储
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.writeFormat = text

在/usr/local/data/test目录下创建文件,并且随便输入些内容
然后在hdfs里面创建目录/flume/data/dir1

运行flume,然后查看hdfs里面的文件
在这里插入图片描述
文件被写入成功,发现文件的后缀名还有一个 .tmp 这是因为这个文件没有被写满,还在等待新的输入,如果此时监听目录里面出现了新的数据,就会被写入到这个文件中。这个等待时间不是无限的,可以自己手动设置
等待一会后刷新页面,即可发现.tmp后缀已经消失
在这里插入图片描述

将hbase日志信息写入hdfs
# 起名字
a.sources = r1
a.sinks = k1 
a.channels = c1

# 配置source,这次选用 exec source
a.sources.r1.type = exec 
# 指定exec source通过执行什么命令来获取信息
a.sources.r1.command = tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log

#配置sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir2
a.sinks.k1.hdfs.filePrefix = hbaselog
a.sinks.k1.hdfs.rollSize = 102400
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.writeFormat = text
a.sinks.k1.hdfs.fileSuffix = .txt

#配置channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100

# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

在另一台终端启动hbase,然后来到flume终端查看信息
在这里插入图片描述
event数据最终被sink写入到hdfs
在这里插入图片描述
在这里插入图片描述
这里的信息来源是执行命令,tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log的结果,然后将tail查看到的日志文件信息写入到hdfs中

source监听netcat端口,sink写入控制台
# 起名字
a.sources = r1
a.sinks = k1 
a.channels = c1

# 配置source
a.sources.r1.type = netcat 
# 0.0.0.0表示接收任意ip的用户提交的信息
a.sources.r1.bind = 0.0.0.0 
# 设置监听端口
a.sources.r1.port = 8888 

#配置sink
a.sinks.k1.type = logger

# 配置channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100

# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

在这里插入图片描述
如果a.sources.r1.bind = 0.0.0.0 这里设置的bind是127.0.0.1表示,只接受这个ip发来的信息,其余ip的信息不会被被flume接收。而设置为0.0.0.0则是任意ip的信息都会被flume监听接收

先启动flume,然后通过telnet在8888端口发送信息
在这里插入图片描述
回到flume的终端,发现数据已经被接收并打印在控制台
在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐