离线数仓同步数据1

这篇具有很好参考价值的文章主要介绍了离线数仓同步数据1。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

[gpb@hadoop104 ~]$ cd /opt/module/flume/
[gpb@hadoop104 flume]$ cd job/
[gpb@hadoop104 job]$ rm file_to_kafka.conf

com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder文章来源地址https://www.toymoban.com/news/detail-695973.html

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 3

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.1.3 日志消费Flume配置实操
1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf 
2)配置文件内容如下

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置优化
1)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2HDFS Sink优化
(1HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
	(2HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件

3)编写Flume拦截器
(1)数据漂移问题

(2)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类
package com.atguigu.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {
    

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

		//1、获取header和body的数据
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

		//2、将body的数据类型转成jsonObject类型(方便获取数据)
        JSONObject jsonObject = JSONObject.parseObject(log);

		//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}3)重新打包

(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。

2.1.4 日志消费Flume测试

1)启动Zookeeper、Kafka集群
2)启动日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)启动hadoop104的日志消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh 
5)观察HDFS是否出现数据
2.1.5 日志消费Flume启停脚本
若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh
	在脚本中填写如下内容
#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 日志数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 日志数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f2.sh
3)f2启动
[atguigu@hadoop102 module]$ f2.sh start
4)f2停止
[atguigu@hadoop102 module]$ f2.sh stop

到了这里,关于离线数仓同步数据1的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 离线数仓建设之数据导出

    为了方便报表应用使用数据,需将ADS各项指标统计结果导出到MySQL,方便熟悉 SQL 人员使用。 创建car_data_report数据库: 1.1.2 创建表 ① 里程相关统计 创建ads_mileage_stat_last_month表,存储里程相关统计数据。 ② 告警相关统计 创建ads_alarm_stat_last_month表,存储告警相关的统计数据。

    2024年03月16日
    浏览(49)
  • 看这篇就明白大数据实时数仓、离线数仓、数据湖之间的关系

      20世纪70年代,MIT(麻省理工)的研究员致力于研究一种优化的技术架构,该架构试图将业务处理系统和分析系统分开,即将业务处理和分析处理分为不同层次,针对各自的特点采取不同的架构设计原则,MIT的研究员认为这两种信息处理的方式具有显著差别,以至于必须采取完

    2024年02月08日
    浏览(47)
  • 离线数仓(一)【数仓概念、需求架构】

            今天开始学习数仓的内容,之前花费一年半的时间已经学完了 Hadoop、Hive、Zookeeper、Spark、HBase、Flume、Sqoop、Kafka、Flink 等基础组件。把学过的内容用到实践这是最重要的,相信会有很大的收获。         数据仓库( Data Warehouse ),是 为企业制定决策,提供数

    2024年02月20日
    浏览(41)
  • 离线数仓分层

    1、清晰数据结构 :数仓每一层都有对应的作用,方便在使用时更好定位与了解 2、数据血缘追踪 :清晰知道表/任务上下游,方便排查问题,知道下游哪个模块在使用,提升开发效率及后期管理维护 3、减少重复开发 :完善数仓好中间层,减少后期不必要的开发,从而减少资

    2024年02月06日
    浏览(39)
  • 阿里云生态离线数仓

            功能齐全:10多年大数据建设沉淀完整的平台,覆盖数据开发治理的全生命周期         简单易用:全图形化界面,SQL为主的数据开发方式         安全稳定:双11日千万级任务稳定调度,金融级数据安全保障         开放兼容: 支持多种大数据引擎绑定,开放

    2024年02月05日
    浏览(33)
  • 一百八十六、大数据离线数仓完整流程——步骤五、在Hive的DWS层建动态分区表并动态加载数据

    经过6个月的奋斗,项目的离线数仓部分终于可以上线了,因此整理一下离线数仓的整个流程,既是大家提供一个案例经验,也是对自己近半年的工作进行一个总结。 1、Hive的DWS层建库建表语句 --如果不存在则创建hurys_dc_dws数据库 create database if not exists hurys_dc_dws; --使用hurys_

    2024年02月07日
    浏览(48)
  • 【从0开始离线数仓项目】——新能源汽车数仓项目介绍

    目录 1、数据仓库概念 2、项目需求及架构设计 3、集群资源规划设计  4、车辆日志字段说明 数据仓库(Data Warehouse)是为企业提供数据支持,用以协助企业制定决策、改进业务流程和提高产品质量等方面的工具。它可以接收多种类型的输入数据,如业务数据、日志数据和爬虫

    2024年02月13日
    浏览(40)
  • 一百八十二、大数据离线数仓完整流程——步骤一、用Kettle从Kafka、MySQL等数据源采集数据然后写入HDFS

    经过6个月的奋斗,项目的离线数仓部分终于可以上线了,因此整理一下离线数仓的整个流程,既是大家提供一个案例经验,也是对自己近半年的工作进行一个总结。 项目行业属于交通行业,因此数据具有很多交通行业的特征,比如转向比数据就是统计车辆左转、右转、直行

    2024年02月07日
    浏览(53)
  • 离线数仓中,为什么用两个flume,一个kafka

    实时数仓中,为什么没有零点漂移问题? 因为flink直接取的事件时间 用kafka是为了速度快,并且数据不丢,那为什么既用了kafkachannel,也用了kafka,而不只用kafkachannel呢? 因为需要削峰填谷 离线数仓中,为什么用两个flume,一个kafka,直接用taildirsource,kafkachannel,hdfssink不行吗?

    2024年02月14日
    浏览(48)
  • 尚硅谷大数据项目《在线教育之离线数仓》笔记006

    视频地址:尚硅谷大数据项目《在线教育之离线数仓》_哔哩哔哩_bilibili 目录 第11章 数仓开发之ADS层 P087 P088 P089 P090 P091 P092 P093 P094 P095 P096 P097 P098 P099 P100 P101 P102 P103 P104 P105 P106 P107 P108 P109 P110 P111 P087 第11章 数仓开发之ADS层 11.1 流量主题 11.1.1 各来源流量统计 [atguigu@node001 ~]

    2024年02月09日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包