Spark Streaming + Kafka构建实时数据流

这篇具有很好参考价值的文章主要介绍了Spark Streaming + Kafka构建实时数据流。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 使用Apache Kafka构建实时数据流

参考文档链接:https://cloud.tencent.com/developer/article/1814030

2. 数据见UserBehavior.csv

数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集

根据这一csv文档运用Kafka模拟实时数据流,作为Spark Streaming的输入源,两条记录实际的间隔时间如果是1分钟,那么Java应用在发送消息时也可以间隔一分钟再发送。

kafka实时流处理,spark,kafka,json,hadoop,sqlkafka实时流处理,spark,kafka,json,hadoop,sql

3. 处理要求

• 找出订单数量最多的日期。

• 找出最受欢迎的前三名商品ID

        这个是老师根据某个比赛修改了赛题给大伙布置的任务,数据在上面方式可见,想着用java写实在是太麻烦了,改用了spark读取并模拟数据的实时性上传到Kafka,然后用sparkStreaming接收并处理数据。

代码如下:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.JSONObject

import java.util.Properties

object KafkaProducer {
  case class UserBehavior(User_ID: String, Item_ID: String, Category_ID: String, Behavior: String,Timestamp: String,Date: String)
//定义了一个样例类 UserBehavior,用于处理用户行为数据

  def main(args:Array[String])={

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
//设置日志级别。
    val spark:SparkSession = SparkSession.builder()
      .appName("KafkaProducer")
      .master("local[2]")
      .getOrCreate()
//创建SparkSession对象,设置应用程序名和运行模式
    val props = new Properties
    props.put("bootstrap.servers", "127.0.0.1:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
//设置kafka的生产者属性并创建kafka的生产者实
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("ERROR")

    val path=""
    val lineRDD: RDD[Array[String]] = sc.textFile(path).map(_.split(","))

    val UserBehaviorRDD: RDD[UserBehavior] = lineRDD.map(x => UserBehavior(x(0), x(1), x(2), x(3),x(4),x(5)))

    import spark.implicits._
    val UserBehaviorDF: DataFrame = UserBehaviorRDD.toDF
    val jsonStringDF = UserBehaviorDF.toJSON.toDF("value") // 转换为JSON格式的DataFrame
    val jsonStringArr = jsonStringDF.collect.map(_.getString(0))  // 获取JSON格式的DataFrame中的JSON字符串数组

    val topic = "UserBehavior"

  //或者你也可以直接这样发送数据更简单
  //val path=""
  //val df = spark.read.csv(path)
  //val JsonDF = df.toJSON
  //val data = JsonDF.collect()
  //data.foreach{x=>
  //   val record = new ProducerRecord[String, String](topic,x)           //    producer.send(record)
  //}

    var lastTimestamp = 10000000000L

    for (jsonString <- jsonStringArr) {
      val jsonObject = new JSONObject(jsonString)
      val timestamp = jsonObject.getString("Timestamp")
      var currentTimestamp = timestamp.toLong
      if (currentTimestamp - lastTimestamp >= 60000) {  //模拟数据实时发送,如果当此时的时间与上一条的时间相隔超过60秒
        Thread.sleep(60000)   //等待1分钟发送
        lastTimestamp=currentTimestamp
        println(jsonString)
        val record = new ProducerRecord[String, String](topic,jsonString)
        producer.send(record)
      } else {
        lastTimestamp=currentTimestamp
        println(jsonString)
        val record = new ProducerRecord[String, String](topic,jsonString)
        producer.send(record)
      }
    }
    producer.close()
    sc.stop()
    spark.stop()
  }
}

下面是SparkStreaming读取的代码:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._

object SparkStreaming {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder.appName("SparkStreaming").master("local[2]").getOrCreate()

    val schema = StructType(Seq(
      StructField("User_ID", StringType),
      StructField("Item_ID", StringType),
      StructField("Category_ID", StringType),
      StructField("Behavior", StringType),
      StructField("Timestamp", StringType),
      StructField("Date", StringType),
    ))//定义数据模式

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "127.0.0.1:9092")
      .option("subscribe","UserBehavior")
      .option("startingOffsets", "earliest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .select(from_json(col("value"),schema).as("data"))
      .select("data.User_ID","data.Item_ID","data.Category_ID","data.Behavior","data.Timestamp","data.Date")
//选择value列,并映射成DataFrame,解析JSON格式的数据成可读的列。
    val newDF = df.withColumn("Timestamp", from_unixtime(df("Timestamp"), "yyyy-MM-dd"))
//将时间戳改成时间格式
    val result = newDF.filter(col("Behavior") === "buy")
      .groupBy(col("Timestamp"))
      .agg(count(col("User_ID")).as("buy_count"))
      .orderBy(col("buy_count").desc)
      .limit(1)
//      .cache()

    val result2 = newDF.groupBy("Item_ID")
      .agg(count("*").as("count"))
      .orderBy(col("count").desc)
      .limit(3)
//      .cache()

    // 启动流处理并等待处理结束
    val query = result.writeStream
      .outputMode("complete")
      .format("console")
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .start()

    val query2 = result2.writeStream
      .outputMode("complete")
//      .outputMode("update")
      .format("console")
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .start()

    query.awaitTermination()
    query2.awaitTermination()
    spark.stop()
  }
}
    这里可见我用过cache()将数据缓存到内存中,但是cache()对于这两个查询任务的性能提升不太明显。因为在这个例子中,数据是实时流式处理的,而不是一次处理一个批次的静态数据。对于流处理程序而言,常规的缓存方法对于提升性能的作用是非常有限的。流式数据的实时特性意味着数据不断更新,因此很难保持缓存的数据与最新的数据的一致性。所以在流处理中,更有效的性能优化方法是使用更高效的算法,并通过对流数据的精细控制来调整计算中的批大小和触发机制,而不是简单地使用缓存方法。

