【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)

这篇具有很好参考价值的文章主要介绍了【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 组件简介

       Sink Processors类型包括这三种:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor。

  • Default Sink Processor是默认的,不用配置Sink group,就是咱们现在使用的这种最普通的形式,一个Channel后面接一个Sink的形式;
  • Load balancing Sink Processor是负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力;
  • Failover Sink Processor是故障转移处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,按照Sink的优先级,默认先让优先级高的Sink来处理数据,如果这个Sink出现了故障,则用优先级低一点的Sink处理数据,可以保证数据不丢失。

2. 项目实践

2.1 负载均衡

使用Load balancing Sink Processor,即负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力。其参数为:

  • processor.sinks:指定这个sink groups中有哪些sink,指定sink的名称,多个的话中间使用空格隔开即可;
  • processor.type:针对负载均衡的sink处理器,这里需要指定load_balance;
  • processor.selector:此参数的值内置支持两个,round_robin和random,round_robin表示轮询,按照sink的顺序,轮流处理数据,random表示随机。
  • processor.backoff:默认为false,设置为true后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长,一直到达到最大的时间。如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率;
  • processor.selector.maxTimeOut:最大的黑名单时间,默认是30秒。

2.1.1 需求

采集指定端口的数据,并实现两个sink通道的负载均衡,采用轮询方式发送数据,为了展现实验效果,使用avro sink,每到一个event就写一次数据(默认是积攒接收一百个再写一次数据)。

2.1.2 配置

【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)
配置bigData01上的Flume Agent:

[root@bigdata01 apache-flume-1.9.0-bin]# cat conf/load-balancing.conf 
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type=avro 
a1.sinks.k1.hostname=192.168.152.101 
a1.sinks.k1.port=41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type=avro 
a1.sinks.k2.hostname=192.168.152.102 
a1.sinks.k2.port=41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = load_balance 
a1.sinkgroups.g1.processor.backoff = true 
a1.sinkgroups.g1.processor.selector = round_robin 

# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 apache-flume-1.9.0-bin]# cat conf/load-balancing-101.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 apache-flume-1.9.0-bin]# cat conf/load-balancing-102.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.1.3 运行

先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:

