Scala编程 读取Kafka处理并写入Redis

这篇具有很好参考价值的文章主要介绍了Scala编程 读取Kafka处理并写入Redis。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

部分知识(可略过)

        Kafka

        Kafka是一种分布式流处理平台,它是一个高吞吐量、可扩展、持久化的消息队列系统,用于处理实时数据流。Kafka的核心概念包括生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责将数据发布到Kafka集群,消费者则从Kafka集群中订阅并消费数据。主题是数据的分类或者分区,每个主题可以有多个分区,而每个分区又可以有多个副本。这种分区和复制的机制使得Kafka具备了高可用性和容错性。同时,Kafka还提供了丰富的API和生态系统,使得开发者可以方便地构建基于Kafka的实时数据处理应用。

        Redis

        Redis是一个开源的内存数据结构存储系统,它可以用作数据库、缓存和消息中间件。Redis支持多种数据结构,包括字符串、哈希表、列表、集合、有序集合等。它以键值对的形式存储数据,并且数据存储在内存中,因此具有快速的读写性能。Redis还提供持久化功能,可以将数据保存到磁盘上,以便在重启后恢复数据。由于其高性能、灵活性和丰富的功能,Redis被广泛应用于各种场景,如缓存加速、实时计数、排行榜、消息队列等。

        Scala

        Scala是一种面向对象的编程语言,也是一种函数式编程语言,它结合了面向对象编程和函数式编程的特性。Scala运行在Java虚拟机上,因此可以与Java代码无缝地集成。Scala具有静态类型系统,支持类型推断,可以提高代码的可读性和可维护性。Scala还提供了许多高级特性,如高阶函数、模式匹配、类型类等,使得编写高效、简洁、可重用的代码变得更加容易。Scala在大数据处理、Web应用程序、分布式系统等领域得到了广泛应用。

        Spark Streaming(DStream)

        Spark Streaming是Apache Spark的一个组件,它提供了实时数据处理和流式计算的功能。它允许开发人员使用Spark的强大的批处理引擎来处理实时数据流。Spark Streaming可以从多种数据源(如Kafka、Flume、HDFS等)接收实时数据,并将其分成小的批次进行处理。每个批次都可以像处理静态数据一样使用Spark的高级API进行处理,包括使用SQL查询、机器学习算法和图处理等。

        RDD

        RDD是Apache Spark中的一个核心概念。RDD是一个可分区、可并行计算的数据集合,它可以在分布式计算环境中进行高效的数据处理和分析。RDD具有容错性,即在计算过程中可以自动恢复失败的节点,并且可以在内存中缓存数据,以提高计算性能。RDD提供了一系列的转换操作(如map、filter、reduce等)和行动操作(如count、collect、save等),可以对数据集进行各种复杂的计算和操作。通过使用RDD,可以方便地进行大规模数据处理和分析。

        Jedis

        Jedis是一个Java语言编写的用于操作Redis数据库的客户端库。Jedis客户端库可以让Java开发者通过简单的API调用来连接、操作和管理Redis数据库。它提供了丰富的功能和灵活的接口,使得开发者可以方便地在Java应用程序中使用Redis。

Scala编程

        目的

        使用Scala编程,用Spark Streaming采集Kafka消费者端口接收到的信息,对信息进行处理求出每个电影ID对应的平均分数并写入到Redis数据库中。

         导入Maven依赖

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.22</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
    </dependencies>

         创建Scala文件导入依赖

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis

         创建Spark相关对象

    // 创建SparkConf对象,名称为"KafkaToRedis",并将运行模式设置为本地模式
    val sparkConf = new SparkConf().setAppName("KafkaToRedis").setMaster("local")

    // 创建SparkContext对象
    val sc = new SparkContext(sparkConf)

    // 创建StreamingContext对象, 设置每个批次的时间间隔为1秒
    val ssc = new StreamingContext(sc, Seconds(1))

        创建Kafka数据流

  • bootstrap.servers:Kafka消费者的IP地址以及端口号
  • auto.offset.reset:对偏移量的采集设置,latest:最新偏移量,即程序运行后会从Kafka消费者接收到的最新的消息开始采集。earliest:最早偏移量,即程序运行后会从Kafka消费者接收的第一条消息开始采集。
  • group.id:消费者组ID,需要到kafka/config/consumer.properties中去查看Scala编程 读取Kafka处理并写入Redis,大数据,scala,kafka,开发语言
    // 定义了Kafka的相关参数,包括Kafka的地址、键和值的反序列化器、偏移量重置方式和消费者组ID
    val kafkaParams = Map("bootstrap.servers" -> "192.168.181.128:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "auto.offset.reset" -> "latest",//latest:最新偏移量,earliest:最早偏移量
      "group.id" -> "test-consumer-group")

    // 定义了要订阅的主题集合
    val topics = Set("order")

    // 创建了一个直接流,将Kafka的消息流转换为DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

        处理数据流

        (通过kafka的API调用使用JAVA编写kafka生产者实现每秒向消费者发送一条消息,该程序本文未展出)

         消费者接收端数据如下

