Flume故障转移
本文介绍了使用Flume构建故障转移数据管道的实现方案。通过配置三个Flume实例,其中Flume1作为主节点监控44444端口,其Sink组采用Failover策略,将数据优先发送至高优先级Flume2(端口4141),当Flume2故障时自动切换至备用Flume3(端口4142)。配置要点包括:定义Sink组及成员、设置优先级数值(数值越大优先级越高)、确保通道绑定一致。测试验证了故障自动转移
案例需求
使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移功能

关于SinkGroups

为消除数据处理管道中的单点故障,Flume支持通过负载均衡或故障转移策略将事件分发至不同sink。sink组可创建逻辑上的sink集合,其具体行为由sink处理器控制,该处理器负责确定事件的路由策略。
配置示例: a1.sinkgroups=g1 表示名为"a1"的agent定义了一个名为g1的sink组。
a1.sinkgroups.g1.sinks=k1 k2 说明g1组包含k1和k2两个sink。
当processor.type设为load_balance时,默认采用round_robin轮询策略,同时支持自定义负载均衡机制。processor.backoff参数用于设置指数级阻塞时间:当sink抛出异常需重试时,若设为true,每次失败后的等待时间将呈指数增长。
配置Flume实现故障转移功能
Flume1配置文件 (flume-file-flume.conf)
该配置定义了一个NetCat源,通过内存通道将数据分发到两个Avro Sink(k1和k2),并使用Failover策略管理Sink组:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
# NetCat源配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Sink组故障转移配置
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5 # 较低优先级
a1.sinkgroups.g1.processor.priority.k2 = 10 # 较高优先级
a1.sinkgroups.g1.sinks = k1 k2
# Avro Sink配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = master
a1.sinks.k2.port = 4142
# 内存通道配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
Flume2配置文件 (flume-flume-console1.conf)
该配置作为高优先级Sink(端口4141)的接收端,将数据输出到日志:
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Avro源配置(接收Flume1的k1数据)
a2.sources.r1.type = avro
a2.sources.r1.bind = master
a2.sources.r1.port = 4141
# 日志Sink配置
a2.sinks.k1.type = logger
# 内存通道配置
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 绑定组件
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
Flume3配置文件 (flume-flume-console2.conf)
该配置作为备用Sink(端口4142)的接收端:
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Avro源配置(接收Flume1的k2数据)
a3.sources.r1.type = avro
a3.sources.r1.bind = master
a3.sources.r1.port = 4142
# 日志Sink配置
a3.sinks.k1.type = logger
# 内存通道配置
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 绑定组件
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
执行测试步骤
启动服务端(Flume2和Flume3)
在master节点分别启动两个接收服务:
# 启动Flume2(高优先级Sink)
bin/flume-ng agent -n a2 -c conf -f conf/flume-flume-console1.conf
# 启动Flume3(备用Sink)
bin/flume-ng agent -n a3 -c conf -f conf/flume-flume-console2.conf
启动客户端(Flume1)
在数据发送节点启动NetCat客户端:
bin/flume-ng agent -n a1 -c conf -f conf/flume-file-flume.conf
验证故障转移
- 通过Telnet发送测试数据到Flume1的44444端口
- 观察Flume2的日志输出(优先级高,默认接收数据)
- 手动停止Flume2服务,Flume1会自动切换至Flume3
- 重启Flume2后,流量会自动切回高优先级Sink
关键机制说明
Failover策略特性
- Sink优先级由
processor.priority参数决定,数值越大优先级越高 - 高优先级Sink恢复后会自动重新接管流量
- 所有Sink均失败时,事务会回滚并重试
配置要点
- Sink组必须明确指定所有成员Sink(如
a1.sinkgroups.g1.sinks=k1 k2) - 同一Sink组的Sink必须绑定相同Channel
- Avro Sink/Source的端口和主机名需严格对应
更多推荐



所有评论(0)