Flume(二)【Flume 进阶使用】

这篇具有很好参考价值的文章主要介绍了Flume(二)【Flume 进阶使用】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

        学数仓的时候发现 flume 落了一点,赶紧补齐。

1、Flume 事务

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

Source 在往 Channel 发送数据之前会开启一个 Put 事务:

  1. doPut:将批量数据写入临时缓冲区 putList(当 source 中的数据达到 batchsize 或者 超过特定的时间就会发送数据)
  2. doCommit:检查 channel 内存队列是否足够合并
  3. doRollback:如果 channel 内存队列空间不足没救回滚数据

同样 Sink 在从 Channel 主动拉取数据的时候也会开启一个 Take 事务:

  1. doTake:将数据读取到临时缓冲区 takeList,并将数据发送到 HDFS
  2. doCommit:如果数据全部发送成功,就会清除临时缓冲区 taskList
  3. dooRollback:数据发送过程如果出现异常,rollback 将临时缓冲区的数据归还给 channel 内存队列

2、Flume Agent 内部原理

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

注意:只有 source 和 channel 之间可以存在拦截器,channel 和 sink 之间不可以!  

  1. source 接收数据,把数据封装成 Event 
  2. 传给 channel processor 也就是 channel 处理器
  3. 把事件传给拦截器(interceptor),在拦截器这里可以对数据进行一些处理(我们在上一节中说过,当我们的路径信息中包含时间的时候,需要从 Event Header 中读取时间信息,如果没有就需要我们指定从本地读取 timestamp,所以这里我们就可以在拦截器这里给我们的 event 添加头部信息);而且,拦截器可以设置多个
  4. 经过拦截器处理的事件又返回给了 channel processor ,然后 channel processor 把事件传给 channel 选择器(channel selector 有两种类型:Replicating 和 Multiplexing ,Replicating 会把source 发送来的 events 发往所有 channel,而 multiplexing 可以配置指定发往哪些 channel)
  5. 经过 channel 选择器处理后的事件仍然返回给 channel processor
  6. channel processor 会根据 channel 选择器的结果,发送给相应的 channel(也就是这个时候才会真正的开启 put 事务,之前都是对 event 进行简单的处理)
  7. SinkProcessor 负责协调拉取 channel 中的数据,它有三种类型:DefaultSinkProcessor、LoadBalancingSinkpProcessor(负载均衡,也就是多个 Sink 轮询的方式去读取 channel 中的数据)、FailoverSinkProcessor(故障转移,每个 sink 有自己的优先级,优先级高的去读取 channel 中的事件,只有当它挂掉的时候,才会轮到下一个优先级的 sink 去读)。其中 DefaultSinkProcessor 一个 channel 只能绑定一个 Sink,所以它也就没有 sink 组的概念。

注意:一个 sink 只可以绑定一个 channel ,但是一个 channel 可以绑定多个 sink!

3、Flume 拓扑结构

3.1、简单串联

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

官网这段话翻译过来就是:为了将数据跨越多个代理或跃点进行传输,前一个代理的接收器(sink)和当前跃点的源(source)需要是avro类型,接收器指向源的主机名(或IP地址)和端口。

这种模式的缺点很好理解,就像串联电路,一个节点坏了会影响整个系统。

3.2、复制和多路复用

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

从官网翻译过来就是:上述示例显示了一个名为“foo”的代理源将流程分散到三个不同的通道。这种分散可以是复制或多路复用。在复制流程的情况下,每个事件都会发送到这三个通道。对于多路复用的情况,当事件的属性与预配置的值匹配时,事件将被发送到可用通道的子集。例如,如果事件属性名为“txnType”设置为“customer”,则应发送到channel1和channel3,如果为“vendor”,则应发送到channel2,否则发送到channel3。映射可以在代理的配置文件中设置。

这种模式相比上面的串联模式的优点无非就是可以发送过多个目的地。

3.3、负载均衡和故障转移

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

Flume 支持多个 Sink 逻辑上分到一个 Sink 组,sink 组配合不同的 SinkProcessor ,可以实现负载均衡和错误恢复的功能。

3.4、聚合

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

这种模式在实际开发中是经常会用到的,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的 flume,再由此flume上传到hdfshivehbase等,进行日志分析。

4、Flume 企业开发实例

4.1、复制和多路复用

注意:多路复用必须配合拦截器使用,因为需要在 Event Header 中添加一些信息

1)案例需求

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

