Flink 广播-broadcast

这篇具有很好参考价值的文章主要介绍了Flink 广播-broadcast。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一.Flink 广播-broadcast介绍

在Apache Flink中,"广播"是一种特殊的数据分发模式,用于将数据从一个并行操作传播到整个作业的所有并行任务中。广播操作对于将少量数据有效地分发给并行任务,以便它们能够共享这些数据而不必进行昂贵的网络通信,是非常有用的。它通常用于将配置信息、静态数据集或参考数据传播给Flink作业中的所有并行任务。

广播的主要优势在于,它避免了将数据通过网络发送到所有并行任务的开销,而是直接将数据复制到每个任务的本地内存中。这样,任务可以直接从本地内存访问数据,无需进行远程通信。这对于那些需要频繁访问同一份数据的场景非常有利,可以显著降低作业的整体延迟。

二.Flink 广播-broadcast使用场景

  1. 配置参数传播: 广播非常适合将配置参数传播到所有并行任务。例如,如果您的Flink作业需要根据特定的配置进行处理,您可以将这些配置参数广播到所有任务中,避免了每个任务单独获取配置的开销。

  2. 规则和字典传递: 在一些数据处理场景中,需要使用一组静态的规则或字典进行数据转换或处理。通过广播这些规则或字典,可以避免每个任务都去加载这些数据,从而减少处理延迟和资源消耗。

  3. 静态数据集共享: 如果您有一个静态数据集(不随时间变化)需要在整个作业的所有任务之间共享,广播可以提供一种高效的方式来共享这些数据,而不需要进行重复传输。

  4. 机器学习模型参数共享: 在机器学习任务中,有时候需要将训练好的模型参数广播给所有任务,以便它们在进行推断或预测时能够共享这些参数,避免重复计算。

  5. 数据分片信息: 在一些任务中,需要根据数据的分片信息来进行特定处理。通过广播数据分片信息,所有任务可以了解到数据的分片情况,从而更有效地进行处理。

  6. 数据缓存: 当需要频繁访问同一份数据时,通过广播将数据复制到每个任务的本地内存中,可以避免重复从磁盘或网络读取数据,从而提高处理性能。

需要注意的是,广播适用于较小的数据集。如果广播的数据集非常大,超过了内存容量,反而可能导致性能问题,因为它会消耗过多的内存资源。

三.Flink 广播-broadcast代码使用

在Flink中,要实现广播操作,需要遵循以下步骤:

  1. 创建要广播的数据集:首先,您需要准备要广播的数据集。通常,这是一个小型的静态数据集,例如配置参数、字典、规则等。这些数据将被广播到整个作业的所有并行任务中。

  2. 广播数据集:一旦您有了要广播的数据集,您需要使用broadcast()方法将其标记为广播数据集。这将会在作业的整个执行过程中将数据复制到所有任务的本地内存中。

  3. 使用广播数据集:在作业的其他部分,您可以通过调用withBroadcastSet()方法将广播数据集传递给算子函数。这样,每个并行任务都可以访问广播数据集,而不必重新复制或通过网络通信。文章来源地址https://www.toymoban.com/news/detail-616089.html

    package com.fwmagic.flink.batch;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Broadcast操作
     */
    public class Broadcast {
    
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,Time.of(10, TimeUnit.SECONDS)));
            //1、准备要广播的数据
            ArrayList<Tuple2<String,Integer>> broadData = new ArrayList<>();
            broadData.add(new Tuple2<>("zs", 23));
            broadData.add(new Tuple2<>("ls", 34));
            broadData.add(new Tuple2<>("ww", 45));
    
            //1.1 处理需要广播的数据,把数据集转成map,key为name,value为age
            DataSet<HashMap<String, Integer>> dateSet = env.fromCollection(broadData).map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
                @Override
                public HashMap<String, Integer> map(Tuple2<String, Integer> tuple) throws Exception {
                    HashMap<String,Integer> hashMap = new HashMap<>();
                    hashMap.put(tuple.f0, tuple.f1);
                    return hashMap;
                }
            });
    
            //源数据
            DataSource<String> dataSource = env.fromElements("zs", "ls", "ww");
    
            //注意:在这里需要使用到RichMapFunction获取广播变量
            DataSet<String> result = dataSource.map(new RichMapFunction<String, String>() {
    
                List<HashMap<String, Integer>> broadcastMap = new ArrayList<>();
                HashMap<String, Integer> allMap = new HashMap<>();
    
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    broadcastMap = getRuntimeContext().getBroadcastVariable("mybroadcastdata");
                    for (HashMap<String, Integer> map : broadcastMap) {
                        allMap.putAll(map);
                    }
                }
    
                @Override
                public String map(String name) throws Exception {
                    Integer age = allMap.get(name);
                    return name + "," + age;
                }
    
            }).withBroadcastSet(dateSet, "mybroadcastdata");
    
            result.print();
        }
    }
    

到了这里,关于Flink 广播-broadcast的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink番外篇】13、Broadcast State 模式示例-简单模式匹配(1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月19日
    浏览(48)
  • 大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

    在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

    2024年02月15日
    浏览(39)
  • Flink广播流 BroadcastStream

    Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一

    2024年03月20日
    浏览(28)
  • flink学习之广播流与合流操作demo

    广播流是什么? 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景? 一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现

    2024年02月09日
    浏览(28)
  • 大数据系列——Flink理论

    Flink是一个对有界和无界数据流进行有状态计算的分布式处理引擎和框架,既可以处理有界的批量数据集,也可以处理无界的实时流数据,为批处理和流处理提供了统一编程模型,其代码主要由 Java 实现,部分代码由 Scala实现。 Flink以REST资源的形式和外部进行交互,所以可以

    2024年02月03日
    浏览(31)
  • 大数据:【学习笔记系列】Flink基础架构

    Apache Flink 是一个开源的流处理框架,用于处理 有界 和 无界 的 数据流 。Flink 设计用于 运行在所有常见的集群环境 中,并且能够以 高性能 和 可扩展 的方式进行实时数据处理和分析。下面将详细介绍 Flink 的基础架构组件和其工作原理。 1. Flink 架构概览 Flink 的架构主要包括

    2024年04月23日
    浏览(28)
  • 大数据:【学习笔记系列】 Flink 学习路线

    Apache Flink 是一种高效、可扩展的 实时流处理框架 ,它允许开发者以 实时方式处理连续的数据流 。学习 Flink 要求你具备一定的编程基础(尤其是 Java 或 Scala),同时对大数据处理的基本概念有所了解。下面是一个详细的 Flink 学习路线,包括各阶段的学习目标和推荐资源。

    2024年04月23日
    浏览(25)
  • [Pytorch]Broadcasting广播机制

    Broadcasting机制用于在不同维度的张量进行运算时进行维度的自动增加与扩展,Broadcasting机制使用的前提是两个参与运算的张量是可broadcastable的。 怎样的两个向量是Broadcastable的,也就是可使用Broadcasting机制的? 规定右边的维度为小维度,即shape(2,32,13,13)中右边的13为最小的维

    2024年02月11日
    浏览(22)
  • Python NumPy 广播(Broadcast)

    张量(Tensor)、标量(scalar)、向量(vector)、矩阵(matrix) Python Numpy 切片和索引(高级索引、布尔索引、花式索引) Python NumPy 广播(Broadcast) 广播(Broadcast)是 numpy 对不同形状(shape)的数组进行数值计算的方式, 对数组的算术运算通常在相应的元素上进行。 如果两个数组 a

    2024年02月04日
    浏览(34)
  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包