flink学习之广播流与合流操作demo

这篇具有很好参考价值的文章主要介绍了flink学习之广播流与合流操作demo。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

广播流是什么?

将一条数据广播到所有的节点。使用 dataStream.broadCast()

广播流使用场景?

一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

广播流怎么用?

一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

不多说直接上demo,开箱即用

package com.chenchi.broadcast;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Random;

public class BroadCastStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());
        DataStream<User> userDataStream = env.addSource(new CustomerSource());


        userDataStream.print("user");
        patternDataStream.print("pattern");
        //test1  直接合流 不广播。只会在一个节点更新。 用于特殊需求?
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream)
//                .process(new CustomerSimpleProcess())
//                .print();
        //test2
        // 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream.broadcast())
//                .process(new CustomerSimpleProcess())
//                        .print();
        //test3
        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(Pattern.class));
        //通过描述器 更新
        BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);
        userDataStream
                .keyBy(user -> user.userId)
                .connect(broadcast)
                .process(new CustomerBroadCastProcess())
                .print();
        env.execute();
    }

    private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {

        @Override
        public void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            Integer userVip = user.getVip();
            //获取广播流的数据 不是通过map保存
            Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);
            if (pattern!=null){
                Integer patternVip = pattern.vip;
                String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;
                if (userVip>= patternVip){
                    result=result+"符合要求";
                }else {
                    result=result+"不符合要求";
                }
                collector.collect(result);
            }else {
                System.out.println("pattern is null ");
            }

        }

        @Override
        public void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,
                User, Pattern, String>.Context context, Collector<String> collector) throws Exception {
            BroadcastState<Void, Pattern> bcState = context.getBroadcastState(
                    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
            // 将广播状态更新为当前的pattern
            bcState.put(null, pattern);
        }


    }
    public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {
        ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的state
        HashMap<String,Integer> vipMap;
        @Override
        public void open(Configuration parameters) throws Exception {
            vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));
            vipMap=new HashMap<String,Integer>();
            super.open(parameters);
        }


        @Override
        public void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {
            Integer userVip = user.getVip();
            Integer patternVip = vipMap.getOrDefault("vip", 0);
            String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;
            if (userVip>=patternVip){
                result=result+"符合要求";
            }else {
                result=result+"不符合要求";
            }
            collector.collect(result);
        }

        @Override
        public void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {
            vipMap.put("vip",pattern.vip);
        }
    }

    public static class User {
        public Integer userId;
        public Integer vip;

        public User() {
        }

        public User(Integer userId, Integer vip) {
            this.userId = userId;
            this.vip = vip;
        }

        public Integer getUserId() {
            return userId;
        }

        public void setUserId(Integer userId) {
            this.userId = userId;
        }

        public Integer getVip() {
            return vip;
        }

        public void setVip(Integer vip) {
            this.vip = vip;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId=" + userId +
                    ", vip='" + vip + '\'' +
                    '}';
        }
    }

    // 定义行为模式POJO类,包含先后发生的两个行为
    public static class Pattern {
        public Integer vip;


        public Pattern() {
        }

        public Pattern(Integer vip) {
            this.vip = vip;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "vip='" + vip + '\'' +
                    '}';
        }
    }

    private static class CustomerSource implements SourceFunction<User> {
        boolean run = true;

        @Override
        public void run(SourceContext<User> sourceContext) throws Exception {
            while (true) {
                Integer userId = new Random().nextInt(1000);
                Integer vip = new Random().nextInt(10);
                sourceContext.collect(new User(userId, vip));
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            run = false;
        }
    }

    private static class ChangeSource implements SourceFunction<Pattern> {
        boolean run = true;

        @Override
        public void run(SourceContext<Pattern> sourceContext) throws Exception {
            int i = 1;
            while (true) {
                sourceContext.collect(new Pattern(i++));
                Thread.sleep(5000);
            }
        }

        @Override
        public void cancel() {
            run = false;
        }
    }

}

demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

