Flume基础知识(十一):Flume自定义接口

这篇具有很好参考价值的文章主要介绍了Flume基础知识(十一):Flume自定义接口。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1)案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要 发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予 不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含”atguigu”模拟不同类型的日志, 我们需要自定义 interceptor 区分数据中是否包含”atguigu”,将其分别发往不同的分析 系统(Channel)。

Flume基础知识(十一):Flume自定义接口,大数据,flume,大数据

3)实现步骤

(1)创建一个 maven 项目,并引入以下依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
</dependencies>

(2)定义 CustomInterceptor 类并实现 Interceptor 接口。

package com.atguigu.interceptor;
​
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
​
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
​
/**
 * @author:左泽林
 * @date:日期:2022-01-17-时间:14:47
 * @message:
 */
public class TypeInterceptor implements Interceptor {
​
    /*声明一个集合,用于存放拦截器处理后的事件*/
    private List<Event> addHeaderEvents ;
​
    //初始化
    public void initialize() {
        /*初始化*/
        addHeaderEvents = new ArrayList<Event>();
    }
​
    //处理单个事件
    public Event intercept(Event event) {
​
        //1.获取header & body
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody());
​
        //2.根据body中是否包含“atguigu”添加不同的头信息
        if (body.contains("atguigu")){
            headers.put("type","atguigu");
        }else{
            headers.put("type","other");
        }
​
        /*返回数据*/
        return event;
    }
​
    /*批处理事件*/
    public List<Event> intercept(List<Event> events) {
​
        //1. 清空集合
        addHeaderEvents.clear();
​
        /*遍历events*/
        for (Event event : events) {
            addHeaderEvents.add(intercept(event));
        }
​
        /*返回数据*/
        return events;
    }
    
    public void close() {
​
    }
    
    public static class Builder implements Interceptor.Builder{
​
        public Interceptor build() {
            return new TypeInterceptor();
        }
​
        public void configure(Context context) {
            
        }
    }
}
打包,上传到服务器Flume的lib下,Flume会在启动时调用

(3)编辑 flume 配置文件

为 hadoop100 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink), 并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
#拦截器
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
#映射需要对应代码中的拦截类型,这里就是atguigu、other
a1.sources.r1.selector.mapping.atguigu = c1
a1.sources.r1.selector.mapping.other = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop102 上的 Flume3 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(4)分别在 hadoop100,hadoop101,hadoop102 上启动 flume 进程,注意先后顺序。

