大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中

这篇具有很好参考价值的文章主要介绍了大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言

题目:

一、读题分析

二、处理过程

  1.数据处理部分:

2.HBaseSink(未经测试,不能证明其正确性,仅供参考!)

三、重难点分析

总结 

什么是HBase?


前言

本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理

注:由于设备问题,代码执行结果以及数据的展示无法给出,可参照我以往的博客其中有相同数据源展示

题目:

        使用Flink消费Kafka中topic为ods_mall_log的数据,根据数据中不同的表前缀区分,将数据分别分发至kafka的DWD层的dim_customer_login_log的Topic中,其他的表则无需处理;


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写) 

一、读题分析

涉及组件:Scala,Flink,Kafka,HBase

涉及知识点:

  1. Flink函数的使用
  2. 了解HBase,基本使用HBase

二、处理过程

  1.数据处理部分:


import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

import java.util.Properties

object answer2 {
  def main(args: Array[String]): Unit = {
    import moduleC.test.HBaseSink2
    import org.apache.flink.streaming.api.scala.DataStream

    //    创建flink流环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //    创建Kafka的配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "bigdata1:9092")

    //    创建Kafka的消费者
    val kafkaConsumer = new FlinkKafkaConsumer[String]("ods_mall_log", new SimpleStringSchema(), properties)

    //    读取消费的数据
    val kafkaStream = env.addSource(kafkaConsumer)

    val newStream: DataStream[(String, String)] = kafkaStream
      .map(
        line => {
          val tablename = line.split(":")(0)
          val data = line.split(":")(1).stripPrefix("(").stripSuffix(");")
          //        12115|7611|0|0|'20230407111600'
          (tablename, data)
        }
      ).filter(_._1 == "customer_login_log")

    newStream.print()
    //    创建Kafka的生产者
    val dwdKafkaProduce = new FlinkKafkaProducer[String]("dim_customer_login_log", new SimpleStringSchema(), properties)
    //    val dwdKafkaProduce2 = new FlinkKafkaProducer[String]("dwd-topic1", new SimpleStringSchema(), properties)

    //    向Kafka发送数据
    //    newStream.addSink(dwdKafkaProduce)    不调用map方法会报错因为需要一个string而不是(string,string)
    val result = newStream.map(line => line._2)
    result.addSink(dwdKafkaProduce)

    //    发到HBase上
    result.addSink(new HBaseSink2)

    //    execute
    env.execute("flinkKafkaToKafka")
  }
}

2.HBaseSink(未经测试,不能证明其正确性,仅供参考!)

package moduleC.test

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory

class HBaseSink(tableName: String, columnFamily: String) extends RichSinkFunction[String] {

   private var connection: client.Connection = _
   private var table: client.Table = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    // 创建 HBase 连接
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "bigdata1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    connection = ConnectionFactory.createConnection(conf)
    // HBase 表信息
    table = connection.getTable(TableName.valueOf(tableName))
  }

  override def invoke(value: String, context: SinkFunction.Context): Unit = {
    val put = new Put(Bytes.toBytes(value))
    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("data"), Bytes.toBytes(value))
    table.put(put)
  }

  override def close(): Unit = {
    if (table != null) table.close()
    if (connection != null) connection.close()
  }

}

三、重难点分析

本题的难点主要在以下几个方面:

  1. Flink对Kafka的集成:需要了解如何使用Flink作为Kafka消费者来消费Kafka中的数据,并掌握Flink如何处理流式数据。

  2. 数据表的前缀区分:需要能够识别数据中不同表的前缀,并根据前缀将数据分别分发到不同的DWD层Topic中。

  3. Kafka的使用:需要了解Kafka的基本原理和API使用,能够编写代码将数据发送到指定的DWD层Topic中。

  4. 数据处理:需要针对数据进行必要的处理和转换,以便将其发送到正确的目标Topic中。

综上,本题需要具备Flink、Kafka和数据处理的基本知识和编程能力,同时需要具备一定的调试能力,因此可能对初学者而言会有一定的难度。

 文章来源地址https://www.toymoban.com/news/detail-768256.html

  1. 在根据前缀区分那块,需要了解Flink对与数据拆解的方法和一些使用的技巧
  2. HBaseSink的编写,这是一个难点,需要了解HBaseSink的原理,相对于其他数据仓库来说算比较复杂的了。使用前需要开启Hadoop,它是基于Hadoop hdfs的。

总结 

什么是HBase?

        HBase是一个Apache Hadoop生态系统中的分布式NoSQL数据库。它是一个面向列的数据库,旨在提供高度可扩展性和可靠性,以便处理大规模数据集。HBase的设计灵感来自于Google的Bigtable论文,并且提供了类似于Bigtable的数据模型和API,但是开源和可扩展。

        HBase的数据存储在Hadoop HDFS(Hadoop分布式文件系统)上,并且可以在多个节点上分布式存储和处理。它支持高并发读写操作,具有快速的随机读/写访问速度,并且能够处理PB级别的数据集。

        HBase的应用场景包括互联网应用、日志处理、社交网络、金融服务、电信和游戏等领域,为这些领域提供了一种高效处理大量数据的解决方案。

        以上来自网络

 

        请关注我的大数据技术专栏大数据技术 作者: Eternity.Arrebol

        请关注我获取更多与大数据相关的文章Eternity.Arrebol的博客

Q-欢迎在评论区进行交流-Q

 

 

到了这里,关于大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka消费多个topic的使用

    我们在业务中难免遇到一个kafka消费多个topic的消息,本文帮助大家如何在业务中用一个类消费多个topic消息 配置类1

    2024年02月11日
    浏览(63)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(37)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(81)
  • Kafka3.1部署和Topic主题数据生产与消费

    本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用: 1.安装JDK配置环境变量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目录位置:D:setupapache-zookeeper-3.7.1 3.安装Kafka3.1X 3.1 下载包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解压并进入Kafka目录: 根目录:D:setupkafka3.1.2 3、 编辑

    2024年02月09日
    浏览(40)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • 多个消费者订阅一个Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    记录 :466 场景 :一个KafkaProducer在一个Topic发布消息,多个消费者KafkaConsumer订阅Kafka的Topic。每个KafkaConsumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zha

    2024年02月16日
    浏览(46)
  • 多个消费者订阅一个Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    记录 :465 场景 :一个Producer在一个Topic发布消息,多个消费者Consumer订阅Kafka的Topic。每个Consumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    浏览(48)
  • Kafka/Spark-01消费topic到写出到topic

    消费者代码 注意点 consumerConfigs是定义的可变的map的类型的,具体如下 consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下 Subscribe传参需要指定泛型,这边指定string,

    2024年02月09日
    浏览(35)
  • Kafka - Topic 消费状态常用命令

    replication-factor:指定副本数量 partitions:指定分区 查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数 这里同样需要根据新、旧版本的consumer,分别指

    2024年01月25日
    浏览(54)
  • Kafka某个Topic无法消费问题

    12月28日,公司测试环境Kafka的task.build.metadata.flow这个topic突然无法消费。 其他topic都正常使用,这个topic只有一个分区,并且只有一个消费者 首先登录服务器,运行kafka的cli命令,查看消费者组的详情。 由上图可以发现,task.build.metadata.flow这个topic,最新offset是2,但是当前o

    2024年02月03日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包