但是用dataframe格式输出的太慢了,所以下面试用rdd的形式:


import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.json.JSONObject

import java.util.Properties

object RDDStreaming {
  case class UserBehavior(User_ID: String, Item_ID: String, Category_ID: String, Behavior: String,Timestamp: String,Date: String)

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("KafkaSparkStreaming").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    val kafkaParams = Map[String, Object](
      //Kafka的broker列表,格式为host:port,host:port
      "bootstrap.servers" -> "127.0.0.1:9092",
      //key的反序列化方式
      "key.deserializer" -> classOf[StringDeserializer],
      //value的反序列化方式
      "value.deserializer" -> classOf[StringDeserializer],
      //消费者组ID
      "group.id" -> "test-group",
      //从最早的记录开始处理消息
      "auto.offset.reset" -> "earliest",
      //不自动提交偏移量
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    def writeToMySQL(df: DataFrame) = {
      val properties: Properties = new Properties()
      properties.setProperty("user", "账户")
      properties.setProperty("password", "密码")
      properties.setProperty("driver", "com.mysql.jdbc.Driver")

      df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/Order", "Order.userbehavior", properties)
    }
   
 val topics = Array("UserBehavior")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    println("--------------------------------------")
    stream.foreachRDD(rdd => {
      val count = rdd.filter(row => row.value.contains("buy")).count()
      val order = rdd.filter(row => row.value.contains("buy")).map(row => {
        val json = new JSONObject(row.value())
        UserBehavior(
          json.getString("User_ID"),
          json.getString("Item_ID"),
          json.getString("Category_ID"),
          json.getString("Behavior"),
          json.getString("Timestamp"),
          json.getString("Date")
        )
      })
//      val current = order.map(x => (x.User_ID, x.Item_ID))
//      current.foreach(x => println("用户ID:" + x._1 + " 商品ID: " + x._2))
      val MostOrderCount = order.map(x=>(x.Date.split(" ")(0),1)).reduceByKey(_+_).sortBy(_._2,false)

      if (!MostOrderCount.isEmpty()) {
        println("订单数量最多的日期:"+MostOrderCount.first()._1+" 数量:"+MostOrderCount.first()._2)
      } else {
        print(" ")
      }

      val order1 = rdd.map(row => {
        val json = new JSONObject(row.value())
        UserBehavior(
          json.getString("User_ID"),
          json.getString("Item_ID"),
          json.getString("Category_ID"),
          json.getString("Behavior"),
          json.getString("Timestamp"),
          json.getString("Date")
        )
      })
      val popular = order1.map(x=>(x.Item_ID,1)).reduceByKey(_+_).sortBy(_._2,false).take(3)
      popular.foreach(x=>println("最受欢迎的商品id:"+x._1+" 用户操作数量:"+x._2))

      println("订单总数为:"+count)
//      order1.foreachPartition(partition => {
//        val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/order", "root", "123456")
//
//        // 获取数据库连接,可以使用连接池技术来管理数据库连接
//        partition.foreach(record => {
//          val insertStatement = connection.prepareStatement(
//            "INSERT INTO user_behavior (User_ID, Item_ID, Category_ID, Behavior, Timestamp, Date) " +
//              "VALUES (?, ?, ?, ?, ?, ?)")
//          insertStatement.setString(1, record.User_ID)
//          insertStatement.setString(2, record.Item_ID)
//          insertStatement.setString(3, record.Category_ID)
//          insertStatement.setString(4, record.Behavior)
//          insertStatement.setString(5, record.Timestamp)
//          insertStatement.setString(6, record.Date)
//          insertStatement.executeUpdate()
//          insertStatement.close()
//        })
//        connection.close()
//      })
//      println("数据写入成功")
//插入数据速度较慢,用批处理
      import spark.implicits._
      if(!order1.isEmpty()) {
        writeToMySQL(order1.toDF)
        println("数据写入成功")
      }
      else println("无数据传入")
      println("--------------------------------------")
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

        很多人遇到个问题就是sparksession和sparkcontext不能一起创建,那是因为只能启动一个sparkcontext,在启动sparksession时会默认启动sparkContext,启动StreamingContext也一样会启动sparkContext,所以这时候只需要设置用一开始创建的那个sparkContext即可,然后对Stream中每一个rdd统计‘buy’的数量然后将所有数据写入到MYSQL中。

        下面附带maven依赖(可能这个代码里有些没用上,挑选其中即可):文章来源地址https://www.toymoban.com/news/detail-517094.html

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.30</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

到了这里,关于Spark Streaming + Kafka构建实时数据流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(81)
  • Spark实时数据流分析与可视化:实战指南【上进小菜猪大数据系列】

    上进小菜猪,沈工大软件工程专业,爱好敲代码,持续输出干货。 本文介绍了如何利用Apache Spark技术栈进行实时数据流分析,并通过可视化技术将分析结果实时展示。我们将使用Spark Streaming进行数据流处理,结合常见的数据处理和可视化库,实现实时的数据流分析和可视化展

    2024年02月07日
    浏览(51)
  • ClickHouse 与 Kafka 整合: 实时数据流处理与分析解决方案

    随着数据量的不断增长,实时数据处理和分析变得越来越重要。ClickHouse 和 Kafka 都是在现代数据技术中发挥着重要作用的工具。ClickHouse 是一个高性能的列式数据库,专为 OLAP 和实时数据分析而设计。Kafka 是一个分布式流处理平台,用于构建实时数据流管道和流处理应用程序

    2024年02月22日
    浏览(50)
  • 推荐系统架构设计实践:Spark Streaming+Kafka构建实时推荐系统架构

    作者:禅与计算机程序设计艺术 推荐系统(Recommendation System)一直都是互联网领域一个非常火热的话题。其主要目标是在用户多样化的信息环境中,通过分析用户的偏好、消费习惯等数据,提供个性化的信息推送、商品推荐、购物指导等服务。如何设计一个推荐系统的架构及

    2024年02月08日
    浏览(53)
  • 实时数据处理:数据流的安全与隐私

    实时数据处理在现代大数据环境中具有重要意义。随着互联网的普及和人们对数据的需求不断增加,实时数据处理技术已经成为了企业和组织的核心技术之一。然而,随着数据处理技术的不断发展,数据流的安全与隐私也成为了一个重要的问题。在这篇文章中,我们将深入探

    2024年02月20日
    浏览(49)
  • 实时Flink数据流与ApacheHadoop集成

    在大数据时代,实时数据处理和批处理数据分析都是非常重要的。Apache Flink 和 Apache Hadoop 是两个非常受欢迎的大数据处理框架。Flink 是一个流处理框架,专注于实时数据处理,而 Hadoop 是一个批处理框架,专注于大规模数据存储和分析。在某些场景下,我们需要将 Flink 和 H

    2024年02月19日
    浏览(49)
  • 实时Flink数据流与ApacheHive集成

    在大数据时代,实时数据处理和批处理数据处理都是非常重要的。Apache Flink 是一个流处理框架,可以处理大规模的实时数据流,而 Apache Hive 是一个基于 Hadoop 的数据仓库工具,主要用于批处理数据处理。在实际应用中,我们可能需要将 Flink 与 Hive 集成,以实现流处理和批处

    2024年02月22日
    浏览(65)
  • 云计算与大数据处理:实时计算与数据流

    云计算和大数据处理是当今信息技术领域的两个热门话题。随着互联网的普及和人们生活中的各种设备的不断增多,我们生活中的数据量不断增加,这些数据需要存储和处理。云计算是一种基于互联网的计算资源共享和分配模式,可以让用户在需要时轻松获取计算资源,从而

    2024年04月13日
    浏览(45)
  • 数据流处理框架Flink与Kafka

    在大数据时代,数据流处理技术已经成为了一种重要的技术手段,用于处理和分析大量实时数据。Apache Flink和Apache Kafka是两个非常重要的开源项目,它们在数据流处理领域具有广泛的应用。本文将深入探讨Flink和Kafka的关系以及它们在数据流处理中的应用,并提供一些最佳实践

    2024年04月23日
    浏览(41)
  • 大数据之使用Flume监听端口采集数据流到Kafka

    前言 题目: 一、读题分析 二、处理过程   1.先在Kafka中创建符合题意的Kafka的topic  创建符合题意的Kafka的topic 2.写出Flume所需要的配置文件 3.启动脚本然后启动Flume监听端口数据并传到Kafka 启动flume指令 启动脚本,观察Flume和Kafka的变化 三、重难点分析 总结          本题

    2024年02月08日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包