test1 不广播

flink学习之广播流与合流操作demo,flink,学习,大数据

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

test2 map保存变化数据

flink学习之广播流与合流操作demo,flink,学习,大数据

test3通过描述器获取数据

和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。文章来源地址https://www.toymoban.com/news/detail-697669.html

到了这里,关于flink学习之广播流与合流操作demo的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第十三节 I/O流与文件操作

    目录 1.字节流 Input Stream输入流,用于从数据源读取数据 Output Stream输出流,用于将数据写入输出目标 读   再加以改进 改进  2,字符流​编辑 不管哪种语言都必须有文件或设备沟通的能力,java中引入流的概念,数据流向我们程序中 1个字节(byte) == 8个二进制位(bit) 8个二进制位

    2023年04月21日
    浏览(33)
  • Flink广播流 BroadcastStream

    Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一

    2024年03月20日
    浏览(35)
  • Flink 广播-broadcast

    一.Flink 广播-broadcast介绍 在Apache Flink中,\\\"广播\\\"是一种特殊的数据分发模式,用于将数据从一个并行操作传播到整个作业的所有并行任务中。广播操作对于将少量数据有效地分发给并行任务,以便它们能够共享这些数据而不必进行昂贵的网络通信,是非常有用的。它通常用于

    2024年02月15日
    浏览(40)
  • python学习-->opencv图像基本操作学习之灰度图转换

    好久没更新,趁今天要做核酸回不了宿舍,把今天的学习的opencv知识先记录一下! 运行环境是:pycharm 话不多说,献上代码再说: 首先我们先读取我们的图片进来! 跟着我们先尝试一下在打开我们的图片看看! 下面是实现的代码! 运行之后我的图片是这样的 我们可以看看图

    2024年02月08日
    浏览(59)
  • Flink多流处理之Broadcast(广播变量)

    写过Spark批处理的应该都知道,有一个广播变量 broadcast 这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有 broadcast ,简单来说和Spark中的类似,但是有所区别,首先Spark中的 broadcast 是静态的数据,而Flink中的 broadcast 是动态的,也就是源源不断的数据流.在Fl

    2024年02月13日
    浏览(48)
  • C# 学习笔记2-控制流与类型转换

    关于变量的简单操作 判断 循环 类型转换 异常处理 检查数字类型的溢出 一元运算符 Unary operators x++ , ++x , x-- , --x 。 这些运算符同 C++。 postfix operator 后置运算符 还有 typeof(int) , sizeof(int) 。 二元运算符 Binary arithmetic operators 无非是: + 、 - 、 * 、 / 、 % modulus 模 remaind

    2024年02月02日
    浏览(39)
  • 《Java面向对象程序设计》学习笔记——第 12 章 输入流与输出流

    ​笔记汇总: 《Java面向对象程序设计》学习笔记 File 对象主要用来获取文件本身的一些信息,例如文件所在的目录、文件的长度和文件的读/写权限等,不涉及对文件的读/写操作。 创建 File 对象的构造方法有以下 3 个: 其中, filename 是文件名字 ,directoryPath 是文件的路径,

    2024年02月11日
    浏览(37)
  • 大数据学习之Flink、搞懂Flink的恢复策略

     第一章、Flink的容错机制 第二章、Flink核心组件和工作原理 第三章、Flink的恢复策略 第四章、Flink容错机制的注意事项 第五章、Flink的容错机制与其他框架的容错机制相比较 目录 第三章、Flink的恢复策略 Ⅰ、恢复策略 1. Checkpoint: 2. Savepoint: 3. 重启策略: 4. 状态后端: 了

    2024年01月24日
    浏览(40)
  • flink学习之state

    state作用 保留当前key的历史状态。 state用法 ListStateInteger vipList = getRuntimeContext().getListState(new ListStateDescriptorInteger(\\\"vipList\\\", TypeInformation.of(Integer.class))); 有valueState listState mapstate 。冒失没有setstate state案例 比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小

    2024年02月09日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包