Flume实战篇-采集Kafka到hdfs

这篇具有很好参考价值的文章主要介绍了Flume实战篇-采集Kafka到hdfs。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

记录Flume采集kafka数据到Hdfs。

配置文件

#  vim job/kafka_to_hdfs_db.conf 
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#每一批有5000条的时候写入channel
a1.sources.r1.batchSize = 5000
#2秒钟写入channel(也就是如果没有达到5000条那么时间过了2秒拉去一次)
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = ip地址:9092,ip地址:9092,ip地址:9092
#指定对应的主题
# a1.sources.r1.kafka.topics.regex = ^topic[0-9]$  ---这里如果主题都是 数据库_表的情况那么就可是使用正则 ^数据库名称_,然后写拦截器的时候在header添加获取database的信息和表的信息
#写到头部,用%{database}_%{tablename}_inc 的形式进行写入到hdfs,达到好的扩展性。
a1.sources.r1.kafka.topics = cart_info,comment_info
#消费者主相同的的多个flume能够提高消费的吞吐量
a1.sources.r1.kafka.consumer.group.id = abs_flume
#在event头部添加一个topic的变量,/origin_data/gmall/db/%{topic}_inc/%Y-%m-%d,%{topic},key 为topic,value为消费的主题信息。
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
#这个拦截器里面可以在头部设置变量用来读取。%{头部的key}
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.db.TimestampInterceptor$Builder
#在最早的地方进行消费,如果有对应的消费者组了,那么就从最新的地方进行消费。
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1123456
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
#滚动的时间,这里是5分钟,如果不设置那么文件大小没有达到128m就不会回滚文件,不回滚文件就读取不到值。
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

拦截器(零点漂移问题)

主要是自定义下Flume读取event头部的时间。

