Flink之SideOutput(数据分流)

这篇具有很好参考价值的文章主要介绍了Flink之SideOutput(数据分流)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink在早期版本有一个split算子用来做数据分流使用的,但是在flink-1.12开始这个API就已经被删除了,在1.12版本以后我们是通过process算子来做数据分流的,这里就介绍一下如何使用prodess进行数据分流.

  • 代码
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/7
 * @Description: 测流输出
 **/
public class FlinkSideOutput {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(3);
        // 这里使用的是自定义数据源为了方便测试,具体数据源根据自己的实际情况进行更换
        DataStreamSource<CustomizeBean> customizeSourceStream = env.addSource(new CustomizeSource());
        /**
         * 需求
         * 1. 将性别为M且爱好为'羽毛球运动爱好者'分到一个流
         * 2. 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'分到一个流
         * 3. 其他保留到主流
        **/
        SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {
            @Override
            public void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {
                String gender = value.getGender(); // 性别
                String hobbit = value.getHobbit(); // 爱好
                if (gender.equals("M") && hobbit.equals("羽毛球运动爱好者")) {
                    // 将性别为M且爱好为'羽毛球运动爱好者'进行分流, 注意这里要声明类型,Java无法自行推断
                    ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);
                } else if (gender.equals("W") && (hobbit.equals("篮球运动爱好者") || hobbit.equals("钓鱼爱好者"))) {
                    // 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'进行分流, 注意这里要声明类型,Java无法自行推断
                    ctx.output(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)), value);
                } else {
                    // 将剩下的数据保留在主流中
                    out.collect(value);
                }
            }
        });
        // 获取'M-羽毛球'分流数据,这里也要加上类型声明
        DataStream<CustomizeBean> mSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)));
        // 打印'M-羽毛球'结果
        mSideOutput.print("M-羽毛球");
        // 获取'W-篮球/钓鱼'分流数据,这里也要加上类型声明
        DataStream<CustomizeBean> wSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)));
        // 打印结果
        wSideOutput.print("W-篮球/钓鱼");
        // 主流数据打印结果
        processedStream.print("主数据流");

        env.execute("Side Output");
        
    }
}
  • 结果数据
主数据流:2> CustomizeBean(name=AAA-641, age=44, gender=W, hobbit=非遗文化爱好者)
主数据流:3> CustomizeBean(name=AAA-17, age=62, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-429, age=25, gender=W, hobbit=非遗文化爱好者)
主数据流:2> CustomizeBean(name=AAA-218, age=33, gender=M, hobbit=旅游爱好者)
主数据流:3> CustomizeBean(name=AAA-826, age=39, gender=M, hobbit=篮球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-190, age=31, gender=M, hobbit=旅游爱好者)
主数据流:2> CustomizeBean(name=AAA-266, age=32, gender=W, hobbit=网吧战神)
主数据流:3> CustomizeBean(name=AAA-106, age=70, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-911, age=50, gender=M, hobbit=网吧战神)
M-羽毛球:2> CustomizeBean(name=AAA-925, age=65, gender=M, hobbit=羽毛球运动爱好者)
主数据流:3> CustomizeBean(name=AAA-20, age=59, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-409, age=79, gender=W, hobbit=天文知识爱好者)
主数据流:2> CustomizeBean(name=AAA-865, age=58, gender=W, hobbit=天文知识爱好者)
主数据流:3> CustomizeBean(name=AAA-898, age=33, gender=M, hobbit=天文知识爱好者)
主数据流:1> CustomizeBean(name=AAA-85, age=38, gender=W, hobbit=非遗文化爱好者)
主数据流:2> CustomizeBean(name=AAA-883, age=51, gender=M, hobbit=美食爱好者)
主数据流:3> CustomizeBean(name=AAA-243, age=37, gender=M, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-430, age=28, gender=W, hobbit=旅游爱好者)
主数据流:2> CustomizeBean(name=AAA-127, age=65, gender=W, hobbit=网吧战神)
W-篮球/钓鱼:3> CustomizeBean(name=AAA-986, age=52, gender=W, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-840, age=50, gender=W, hobbit=旅游爱好者)
M-羽毛球:2> CustomizeBean(name=AAA-196, age=34, gender=M, hobbit=羽毛球运动爱好者)
主数据流:3> CustomizeBean(name=AAA-142, age=46, gender=W, hobbit=乒乓球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-985, age=78, gender=W, hobbit=美食爱好者)
W-篮球/钓鱼:2> CustomizeBean(name=AAA-490, age=50, gender=W, hobbit=钓鱼爱好者)
主数据流:3> CustomizeBean(name=AAA-295, age=77, gender=M, hobbit=篮球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-754, age=50, gender=M, hobbit=天文知识爱好者)
主数据流:2> CustomizeBean(name=AAA-249, age=35, gender=W, hobbit=羽毛球运动爱好者)
W-篮球/钓鱼:3> CustomizeBean(name=AAA-908, age=27, gender=W, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-674, age=73, gender=M, hobbit=非遗文化爱好者)