[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume3.conf -Dflume.root.logger=INFO,console
[root@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2.conf -Dflume.root.logger=INFO,console
[root@hadoop100 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf

(5)在 hadoop100 使用 netcat 向 localhost:44444 发送字母和数字。

(6)观察 hadoop101 和 hadoop102 打印的日志。文章来源地址https://www.toymoban.com/news/detail-780173.html

到了这里,关于Flume基础知识(十一):Flume自定义接口的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【前端知识】React 基础巩固(三十一)——Redux的简介

    概念 纯函数(确定的输入一定产生确定的输出,函数在执行过程中不产生副作用): 在程序设计中,若一个函数符合以下条件,那么这个函数就被称为纯函数 此函数在相同的输入值时,需产生相同的输出 函数的输出和输入值以外的其他隐藏信息或状态无关,也和由I/O设备产

    2024年02月16日
    浏览(35)
  • vue 3 第三十一章:状态管理(Pinia基础知识)

    状态管理是现代 Web 应用开发中的一个重要概念。Vue 3 中的状态管理库 Pinia ,是一个基于 Vue 3 Composition API 的状态管理库,它提供了一种 简单 、 灵活 的方式来管理应用程序的状态,同时还具有 高性能 和 可扩展性 。 Pinia 在某种程度上来说,也可以被叫做 Vuex5 ,因为它结合

    2024年02月07日
    浏览(38)
  • css基础知识十一:CSS3新增了哪些新特性?

    一、是什么 css,即层叠样式表(Cascading Style Sheets)的简称,是一种标记语言,由浏览器解释执行用来使页面变得更为美观 css3是css的最新标准,是向后兼容的,CSS1/2的特性在CSS3 里都是可以使用的 CSS3 也增加了很多新特性,为开发者带来了更佳的开发体验 从几个维度列举一些

    2024年02月11日
    浏览(27)
  • Unity | Shader基础知识(第十一集:什么是Normal Map法线贴图)

    目录 前言 一、图片是否有法线贴图的视觉区别 二、有视觉区别的原因 三、法线贴图的作用 四、信息是如何存进去的 五、自己写一个Shader用到法线贴图 六、注意事项 七、作者的话 前言 本小节会给大家解释,什么是法线贴图?为什么法线贴图会产生深度?我们怎么自己写一

    2024年04月15日
    浏览(35)
  • Linux 驱动开发基础知识——LED 模板驱动程序的改造:设备树(十一)

     个人名片: 🦁作者简介:学生 🐯个人主页:妄北y 🐧个人QQ:2061314755 🐻个人邮箱:2061314755@qq.com 🦉个人WeChat:Vir2021GKBS 🐼 本文由妄北y原创,首发CSDN 🎊🎊🎊 🐨座右铭:大多数人想要改造这个世界,但却罕有人想改造自己。 专栏导航: 妄北y系列专栏导航: C/C++的基

    2024年02月21日
    浏览(29)
  • 【前端知识】React 基础巩固(四十一)——手动路由跳转、参数传递及路由配置

    利用 useNavigate 封装一个 withRouter(hoc/with_router.js) 添加到hoc/index.js文件中 利用withRouter,拦截Home组件,实现手动跳转路由 路由参数传递包括:1.动态路由传参;2.查询字符串传参 改造withRouter,通过 useParams() 和 useSearchParams() 来接收两种参数传递: 在界面中,通过params来接收

    2024年02月14日
    浏览(37)
  • stm32串口自定义协议接收一串十六进制数据(将其中两个字节转化为十进制数据)+部分串口基础知识

    位(bit): 二进制数中的一个数位,可以是0或者1,是计算机中数据的最小单位。 字节(Byte): 计算机中数据的基本单位,每8位组成一个字节。各种信息在计算机中存储、处理至少需要一个字节。 例如,一个ASCII码用一个字节表示,一个汉字用两个字节表示。 字(Word):

    2023年04月08日
    浏览(45)
  • C语言基础知识:宏定义

    目录 一.预处理 二.宏定义用法 ①宏常量 ②宏语句 ③宏函数 ④其它 1.#undef 是用来撤销宏定义的,用法如下: 2.使用ifndef防止头文件被重复包含和编译 三.宏定义相关作用符 ①换行符 \\\"\\\" ②字符串化符 \\\"#\\\" ③片段连接符\\\"##\\\" 四.宏函数的巧用 ①类型传递 ②传递数组 五.注意事项

    2024年02月06日
    浏览(30)
  • FPGA基础知识-编程语言接口

    目录 学习目标: 学习内容: 1.PLI的使用 2.PLI任务的连接和调用 3.内部数据的获取 4.PLI库子程序 学习时间: 学习产出: 解释在Verilog仿真中如何使用PLI子程序。 描述PLI的用途。 定义用户自定义系统任务和函数以及用户自定义C子程序。 理解用户自定义系统任务的连接和调用。

    2024年02月11日
    浏览(25)
  • FPGA基础知识-用户自定义原语

    目录 学习目标 学习内容 1.UDP的组成 2.UDP定义规则 3.表示组合逻辑的UDP 4.表示时序逻辑的UDP 5.UDP表中的缩写符号 6.UDP设计指南  学习时间 学习总结 提示:这里可以添加学习目标 理解编写UDP的规则,明白UDP的各个组成部分。 学会编写表示时序和表示组合逻辑的两种不同的UDP, 理

    2024年02月11日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包