大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层

这篇具有很好参考价值的文章主要介绍了大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言

题目:

一、读题分析

二、处理过程

三、重难点分析

总结 


前言

本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理

注:由于设备问题,代码执行结果以及数据的展示无法给出,可参照我以往的博客其中有相同数据源展示

题目:

 文章来源地址https://www.toymoban.com/news/detail-440815.html

大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层

 


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写) 

一、读题分析

涉及组件:Scala,Flink,Kafka,json

涉及知识点:

  1. Flink处理数据
  2. Flink1.14新特性
  3. json文件的处理

二、处理过程

  --代码仅供参考--

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import java.util.Properties

object task1 {

  def main(args: Array[String]): Unit = {

    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置使用处理时间
    import org.apache.flink.streaming.api.TimeCharacteristic
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    // 设置并行度
    env.setParallelism(1)

    // 启用检查点
    env.enableCheckpointing(5000)

    // kafka source
    val kafkaSource = KafkaSource.builder[String]
      .setBootstrapServers("bigdata1:9092")
      .setTopics("ods_mall_data")
      .setGroupId("group-test")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema)
      .build()

    // kafka sink
    val properties = new Properties()
    properties.setProperty("trans.timeout.ms", "7200000") // 2 hours

    // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
    val kafkaSink = KafkaSink.builder[String]
      .setBootstrapServers("bigdata1:9092")
      .setKafkaProducerConfig(properties)
      .setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()
        .setTopicSelector(new TopicSelector[String] {
          override def apply(t: String): String = {
            if (t.contains("order_id")) "fact_order_master"
            else if (t.contains("order_detail")) "fact_order_detail"
            else null
          }
        })
        .setValueSerializationSchema(new SimpleStringSchema)
        .build()).build()

    env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
      .filter(line => line.contains("order_master") || line.contains("order_detail"))
      .map(line => {
        import com.google.gson.JsonParser
        val jsonobj = new JsonParser().parse(line).getAsJsonObject
        jsonobj.getAsJsonObject("data").toString
      })
      //      .print
      .sinkTo(kafkaSink)
    /* .map(line => {
       val jsonObj = JsonParser.parseString(line).getAsJsonObject.getAsJsonObject("data")
       jsonObj.toString
     })
     // .print
     .sinkTo(kafkaSink)*/
    env.execute("Task1")
  }
}

 


三、重难点分析

本题的难点主要集中在以下几个方面:

  1. Flink对Kafka的集成:需要了解如何使用Flink作为Kafka消费者消费Kafka中的数据,并对数据进行处理和转换。

  2. 数据表的识别和分发:需要辨别Kafka中topic1中的不同数据表,并将它们分别发送到Kafka的DWD层Topic中。

  3. Kafka消费者的使用:需要了解如何使用Kafka自带的消费者消费Kafka中的数据,并掌握Kafka消费者的一些基本配置和参数设置。

综上,本题需要较全面的Kafka和Flink技能,并具备一定的编程能力和调试经验,因此可能对初学者而言会有一定难度。

 

        Flink1.14版本抛弃了FlinkKafkaConsumer和FlinkKafkaProduce的方法(但依旧能用),采用了KafkaSource和KafkaSink作为Flink读取和写入Kafka的方法类

  1.  因为采用最新所以难点在于是否能够熟练使用KafkaSource和KafkaSink。
  2. 对每一条从Kafka读取的数据,我们可以直接使用contains方法直接判断数据中是否包含我们想要的,这个方法最直接,不过消耗更多的时间。如果自己去实现对数据分析是否包含相应的数据格式,难度较大,时间较长。不建议这样。
  3. 对于json数据的解析,可能会存在版本较低,一些方法不能使用需要用替代的方法,建议查询json解析类的版本,找到其对应符合版本操作的方法在使用。

总结 

 

将本题的难度解决好,总结一下Flink版本的特点,因为Flink到目前能够普及下来的并不成熟,网上的方法教程也很少,并不像Spark那样。

综上,本题需要较全面的Kafka和Flink技能,并具备一定的编程能力和调试经验,因此可能对初学者而言会有一定难度。

 

