目录
前言
题目:
一、读题分析
二、处理过程
1.数据处理部分:
2.HBaseSink(未经测试,不能证明其正确性,仅供参考!)
三、重难点分析
总结
什么是HBase?
前言
本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理
注:由于设备问题,代码执行结果以及数据的展示无法给出,可参照我以往的博客其中有相同数据源展示
题目:
使用Flink消费Kafka中topic为ods_mall_log的数据,根据数据中不同的表前缀区分,将数据分别分发至kafka的DWD层的dim_customer_login_log的Topic中,其他的表则无需处理;
提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)
一、读题分析
涉及组件:Scala,Flink,Kafka,HBase
涉及知识点:
- Flink函数的使用
- 了解HBase,基本使用HBase
二、处理过程
1.数据处理部分:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import java.util.Properties
object answer2 {
def main(args: Array[String]): Unit = {
import moduleC.test.HBaseSink2
import org.apache.flink.streaming.api.scala.DataStream
// 创建flink流环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建Kafka的配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "bigdata1:9092")
// 创建Kafka的消费者
val kafkaConsumer = new FlinkKafkaConsumer[String]("ods_mall_log", new SimpleStringSchema(), properties)
// 读取消费的数据
val kafkaStream = env.addSource(kafkaConsumer)
val newStream: DataStream[(String, String)] = kafkaStream
.map(
line => {
val tablename = line.split(":")(0)
val data = line.split(":")(1).stripPrefix("(").stripSuffix(");")
// 12115|7611|0|0|'20230407111600'
(tablename, data)
}
).filter(_._1 == "customer_login_log")
newStream.print()
// 创建Kafka的生产者
val dwdKafkaProduce = new FlinkKafkaProducer[String]("dim_customer_login_log", new SimpleStringSchema(), properties)
// val dwdKafkaProduce2 = new FlinkKafkaProducer[String]("dwd-topic1", new SimpleStringSchema(), properties)
// 向Kafka发送数据
// newStream.addSink(dwdKafkaProduce) 不调用map方法会报错因为需要一个string而不是(string,string)
val result = newStream.map(line => line._2)
result.addSink(dwdKafkaProduce)
// 发到HBase上
result.addSink(new HBaseSink2)
// execute
env.execute("flinkKafkaToKafka")
}
}
2.HBaseSink(未经测试,不能证明其正确性,仅供参考!)
package moduleC.test
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory
class HBaseSink(tableName: String, columnFamily: String) extends RichSinkFunction[String] {
private var connection: client.Connection = _
private var table: client.Table = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 创建 HBase 连接
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "bigdata1")
conf.set("hbase.zookeeper.property.clientPort", "2181")
connection = ConnectionFactory.createConnection(conf)
// HBase 表信息
table = connection.getTable(TableName.valueOf(tableName))
}
override def invoke(value: String, context: SinkFunction.Context): Unit = {
val put = new Put(Bytes.toBytes(value))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("data"), Bytes.toBytes(value))
table.put(put)
}
override def close(): Unit = {
if (table != null) table.close()
if (connection != null) connection.close()
}
}
三、重难点分析
本题的难点主要在以下几个方面:
-
Flink对Kafka的集成:需要了解如何使用Flink作为Kafka消费者来消费Kafka中的数据,并掌握Flink如何处理流式数据。
-
数据表的前缀区分:需要能够识别数据中不同表的前缀,并根据前缀将数据分别分发到不同的DWD层Topic中。
-
Kafka的使用:需要了解Kafka的基本原理和API使用,能够编写代码将数据发送到指定的DWD层Topic中。
-
数据处理:需要针对数据进行必要的处理和转换,以便将其发送到正确的目标Topic中。
综上,本题需要具备Flink、Kafka和数据处理的基本知识和编程能力,同时需要具备一定的调试能力,因此可能对初学者而言会有一定的难度。
文章来源地址https://www.toymoban.com/news/detail-768256.html
- 在根据前缀区分那块,需要了解Flink对与数据拆解的方法和一些使用的技巧
- HBaseSink的编写,这是一个难点,需要了解HBaseSink的原理,相对于其他数据仓库来说算比较复杂的了。使用前需要开启Hadoop,它是基于Hadoop hdfs的。
总结
什么是HBase?
HBase是一个Apache Hadoop生态系统中的分布式NoSQL数据库。它是一个面向列的数据库,旨在提供高度可扩展性和可靠性,以便处理大规模数据集。HBase的设计灵感来自于Google的Bigtable论文,并且提供了类似于Bigtable的数据模型和API,但是开源和可扩展。
HBase的数据存储在Hadoop HDFS(Hadoop分布式文件系统)上,并且可以在多个节点上分布式存储和处理。它支持高并发读写操作,具有快速的随机读/写访问速度,并且能够处理PB级别的数据集。
HBase的应用场景包括互联网应用、日志处理、社交网络、金融服务、电信和游戏等领域,为这些领域提供了一种高效处理大量数据的解决方案。
以上来自网络
文章来源:https://www.toymoban.com/news/detail-768256.html
请关注我的大数据技术专栏大数据技术 作者: Eternity.Arrebol
请关注我获取更多与大数据相关的文章Eternity.Arrebol的博客
Q-欢迎在评论区进行交流-Q
到了这里,关于大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!