2)需求分析

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

  • 监控文件变动我们可以考虑使用 taildir 或者 exec 这两种 source
  • flume-1 sink 需要使用 avro sink 才能传输到下一个 flume-2 和 flume-3 的 source
  • flume-2 需要上传数据到 HDFS 所以 sink 为 hdfs
  • flume-3 需要把数据输出到本地,所以 sink 为 file_roll sink(要保存到本地目录,这个目录就必须提前创建好,它不像 HDFS Sink 会自动帮我们创建)

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

我们需要实现三个 flume 作业:

  1. flume-1 把监听到的新日志读取到 flume-2 和 flume-3 的 source
  2. flume-2 把日志上传到 hdfs
  3. flume-3 把日志写到本地

3)需求实现

flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 将数据流复制给所有 channel 默认就是 replicating 所以也可以不用配置
a1.sources.r1.selector.type = replicating 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
# 一个 sink 只可以指定一个 channel,但是一个 channel 可以指定多个 sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

4)测试

bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-dir.conf
bin/flume-ng agent -n a1 -c conf/ -f job/group1/flume-file-flumc.conf
bin/flume-ng agent -n a2 -c conf/ -f job/group1/flume-hdfs.conf

查看结果:

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

注意:写入本地文件时,当一段时间没有新的日志时,它仍然会创建一个新的文件,而不像 hdfs sink 即使达到了设置的间隔时间但是没有新日志产生,那么它也不会创建一个新的文件。

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

这个需要注意的就是 hdfs 的端口不要写错,比如我的就不是 9870 而是 8020.

4.2、负载均衡和故障转移

1)案例需求

使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用 FailoverSinkProcessor,实现故障转移的功能。

2)需求分析

  • 开启一个端口 88888 来发送数据
  • 使用 flume-1 监听该端口,并发送到 flume-2 和 flume-3 (需要 flume-1 的 sink 为 avro sink,flume-2 和 flume-3 的 source 为 avro source),flume-2 和 flume-3 发送日志到控制台(flume-2 和 flume-3 的 sink 为 logger sink)

3)需求实现

flume-nc-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-console2.conf 
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

 4)案例测试

bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-nc-flume.conf

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

关闭 flume-flume-console1.conf 作业 

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据 我们发现,一开始我们开启三个 flume 作业,当向 netcat 输入数据时,只有 flume-flume-console1.conf 作业的控制台有日志输出,这是因为它的优先级更高,当把作业 flume-flume-console1.conf 关闭时,再次向端口 44444 发送数据,发现 flume-flume-console2.conf 作业开始输出。

如果要使用负载均衡,只需要替换上面 flume-nc-flume.conf 中:

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

替换为:

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.maxTimeOut = 30000

其中,backoff 代表退避,默认为 false, 如果当前 sink 没有拉到数据,那么接下来一段时间就不用这个 sink 。maxTimeOut 代表最大的退避时间,因为退避默认是指数增长的(比如一个 sink 第一次没有拉到数据,需要等 1 s,第二次还没拉到,等 2s,第三次等 4s ...),默认最大值为 30 s。

4.3、聚合

1)案例需求

  • hadoop102 上的 Flume-1 监控文件/opt/module/group.log,
  • hadoop103 上的 Flume-2 监控某一个端口的数据流,
  • Flume-1 与 Flume-2 将数据发 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。

注意:主机只能在 hadoop104 上配,因为 avro source 在 hadoop104 上,客户端(hadoop02 和 hadoop103 的 sink)可以远程连接,但是服务端(hadoop104 的 source)只能绑定自己的端口号。

2)需求实现

flume-log-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
 flume-nc-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-log.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3)测试

向 group.log 文件中追加文本:

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

注意:hadoop103 这里不能写 nc localhost 44444 而要写 nc hadoop103 44444! 否则报错:Ncat: Connection refused.

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

5、自定义 Interceptor

前面我们的多路复用还没有实现,因为我们说多路复用必须配合拦截器来使用,因为我们必须知道每个 Channel 发往哪些 Sink,这需要拦截器往 Event Header 中写一些内容。

1)案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含”lyh”模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含”lyh”,将其分别发往不同的分析系统(Channel)。

 3)需求实现

自定义拦截器

引入 flume 依赖

<dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>
package com.lyh.gmall.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class TypeInterceptor implements Interceptor {

    // 存放事件集合
    private List<Event> addHeaderEvents;


    @Override
    public void initialize() {
        // 初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();
    }

    // 单个事件拦截
    @Override
    public Event intercept(Event event) {

        // 1. 获取事件中的 header 信息
        Map<String, String> headers = event.getHeaders();

        // 2. 获取事件中的 body 信息
        String body = new String(event.getBody());

        // 3. 根据 body 中是否包含 'lyh' 来决定发往哪个 sink
        if (body.contains("lyh"))
            headers.put("type","first");
        else
            headers.put("type","second");

        return event;
    }

    // 批量事件拦截
    @Override
    public List<Event> intercept(List<Event> events) {

        // 1. 清空集合
        addHeaderEvents.clear();

        // 2. 遍历 events
        for (Event event : events) {
            // 3. 给每个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }

        return addHeaderEvents;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包放到 flume 安装目录的 lib 目录下:
 

flume 作业配置

hadoop102:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1 # 包含 'lyh'
a1.sources.r1.selector.mapping.second = c2 # 不包含 'lyh'

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
hadoop103:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
hadoop104:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

4)需求实现

#hadoop103
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume2.conf -Dflume.root.logger=INFO,console

#hadoop104
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume3.conf -Dflume.root.logger=INFO,console

#hadoop102
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume1.conf
nc localhost 44444

hadoop102:

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

hadoop103:

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

hadoop104: 

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

可以看到,从 hadoop102 发送的日志中,包含 "lyh" 的都被发往 hadoop103 的 4141 端口,其它日志则被发往 hadoop104 的 4242端口。

6、自定义 Source

自定义 source 用的还是比较少的,毕竟 flume 已经提供了很多常用的了。

1)介绍

        Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义 source 的接口: https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义 MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
  • getBackOffSleepIncrement() //backoff 步长,当从数据源拉取数据时,拉取不到数据的话它不会一直再去拉取,而是等待,之后每一次再=如果还拉取不到,就会比上一次多等待步长单位个时间。
  • getMaxBackOffSleepInterval()  //backoff 最长时间,如果不设置最长等待时间,它最终会无限等待,所以需要指定。
  • configure(Context context)  //初始化 context(读取配置文件内容)
  • process()  //获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统。

2)需求

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文
件中配置。

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

3)分析

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

4)需求实现

代码

package com.lyh.source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;
import java.util.Map;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    // 定义配置文件将来要读取的字段
    private Long delay;
    private String field;

    @Override
    public Status process() throws EventDeliveryException {
        try {
            // 创建事件头信息
            Map<String,String> headerMap = new HashMap<>();
            // 创建事件
            SimpleEvent event = new SimpleEvent();
            // 循环封装事件
            for (int i = 0; i < 5; i++) {
                // 给事件设置头信息
                event.setHeaders(headerMap);
                // 给事件设置内容
                event.setBody((field + i).getBytes());
                // 将事件写入 channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Status.READY;
    }

    // 步长
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    // 最大间隔时间
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    // 初始化配置信息
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field","Hello");
    }
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.lyh.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = lyh

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console

运行结果: 

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

7、自定义 Sink

1)介绍

        Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
        Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
        Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、 自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
        官方也提供了自定义 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。实现相应方法:
  • configure(Context context)//初始化 context(读取配置文件内容)
  • process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

2)需求分析

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
流程分析:
Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

 3)需求实现

package com.lyh.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable{

    private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {

        // 声明返回值状态信息
        Status status;

        // 获取当前 sink 绑定的 channel
        Channel channel = getChannel();

        // 获取事务
        Transaction txn = channel.getTransaction();

        // 声明事件
        Event event;

        // 开启事务
        txn.begin();

        // 读取 channel 中的事件、直到读取事件结束循环
        while (true){
            event = channel.take();
            if (event!=null) break;
        }

        try {
            // 打印事件
            LOG.info(prefix + new String(event.getBody()) + suffix);

            // 事务提交
            txn.commit();
            status = Status.READY;
        }catch (Exception e){
            // 遇到异常回滚事务
            txn.rollback();
            status = Status.BACKOFF;
        }finally {
            // 关闭事务
            txn.close();
        }

        return null;
    }

    // 初始化配置信息
    @Override
    public void configure(Context context) {
        // 带默认值
        prefix = context.getString("prefix","hello");
        // 不带默认值
        suffix = context.getString("suffix");
    }
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
a1.sinks.k1.prefix = lyh:
a1.sinks.k1.suffix = :lyh

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4)测试

bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console

运行结果:

Flume(二)【Flume 进阶使用】,大数据开发工具,flume,大数据

总结

        自此,flume 的学习基本也完了,这一篇虽然不多但也用了大概3天时间。相比较 kafka、flink,flume 这个框架还是非常简单的,比如我们自己实现一些 source、sink,都是很简单的,没有太多复杂的理解的东西。

        总之 flume 这个工具还是多看官网。文章来源地址https://www.toymoban.com/news/detail-832016.html

