【Flink实战】Flink中的分流

这篇具有很好参考价值的文章主要介绍了【Flink实战】Flink中的分流。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink中的分流

在Flink中将数据流切分为多个子数据流,子数据流称为”旁路输出数据流“。

拆分流数据的方式

  • Split,已经废弃,不推荐使用
  • Fliter
  • SideOut,推荐使用

Fliter分流的Java实现

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指标明细
        DataStream<String> detailMessage = KafkaConfigUtil.buildSource(env)
                .map((MapFunction<String, String>) kafkaMessage -> {
                    JSONObject jsonobject = null;
                    try {
                        jsonobject = JSONObject.parseObject(kafkaMessage);
                    } catch (Exception e) {
                        LOG.warn("报文格式错误:{}", kafkaMessage);
                    }
                    if (null == jsonobject || jsonobject.isEmpty()) {
                        LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));
                    } else {
                        if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
                                && !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
                            LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));
                        }
                    }
                    return JSONObject.toJSONString(jsonobject);
                });
        // 将原始流中包含demo的数据筛选出来
        DataStream<String> diagnosisMessages = detailMessage
                .filter((FilterFunction<String>) kafkaMessage -> (kafkaMessage.contains("demo")))
                .map((MapFunction<String, String>) sparkMessage -> {
                    // 为达到实验效果,进行日志输出
                    LOG.info("[is demo message]:{}", sparkMessage);
                    return sparkMessage;
                });

        env.execute("Flink Streaming Java API Skeleton");
    }

SideOut分流的Java实现

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        System.out.println("【SideOutputDemo】");
        
        // 指标明细
        DataStream<String> mainMessage = KafkaConfigUtil.buildSource(env)
                .map((MapFunction<String, String>) kafkaMessage -> {
                    JSONObject jsonobject = null;
                    try {
                        jsonobject = JSONObject.parseObject(kafkaMessage);
                    } catch (Exception e) {
                        LOG.warn("报文格式错误:{}", kafkaMessage);
                    }
                    if (null == jsonobject || jsonobject.isEmpty()) {
                        LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));
                    } else {
                        if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
                                && !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
                            LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));
                        }
                    }
                    return JSONObject.toJSONString(jsonobject);
                });

        // 定义一个切分(旁路输出)
        final OutputTag<String> outputTag = new OutputTag<String>("Spark_END") {
        };

        SingleOutputStreamOperator<String> sp = mainMessage
                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(
                            String s
                            , Context context
                            , Collector<String> collector) throws Exception {
                        // 向常规流(主流)中添加数据
                        collector.collect(s);
                        // 向旁路输出流中添加数据
                        if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {
                            context.output(outputTag, s);
                        }
                    }
                });
        sp.map((MapFunction<String, String>) sparkMessage -> {
            LOG.info("主流的数据: {}", sparkMessage);
            return sparkMessage;
        });

        DataStream<String> tag = sp.getSideOutput(outputTag);
        tag.map((MapFunction<String, String>) sparkMessage -> {
            LOG.info("旁路[{}]的数据: {}", outputTag.getId(), sparkMessage);
            return sparkMessage;
        });

        env.execute("Flink Streaming Java API Skeleton");
    }

SideOutPut 是 Flink 框架推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:

  1. 为每个分支流定义一个 SideOutPut。

  2. 为定义好的 SideOutPut发出数据。只有以下特定的函数才能通过Context上下文对象,向旁路输出的SideOutPut发送数据。

    1. ProcessFunction:处理函数,单流输入函数
    2. KeyedProcessFunction:处理函数,单流输入函数
    3. CoProcessFunction:处理函数,双流流输入函数
    4. KeyedCoProcessFunction:处理函数,双流流输入函数
    5. ProcessWindowFunction:窗口函数,全量计算函数
    6. ProcessAllWindowFunction:窗口函数,全量计算函数,它与 ProcessWindowFunction 类似,但是它会对窗口中的所有数据进行处理,而不是仅处理触发窗口计算的数据。

    例子中使用ProcessFunction实现流拆分。

  3. 根据SideOutPut 的ID标识获取旁路输出流,进行数据继续处理。文章来源地址https://www.toymoban.com/news/detail-687699.html

拆分方式 对比
Split 不支持链式拆分,切分得到的流,是不能进行再次切分的
Fliter 多分支流,需要多次遍历原始流进行筛选。浪费集群的资源
SideOut 以多次进行拆分的,支持链式拆分。

到了这里,关于【Flink实战】Flink中的分流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

    所谓 “分流” ,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream ,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选

    2024年01月24日
    浏览(49)
  • Flink分流,合流,状态,checkpoint和精准一次笔记

    第8章 分流 1.使用侧输出流 2.合流 2.1 union :使用 ProcessFunction 处理合流后的数据 2.2 Connect : 两条流的格式可以不一样, map操作使用CoMapFunction,process 传入:CoProcessFunction 2.2 BroadcastConnectedStream keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction; 如果没有按键分

    2024年02月12日
    浏览(36)
  • Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流

                           星光下的赶路人star的个人主页                        欲买桂花同载酒,终不似,少年游 计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是

    2024年02月07日
    浏览(46)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(51)
  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(90)
  • 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月09日
    浏览(48)
  • 【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

    🚀 作者 :“大数据小禅” 🚀 文章简介 :Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X 🚀 欢迎小伙伴们 点赞 👍、 收藏 ⭐、 留言 💬 Flink怎么操作Redis Flink怎么操作redis? 方式一:自定义sink 方式二:使用connector Redis Sink 核心是RedisMapper 是一个接口,使用时要

    2024年02月06日
    浏览(42)
  • 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月11日
    浏览(41)
  • 尚硅谷大数据Flink1.17实战教程-笔记03【Flink运行时架构】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月16日
    浏览(45)
  • 大数据Flink(五十二):Flink中的批和流以及性能比较

    文章目录 Flink中的批和流以及性能比较 ​​​​​​​​​​​​​​一、Flink中的批和流

    2024年02月15日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包