【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

这篇具有很好参考价值的文章主要介绍了【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】,Spark,intellij-idea,spark,scala

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.11"

lazy val root = (project in file("."))
  .settings(
    name := "SparkLearning",
    idePackagePrefix := Some("cn.lh.spark"),
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",
    libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",
    libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  
  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  
  
object NetworkWordCountStatefultoMysql {  
  
  def main(args: Array[String]): Unit = {  
    //    定义状态更新函数  
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {  
      val currentCount = values.foldLeft(0)(_ + _)  
      val previousCount = state.getOrElse(0)  
      Some(currentCount + previousCount)  
    }  
  
    //    设置log4j日志级别  
    StreamingExamples.setStreamingLogLevels()  
  
    val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  
    val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  
  
    //    设置检查点,具有容错机制  
    scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  
  
    val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  
    val words: DStream[String] = lines.flatMap(_.split(" "))  
    val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  
    val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  
    // 打印出状态  
    stateDstream.print()  
    // 将统计结果保存到MySQL中  
    stateDstream.foreachRDD(rdd =>{  
      val repartitionedRDD = rdd.repartition(3)  
      repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  
    })  
  
    scc.start()  
    scc.awaitTermination()  
  
    scc.stop()  
  }  
  
  
}

StreamingSaveMySQL8.scala

package cn.lh.spark  
  
import java.sql.DriverManager  
  
object StreamingSaveMySQL8 {  
  
  // 定义写入 MySQL 的函数  
  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  
    // 保存到MySQL  
    val ip = "192.168.137.110"  
    val port = "3306"  
    val db = "sparklearning"  
    val username = "lh"  
    val pwd = "Lh123456!"  
    val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  
    val conn = DriverManager.getConnection(jdbcurl, username, pwd)  
    val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  
  
    try {  
      // 写入数据  
      iter.foreach { wc =>  
        statement.setString(1, wc._1.trim)  
        statement.setInt(2, wc._2.toInt)  
        statement.executeUpdate()  
      }  
    } catch {  
      case e:Exception => e.printStackTrace()  
    } finally {  
      if(statement != null){  
        statement.close()  
      }  
      if(conn!=null){  
        conn.close()  
      }  
    }  
  }  
  
}

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】,Spark,intellij-idea,spark,scala

  2. 启动nc -lk 9999
    【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】,Spark,intellij-idea,spark,scala

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】,Spark,intellij-idea,spark,scala

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】,Spark,intellij-idea,spark,scala


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!文章来源地址https://www.toymoban.com/news/detail-632876.html

到了这里,关于【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • idea的datagrip报错[08S01] 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。

    今天第一次使用idea中的datagrip就报了这个问题,有说要导入证书的有说要关闭验证的但都没什么作用。 [08S01] 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid cer

    2024年02月08日
    浏览(49)
  • idea 中无法连接 sql server 数据库,报错:驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接

    上面的代码报错如下: 在dbURL中把;trustServerCertificate=true加上后就没有报错了 无报错 因为sql server在jdbc连接的时候需要一定的安全验证,只需要在dbURL中把;trustServerCertificate=true加上后令其跳过就行了

    2024年02月12日
    浏览(55)
  • 网络编程套接字 | UDP套接字

    前面的文章中我们叙述了网络编程套接字的一些预备知识点,从本文开始我们就将开始UDP套接字的编写。本文中的服务端与客户端都是在阿里云的云服务器进行编写与测试的。 在v1的版本中我们先来使用一下前面讲过得一些接口,简单的构建一个udp服务器: 然后运行上述的程

    2024年02月09日
    浏览(100)
  • 套接字通信(附带单线程TCP套接字通信代码)

    1. 概念 1.1 局域网和广域网 局域网(LAN)和广域网(WAN)是两种不同范围的计算机网络,它们用于连接多台计算机以实现数据共享和通信。 局域网(LAN): 定义: 局域网是一个较小范围内的网络,通常限定在某个地理区域,比如一个办公室、学校或者家庭。 范围: LAN 的范

    2024年01月21日
    浏览(47)
  • 网络编程之 Socket 套接字(使用数据报套接字和流套接字分别实现一个小程序(附源码))

    网络编程是指网络上的主机,通过不同的进程,以编程的方式实现 网络通信(或称为网络数据传输) 只要满足不同的进程就可以进行通信,所以即便是在同一个主机,只要不同的进程,基于网络传输数据,也属于网络编程 在一次网络传输中: 发送端: 数据的 发送方进程

    2024年02月03日
    浏览(63)
  • 【JavaEE】网络编程之TCP套接字、UDP套接字

    目录 1.网络编程的基本概念 1.1为什么需要网络编程  1.2服务端与用户端 1.3网络编程五元组  1.4套接字的概念 2.UDP套接字编程 2.1UDP套接字的特点  2.2UDP套接字API 2.2.1DatagramSocket类 2.2.2DatagramPacket类  2.2.3基于UDP的回显程序 2.2.4基于UDP的单词查询  3.TCP套接字编程 3.1TCP套接字的特

    2023年04月20日
    浏览(75)
  • 【JaveEE】网络编程之TCP套接字、UDP套接字

    目录 1.网络编程的基本概念 1.1为什么需要网络编程  1.2服务端与用户端 1.3网络编程五元组  1.4套接字的概念 2.UDP套接字编程 2.1UDP套接字的特点  2.2UDP套接字API 2.2.1DatagramSocket类 2.2.2DatagramPacket类  2.2.3基于UDP的回显程序 2.2.4基于UDP的单词查询  3.TCP套接字编程 3.1TCP套接字的特

    2023年04月13日
    浏览(169)
  • 【C/C++套接字编程】套接字的基本概念与基础语法

    TCP/UDP实验为牵引,学习套接字编程的相关知识,再进一步深化对TCP/UDP的理解 目录 前言 Socket编程语法 1. 套接字及创建 什么是套接字? 创建套接字 2. 端口绑定 3. 收发信息 与recv()函数的比较: 与send()函数的比较: 编程实例  总结 系列博客 【C/C++套接字编程】TCP协议通信的

    2024年02月09日
    浏览(47)
  • Python进阶篇(三)-- TCP套接字与UDP套接字编程

    1.1 介绍         本文将首先利用 Python 实现面向TCP连接的套接字编程基础知识:如何创建套接字,将其绑定到特定的地址和端口,以及发送和接收数据包。其次还将学习 HTTP 协议格式的相关知识。在此基础上,本篇将用 Python 语言开发一个简单的 Web 服务器,它仅能处理一

    2023年04月23日
    浏览(43)
  • 网络编程套接字(3)——Java数据报套接字(UDP协议)

    目录 一、Java数据报套接字通信模型 二、UDP数据报套接字编程 1、DatagramSocket         (1)DatagramSocket构造方法         (2)DatagramSocket方法 2、DatagramPacket         (1)DatagramPacket构造方法         (2)DatagramPacket方法 3、InetSocketAddress 三、代码示例:回显服务

    2024年03月12日
    浏览(99)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包