案例需求

    使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移功能

关于SinkGroups

为消除数据处理管道中的单点故障,Flume支持通过负载均衡或故障转移策略将事件分发至不同sink。sink组可创建逻辑上的sink集合,其具体行为由sink处理器控制,该处理器负责确定事件的路由策略。

配置示例: a1.sinkgroups=g1 表示名为"a1"的agent定义了一个名为g1的sink组。

a1.sinkgroups.g1.sinks=k1 k2 说明g1组包含k1和k2两个sink。

当processor.type设为load_balance时,默认采用round_robin轮询策略,同时支持自定义负载均衡机制。processor.backoff参数用于设置指数级阻塞时间:当sink抛出异常需重试时,若设为true,每次失败后的等待时间将呈指数增长。

配置Flume实现故障转移功能

Flume1配置文件 (flume-file-flume.conf)
该配置定义了一个NetCat源,通过内存通道将数据分发到两个Avro Sink(k1和k2),并使用Failover策略管理Sink组:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1

# NetCat源配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Sink组故障转移配置
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5  # 较低优先级
a1.sinkgroups.g1.processor.priority.k2 = 10 # 较高优先级
a1.sinkgroups.g1.sinks = k1 k2

# Avro Sink配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = master
a1.sinks.k2.port = 4142

# 内存通道配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

 

Flume2配置文件 (flume-flume-console1.conf)
该配置作为高优先级Sink(端口4141)的接收端,将数据输出到日志:

a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Avro源配置(接收Flume1的k1数据)
a2.sources.r1.type = avro
a2.sources.r1.bind = master
a2.sources.r1.port = 4141

# 日志Sink配置
a2.sinks.k1.type = logger

# 内存通道配置
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 绑定组件
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

 

Flume3配置文件 (flume-flume-console2.conf)
该配置作为备用Sink(端口4142)的接收端:

a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Avro源配置(接收Flume1的k2数据)
a3.sources.r1.type = avro
a3.sources.r1.bind = master
a3.sources.r1.port = 4142

# 日志Sink配置
a3.sinks.k1.type = logger

# 内存通道配置
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 绑定组件
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

 

执行测试步骤

启动服务端(Flume2和Flume3)
在master节点分别启动两个接收服务:

# 启动Flume2(高优先级Sink)
bin/flume-ng agent -n a2 -c conf -f conf/flume-flume-console1.conf

# 启动Flume3(备用Sink)
bin/flume-ng agent -n a3 -c conf -f conf/flume-flume-console2.conf

 

启动客户端(Flume1)
在数据发送节点启动NetCat客户端:

bin/flume-ng agent -n a1 -c conf -f conf/flume-file-flume.conf

 

验证故障转移

  1. 通过Telnet发送测试数据到Flume1的44444端口
  2. 观察Flume2的日志输出(优先级高,默认接收数据)
  3. 手动停止Flume2服务,Flume1会自动切换至Flume3
  4. 重启Flume2后,流量会自动切回高优先级Sink

关键机制说明

Failover策略特性

  • Sink优先级由processor.priority参数决定,数值越大优先级越高
  • 高优先级Sink恢复后会自动重新接管流量
  • 所有Sink均失败时,事务会回滚并重试

配置要点

  • Sink组必须明确指定所有成员Sink(如a1.sinkgroups.g1.sinks=k1 k2
  • 同一Sink组的Sink必须绑定相同Channel
  • Avro Sink/Source的端口和主机名需严格对应

 

 

 

 

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