Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中

这篇具有很好参考价值的文章主要介绍了Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Flume监听多个文件目录

1. flume的环境搭建和基础配置参考

https://blog.csdn.net/qinqinde123/article/details/128130131

2. 修改配置文件flume-conf.properties

#定义两个是数据源source1、source2
agent.sources = source1 source2
agent.channels = channel1
agent.sinks = sink1

#数据源source1:监听/home/sxvbd/bigdata/flumeTestDir目录
agent.sources.source1.type = spooldir
agent.sources.source1.spoolDir = /home/sxvbd/bigdata/flumeTestDir
# 文件名带路径,header中key=filePath
agent.sources.source1.fileHeader = true
agent.sources.source1.fileHeaderKey = filePath
# 文件名不带路径,header中key=fileName
agent.sources.source1.basenameHeader = true
agent.sources.source1.basenameHeaderKey = fileName

#数据源source2:监听/home/sxvbd/bigdata/flumeTestDir/temp目录·
agent.sources.source2.type = spooldir
agent.sources.source2.spoolDir = /home/sxvbd/bigdata/flumeTestDir/temp
# 文件名带路径,header中key=filePaht
agent.sources.source2.fileHeader = true
agent.sources.source2.fileHeaderKey = filePath
# 文件名不带路径,header中key=fileName
agent.sources.source2.basenameHeader = true
agent.sources.source2.basenameHeaderKey = fileName

#定义一个channel
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000000
agent.channels.channel1.transactionCapacity = 10000
agent.channels.channel1.keep-alive = 60

#重写sink,根据文件名称不同,推送到不同topic中
agent.sinks.sink1.type = com.demo.flume.LogToDiffentKafkaTopic
agent.sinks.sink1.kafka.bootstrap.servers = node24:9092,node25:9092,node26:9092
agent.sinks.sink1.parseAsFlumeEvent = false

#定义source channel  sink的关系
agent.sources.source1.channels = channel1
agent.sources.source2.channels = channel1
agent.sinks.sink1.channel = channel1

二、重写Sink,根据文件名称不同,消息发送到不同的topic中

flume监听到有新文件出现的时候,会将文件内容推送到kakfa的topic中,但是如果文件夹中有不同类型的文件,直接推送到kafka的同一个topic中,如果根据内容无法区分不同类型的文件,那就需要根据文件名称来区分。flume本身根据配置无法实现,只能通过重写Sink,根据文件名称,将内容推送到kafka的不同topic。

Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中
看了一下官网的开发文档,要想自定义一个Sink也很简单,只需要继承一个抽象类 AbstractSink 和一个用于接收配置参数的接口 Configurable 即可.然后呢就需要实现两个方法一个就是public Status process() throws EventDeliveryException {}这个方法会被多次调用,反复执行,也就是通过它来实时的获取Channel流出来的数据;第二个就是public void configure(Context context) {} 这个方法主要是通过传入的这个Contex上下文对象.来个获取配置文件中的参数,一些初始化的工作可以写在这个方法里面.

1.创建springboot项目LogToDiffentKafkaTopic

2.pom.xml中引入flume相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demo</groupId>
    <artifactId>flume</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--Flume 依赖-->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!--Kafka 依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.30</version>
        </dependency>

    </dependencies>
	<!--构建-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3. 创建一个类LogToDiffentKafkaTopic.java,继承自AbstractSink

public class LogToDiffentKafkaTopic extends AbstractSink implements Configurable {

    private MessageClassifier messageClassifier;