[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-101.conf -Dflume.root.logger=INFO,console
[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-102.conf -Dflume.root.logger=INFO,console
apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing.conf -Dflume.root.logger=INFO,console

向指定端口发送数据,模拟输入:

[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hehe
OK
haha
OK

查看HDFS中的保存的运行结果:

[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -ls -R / 
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 00:47 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data101.1687366028115.log.tmp
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data102.1687366024769.log.tmp
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data101.1687366028115.log.tmp 
haha
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data102.1687366024769.log.tmp
hehe

2.2 故障转移

使用Failover Sink Processor,即故障转移处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,按照sink的优先级,默认先让优先级高的sink来处理数据,如果这个sink出现了故障,则用优先级低一点的sink处理数据,可以保证数据不丢失。其参数为:

  • processor.type:针对故障转移的sink处理器,使用failover;
  • processor.priority.:指定sink group中每一个sink组件的优先级,默认情况下channel中的数据会被优先级比较高的sink取走;
  • processor.maxpenalty:sink发生故障之后,最大等待时间。

2.2.1 需求

实现两个sink的故障转移。

2.2.2 配置

【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)
配置bigData01上的Flume Agent:

[root@bigdata01 conf]# cat failover.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = 192.168.152.101 
a1.sinks.k1.port = 41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type = avro 
a1.sinks.k2.hostname = 192.168.152.102 
a1.sinks.k2.port = 41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = failover 
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 conf]# cat failover-101.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 conf]# cat failover-102.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.2.3 运行

  1. 先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-101.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-102.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover.conf -Dflume.root.logger=INFO,console
  1. 向指定端口发送数据,模拟输入两个数据test1test2
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
  1. 查看HDFS中的保存的运行结果:

因为bigdata03的优先级高,可以看到两个数据都是由其写入。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:51 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:51 /failover/data102.1687398676525.log.tmp
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data102.1687398676525.log.tmp
test1
test2
  1. 关闭bigdata03,再输入测试数据test3
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
test3
OK
  1. 查看HDFS中的保存的运行结果:

关闭bigdata03后,数据就由优先度较低的bigdata02写入,保证数据不丢失,达到故障转移的目的,此时若再次开启bigdata03,则数据就又会由优限度更高的bigdata03传输。文章来源地址https://www.toymoban.com/news/detail-496581.html

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:54 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:54 /failover/data101.1687398846336.log.tmp
-rw-r--r--   2 root supergroup         14 2023-06-22 09:53 /failover/data102.1687398676525.log
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data101.1687398846336.log.tmp
test3

到了这里,关于【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringCloud系列:负载均衡组件-Ribbon

    作者平台: | CSDN:blog.csdn.net/qq_41153943 | 掘金:juejin.cn/user/651387… | 知乎:www.zhihu.com/people/1024… | GitHub:github.com/JiangXia-10… 本文一共4529字,预计阅读12分钟 前面几篇文章介绍了微服务相关的内容,比如什么是微服务,常见的一些服务注册中心组件,以及微服务之间是如何进

    2024年02月17日
    浏览(34)
  • webshell实践,在nginx上实现负载均衡

    我采用了三台虚拟机进行服务器设置:192.168.240.11、192.168.240.12、192.168.240.13           

    2024年02月12日
    浏览(24)
  • 搭建TiDB负载均衡环境-HAproxy+KeepAlived实践

    作者: 我是咖啡哥 原文来源: https://tidb.net/blog/8e8cca1d HAProxy 提供 TCP 协议下的负载均衡能力,TiDB 客户端通过连接 HAProxy 提供的浮动 IP 即可对数据进行操作,实现 TiDB Server 层的负载均衡。同时,HAproxy部署2个节点,使用KeepAlived来实现高可用。 TiDB版本:V7.1.0 haproxy版本:2.

    2024年02月09日
    浏览(29)
  • 探索Nginx的奥秘--从代理到负载均衡的艺术实践

    ⭐在分布式微服务架构中,服务是分布在不同主机、服务器上的。我们希望访问不同的服务,就需要一个代理服务器来为我们做请求转发,这个时候我们就引入了Nginx。⭐ 我们在请求传统的单体项目时,常常是在低并发的情况下进行的。一个公司项目刚刚上线的时候,并发量

    2024年02月04日
    浏览(21)
  • 《企业级Linux高可用负载均衡集群实践真传》目录

    第1章 关于负载均衡... 2 1.1        负载均衡定义... 2 1.2        负载均衡在生产环境中的基本要求... 3 1.2.1 在线可扩展性... 3 1.2.2 高可用性... 3 1.2.3 多服务性... 4 1.3        负载均衡基本功能... 4 1.3.1      负载均衡... 4 1.3.2      健康检查... 5 1.3.3      负载均

    2024年02月02日
    浏览(44)
  • springCloudAlibaba组件-Nacos-服务发现与负载均衡(三)

    如果项目使用微服务架构,如果A微服务需要访问B微服务,需要http请求进行调用,当然需要B微服务的地址与端口号,微服务可以向之前提到的服务中心进行获取B服务的ip地址和端口号,这就是服务发现 1.客户端主动获取 客户端: 流程: 1.先是故障转移机制判断是否去本地文

    2024年02月10日
    浏览(34)
  • Ribbon:Spring Cloud负载均衡与服务调用组件

    负载均衡? Ribbon实现服务调用? Ribbon实现负载均衡? 切换负载均衡策略? 定制负载均衡策略? 负载均衡 负载均衡(Load Balance),将用户的请求平分到多个服务器上运行,以扩展服务器带宽、增强数据处理能力、增加吞吐量、提高网络的可用性和灵活性的目的。 服务端负载

    2024年02月03日
    浏览(36)
  • 第五次作业 运维高级 构建 LVS-DR 集群和配置nginx负载均衡

    1、基于 CentOS 7 构建 LVS-DR 群集。 LVS-DR模式工作原理 首先,来自客户端计算机CIP的请求被发送到Director的VIP。然后Director使用相同的VIP目的IP地址将请求发送到集群节点或真实服务器。然后,集群某个节点将回复该数据包,并将该数据包直接发送到客户端计算机(不经过direct

    2024年02月14日
    浏览(37)
  • 干货 | 携程客服机器人ASR引擎的负载均衡实践

    作者简介 玉修,携程技术专家,专注于电话音视频通信、智能客服机器人等领域。 一、前言 携程拥有庞大的呼叫中心,涉及上万客服人员,覆盖机票、酒店、火车票、度假等产线的售前售后业务,每天的电话业务量超百万通。近年来,通信技术、人工智能技术和智能终端等

    2024年02月03日
    浏览(30)
  • Flume学习---3、自定义Interceptor、自定义Source、自定义Sink

    1、案例需求 使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。 2、需求分析 在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的Multi

    2024年02月09日
    浏览(15)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包