大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

这篇具有很好参考价值的文章主要介绍了大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、broadcast广播

大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache,Scala,BigData,Spark,大数据,spark,缓存

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。文章来源地址https://www.toymoban.com/news/detail-617102.html

2、构建缓存

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject

// 定义全局缓存单例对象
object GlobalCache extends Serializable {

  // 广播变量,用于存储缓存数据
  private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _

  // 设置 SparkSession 和广播变量
  def setSparkSession(spark: SparkSession): Unit = {
    cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])
  }


  // 按订单ID和用户ID缓存JSONObject对象
  def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {
    // 获取广播变量的值并进行修改
    val data = cacheData.value
    data.synchronized {
      data.put(generateKey(orderId, userId), jsonObject)
    }
  }

  // 根据订单ID和用户ID删除缓存的JSONObject对象
  def removeJSONObject(orderId: String, userId: String): Unit = {
    // 获取广播变量的值并进行修改
    val data = cacheData.value
    data.synchronized {
      data.remove(generateKey(orderId, userId))
    }
  }

  // 根据订单ID和用户ID获取缓存的JSONObject对象
  def getJSONObjet(orderId: String, userId: String): JSONObject = {
    // 获取广播变量的值并进行访问
    val data = cacheData.value
    data.synchronized {
      data.get(generateKey(orderId, userId)).orNull
    }
  }

  // 生成缓存键,使用订单ID和用户ID拼接
  private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}

3、缓存测试

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}

object CacheTest {
  Logger.getLogger("org").setLevel(Level.ERROR)
  Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别


  def addItem(orderId:String, userId:String, name:String): Unit = {
    val jsonObject = new JSONObject()
    jsonObject.put("name", name)

    // 缓存JSONObject对象
    GlobalCache.cacheJSONObject(orderId, userId, jsonObject)
  }


  def getCache(orderId: String, userId: String): JSONObject = {
    // 获取缓存的JSONObject对象
    GlobalCache.getJSONObjet(orderId, userId)
  }

  def delItem(orderId:String, userId:String): Unit = {
    // 删除缓存的JSONObject对象
    GlobalCache.removeJSONObject(orderId, userId)
  }


  def getSparkSession(appName: String, localType: Int): SparkSession = {
    val builder: SparkSession.Builder = SparkSession.builder().appName(appName)
    if (localType == 1) {
      builder.master("local[8]") // 本地模式,启用8个核心
    }

    val spark = builder.getOrCreate() // 获取或创建一个新的SparkSession
    spark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别
    spark
  }

  def main(args: Array[String]): Unit = {
    println("Start CacheTest")
    val spark: SparkSession = getSparkSession("CacheTest", 1)

    GlobalCache.setSparkSession(spark)  // 构造全局缓存

    addItem("001", "456", "苹果")      // 添加元素
    addItem("002", "789", "香蕉")      // 添加元素
    var cachedObject = getCache("001", "456")
    println(s"Cached Object: $cachedObject")

    delItem("001", "456")      // 删除元素
    cachedObject = getCache("001", "456")
    println(s"Cached Object: $cachedObject")
    spark.stop()
  }
}

4、控制台输出

Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: null

Process finished with exit code 0

到了这里,关于大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python批处理(一)提取txt中数据存入excel

    现从冠层分析软件中保存了叶面积指数分析的结果,然而软件保存格式为txt,且在不同的文件夹中,每个文件夹的txt文件数量不固定,但是txt文件格式固定。现需要批量处理这些txt文件,获取头三行的数据,并存入excel中。 1、file = open(file_name, ‘r’)。使用open()函数打开名为

    2024年02月09日
    浏览(46)
  • mysql数据备份批处理文件正式版已测试通过

    中心思想找到源文件(.ibd)备份到目的地成.sql文件 1.比如备份 test1和test2 表 2.加载部分表 后续更新 说明: 备份的文件并非一定要求后缀名为.sql,例如后缀名为.txt的文件也是可以的。 示范

    2024年02月11日
    浏览(64)
  • 批处理命令大全 | Windows批处理教程 - ChatGPT

    批处理以.bat或.cmd文件的形式存在,在Windows命令提示符下运行,也可以通过双击批处理文件来运行。批处理文件由一系列命令组成,可以按照顺序执行,也可以根据条件或循环控制选择性地执行。 在Windows上创建一个批处理文件非常简单,在编辑器中输入一系列命令并保存为

    2024年02月04日
    浏览(80)
  • Flink多流处理之Broadcast(广播变量)

    写过Spark批处理的应该都知道,有一个广播变量 broadcast 这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有 broadcast ,简单来说和Spark中的类似,但是有所区别,首先Spark中的 broadcast 是静态的数据,而Flink中的 broadcast 是动态的,也就是源源不断的数据流.在Fl

    2024年02月13日
    浏览(46)
  • redis批处理优化

    一个命令在网络传输的时间往往是远大于在redis中执行命令的时间的,如果每条命令都要逐条经历网络传输,耗时将会大大增加,我们不妨将命令多量少次的传输给redis,这样就大大减少了因为网络传输时间,大大提高的效率 2.1.单机模式下的批处理 2.2.集群模式下的批处理 这

    2024年01月19日
    浏览(41)
  • Windows批处理

    @ echo off :关闭命令的回显功能,这样在执行脚本时不会显示每条命令的具体执行过程。建议将此行放在批处理脚本的首行。 rem :用于添加注释,后面可以跟上注释内容。注释的作用是对脚本进行说明或提醒,不会被执行。 pause :暂停批处理的运行,直到用户按下任意键才

    2024年02月07日
    浏览(48)
  • 【Sql】sql server数据库提示:执行Transact-SQL语句或批处理时发生了异常。 无法打开数据库msdb,错误:926。

    【问题描述】 打开sql server2008r2数据库的时候, 系统提示执行Transact-SQL语句或批处理时发生了异常。 无法打开数据库msdb,错误:926。 【概念理解】 首先MSDB数据库是的作用: 用于给SQL Server代理提供必要的信息来运行调度警报、作业及记录操作。同时也会记录数据库的备份和

    2024年02月04日
    浏览(63)
  • BAT 批处理脚本教程

    第一节 常用批处理内部命令简介 批处理定义:顾名思义,批处理文件是将一系列命令按一定的顺序集合为一个可执行的文本文件,其扩展名为BAT或者CMD。这些命令统称批处理命令。 小知识:可以在键盘上按下Ctrl+C组合键来强行终止一个批处理的执行过程。 了解了大概意思后

    2024年02月02日
    浏览(47)
  • JDBC p4 批处理

    基本介绍: 当需要成批插入或者更新记录时。可以采用Java的批量更新机制,这一机制允许多条语句一次性提交给数据库批量处理。通常情况下比单独提交处理更有效率。 JDBC的批量处理语句包括下面方法: addBatch():添加需要批量处理的SQL语句或参数; executeBatch():执行批量

    2024年02月15日
    浏览(39)
  • 【bat】批处理脚本大全

    目录 1.概述 2.变量 3.运算符 3.2.重定向运算符 3.3.多命名运算符 3.4.管道运算符 4.命令 4.1.基本命令 4.2.参数传递 4.3.查看脚本内容 4.4.注释 4.5.日期和时间 4.6.启动脚本 4.7.调用其他bat 4.8.任务管理 4.8.1.任务列表查看 4.8.2.任务终止 4.9.文件夹 4.10.关机 4.11.环境变量 4.12.目录 4.12.1

    2024年02月04日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包