课程回顾:
讲述入门案例;
1,从端口接收数据,通过channel ,sink 最终这个数据到日志, – 控制台输出 ,到logj.properties,
nc -l localhost port , source TCP
2监控hive的日志文件,将数据输出到hdfs上存储
2.1监控单个文件的追加,读取 sink输出到日志
tail - f 文件路径
exec source .其余的不变. echo > 覆盖 echo’>> 追加, 使用的追加爱形式
2.2监控单个文件数据的增加,读sink输出到hdfs
spooling source
2,企业实例
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,
Flume1监控端口数据,将监控的内容通过轮询或者随机的方式给到Flume2和Flume3
Flume2将内容打印到控制台
Flume3将内容打印到控制台
2.1采用FailoverSinkProcessor,实现故障转移的功能。配置文件编写
#Flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
# round_robin 轮询
a1.sinkgroups.g1.processor.selector = round_robin
# random 随机
#a1.sinkgroups.g1.processor.selector = random
#Bind
a1.sources.r1.channels = c1 c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
#Flume2.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
# Describe the sink
# sink
a2.sinks.k1.type = logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#Flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
# Describe the sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
2.2启动命令
先执行下游服务端
flume-ng agent -c $FLUME_HOME/conf -f$FLUME_HOME/jobs/group2/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group2/flume2.conf -n a2 -Dflume.root.logger=INFO,console
再执行力上游的客户端
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group2/flume1.conf -n a1 -Dflume.root.logger=INFO,console
2.3测试
nc localhost 6666
3企业案例3:故障转移案例
Flume1监控端口数据,将监控到的内容发送给Active的Sink
Flume2将内容打印到控制台
Flume3将内容打印到控制台
3.1编写代码
#Flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#优先级
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
#Flume2.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
# Describe the sink
# sink
a2.sinks.k1.type = logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#Flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
# Describe the sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
3.2执行命令:
先执行下游服务端,在执行客户端
flume-ng agent -c $FLUME_HOME/conf -f$FLUME_HOME/jobs/group3/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group3/flume2.conf -n a2 -Dflume.root.logger=INFO,console
再执行力上游的客户端
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group3/flume1.conf -n a1 -Dflume.root.logger=INFO,console
3.3测试
nc localhost 6666
4,企业案例:聚合案例
Flume1(102) 监控文件内容
Flume2(103) 监控端口数据,Flume1和 Flume2将监控到数据发往Flume3
Flume3(104) 将内容打印到控制台
4.1编写代码
进入/opt/module/flume-1.9.0/jobs创建group4文件夹
mkdir group4
cd group4
mkdir position
vim flume1.conf
vim flume2.conf
vim flume3.conf
flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/group4/.*\.txt
#产生监控数据存放的文件地址
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/group4/position/position.json
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node4
a1.sinks.k1.port = 8888
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node4
a2.sinks.k1.port = 8888
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = node4
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
4.2 分发fulme文件
4.2.1发送程序给 node3 node4
在
cd /opt/module/进入目录下,分发文件
[itwise@node2 module]$ my_rsync.sh flume-1.9.0/
进入并目录下,发送配置环境文件
[itwise@node2 bin]$ sudo ./my_rsync.sh /etc/profile.d/my_env.sh
确定yes 输入密码 :123456
进入 node3/4:进行测试,并且重启一下环境变量配置
/opt/module/flume-1.9.0/bin/flume-ng
itwise@node4 ~]$ source /etc/profile
4.3 启动命令
启动命令:
执行命令:
先执行:下游 服务端
在node4上启动
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group4/flume3.conf -n a3 -Dflume.root.logger=INFO,console
在node3启动
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group4/flume2.conf -n a2 -Dflume.root.logger=INFO,console
在执行,上游 客户端
在node2起动
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group4/flume1.conf -n a1 -Dflume.root.logger=INFO,console
4,4测试
4.41在node2上 进入监控的文件夹
cd /opt/module/flume-1.9.0/jobs/group4
创建文本
[itwise@node2 group4]$ touch test4.txt
写入数据
[itwise@node2 group4]$ echo '我爱fulme' >> test4.txt
查看node4上监控的变化
2024-03-10 17:11:03,812 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: E6 88 91 E7 88 B1 66 75 6C 6D 65 ......fulme }
4.42在node3上看监控的端口
进入端口:输入内容
[itwise@node3 ~]$ nc localhost 6666
wo^H^H
OK
123456
OK
我爱flume
OK
查看node4上的变化:
2024-03-10 17:16:06,261 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 32 33 34 35 36 123456 }
2024-03-10 17:16:28,278 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: E6 88 91 E7 88 B1 66 6C 75 6D 65
.....flume }
5.1自定义组件的编写
5.1自定义的拦截器:
业务:
要求:简单: flume1监听客户端端口号, 输入数据:
java:就到 发送给 Flume2 上, hadoop: 就发送到 Flume3上
5.1.1需要自定义一个拦截器:
1,首先创建一个maven工程:编写自己拦截器:
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
2,编写自定义的拦截器:com.itwise.flume.interceptor.MyInterceptor
package com.itwise.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//实现逻辑 判断 //javaspring hadoop
String body = new String(event.getBody(), StandardCharsets.UTF_8);
if(body.startsWith("java")){
event.getHeaders().put("type", "java");
}else if(body.startsWith("hadoop")){
event.getHeaders().put("type", "hadoop");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
3,将接下来将:如上编写好的代码打包 放入 flume 的lib中:
5.2配置多路复用的代码:
在node2中创建目录:
flume1.conf
#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555
#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.java = c1
a1.sources.r1.selector.mapping.hadoop = c2
a1.sources.r1.selector.default = c3
# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type=com.itwise.flume.interceptor.MyInterceptor$Builder
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888
#Bind
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 7777
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume4.conf
#Named
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = localhost
a4.sources.r1.port = 8888
#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#Sink
a4.sinks.k1.type = logger
#Bind
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
5.3 执行程序
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs//DIY1/flume4.conf -n a4 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY1/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs//DIY1/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY1/flume1.conf -n a1 -Dflume.root.logger=INFO,console
测试
nc localhost 5555
-
6自定义:source
需求:flume1 使用自定义的source,收集随机生成一个数据,输出到日志。打印到控制台:
自定义的MySource,代码如下
package com.itwise.flume.source;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 自定义Source 需要继承AbstractSource,实现 Configurable ,PollableSource接口.
*/
public class MySource extends AbstractSource implements Configurable,
PollableSource {
private String prefix;
/**
* Source的核心处理方法,
*
* 该方法在flume的处理流程中是循环调用的。
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
//休眠一秒中
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Status status = null ;
try {
// Receive new data
// 采集数据,封装成event对象
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
// 将event对象交给ChannelProcessor进行处理
getChannelProcessor().processEvent(e);
// 正常处理,返回Status.READY
status = Status.READY;
} catch (Throwable t) {
// Log exception, handle individual exceptions as needed
// 处理失败,返回 Status.BACKOFF
status = Status.BACKOFF;
}
return status;
}
/**
* 随机生成一个字符串作为采集的数据
* @return
*/
private Event getSomeData() {
String data = UUID.randomUUID().toString();
String resultData = prefix + data ;
SimpleEvent event = new SimpleEvent();
event.setBody(resultData.getBytes(StandardCharsets.UTF_8));
event.getHeaders().put("author","zdy");
return event ;
}
/**
* 规避时间的增长步长
* @return
*/
@Override
public long getBackOffSleepIncrement() {
return 1;
}
/**
* 最大的规避时间
* @return
*/
@Override
public long getMaxBackOffSleepInterval() {
return 10;
}
/**
* 用于读取flume的配置信息 xxx.conf
* @param context
*/
@Override
public void configure(Context context) {
prefix = context.getString("prefix","log-");
}
}
flume4.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = com.itwise.flume.source.MySource
a1.sources.r1.prefix = log--
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = logger
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动执行
DIY2_MySource]$
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY2_MySource/flume-mysource-logger.conf -n a1 -Dflume.root.logger=INFO,consolee
4.4 自定义:sink
需求:使用自定义的source采集数据,使用自定义的sink打印输出到控制台:
package com.itwise.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* 自定义Sink ,需要继承Flume提供的AbstractSink,实现Configurable接口
*/
public class MySink extends AbstractSink implements Configurable {
Logger logger = LoggerFactory.getLogger(MySink.class);
/**
* 核心处理方法
*
* 该方法在flume的处理流程中是循环调用的.
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
//获取Channel
Channel ch = getChannel();
//获取事务对象
Transaction txn = ch.getTransaction();
//开启事务
txn.begin();
try {
// 从channel中获取event
Event event = ch.take();
// 处理event
storeSomeData(event);
// 处理成功,提交事务
txn.commit();
status = Status.READY;
} catch (Throwable t) {
// 处理失败,回滚事务
txn.rollback();
status = Status.BACKOFF;
} finally{
//不论事务成功与否。都要关闭
txn.close();
}
return status;
}
private void storeSomeData(Event event) {
String printData = event.getHeaders() + " ::: "+ new
String(event.getBody(), StandardCharsets.UTF_8);
logger.info(printData);
}
@Override
public void configure(Context context) {
}
}
配置文件:
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = com.itwise.flume.source.MySource
a1.sources.r1.prefix = log--
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = com.itwise.flume.sink.MySink
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
执行命令
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY3_MySink/mysource-flume-mysink.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent \
-c $FLUME_HOME/conf \
-n a1 \
-f $FLUME_HOME/jobs/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=node2:8649
6,简述flume事务控制原理?并画图说明
Put事务流程
•doCommit:检查channel内存队列是否足够合并。
•doRollback:channel内存队列空间不足,回滚数据
Take事务
•doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
•doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
•doRollback:数据发送过程中如果出现异常,rollback将临时缓冲
区takeList中的数据归还给channel内存队列。
7,Flume Agent 内部原理
ChannelSelector
ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。
ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。
SinkProcessor
SinkProcessor共有三种类型分别是DefaultSinkProcessor LoadBalancingSinkProcessor 和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。
8,Flume的拓扑结构
8.1点对点
这种是最简单的方式,两个flume,一个的sink是另一个的source,这种结构有点像链式结构,后面还可以接着加节点flume
数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
8.2多副本复制和多路复用
这种结构特点: 一个source,多个channel,而多个channel是同一内容,只不过后面的sink不同,
8.3负载均衡和故障转移
这种结构在大数据领域经常使用,使用多个sink进行负载均衡Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。文章来源:https://www.toymoban.com/news/detail-853823.html
8.4聚合
这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析
Processor 和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。文章来源地址https://www.toymoban.com/news/detail-853823.html
## 8,Flume的拓扑结构
### 8.1点对点
[外链图片转存中...(img-EIs3Jb4G-1712838835440)]
这种是最简单的方式,两个flume,一个的sink是另一个的source,这种结构有点像链式结构,后面还可以接着加节点flume
数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
### 8.2多副本**复制和多路复用**
[外链图片转存中...(img-tu8AE7aK-1712838835440)]
这种结构特点: 一个source,多个channel,而多个channel是同一内容,只不过后面的sink不同,
### 8.3**负载均衡和故障转移**
[外链图片转存中...(img-bZj6d51i-1712838835440)]
这种结构在大数据领域经常使用,使用多个sink进行负载均衡Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。
### 8.4聚合
[外链图片转存中...(img-OPs5HVCj-1712838835440)]
这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析
到了这里,关于kafka2_企业级案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!