基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

这篇具有很好参考价值的文章主要介绍了基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

04:数据源

  • 目标了解数据源的格式及实现模拟数据的生成

  • 路径

    • step1:数据格式
    • step2:数据生成
  • 实施

    • 数据格式

      基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源,# Flink,flume,kafka,hbase

      消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号 收件人网络制式 收件人GPS 收件人性别 消息类型 双方距离 消息
      msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message
      2020/05/08 15:11:33 古博易 14747877194 48.147.134.255 Android 8.0 小米 Redmi K30 4G 94.704577,36.247553 莱优 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 TEXT 77.82KM 天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。
    • 数据生成

      • 创建原始文件目录

        mkdir /export/data/momo_init
        
      • 上传模拟数据程序

        cd /export/data/momo_init
        rz
        

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源,# Flink,flume,kafka,hbase

      • 创建模拟数据目录

        mkdir /export/data/momo_data
        
      • 运行程序生成数据

        • 语法

          java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
          
        • 测试:每500ms生成一条数据

          java -jar /export/data/momo_init/MoMo_DataGen.jar \
          /export/data/momo_init/MoMo_Data.xlsx \
          /export/data/momo_data/ \
          500
          
        • 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源,# Flink,flume,kafka,hbase

  • 小结文章来源地址https://www.toymoban.com/news/detail-765674.html

    • 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型

  • 路径

    • step1:需求分析
    • step2:技术选型
    • step3:技术架构
  • 实施

    • 需求分析

      • 离线存储计算
        • 提供离线T + 1的统计分析
        • 提供离线数据的即时查询
      • 实时存储计算
        • 提供实时统计分析
    • 技术选型

      • 离线
        • 数据采集:Flume
        • 离线存储:Hbase
        • 离线分析:Hive:复杂计算
        • 即时查询:Phoenix:高效查询
      • 实时
        • 数据采集:Flume
        • 实时存储:Kafka
        • 实时计算:Flink
        • 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
    • 技术架构

      基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源,# Flink,flume,kafka,hbase

      • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
        • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
        • 保证数据一致性
  • 小结

    • 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

  • 目标回顾Flume基本使用及实现Flume的安装测试

  • 路径

    • step1:Flume回顾
    • step2:Flume的安装
    • step3:Flume的测试
  • 实施

    • Flume的回顾

      • 功能:实时对文件或者网络端口进行数据流监听采集
      • 场景:文件实时采集
      • 开发
        • step1:先开发一个配置文件:properties【K=V】
        • step2:运行这个文件即可
      • 组成
        • Agent:一个Agent就是一个Flume程序
        • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
        • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
        • Sink:负责从Channel拉取数据写入目标地
        • Event:代表一条数据对象
          • head:Map集合[KV]
          • body:byte[]
    • Flume的安装

      • 上传安装包

        cd /export/software/
        rz
        

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源,# Flink,flume,kafka,hbase

      • 解压安装

        tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
        cd /export/server
        mv apache-flume-1.9.0-bin flume-1.9.0-bin
        
      • 修改配置

        #集成HDFS,拷贝HDFS配置文件
        cd /export/server/flume-1.9.0-bin
        cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
        #修改Flume环境变量
        cd /export/server/flume-1.9.0-bin/conf/
        mv flume-env.sh.template flume-env.sh
        vim flume-env.sh 
        
        #修改22行
        export JAVA_HOME=/export/server/jdk1.8.0_65
        #修改34行
        export HADOOP_HOME=/export/server/hadoop-3.3.0
        
      • 删除Flume自带的guava包,替换成Hadoop的

        cd /export/server/flume-1.9.0-bin 
        rm -rf lib/guava-11.0.2.jar
        cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
        
      • 创建目录

        cd /export/server/flume-1.9.0-bin
        #程序配置文件存储目录
        mkdir usercase
        #Taildir元数据存储目录
        mkdir position
        
    • Flume的测试

      • 需求:采集聊天数据,写入HDFS

      • 分析

        • Source:taildir:动态监听多个文件实现实时数据采集
        • Channel:mem:将数据缓存在内存
        • Sink:hdfs
      • 开发

        vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
        
        # define a1
        a1.sources = s1 
        a1.channels = c1
        a1.sinks = k1
        
        #define s1
        a1.sources.s1.type = TAILDIR
        #指定一个元数据记录文件
        a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
        #将所有需要监控的数据源变成一个组
        a1.sources.s1.filegroups = f1
        #指定了f1是谁:监控目录下所有文件
        a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
        #指定f1采集到的数据的header中包含一个KV对
        a1.sources.s1.headers.f1.type = momo
        a1.sources.s1.fileHeader = true
        
        #define c1
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 1000
        
        #define k1
        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
        a1.sinks.k1.hdfs.fileType = DataStream
        #指定按照时间生成文件,一般关闭
        a1.sinks.k1.hdfs.rollInterval = 0
        #指定文件大小生成文件,一般120 ~ 125M对应的字节数
        a1.sinks.k1.hdfs.rollSize = 102400
        #指定event个数生成文件,一般关闭
        a1.sinks.k1.hdfs.rollCount = 0
        a1.sinks.k1.hdfs.filePrefix = momo
        a1.sinks.k1.hdfs.fileSuffix = .log
        a1.sinks.k1.hdfs.useLocalTimeStamp = true
        
        #bound
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      • 启动HDFS

        start-dfs.sh
        
      • 运行Flume

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
        
      • 运行模拟数据

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        100
        
      • 查看结果

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源,# Flink,flume,kafka,hbase

  • 小结

    • 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

  • 目标实现案例Flume采集程序的开发

  • 路径

    • step1:需求分析
    • step2:程序开发
    • step3:测试实现
  • 实施

    • 需求分析

      • 需求:采集聊天数据,实时写入Kafka

      • Source:taildir

      • Channel:mem

      • Sink:Kafka sink

        a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
        a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
        a1.sinks.k1.kafka.producer.acks = 1
        a1.sinks.k1.kafka.topic = mytopic
        a1.sinks.k1.kafka.flumeBatchSize = 20
        a1.sinks.k1.kafka.producer.linger.ms = 1
        a1.sinks.k1.kafka.producer.compression.type = snappy
        
    • 程序开发

      vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
      
      # define a1
      a1.sources = s1 
      a1.channels = c1
      a1.sinks = k1
      
      #define s1
      a1.sources.s1.type = TAILDIR
      #指定一个元数据记录文件
      a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
      #将所有需要监控的数据源变成一个组
      a1.sources.s1.filegroups = f1
      #指定了f1是谁:监控目录下所有文件
      a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
      #指定f1采集到的数据的header中包含一个KV对
      a1.sources.s1.headers.f1.type = momo
      a1.sources.s1.fileHeader = true
      
      #define c1
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 1000
      
      #define k1
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      a1.sinks.k1.kafka.topic = MOMO_MSG
      a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
      a1.sinks.k1.kafka.flumeBatchSize = 10
      a1.sinks.k1.kafka.producer.acks = 1
      a1.sinks.k1.kafka.producer.linger.ms = 100
      
      #bound
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    • 测试实现

      • 启动Kafka

        start-zk-all.sh
        start-kafka.sh 
        
      • 创建Topic

        kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
        

        注意:Kafka2.11版本用–zookeeper 替代
        kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092

      • 列举

        kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
        
      • 启动消费者

        kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
        
      • 启动Flume程序

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
        
      • 启动模拟数据

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        50
        
      • 观察结果
        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源,# Flink,flume,kafka,hbase

  • 小结

    • 实现案例Flume采集程序的开发

