目录
前言
题目:
一、读题分析
二、处理过程
三、重难点分析
总结
前言
本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理
注:由于设备问题,代码执行结果以及数据的展示无法给出,可参照我以往的博客其中有相同数据源展示
题目:
文章来源地址https://www.toymoban.com/news/detail-440815.html
提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)
一、读题分析
涉及组件:Scala,Flink,Kafka,json
涉及知识点:
- Flink处理数据
- Flink1.14新特性
- json文件的处理
二、处理过程
--代码仅供参考--
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
object task1 {
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置使用处理时间
import org.apache.flink.streaming.api.TimeCharacteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// 设置并行度
env.setParallelism(1)
// 启用检查点
env.enableCheckpointing(5000)
// kafka source
val kafkaSource = KafkaSource.builder[String]
.setBootstrapServers("bigdata1:9092")
.setTopics("ods_mall_data")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build()
// kafka sink
val properties = new Properties()
properties.setProperty("trans.timeout.ms", "7200000") // 2 hours
// KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
val kafkaSink = KafkaSink.builder[String]
.setBootstrapServers("bigdata1:9092")
.setKafkaProducerConfig(properties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()
.setTopicSelector(new TopicSelector[String] {
override def apply(t: String): String = {
if (t.contains("order_id")) "fact_order_master"
else if (t.contains("order_detail")) "fact_order_detail"
else null
}
})
.setValueSerializationSchema(new SimpleStringSchema)
.build()).build()
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")
.filter(line => line.contains("order_master") || line.contains("order_detail"))
.map(line => {
import com.google.gson.JsonParser
val jsonobj = new JsonParser().parse(line).getAsJsonObject
jsonobj.getAsJsonObject("data").toString
})
// .print
.sinkTo(kafkaSink)
/* .map(line => {
val jsonObj = JsonParser.parseString(line).getAsJsonObject.getAsJsonObject("data")
jsonObj.toString
})
// .print
.sinkTo(kafkaSink)*/
env.execute("Task1")
}
}
三、重难点分析
本题的难点主要集中在以下几个方面:
-
Flink对Kafka的集成:需要了解如何使用Flink作为Kafka消费者消费Kafka中的数据,并对数据进行处理和转换。
-
数据表的识别和分发:需要辨别Kafka中topic1中的不同数据表,并将它们分别发送到Kafka的DWD层Topic中。
-
Kafka消费者的使用:需要了解如何使用Kafka自带的消费者消费Kafka中的数据,并掌握Kafka消费者的一些基本配置和参数设置。
综上,本题需要较全面的Kafka和Flink技能,并具备一定的编程能力和调试经验,因此可能对初学者而言会有一定难度。
Flink1.14版本抛弃了FlinkKafkaConsumer和FlinkKafkaProduce的方法(但依旧能用),采用了KafkaSource和KafkaSink作为Flink读取和写入Kafka的方法类
- 因为采用最新所以难点在于是否能够熟练使用KafkaSource和KafkaSink。
- 对每一条从Kafka读取的数据,我们可以直接使用contains方法直接判断数据中是否包含我们想要的,这个方法最直接,不过消耗更多的时间。如果自己去实现对数据分析是否包含相应的数据格式,难度较大,时间较长。不建议这样。
- 对于json数据的解析,可能会存在版本较低,一些方法不能使用需要用替代的方法,建议查询json解析类的版本,找到其对应符合版本操作的方法在使用。
总结
将本题的难度解决好,总结一下Flink版本的特点,因为Flink到目前能够普及下来的并不成熟,网上的方法教程也很少,并不像Spark那样。
综上,本题需要较全面的Kafka和Flink技能,并具备一定的编程能力和调试经验,因此可能对初学者而言会有一定难度。
在这里给大家写出Flink 1.14版本的新特性包括:
- 动态表格功能支持:Flink 1.14引入了动态表格功能,可以在不修改代码的情况下对流和批处理程序进行修改,使得它们可以支持更加灵活的工作负载。
- 集成Kettle:Flink 1.14集成了Kettle,可以通过使用Kettle插件来实现ETL工作。
- SQL扩展:Flink 1.14引入了多种新的SQL内置函数和聚合函数,包括采样、全窗口聚合函数等。
- 新的状态后端:Flink 1.14引入了多种新的状态后端选项,包括RocksDB、Filesystem等。
- 优化与稳定性改进:Flink 1.14对任务调度、内存管理、并发度等方面均进行了优化和改进,同时提高了系统的稳定性和容错性。 总之,Flink 1.14版本的新特性提高了系统的灵活性、ETL功能和SQL处理能力,并且对系统的性能和稳定性均有提升。
Flink和Spark都是大数据处理框架,但它们有以下几点不同之处:
- 数据流和数据集:Flink是基于数据流的处理框架,可以无限制地处理数据流。而Spark是基于数据集的处理框架,需要将数据加载到内存中进行处理,因此对数据规模和处理速度有一定的限制。
- 运行模式:Flink支持流处理和批处理,且可以实现流批一体的处理。而Spark目前主要支持批处理,虽然Spark Streaming提供了流处理的功能,但是不支持真正的流式处理。
- API:Flink和Spark都提供了API供用户进行编程,但是Flink的API更加完整和统一,同时自带了大量的运算算子。相比之下,Spark的API较为分散并且需要使用lambda表达式进行功能组合。
- 状态管理:Flink提供了状态管理功能,可以自动追踪流处理中的状态。而Spark不支持状态管理,需要用户自己维护状态。
- 容错性:Flink具有高容错性,支持在出现故障时重新启动任务并继续处理。而Spark的容错机制较弱,出现故障时需要重新启动整个应用。 综上所述,Flink和Spark都是强大的大数据处理框架。Flink的优点包括流式处理、运行模式灵活、API完整和状态管理功能强大等,而Spark的优点则体现在语法简洁且易于上手、支持大量的数据源和数据处理工具等方面。
!!以上两个均出自于AI!!
请关注我的大数据技术专栏大数据技术 作者: Eternity.Arrebol
请关注我获取更多与大数据相关的文章Eternity.Arrebol的博客
Q-欢迎在评论区进行交流-Q文章来源:https://www.toymoban.com/news/detail-440815.html
到了这里,关于大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!