到了这里,关于Flume(二)【Flume 进阶使用】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2、Flume进阶

    目录 1、Flume事务 1.1 Flume事务 1.2 Flume Agent内部原理 1.3 重要组件: 2、 Flume拓扑结构 2.1 简单串联 2.2 复制和多路复用 2.3 负载均衡和故障转移 2.4 聚合 3、开发案例 3.1 复制和多路复用 3.4.2 负载均衡和故障转移 3.3 聚合 1.1 Flume事务 1.2 Flume Agent内部原理 1.3 重要组件: ChannelSelec

    2024年02月05日
    浏览(26)
  • Flume基本使用--mysql数据输出

    在MySQL中建立数据库school,在数据库中建立表student。SQL语句如下: 请使用Flume实时捕捉MySQL数据库中的记录更新,一旦有新的记录生成,就捕获该记录并显示到控制台。可以使用如下SQL语句模拟MySQL数据库中的记录生成操作。 要求: 安装好flume-ng-sql-source-1.5.2.jar以及mysql-conn

    2024年02月04日
    浏览(31)
  • 数据同步工具调研选型:SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

    Apache SeaTunnel 是一个非常易用的超高性能分布式数据集成产品,支持海量数据的离线及实时同步。每天可稳定高效同步万亿级数据,已应用于数百家企业生产,也是首个由国人主导贡献到 Apache 基金会的数据集成顶级项目。 SeaTunnel 主要解决数据集成领域的常见问题: * 数据源

    2024年02月04日
    浏览(49)
  • 大数据技术——Flume简介&安装配置&使用案例

        Flume是一种 可配置、高可用 的 数据采集 工具,主要用于采集来自各种流媒体的数据(Web服务器的日志数据等)并传输到集中式数据存储区域。     Flume 支持在日志系统中定制 各种数据发送方 ,用于收集数据;并且可以对数据进行简单处理,将其写到可定制的各种数

    2024年02月08日
    浏览(38)
  • 大数据开发之电商数仓(hadoop、flume、hive、hdfs、zookeeper、kafka)

    1.1.1 数据仓库概念 1、数据仓库概念: 为企业制定决策,提供数据支持的集合。通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本,提高产品质量。 数据仓库并不是数据的最终目的地,而是为数据最终的目的地做好准备,这些准备包括对数据的:清洗、

    2024年01月22日
    浏览(60)
  • Kafka进阶篇-消费者详解&Flume消费Kafka原理

    由于挺多时候如果不太熟系kafka消费者详细的话,很容易产生问题,所有剖析一定的原理很重要。 消费方式 消费者总体工作流程 消费者组初始化流程   消费者详细消费流程   消费者重要参数  bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。 key.deserializervalu

    2024年02月15日
    浏览(46)
  • 使用Flume-KafkaSource实时采集Avro格式数据

    Flume是一个可靠、可扩展且具有高可用性的分布式系统,用于在大规模数据集群中进行高效的日志聚合、收集和传输。Kafka是一个分布式流处理平台,用于处理高容量的实时数据流。本文将介绍如何使用Flume的KafkaSource来实时采集Avro格式的数据,并提供相应的源代码。 首先,确

    2024年02月07日
    浏览(42)
  • 大数据之使用Flume监听端口采集数据流到Kafka

    前言 题目: 一、读题分析 二、处理过程   1.先在Kafka中创建符合题意的Kafka的topic  创建符合题意的Kafka的topic 2.写出Flume所需要的配置文件 3.启动脚本然后启动Flume监听端口数据并传到Kafka 启动flume指令 启动脚本,观察Flume和Kafka的变化 三、重难点分析 总结          本题

    2024年02月08日
    浏览(58)
  • 二百一十七、Flume——Flume拓扑结构之聚合的开发案例(亲测,附截图)

    对于Flume的聚合拓扑结构,进行一个开发测试 这种模式是我们最常见的 ,也非常实用。日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器产生的日志,处理起来也非常麻烦。 用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日

    2024年02月04日
    浏览(44)
  • 【ETL工具】本地环境IDEA远程DEBUG调试Flume代码

    🪁🍁🪁🍁🪁🍁🪁🍁 感谢点赞和关注 ,每天进步一点点!加油! 🪁🍁🪁🍁🪁🍁🪁🍁 目录 🦄 个人主页——🎐个人主页 🎐✨🍁 一、问题说明 二、操作步骤 2.1 idea创建远程调试 Flume 自定义 HDFS-Sink,远程服务器进行部署flume打包好后的代码,本地监控远程flume程序

    2024年02月08日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包