在这里给大家写出Flink 1.14版本的新特性包括:

  1.  动态表格功能支持:Flink 1.14引入了动态表格功能,可以在不修改代码的情况下对流和批处理程序进行修改,使得它们可以支持更加灵活的工作负载。
  2. 集成Kettle:Flink 1.14集成了Kettle,可以通过使用Kettle插件来实现ETL工作。
  3. SQL扩展:Flink 1.14引入了多种新的SQL内置函数和聚合函数,包括采样、全窗口聚合函数等。
  4. 新的状态后端:Flink 1.14引入了多种新的状态后端选项,包括RocksDB、Filesystem等。
  5. 优化与稳定性改进:Flink 1.14对任务调度、内存管理、并发度等方面均进行了优化和改进,同时提高了系统的稳定性和容错性。 总之,Flink 1.14版本的新特性提高了系统的灵活性、ETL功能和SQL处理能力,并且对系统的性能和稳定性均有提升。

Flink和Spark都是大数据处理框架,但它们有以下几点不同之处:

  1. 数据流和数据集:Flink是基于数据流的处理框架,可以无限制地处理数据流。而Spark是基于数据集的处理框架,需要将数据加载到内存中进行处理,因此对数据规模和处理速度有一定的限制。
  2. 运行模式:Flink支持流处理和批处理,且可以实现流批一体的处理。而Spark目前主要支持批处理,虽然Spark Streaming提供了流处理的功能,但是不支持真正的流式处理。
  3. API:Flink和Spark都提供了API供用户进行编程,但是Flink的API更加完整和统一,同时自带了大量的运算算子。相比之下,Spark的API较为分散并且需要使用lambda表达式进行功能组合。
  4. 状态管理:Flink提供了状态管理功能,可以自动追踪流处理中的状态。而Spark不支持状态管理,需要用户自己维护状态。
  5. 容错性:Flink具有高容错性,支持在出现故障时重新启动任务并继续处理。而Spark的容错机制较弱,出现故障时需要重新启动整个应用。 综上所述,Flink和Spark都是强大的大数据处理框架。Flink的优点包括流式处理、运行模式灵活、API完整和状态管理功能强大等,而Spark的优点则体现在语法简洁且易于上手、支持大量的数据源和数据处理工具等方面。

!!以上两个均出自于AI!!

 

请关注我的大数据技术专栏大数据技术 作者: Eternity.Arrebol

        请关注我获取更多与大数据相关的文章Eternity.Arrebol的博客

Q-欢迎在评论区进行交流-Q

 

到了这里,关于大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka消费多个topic的使用

    我们在业务中难免遇到一个kafka消费多个topic的消息,本文帮助大家如何在业务中用一个类消费多个topic消息 配置类1

    2024年02月11日
    浏览(63)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(37)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(81)
  • Kafka3.1部署和Topic主题数据生产与消费

    本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用: 1.安装JDK配置环境变量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目录位置:D:setupapache-zookeeper-3.7.1 3.安装Kafka3.1X 3.1 下载包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解压并进入Kafka目录: 根目录:D:setupkafka3.1.2 3、 编辑

    2024年02月09日
    浏览(40)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • 多个消费者订阅一个Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    记录 :466 场景 :一个KafkaProducer在一个Topic发布消息,多个消费者KafkaConsumer订阅Kafka的Topic。每个KafkaConsumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zha

    2024年02月16日
    浏览(46)
  • 多个消费者订阅一个Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    记录 :465 场景 :一个Producer在一个Topic发布消息,多个消费者Consumer订阅Kafka的Topic。每个Consumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    浏览(48)
  • Kafka/Spark-01消费topic到写出到topic

    消费者代码 注意点 consumerConfigs是定义的可变的map的类型的,具体如下 consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下 Subscribe传参需要指定泛型,这边指定string,

    2024年02月09日
    浏览(35)
  • Kafka - Topic 消费状态常用命令

    replication-factor:指定副本数量 partitions:指定分区 查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数 这里同样需要根据新、旧版本的consumer,分别指

    2024年01月25日
    浏览(54)
  • Kafka某个Topic无法消费问题

    12月28日,公司测试环境Kafka的task.build.metadata.flow这个topic突然无法消费。 其他topic都正常使用,这个topic只有一个分区,并且只有一个消费者 首先登录服务器,运行kafka的cli命令,查看消费者组的详情。 由上图可以发现,task.build.metadata.flow这个topic,最新offset是2,但是当前o

    2024年02月03日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包