Scala编程 读取Kafka处理并写入Redis,大数据,scala,kafka,开发语言

        处理方法:

        1.因为采集到的一行data数据为{"movie_id":"84","movie_rank":8,"user_id":"365"},需要对数据进行分割提取处理。

        2.以data.split(",")(0).split(":")(1).replaceAll("\"", "").trim()为例

                split(",")(0)将data按逗号分割提取第一个,得到{"movie_id":"84"

               split(":")(1)将上一步得到的数据按‘:’经行分割提取第二个,得到"84"

                replaceAll("\"", "")将上一步得到的数据中的所有双引号替换为空字符串,得到 84  

                .trim()去除上一步得到的数据的两端的空格,得到84

            按照这个方法将提取到的数据存入movieId,movieRank,userId中

        3. 计算平均值

                读取redis数据库中键为movie_rank,字段为movieId的值。

                如果数据库中查不到,则将这条数据存入数据库中,格式为(电影平均分,评论次数)。

                如果找到了,则读取电影平均分和评论次数设置为旧参数

                计算公式:新的平均分=((旧平均分*旧评论次数)+  新的评论分数)/(旧评论次数+1)

                存入计算结果,(新电影平均分,旧评论次数+1)

    // 对每行数据进行处理
    kafkaStream.foreachRDD { rdd =>
      rdd.foreach { record =>

        //设置jedis客户端连接redis数据库
        val jedis = new Jedis("192.168.181.128", 6379)

        //创建data对象储存采集到的value值
        val data = record.value()

        //处理数据提取出movie_id,movie_rank,user_id的值
        val movieId = data.split(",")(0).split(":")(1).replaceAll("\"", "").trim()
        val movieRank = data.split(",")(1).split(":")(1).trim()
        val userId = data.split(",")(2).split(":")(1).replaceAll("\"", "").replaceAll("}", "").trim()

        // 读取redis中哈希表键为movie_rank,字段为movieId的值(读取对应电影ID的平均分)
        val movieRankInfo = jedis.hget("movie_rank", movieId)

        // 如果Redis中没有该电影的评分信息,则将该电影的评分信息存入Redis
        if (movieRankInfo == null) {
          jedis.hset("movie_rank", movieId, s"$movieRank,1")
        } else {
          // 如果Redis中已经有该电影的评分信息,则更新该电影的评分信息
          val oldRank = movieRankInfo.split(",")(0).toDouble
          val oldCount = movieRankInfo.split(",")(1).toInt
          val newRank = (oldRank * oldCount + movieRank.toDouble) / (oldCount + 1)
          jedis.hset("movie_rank", movieId, s"$newRank,${oldCount + 1}")
        }
      }
    }

        启动Spark Streaming

ssc.start()
ssc.awaitTermination()

         完整代码

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis

object KafkaToRedis {
  def main(args: Array[String]): Unit = {

    // 创建SparkConf对象,名称为"KafkaToRedis",并将运行模式设置为本地模式
    val sparkConf = new SparkConf().setAppName("KafkaToRedis").setMaster("local")

    // 创建SparkContext对象
    val sc = new SparkContext(sparkConf)

    // 创建StreamingContext对象, 设置每个批次的时间间隔为1秒
    val ssc = new StreamingContext(sc, Seconds(1))

    // 定义了Kafka的相关参数,包括Kafka的地址、键和值的反序列化器、偏移量重置方式和消费者组ID
    val kafkaParams = Map("bootstrap.servers" -> "192.168.181.128:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "auto.offset.reset" -> "latest",//latest:最新偏移量,earliest:最早偏移量
      "group.id" -> "test-consumer-group")

    // 定义了要订阅的主题集合
    val topics = Set("order")

    // 创建了一个直接流,将Kafka的消息流转换为DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

    // 对每行数据进行处理
    kafkaStream.foreachRDD { rdd =>
      rdd.foreach { record =>

        val jedis = new Jedis("192.168.181.128", 6379)
        val data = record.value()
        val movieId = data.split(",")(0).split(":")(1).replaceAll("\"", "").trim()
        val movieRank = data.split(",")(1).split(":")(1).trim()
        val userId = data.split(",")(2).split(":")(1).replaceAll("\"", "").replaceAll("}", "").trim()

        val movieRankInfo = jedis.hget("movie_rank", movieId)

        // 如果Redis中没有该电影的评分信息,则将该电影的评分信息存入Redis
        if (movieRankInfo == null) {
          jedis.hset("movie_rank", movieId, s"$movieRank,1")
        } else {
          // 如果Redis中已经有该电影的评分信息,则更新该电影的评分信息
          val oldRank = movieRankInfo.split(",")(0).toDouble
          val oldCount = movieRankInfo.split(",")(1).toInt
          val newRank = (oldRank * oldCount + movieRank.toDouble) / (oldCount + 1)
          jedis.hset("movie_rank", movieId, s"$newRank,${oldCount + 1}")
        }
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

 测试结果

        代码运行结果

Scala编程 读取Kafka处理并写入Redis,大数据,scala,kafka,开发语言Scala编程 读取Kafka处理并写入Redis,大数据,scala,kafka,开发语言

        Scala程序正常运行,每秒都在抓取数据计算并写入Redis

        Redis 查询结果

        进入redis/bin目录,登录redis数据库

./redis-cli -h 192.168.181.128

         选择0号数据库,查看该数据库中的键

Scala编程 读取Kafka处理并写入Redis,大数据,scala,kafka,开发语言

        查看movie_rank哈希表的所有值

192.168.181.128:6379> hgetall movie_rank 

Scala编程 读取Kafka处理并写入Redis,大数据,scala,kafka,开发语言

 让Scala程序运行一段时间再次查询

Scala编程 读取Kafka处理并写入Redis,大数据,scala,kafka,开发语言

数据库的内容正在实时改变,说明程序正常运行 文章来源地址https://www.toymoban.com/news/detail-827415.html

到了这里,关于Scala编程 读取Kafka处理并写入Redis的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

    目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】        a、使用场景:测试        b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理     1. 自定义采集器     2. 什么情况下需要自定采集器呢?          比

    2024年02月07日
    浏览(40)
  • FlinkSql写入/读取Kafka

    创建写入kafka的sink表 创建catalog 插入数据 发现kafka中已有数据 创建连接Kafka的Source表 创建iceberg表 3.插入数据 问题: 报错如下:org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId=‘null’, inTransaction=false, closed=false} Caused by: org.ap

    2024年02月15日
    浏览(39)
  • NCDC气象数据的提取与处理(四):python批量读取、写入nc数据经纬度格点数值

    1.问题描述: 2.思路: 3.实现过程: 3.1格点位置匹配 3.2写入表格 4.运行效果 4.1打包站点信息 4.2读取nc文件列表 4.3提取对应格点的nc数据 4.4数据写入 NCDC的站点数据处理在之前三节里已经介绍过了,但是NCDC的就那么几种数据可能不能满足日常使用,比如说辐射数据他就没有。

    2024年02月05日
    浏览(35)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(38)
  • 2.1、如何在FlinkSQL中读取&写入到Kafka

    目录 1、环境设置 方式1:在Maven工程中添加pom依赖 方式2:在 sql-client.sh 中添加 jar包依赖 2、读取Kafka 2.1 创建 kafka表 2.2 读取 kafka消息体(Value) 使用 \\\'format\\\' = \\\'json\\\' 解析json格式的消息 使用 \\\'format\\\' = \\\'csv\\\' 解析csv格式的消息 使用 \\\'format\\\' = \\\'raw\\\' 解析kafka消息为单个字符串字段

    2024年02月08日
    浏览(40)
  • Linux下文件的创建写入读取编程

            在linux下操作一个文件,首先要保证文件的存在(不存在就创建),接着打开文件( 打开成功 )并得到 文件描述符 ,接着在进行读写操作,最后还需要关闭文件。如果我们对文件进行读写之后不关闭文件,而直接关闭我们的编译器,可能会造成文件损坏。      

    2024年01月18日
    浏览(27)
  • Python 文件处理指南:打开、读取、写入、追加、创建和删除文件

    文件处理是任何Web应用程序的重要部分。Python有多个用于创建、读取、更新和删除文件的函数。 在Python中处理文件的关键函数是open()函数。open()函数接受两个参数:文件名和模式。 有四种不同的方法(模式)可以打开文件: \\\"r\\\" - 读取 - 默认值。打开一个文件以进行读取,如

    2024年02月05日
    浏览(50)
  • Java 文件处理完全指南:创建、读取、写入和删除文件详细解析

    文件处理是任何应用程序的重要部分。Java 提供了许多用于创建、读取、更新和删除文件的方法。 Java 文件处理 Java 中的文件处理主要通过 java.io 包中的 File 类完成。该类允许我们处理文件,包括创建、读取、写入和删除文件。 要使用 File 类,我们首先需要创建该类的对象,

    2024年03月18日
    浏览(50)
  • java中pdfbox处理pdf常用方法(读取、写入、合并、拆分、写文字、写图片)

    方法代码: 测试用例: 2.1写文字 方法代码: 测试用例: A.pdf: A2.pdf: 2.2写图片 方法代码: 测试用例: A.pdf: pic.jpg: A2.pdf: 方法代码: 测试用例: 方法代码: 测试用例: 引用链接: (17条消息) 使用Apache PDFBox实现拆分、合并PDF_似有风中泣的博客-CSDN博客 (17条消息) Java使用P

    2024年02月11日
    浏览(110)
  • spark stream入门案例:netcat准实时处理wordCount(scala 编程)

    目录 案例需求 代码 结果 解析          案例需求:         使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数         -- 1. Spark从socket中获取数据:一行一行的获取         -- 2. Driver程序执行时,streaming处理过程

    2024年02月07日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包