到了这里,关于基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 在hadoop或docker环境下基于kafka和flink的实时计算大屏展示

    第一章 总体需求 1.1.课题背景 某股票交易机构已上线一个在线交易平台,平台注册用户量近千万,每日均 接受来自全国各地的分支机构用户提交的交易请求。鉴于公司发展及平台管理要 求,拟委托开发一个在线实时大数据系统,可实时观测股票交易大数据信息,展 示部分重

    2024年02月03日
    浏览(42)
  • 【flume实时采集mysql数据库的数据到kafka】

    最近做了flume实时采集mysql数据到kafka的实验,做个笔记,防止忘记 !!!建议从头看到尾,因为一些简单的东西我在前面提了,后面没提。 Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013 flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502 编写配置

    2024年02月03日
    浏览(60)
  • 基于华为MRS实时消费Kafka通过Flink落盘至HDFS的Hive外部表的调度方案

    该需求为实时接收对手Topic,并进行消费落盘至Hive。 在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。 本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。 华为官方文档:

    2024年01月18日
    浏览(46)
  • 【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

    需要源码请点赞关注收藏后评论区留言私信~~~ 新闻话题实时统计分析系统以搜狗实验室的用户查询日志为基础,模拟生成用户查询日志,通过Flume将日志进行实时采集、汇集,分析并进行存储。利用Spark Streaming实时统计分析前20名流量最高的新闻话题,并在前端页面实时显示

    2024年02月06日
    浏览(52)
  • 【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

    需要源码请点赞关注收藏后评论区留言私信~~~ 1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消

    2024年02月03日
    浏览(53)
  • 基于华为MRS3.2.0实时Flink消费Kafka落盘至HDFS的Hive外部表的调度方案

    该需求为实时接收对手Topic,并进行消费落盘至Hive。 在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。 本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。 华为官方文档:

    2024年01月21日
    浏览(39)
  • (二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

    本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的

    2024年02月09日
    浏览(54)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建 下载 https://archive.apache.org/dist/  Mysql下载地址 Index of /MySQL/Downloads/ 我最终选择 Zookeeper3.7.1 +Hadoop3.3.5 + Spark-3.2.4 + Flink-1.16.1 + Kafka2.12-3.4.0 + HBase2.4.17 + Hive3.1.3  +JDK1.8.0_391  IP规划 IP hostname 192.168.1.5 node1 192.168.1.6 node

    2024年01月23日
    浏览(52)
  • Flink流处理案例:实时数据聚合

    Apache Flink是一个流处理框架,可以处理大规模数据流,实现实时数据处理和分析。Flink支持各种数据源和接口,如Kafka、HDFS、TCP流等,可以实现高吞吐量、低延迟的流处理。 在本文中,我们将通过一个实际的Flink流处理案例来讲解Flink的核心概念、算法原理和最佳实践。我们将

    2024年02月19日
    浏览(46)
  • Flink CDC+Kafka 加速业务实时化

    摘要: 本文整理自阿里巴巴开发工程师,Apache Flink Committer 任庆盛,在 9 月 24 日 Apache Flink Meetup 的分享。主要内容包括: Flink CDC 技术对比与分析 Flink + Kafka 实时数据集成方案 Demo:Flink+Kafka 实现 CDC 数据的实时集成和实时分析 1.1. 变更数据捕获(CDC)技术 广义概念上,能够

    2024年02月15日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包