Kafka/Spark-01消费topic到写出到topic

这篇具有很好参考价值的文章主要介绍了Kafka/Spark-01消费topic到写出到topic。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 Kafka的工具类

1.1 从kafka消费数据的方法

  1. 消费者代码
  def getKafkaDStream(ssc : StreamingContext , topic: String  , groupId:String  ) ={
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))
    kafkaDStream
  }
  1. 注意点
  • consumerConfigs是定义的可变的map的类型的,具体如下
private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](
    // kafka集群位置

    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),

    // kv反序列化器
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    // groupId
    // offset提交  自动 手动
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
    //自动提交的时间间隔
    //ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
    // offset重置  "latest"  "earliest"
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
    // .....
  )
  • consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参

  • 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下

在`KafkaUtils.createDirectStream`方法中,后续传递的参数的含义如下:

1. `ssc`:这是一个`StreamingContext`对象,用于指定Spark Streaming的上下文。
2. `LocationStrategies.PreferConsistent`:这是一个位置策略,用于指定Kafka消费者的位置策略。`PreferConsistent`表示优先选择分区分布均匀的消费者。
3. `ConsumerStrategies.Subscribe[String, String]`:这是一个消费者策略,用于指定Kafka消费者的订阅策略。`Subscribe[String, String]`表示按照指定的泛型主题字符串数组订阅消息,键和值的类型都为`String`。
4. `Array(topic)`:这是一个字符串数组,用于指定要订阅的Kafka主题。
5. `consumerConfigs`:这是一个`java.util.Properties`类型的对象,其中配置了一些Kafka消费者的属性。

总之,在`KafkaUtils.createDirectStream`方法中,这些参数组合被用于创建一个Kafka直连流(Direct Stream),该流可以直接从Kafka主题中消费消息,并将其转换为`InputDStream[ConsumerRecord[String, String]]`类型的DStream。

Kafka/Spark-01消费topic到写出到topic,kafka,spark,大数据

  • Subscribe传参需要指定泛型,这边指定string,表示指定主题的键和值的类型,即Array(topic), consumerConfigs传参是string

Kafka/Spark-01消费topic到写出到topic,kafka,spark,大数据

  • 最后方法返回一个kafkaDStream

1.2 kafka的生产数据的方法

  1. 生产者代码
  • 创建与配置
/**
    * 生产者对象
    */
  val producer : KafkaProducer[String,String] = createProducer()

  /**
    * 创建生产者对象
    */
  def createProducer():KafkaProducer[String,String] = {
    val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]
    //生产者配置类 ProducerConfig
    //kafka集群位置
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))
    producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
    //kv序列化器
    producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    //acks
    producerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")
    //batch.size  16kb
    //linger.ms   0
    //retries
    //幂等配置
    producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")

    val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)
    producer
  }
  • 生产方法
  /**
    * 生产(按照默认的黏性分区策略)
    */
  def send(topic : String  , msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , msg ))
  }

  /**或者!
    * 生产(按照key进行分区)
    */
  def send(topic : String  , key : String ,  msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , key ,  msg ))
  }
  • 关闭生产
/**
    * 关闭生产者对象
    */
  def close():Unit = {
    if(producer != null ) producer.close()
  }

  /**
    * 刷写 ,将缓冲区的数据刷写到磁盘
    *
    */
  def flush(): Unit ={
    producer.flush()
  }

2 消费数据

2.1 消费到数据

单纯的使用返回的ConsumerRecord不支持序列化,没有实现序列化接口

Kafka/Spark-01消费topic到写出到topic,kafka,spark,大数据

因此需要转换成通用的jsonobject对象

//3. 处理数据
    //3.1 转换数据结构
    val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
      consumerRecord => {
        //获取ConsumerRecord中的value,value就是日志数据
        val log: String = consumerRecord.value()
        //转换成Json对象
        val jsonObj: JSONObject = JSON.parseObject(log)
        //返回
        jsonObj
      }
    )

2.2 数据分流发送到对应topic

  1. 提取错误数据并发送到对应的topic中