    @Override
    public Status process() throws EventDeliveryException {
        System.out.println("========>process");
        Status status = null;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try{
            Event event = channel.take();
            if (event == null){
                transaction.rollback();
                status = Status.BACKOFF;
                return status;
            }
            System.out.println("========>event:" + event.toString());
            //根据配置文件中定义的agent.sources.source1.basenameHeader = true和agent.sources.source1.basenameHeaderKey = fileName获取文件名称
            String fileName = event.getHeaders().get("fileName");
            byte[] body = event.getBody();
            final String msg = new String(body);
            System.out.println("========>msg:" + msg.toString());
            status = messageClassifier.startClassifier(msg, fileName) ;
            // 提交事务
            transaction.commit();
        }catch (Exception e){
            transaction.rollback();
            e.printStackTrace();
            status = Status.BACKOFF;
        }finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        ImmutableMap<String, String> parameters = context.getParameters();
        //启动的时候,从配置文件flume-conf.properties中读取的配置信息
        System.out.println("========>parameters: " + parameters.toString());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", context.getString("kafka.bootstrap.servers", "localhost:9092"));
        properties.put("acks", context.getString("acks", "all"));
        properties.put("retries", Integer.parseInt(context.getString("retries", "0")));
        properties.put("batch.size", Integer.parseInt(context.getString("batch.size", "16384")));
        properties.put("linger.ms", Integer.parseInt(context.getString("linger.ms", "1")));
        properties.put("buffer.memory", Integer.parseInt(context.getString("buffer.memory", "33554432")));
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        messageClassifier = new MessageClassifier(properties);
    }

4. 创建一个类MessageClassifier.java,继承自AbstractSink

public class MessageClassifier {

    /*文件名称中包含_CDSS_,则消息推送到data-ncm-hljk-cdss-topic*/
    private static final String HJSJ_SSSJ_CDSS = ".*_CDSS_.*";
    private static final String HJSJ_SSSJ_CDSS_TOPIC = "data-ncm-hljk-cdss-topic";

     /*文件名称中包含_FZSS_,则消息推送到data-ncm-hljk-fzss-topic*/
    private static final String HJSJ_SSSJ_FZSS = ".*_FZSS_.*";
    private static final String HJSJ_SSSJ_FZSS_TOPIC = "data-ncm-hljk-fzss-topic";

    private final KafkaProducer<String, String> producer;

    public MessageClassifier(Properties kafkaConf) {
        producer = new KafkaProducer<>(kafkaConf);
    }

    public Sink.Status startClassifier(String msg, String fileName) {
        System.out.println("===========>msg: " + msg);
        System.out.println("===========>fileName: " + fileName);
        try {
            if (Pattern.matches(HJSJ_SSSJ_CDSS, fileName)) {
                System.out.println("===========>HJSJ_SSSJ_CDSS");
                producer.send(new ProducerRecord<>(HJSJ_SSSJ_CDSS_TOPIC, msg));
            } else if (Pattern.matches(HJSJ_SSSJ_FZSS, fileName)) {
                System.out.println("===========>HJSJ_SSSJ_FZSS");
                producer.send(new ProducerRecord<>(HJSJ_SSSJ_FZSS_TOPIC, msg));
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("===========>exception: " + e.getMessage());
            return Sink.Status.BACKOFF;
        }
        return Sink.Status.READY;
    }
}

5. 打jar包: flume-1.0.jar

mvn clean install -DskipTests

6. 在flume的安装目录下创建plugins.d目录

mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d

7. 在plugins.d目录下创建一个目录(名字任意,例如demo)

mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d/demo

8. 在demo目录下创建两个目录:lib和libext

mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d/demo/lib
mkdir -p /home/sxvbd/bigdata/flume-1.9.0/plugins.d/demo/libext

9. 将jar包上传到lib目录下(libext不用管)

10. 在配置文件flume-conf.properties中配置自定义sink

#Each channel's type is defined.
agent.sinks.sink1.type = com.demo.flume.LogToDiffentKafkaTopic
agent.sinks.sink1.kafka.bootstrap.servers = node24:9092,node25:9092,node26:9092
agent.sinks.sink1.parseAsFlumeEvent = false

11.启动

nohup ../bin/flume-ng agent --conf conf -f /home/sxvbd/bigdata/flume-1.9.0/conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console > flume.log 2>&1 &

12.在对应的目录下拖入文件

目录/home/sxvbd/bigdata/flumeTestDir/和目录/home/sxvbd/bigdata/flumeTestDir/temp

13.监听kafka的topic

Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中文章来源地址https://www.toymoban.com/news/detail-401700.html

到了这里,关于Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 文件夹路径保存不同,什么批量修改名称

    在日常工作中不知道大家有没有遇到过,需要批量修改文件夹名称,并且文件夹保存路径不同呢,像这种情况到底不能批量修改呢。我也问了很多身边的朋友,他们有的说,他一般都修改保存路径是同一个,还很少遇到像我这样情况,他们给的建议是,分次修改,一次修改同

    2024年02月05日
    浏览(34)
  • win11 搭建Apache webdav 设置用户名密码 加密授权访问以及多个不同目录访问