通过结果内容可以看到数据完全按照我们分流的逻辑进行输出的,如果想在主数据流中讲所有数据保留下来,Collector<Object> out单独拎出来即可,也就是不加到判断逻辑中,代码如下,这里就只展示部分代码了

SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {
            @Override
            public void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {
                String gender = value.getGender(); // 性别
                String hobbit = value.getHobbit(); // 爱好
                // 将所有数据保留在主流中
                out.collect(value);
                // 开始进行分流处理
                if (gender.equals("M") && hobbit.equals("羽毛球运动爱好者")) {
                    // 将性别为M且爱好为'羽毛球运动爱好者'进行分流, 注意这里要声明类型,Java无法自行推断
                    ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);
                } else if ((gender.equals("W") && (hobbit.equals("篮球运动爱好者")) || (gender.equals("W") && hobbit.equals("钓鱼爱好者")))) {
                    // 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'进行分流, 注意这里要声明类型,Java无法自行推断
                    ctx.output(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)), value);
                }
            }
        });

所有的内容到这里就结束了.文章来源地址https://www.toymoban.com/news/detail-634597.html

到了这里,关于Flink之SideOutput(数据分流)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink DataStream之使用filter实现分流

    新建类 启动netcat和程序  可以看到输入的\\\"World\\\"由于不满足两个filter中的任何一个,所以数据被舍弃。\\\"Monday\\\"和\\\"Hello\\\"分别打印在两个不同的流中。  

    2024年02月13日
    浏览(32)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

    所谓 “分流” ,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream ,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选

    2024年01月24日
    浏览(49)
  • Flink分流,合流,状态,checkpoint和精准一次笔记

    第8章 分流 1.使用侧输出流 2.合流 2.1 union :使用 ProcessFunction 处理合流后的数据 2.2 Connect : 两条流的格式可以不一样, map操作使用CoMapFunction,process 传入:CoProcessFunction 2.2 BroadcastConnectedStream keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction; 如果没有按键分

    2024年02月12日
    浏览(36)
  • Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流

                           星光下的赶路人star的个人主页                        欲买桂花同载酒,终不似,少年游 计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是

    2024年02月07日
    浏览(46)
  • 一种基于动态水位值的Flink调度优化算法(flink1.5以前),等同于实现flink的Credit-based反压原理

    首先说明,偶然看了个论文,发现 flink优化原来比我想象中的更简单,得到了一些启发,所以写下这篇帖子,供大家共同学习。 看到的论文是《计算机科学与应用》21年11月的一篇 名字就叫做 : 一种基于动态水位值的Flink调度优化算法。感兴趣的小伙伴可以自己看一下 ,很

    2024年02月22日
    浏览(50)
  • 大数据Flink(五十一):Flink的引入和Flink的简介

    文章目录 Flink的引入和Flink的简介 一、Flink的引入 1、第1代——Hadoop MapReduce

    2024年02月15日
    浏览(46)
  • 【Flink】 Flink实时读取mysql数据

    准备 你需要将这两个依赖添加到 pom.xml 中 mysql mysql-connector-java 8.0.0 读取 kafka 数据 这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。 package com.zhisheng.connectors.mysql.utils; impo

    2024年02月03日
    浏览(44)
  • 大数据Flink学习圣经:一本书实现大数据Flink自由

    本文是《大数据Flink学习圣经》 V1版本,是 《尼恩 大数据 面试宝典》姊妹篇。 这里特别说明一下:《尼恩 大数据 面试宝典》5个专题 PDF 自首次发布以来, 已经汇集了 好几百题,大量的大厂面试 干货、正货 。 《尼恩 大数据 面试宝典》面试题集合, 将变成大数据学习和面

    2024年02月12日
    浏览(48)
  • 大数据Flink(五十五):Flink架构体系

    文章目录 Flink架构体系 一、 Flink中的重要角色 二、Flink数据流编程模型

    2024年02月14日
    浏览(49)
  • Flink CDC 2.0 主要是借鉴 DBLog 算法

    DBLog 算法原理 DBLog 这个算法的原理分成两个部分,第一部分是分 chunk,第二部分是读 chunk。分 chunk 就是把一张表分为多个 chunk(桶/片) 。我可以把这些 chunk 分发给不同的并发的 task 去做。例如:有 reader1 和 reader2,不同的 reader 负责读不同的 chunk。其实只要保证每个 reade

    2024年02月08日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包