flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本

这篇具有很好参考价值的文章主要介绍了flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

对应官网

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/

测试数据

 * 广播流 官方案例 scala版本
 * 广播状态
 *    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/
 * 事件流:
 *    red,4side
 *    red,5side
 *    red,1side
 *    red,4side
 *
 * 规则流:
 *    rule1,4side,1side
 * 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.
 * map可存储多个规则

完整scala版本代码

package com.yy.state.operatorStateDemo

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.streaming.api.datastream.KeyedStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.util.Collector

import java.time.ZoneId
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._

/**
 * 广播流 官方案例 scala版本
 * 广播状态
 *    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/
 * 事件流:
 *    red,4side
 *    red,5side
 *    red,4side
 *    red,1side
 *    red,4side
 *
 * 规则流:
 *    rule1,4side,1side
 * 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.
 * map可存储多个规则
 */
object BroadcastStateV1 {

  case class Item(color:Color,shape: Shape){
    def getShape()={
      shape
    }

  }
  case class Rule(name:String,first:Shape,second:Shape)
  case class Color(color:String)
  case class Shape(shape:String)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = StreamTableEnvironment.create(env)


    // 指定国内时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))


    val itemStream = env.socketTextStream("localhost", 9999)
      .map(_.split(","))
      .map(arr => Item(Color(arr(0)), Shape(arr(1))))

    val ruleStream = env.socketTextStream("localhost", 9998).broadcast()
      .map(s => Rule(s.split(",")(0), Shape(s.split(",")(1)), Shape(s.split(",")(2))))


    val ruleStateDescriptor = new MapStateDescriptor(
      "RulesBroadcastState",
      BasicTypeInfo.STRING_TYPE_INFO,
      TypeInformation.of(new TypeHint[Rule](){}));
    val ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor)


    val colorPartitionedStream: KeyedStream[Item, Color] = itemStream
      .keyBy(new KeySelector[Item, Color] {
        override def getKey(value: Item): Color = value.color
      })


    colorPartitionedStream
      .connect(ruleBroadcastStream)
      .process(

        // type arguments in our KeyedBroadcastProcessFunction represent:
        //   1. the key of the keyed stream
        //   2. the type of elements in the non-broadcast side
        //   3. the type of elements in the broadcast side
        //   4. the type of the result, here a string

        new KeyedBroadcastProcessFunction[Color, Item, Rule, String]() {
          val  mapStateDesc =
            new MapStateDescriptor(
              "items",
              BasicTypeInfo.STRING_TYPE_INFO,
              new ListTypeInfo(classOf[Item]))

          val ruleStateDescriptor =
            new MapStateDescriptor(
              "RulesBroadcastState",
              BasicTypeInfo.STRING_TYPE_INFO,
              TypeInformation.of(new TypeHint[Rule]() {}))

          override def processElement(value: Item, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#ReadOnlyContext, out: Collector[String]): Unit = {
            val state = getRuntimeContext().getMapState(mapStateDesc)
            val shape = value.getShape()
            // 遍历广播的 rule
            ctx.getBroadcastState(ruleStateDescriptor).immutableEntries().asScala.foreach{
              entry =>
                val ruleName = entry.getKey()
                val rule = entry.getValue()
                val stored: ListBuffer[Item] = {
                  if (state.contains(ruleName)) {
                    state.get(ruleName).asScala.to[ListBuffer]
                  } else {
                    new ListBuffer[Item]()
                  }
                }

                //
                if (shape == rule.second && stored.nonEmpty) {
                  stored.foreach { i =>
                    out.collect("MATCH: " + i + " - " + value);
                  }
                  stored.clear();
                }

                // there is no else{} to cover if rule.first == rule.second
                if (shape.equals(rule.first)) {
                  stored.append(value);
                }

                if (stored.isEmpty) {
                  // 规则已经匹配输出 清理状态
                  state.remove(ruleName)
                } else {
                  // 没输出则更新状态
                  state.put(ruleName, stored.asJava)
                }


            }
          }

          override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#Context, out: Collector[String]): Unit = {
            ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
          }
        }
      ).print("sink --> ")




    env.execute("flink-broadcast-state")
  }
}