jsonObjDStream.foreachRDD(
      rdd => {

        rdd.foreachPartition(
          jsonObjIter => {
            for (jsonObj <- jsonObjIter) {
              //分流过程
              //分流错误数据
              val errObj: JSONObject = jsonObj.getJSONObject("err")
              if(errObj != null){
                //将错误数据发送到 DWD_ERROR_LOG_TOPIC
                MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC ,  jsonObj.toJSONString )
              }else{
                  
              }
            }
          }
        }	
  1. 将公共字段和页面数据发送到DWD_PAGE_DISPLAY_TOPIC
    Kafka/Spark-01消费topic到写出到topic,kafka,spark,大数据
else{
                // 提取公共字段
                val commonObj: JSONObject = jsonObj.getJSONObject("common")
                val ar: String = commonObj.getString("ar")
                val uid: String = commonObj.getString("uid")
                val os: String = commonObj.getString("os")
                val ch: String = commonObj.getString("ch")
                val isNew: String = commonObj.getString("is_new")
                val md: String = commonObj.getString("md")
                val mid: String = commonObj.getString("mid")
                val vc: String = commonObj.getString("vc")
                val ba: String = commonObj.getString("ba")
                //提取时间戳
                val ts: Long = jsonObj.getLong("ts")
                // 页面数据
                val pageObj: JSONObject = jsonObj.getJSONObject("page")
                if(pageObj != null ){
                  //提取page字段
                  val pageId: String = pageObj.getString("page_id")
                  val pageItem: String = pageObj.getString("item")
                  val pageItemType: String = pageObj.getString("item_type")
                  val duringTime: Long = pageObj.getLong("during_time")
                  val lastPageId: String = pageObj.getString("last_page_id")
                  val sourceType: String = pageObj.getString("source_type")

                  //封装成PageLog,这边还写了bean实体类去接收
                  var pageLog =
                    PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)
                  //发送到DWD_PAGE_LOG_TOPIC
                  MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean没有set和get方法,这边是直接操作字段
                }

  1. 其他曝光、事件、启动数据如下
                  //提取曝光数据
                  val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays")
                  if(displaysJsonArr != null && displaysJsonArr.size() > 0 ){
                    for(i <- 0 until displaysJsonArr.size()){
                      //循环拿到每个曝光
                      val displayObj: JSONObject = displaysJsonArr.getJSONObject(i)
                      //提取曝光字段
                      val displayType: String = displayObj.getString("display_type")
                      val displayItem: String = displayObj.getString("item")
                      val displayItemType: String = displayObj.getString("item_type")
                      val posId: String = displayObj.getString("pos_id")
                      val order: String = displayObj.getString("order")

                      //封装成PageDisplayLog
                      val pageDisplayLog =
                        PageDisplayLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts)
                      // 写到 DWD_PAGE_DISPLAY_TOPIC
                      MyKafkaUtils.send(DWD_PAGE_DISPLAY_TOPIC , JSON.toJSONString(pageDisplayLog , new SerializeConfig(true)))
                    }
                  }
                  //提取事件数据(课下完成)
                  val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions")
                  if(actionJsonArr != null && actionJsonArr.size() > 0 ){
                    for(i <- 0 until actionJsonArr.size()){
                      val actionObj: JSONObject = actionJsonArr.getJSONObject(i)
                      //提取字段
                      val actionId: String = actionObj.getString("action_id")
                      val actionItem: String = actionObj.getString("item")
                      val actionItemType: String = actionObj.getString("item_type")
                      val actionTs: Long = actionObj.getLong("ts")

                      //封装PageActionLog
                      var pageActionLog =
                        PageActionLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts)
                      //写出到DWD_PAGE_ACTION_TOPIC
                      MyKafkaUtils.send(DWD_PAGE_ACTION_TOPIC , JSON.toJSONString(pageActionLog , new SerializeConfig(true)))
                    }
                  }
                }
                // 启动数据(课下完成)
                val startJsonObj: JSONObject = jsonObj.getJSONObject("start")
                if(startJsonObj != null ){
                  //提取字段
                  val entry: String = startJsonObj.getString("entry")
                  val loadingTime: Long = startJsonObj.getLong("loading_time")
                  val openAdId: String = startJsonObj.getString("open_ad_id")
                  val openAdMs: Long = startJsonObj.getLong("open_ad_ms")
                  val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms")

                  //封装StartLog
                  var startLog =
                    StartLog(mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts)
                  //写出DWD_START_LOG_TOPIC
                  MyKafkaUtils.send(DWD_START_LOG_TOPIC , JSON.toJSONString(startLog ,new SerializeConfig(true)))

2.3 精确一次消费

  1. 背景

发送kafka的是自动提交,如果提交有误,会出现漏消费或者重复消费文章来源地址https://www.toymoban.com/news/detail-705632.html

  1. 相关语义
  • 至少一次消费:数据不会丢失,但存在数据重复
  • 最多一次消费:数据不会重复,但可能丢失数据
  • 精确一次消费:不多不少一次消费

到了这里,关于Kafka/Spark-01消费topic到写出到topic的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 如何查看Kafka的Topic消费情况

    进入kafka安装目录,然后执行以下命令  2.10为Scala版本,0.10.0.2.5.3.0为kafka版本  

    2024年02月10日
    浏览(38)
  • 自定义kafka客户端消费topic

    使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。 后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没

    2024年02月02日
    浏览(52)
  • Flink实现同时消费多个kafka topic,并输出到多个topic

    1)代码使用的 flink版本为1.16.1 ,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换; 2)本次编写的两个方案,均只适用于 数据源topic来自同一个集群 ,且kafka消费组相同,暂未研究flink的connect算子join多条流 代码涉及 Hadoop相关环境 ,若无该环境

    2023年04月20日
    浏览(85)
  • Kafka某Topic的部分partition无法消费问题

    今天同事反馈有个topic出现积压。于是上kfk管理平台查看该topic对应的group。发现6个分区中有2个不消费,另外4个消费也较慢,总体lag在增长。查看服务器日志,日志中有rebalance 12 retry 。。。Exception,之后改消费线程停止。 查阅相关rebalance资料:   分析Rebalance 可能是 Consu

    2024年02月12日
    浏览(42)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(53)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(46)
  • Kafka3.1部署和Topic主题数据生产与消费

    本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用: 1.安装JDK配置环境变量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目录位置:D:setupapache-zookeeper-3.7.1 3.安装Kafka3.1X 3.1 下载包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解压并进入Kafka目录: 根目录:D:setupkafka3.1.2 3、 编辑

    2024年02月09日
    浏览(36)
  • Kafka系列之:记录一次Kafka Topic分区扩容,但是下游flink消费者没有自动消费新的分区的解决方法

    生产环境Kafka集群压力大,Topic读写压力大,消费的lag比较大,因此通过扩容Topic的分区,增大Topic的读写性能 理论上下游消费者应该能够自动消费到新的分区,例如flume消费到了新的分区,但是实际情况是存在flink消费者没有消费到新的分区 出现无法消费topic新的分区这种情况

    2024年02月14日
    浏览(50)
  • Kafka - 主题Topic与消费者消息Offset日志记录机制

    可以根据业务类型,分发到不同的Topic中,对于每一个Topic,下面可以有多个分区(Partition)日志文件: kafka 下的Topic的多个分区,每一个分区实质上就是一个队列,将接收到的消息暂时存储到队列中,根据配置以及消息消费情况来对队列消息删除。 可以这么来理解Topic,Partitio

    2024年02月03日
    浏览(52)
  • kafka2.x常用命令:创建topic,查看topic列表、分区、副本详情,删除topic,测试topic发送与消费

    原创/朱季谦 接触kafka开发已经两年多,也看过关于kafka的一些书,但一直没有怎么对它做总结,借着最近正好在看《Apache Kafka实战》一书,同时自己又搭建了三台kafka服务器,正好可以做一些总结记录。 本文主要是记录如何在kafka集群服务器上创建topic,查看topic列表、分区、

    2024年02月03日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包