TimestampInterceptor

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        Long ts = jsonObject.getLong("ts");

        //Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
        String timeMills = String.valueOf(ts * 1000);

        headers.put("timestamp", timeMills);

        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {


        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

将打好的包放入/opt/module/flume/lib文件夹下
[root@ lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

Flume启停文件 

 bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=info,console


#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

生产实践 

配置下flume的jvm

vi flume-env.sh
export JAVA_OPTS="-Xms2048m -Xmx2048m"
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 3000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = ip:9092,ip:9092
a1.sources.r1.kafka.topics = 表名,表名
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.bigdata.bigdatautil.TimestampInterceptor$Builder
#如果是相同的消费者组那么首次是最开始的时候,后面在启动是末尾消费
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/bigdata/module/apache-flume-1.9.0-bin/checkpoint
a1.channels.c1.dataDirs = /home/bigdata/module/apache-flume-1.9.0-bin/data
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1123456
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/hive/warehouse/abs/ods/ods_%{table_name}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.round = false

#这里设置5分钟滚动一次文件,如果数据量不太大,建议增大数据,减少小文件的生成
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

拦截器

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 *  "database": "数据库",
 *     "table": "表明",
 *     "type": "bootstrap-start",
 *     "ts": 时间戳秒,
 *     "data": {}
 */
public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        Long ts = jsonObject.getLong("ts");
        String database_name = jsonObject.getString("database");
        String table_name = jsonObject.getString("table");

        //Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
        String timeMills = String.valueOf(ts * 1000);


        headers.put("timestamp", timeMills);
        headers.put("database_name",database_name);
        headers.put("table_name",table_name);
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {


        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

集群启停文件 

#!/bin/bash

case $1 in
"start")
        echo " --------启动 abs3 flume-------"
        ssh master2 "nohup /home/bigdata/module/apache-flume-1.9.0-bin/bin/flume-ng agent -n a1 -c /home/bigdata/module/apache-flume-1.9.0-bin/conf -f /home/bigdata/module/apache-flume-1.9.0-bin/conf/kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------启动 关闭 flume-------"
        ssh master2 "ps -ef | grep kafka_to_hdfs.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

   上面配置消费的形式是earliest,如果重新启动以后,他会从最新的位置开始消费,如果消费者组的名称有改变,那么就会从最开始的地方开始消费。

   上面的集群脚本可以在要修改配置前,停止flume服务,停止以后,因为使用的是file的channel并且flume消费kafka,到hdfs,本身有事物保证,所以不会有数据丢失的问题,可能会有重复信息的问题。

处理异常情况问题 

#查看所有的消费者组
bin/kafka-consumer-groups.sh  --bootstrap-server ip:9092,ip:9092,ip:9092  --list
#查看对应消费者组的情况
bin/kafka-consumer-groups.sh  --bootstrap-server ip:9092,ip:9092,ip:9092  --describe  --group abs3_flume
#删除对应的消费者组
bin/kafka-consumer-groups.sh --bootstrap-server ip:9092,ip:9092,ip:9092 --delete --group abs_flume
#然后删除flume的checkoutpoint,和data数据重新消费。
#由于商城信息数据量不是特别大,这里每20分钟滚动一次文件。避免过多的小文件。(测试消费kafka数据1000多万秒消费)
a1.sinks.k1.hdfs.rollInterval = 1200
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

相关参考文章

Flume之 各种 Channel 的介绍及参数解析_flume的channel类型有哪些_阿浩_的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-739557.html

到了这里,关于Flume实战篇-采集Kafka到hdfs的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flume采集数据到Kafka操作详解

    目录 一、创建一个Kafka主题 二、配置Flume 三、开启Flume 四、开启Kafka消费者 五、复制文件到Flume监控的source目录下 六、查看Flume是否能够成功采集 七、采集后查看Kafka消费者主题 八、采集数据错误解决办法 1.Ctrl+C关闭flume 2.删除出错的topic并重新创建 3.删除对应Flume文件中指定

    2024年02月09日
    浏览(62)
  • Flume学习-采集端口数据存入kafka

    启动zookeeper、kafka并创建kafka主题 2、创建flume-kafka.conf配置文件 用于采集socket数据后存入kafka 在flume文件夹中的conf下新建flume-kafka.conf配置文件 设置监听本地端口10050 netcat发送的socket数据,讲采集到的数据存入kafka的hunter主题中 3、启动flume ./bin/flume-ng :启动Flume-ng二进制文件。

    2024年02月03日
    浏览(46)
  • Flume实现Kafka数据持久化存储到HDFS

    写在前面:博主是一只经过实战开发历练后投身培训事业的“小山猪”,昵称取自动画片《狮子王》中的“彭彭”,总是以乐观、积极的心态对待周边的事物。本人的技术路线从Java全栈工程师一路奔向大数据开发、数据挖掘领域,如今终有小成,愿将昔日所获与大家交流一二

    2024年02月06日
    浏览(44)
  • 【flume实时采集mysql数据库的数据到kafka】

    最近做了flume实时采集mysql数据到kafka的实验,做个笔记,防止忘记 !!!建议从头看到尾,因为一些简单的东西我在前面提了,后面没提。 Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013 flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502 编写配置

    2024年02月03日
    浏览(60)
  • 【数仓】通过Flume+kafka采集日志数据存储到Hadoop

    【数仓】基本概念、知识普及、核心技术 【数仓】数据分层概念以及相关逻辑 【数仓】Hadoop软件安装及使用(集群配置) 【数仓】Hadoop集群配置常用参数说明 【数仓】zookeeper软件安装及集群配置 【数仓】kafka软件安装及集群配置 【数仓】flume软件安装及配置 【数仓】flum

    2024年03月17日
    浏览(59)
  • 大数据之使用Flume监听端口采集数据流到Kafka

    前言 题目: 一、读题分析 二、处理过程   1.先在Kafka中创建符合题意的Kafka的topic  创建符合题意的Kafka的topic 2.写出Flume所需要的配置文件 3.启动脚本然后启动Flume监听端口数据并传到Kafka 启动flume指令 启动脚本,观察Flume和Kafka的变化 三、重难点分析 总结          本题

    2024年02月08日
    浏览(59)
  • 二百一十一、Flume——Flume实时采集Linux中的Hive日志写入到HDFS中(亲测、附截图)

    为了实现用Flume实时采集Hive的操作日志到HDFS中,于是进行了一场实验 [root@hurys23 conf]# find / -name hive.log /home/log/hive312/hive.log [root@hurys23 conf]# vi  flume-file-hdfs.conf # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources

    2024年02月04日
    浏览(65)
  • 日志采集传输框架之 Flume,将监听端口数据发送至Kafka

    1、简介                 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,主要有以下几个部分组成。  主要组件介绍: 1)、 Flume Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。Agent 主

    2024年01月22日
    浏览(58)
  • 大数据开发之电商数仓(hadoop、flume、hive、hdfs、zookeeper、kafka)

    1.1.1 数据仓库概念 1、数据仓库概念: 为企业制定决策,提供数据支持的集合。通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本,提高产品质量。 数据仓库并不是数据的最终目的地,而是为数据最终的目的地做好准备,这些准备包括对数据的:清洗、

    2024年01月22日
    浏览(64)
  • Flume多路复用模式把接收数据注入kafka 的同时,将数据备份到HDFS目录

    启动hadoop、在hdfs中创建需要访问的目录 配置Hadoop的核心配置文件 core-site.xml :设置Hadoop的核心配置参数,例如NameNode的地址、数据块大小、副本数量等。示例配置如下: hdfs-site.xml :设置HDFS(Hadoop分布式文件系统)的参数,例如数据块复制因子、NameNode的存储路径等。示例配

    2024年02月16日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包