文章来源地址https://www.toymoban.com/news/detail-817277.html

到了这里,关于flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink1.18.0 flink维表join新思路

    弊端:         虽然缓存可以减轻维表负担,但是如果事实表数据量很大,每秒千万条,维度表只有百万条,也就是说 你会看到大量的无法关联的数据仍然需要查询维度表.  cache缓存千万数据量内存压力又比较大, 那么怎么减轻维表数据库压力,还能做到低延迟. 以往双流join ; a joi

    2024年01月24日
    浏览(31)
  • Flink1.18.1在CentOS7下的安装和部署

    本文是Flink1.18.1在CenOS7下的安装和部署的学习笔记,记录了基本的安装步骤及参数配置,给初学者避坑用。 一,安装JDK11 Flink在1.13及其之前的版本,推荐用JDK8。从Flink 1.14开始,官方推荐使用的Java版本是JDK 11,并且从Flink 1.17开始,部分依赖于Flink的第三方库已经弃用了对JDK

    2024年04月16日
    浏览(23)
  • flink1.18.0 macos sql-client.sh启动报错

    2024年01月23日
    浏览(35)
  • Python NumPy 广播(Broadcast)

    张量(Tensor)、标量(scalar)、向量(vector)、矩阵(matrix) Python Numpy 切片和索引(高级索引、布尔索引、花式索引) Python NumPy 广播(Broadcast) 广播(Broadcast)是 numpy 对不同形状(shape)的数组进行数值计算的方式, 对数组的算术运算通常在相应的元素上进行。 如果两个数组 a

    2024年02月04日
    浏览(34)
  • Android 广播使用详解(Broadcast Receivers)

    广播接收器用于响应来自其他应用程序或者系统的广播消息。这些消息有时被称为事件或者意图。例如,应用程序可以初始化广播来让其他的应用程序知道一些数据已经被下载到设备,并可以为他们所用。这样广播接收器可以定义适当的动作来拦截这些通信。 有以下两个重要

    2024年02月05日
    浏览(29)
  • 墨西哥小区广播CELL BROADCAST MEXICO 2023

    CB MEXICO 2023 GC Emergency broadcast is requested by Movistar MEXICO in a regulated standard named CBMexico. The implementation of this standar is MANDATORY to get approval. RULE 1: The title of the message must change depending on the channel. (check complete table) Function Primary channels: Title of the messages. Can be turned OFF Level 1 4370, 4383 Mens

    2023年04月21日
    浏览(25)
  • 大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

    在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

    2024年02月15日
    浏览(38)
  • Vue.js组件精讲 第4章 组件的通信2:派发与广播——自行实现dispatch和broadcast方法

    上一讲的 provide / inject API 主要解决了跨级组件间的通信问题,不过它的使用场景,主要是子组件获取上级组件的状态,跨级组件间建立了一种主动提供与依赖注入的关系。然后有两种场景它不能很好的解决: 父组件向子组件(支持跨级)传递数据; 子组件向父组件(支持跨

    2024年04月13日
    浏览(38)
  • Android发送广播时报错:Sending non-protected broadcast xxxxxxx from system xxxxxxxxxx

    带android:sharedUserId=“android.uid.system” 发送广播时,会出现 Sending non-protected broadcast 异常提醒; 原因: Ams在发送广播时,对于systemApp(系统应用),会要求发送广播必须是声明在frameworksbasecoreresAndroidManifest.xml里面的protected-broadcast。这是为了提醒 系统应用开发者要将 broadca

    2023年04月09日
    浏览(31)
  • 设计模式——状态模式(State Pattern)

    对象的行为依赖于它的状态(属性),并且可以根据它的状态改变而改变它的相关行为。 1.1、定义状态接口 1.2、定义开始状态实现类 1.3、定义停止状态实现类 1.4、创建 Context 类 1.5、使用 Context 来查看当状态 State 改变时的行为变化。 创建型模式 结构型模式 1、设计模式——

    2024年02月06日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包