    Apache webdav 的搭建应该比较简单,但是搭建后还遇到了一些问题,也就是设置了访问用户名密码,咋就不生效呢,苦苦思索两日,终于发现了问题,本文就是分两个方面来编写 官网下载: https://www.apachehaus.com/cgi-bin/download.plx 打开文件 “/conf/httpd.conf“, 取消加载和包含语句的

    2024年01月21日
    浏览(39)
  • 【前端】根据后端返回的url进行下载并设置文件下载名称

            在我们项目当中存储文件是存储到厂商的服务器上的,然后厂商返回一个可以直接下载url地址,但是前端使用这个url下载的时候永远都是保存一个名字,这时候我们就需要设置文件保存的名称,         那么如何实现呢?使用了fetch将url转换成了blob即可。 代码

    2024年02月04日
    浏览(43)
  • visual studio 生成dll文件以及修改输出dll文件名称操作

    Windows系统下Visual Studio可以通过.def文件创建dll。 1.确定需要导出的函数,test.cpp文件中定义如下 2. 添加 .def文件,一般添加到源文件下面。 * 在代码栏下面有一个“模块定义文件”,即我们的.def文件 3.编写test.def文件 文件添加完成,下一步即可设置一些导出规则。 4.在我们的

    2024年02月14日
    浏览(33)
  • cp命令 复制多个目录/文件夹下文件到指定目录

    可以使用cp命令的通配符和递归选项来复制多个目录下多个文件夹下的文件到指定目录。 如果目标目录不存在,可以使用 mkdir -p命令来创建目录。 -p 选项表示递归创建目录,如果目录已经存在,则不会报错。 例如,以下命令会复制 /path/to/dir1和 /path/to/dir2 下的所有子目录中的

    2024年02月12日
    浏览(44)
  • Linux - 借助 inotifywait,轻松实现 Linux 文件/目录事件监听

    常用选项包括: -m​:以持续监视模式运行,即持续监视文件并输出事件。 ​-r​:递归监视指定目录及其子目录中的文件。 ​-e event ​:指定要监视的特定事件类型。可以使用多个 -e​ 选项来指定多个事件类型。 ​-q​:静默模式,只输出事件信息。 ​-s seconds ​:设置

    2024年02月12日
    浏览(38)
  • postcss-pxtorem适配插件动态配置rootValue(根据文件路径名称,动态改变vue.config里配置的值)

    项目背景:一个项目里有两个分辨率的设计稿(1920和2400),不能拆开来打包 参考: 是参考vant插件:移动端Vant组件库rem适配下大小异常的解决方案:https://github.com/youzan/vant/issues/1181 说明: 因为 vue.config.js 文件无法获取window对象,所以任何外部参数都加不进来,甚至无法打印

    2024年02月14日
    浏览(35)
  • nohup 输出到指定文件 Linux nohup 实现命令后台运行并输出或记录到指定日志文件 设置日志结果文件名称 重定向到某个文件 标准误 标准错误输出定向 输入报错信息保留

    # yourcommand:启动对象命令。可以跟该命令需要的各种参数。 # 是指在后台运行,但当用户推出(挂起)的时候,命令自动也跟着退出. nohup与结合起来,可以实现不挂断的后台运行。 实现屏幕输出记录到日志文件 # 0 – stdin (standard input),1 – stdout (standard output),2 – stderr (standa

    2024年02月05日
    浏览(49)
  • java通过FTP跨服务器动态监听读取指定目录下文件数据

    1、文件数据在A服务器(windows)(不定期在指定目录下生成),项目应用部署在B服务器(Linux); 2、项目应用在B服务器,监听A服务器指定目录,有新生成文件,进行读取文件信息,持久化数据; 3、提供两块内容,第一安装windows FTP服务;第二项目源码,希望可以帮助到你

    2024年02月03日
    浏览(43)
  • Python 合并多个 PDF 文件并建立书签目录

    今天在用 WPS 的 PDF 工具合并多个文件的时候,非常不给力,居然卡死了好几次,什么毛病?! 心里想,就这么点儿功能,居然收了我会员费都实现不了?不是吧…… 只能自己来了,主要用了 pypdf 库,因为 PyPDF2 版本更新原因,一些类和函数已经过时,截止发文时以下是最新

    2024年02月11日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包