Flink旁路输出OutputTag

这篇具有很好参考价值的文章主要介绍了Flink旁路输出OutputTag。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

除了由 DataStream 操作产生的主要流之外,还可以产生任意数量的旁路输出结果流。结果流中的数据类型不必与主要流中的数据类型相匹配,并且不同旁路输出的类型也可以不同。当你需要拆分数据流时,通常必须复制该数据流,然后从每个流中过滤掉不需要的数据。

使用旁路输出时,首先需要定义用于标识旁路输出流的 OutputTag:

//需要使用匿名内部类,其中T是泛型
OutputTag<T> outputTag = new OutputTag<T>("side-output") {};

可以通过以下方法将数据发送到旁路输出:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

代码示例

1.流复制

将流复制两份 发到测输出流stream1 和stream2,代码如下(示例):


import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputTest {
    public static final String TYPE = "type";
    public static void main(String[] args) throws Exception {
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final ParameterTool params = ParameterTool.fromArgs(args);
        String hostName = params.get("hostname", "10.68.8.59");
        int port = params.getInt("port", 9999);
     // nc -l 9999
        DataStream<String> sourceStream = env.socketTextStream(hostName, port, "\n");
        SingleOutputStreamOperator<JSONObject> jsonObjectStream = sourceStream.map(s -> JSONObject.parseObject(s));
        //定义OutputTag
        OutputTag<JSONObject> outputTag1 = new OutputTag<JSONObject>("stream1") {
        };
        OutputTag<JSONObject> outputTag2 = new OutputTag<JSONObject>("stream2") {
        };

        //将流复制两份 发到测输出流stream1 和stream2
        SingleOutputStreamOperator<JSONObject> outputStream = jsonObjectStream.process(new ProcessFunction<JSONObject, JSONObject>() {

            @Override
            public void processElement(JSONObject jsonObject, Context context, Collector<JSONObject> collector)
                    throws Exception {
                context.output(outputTag1, jsonObject);
                context.output(outputTag2, jsonObject);
            }
        });
        DataStream<JSONObject> stream1 = outputStream.getSideOutput(outputTag1);
        DataStream<JSONObject> stream2 = outputStream.getSideOutput(outputTag2);
        //数据去向
        //stream1
        stream1.map(e -> {
            e.put("stream", "stream1");
            return e;
        }).print();
        //stream2
        stream2.map(e -> {
            e.put("stream", "stream2");
            return e;
        }).print();
        env.execute("SocketStreamTest");
    }
}

2.条件分流

可以根据自定义条件将数据分流。文章来源地址https://www.toymoban.com/news/detail-806308.html

public class SplitDemo {
    public static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};
    public static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = executionEnvironment.fromElements(1, 2, 3, 4, 5);

        SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws Exception {
                if (value % 2 == 0) {
                    // 这里不使用out.collect,而是使用ctx.output
                    // 这个方法多了一个参数,可以指定output tag,从而实现数据分流
                    ctx.output(evenTag, value);
                } else {
                    ctx.output(oddTag, value);
                }
            }
        });

        // 依赖OutputTag获取对应的旁路输出
        DataStream<Integer> evenStream = process.getSideOutput(evenTag);
        DataStream<Integer> oddStream = process.getSideOutput(oddTag);

        // 分别打印两个旁路输出流中的数据
        evenStream.process(new ProcessFunction<Integer, String>() {
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect("Even: " + value);
            }
        }).print();

        oddStream.process(new ProcessFunction<Integer, String>() {
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect("Odd: " + value);
            }
        }).print();

        executionEnvironment.execute();
    }
}

3.迟到数据分流

public class OutOfOrderDemo {
    // 创建tag
    public static final OutputTag<Tuple2<String, Integer>> lateTag = new OutputTag<Tuple2<String, Integer>>("late"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 示例数据,其中D乱序,I来迟(H到来的时候认为15000ms之前的数据已经到齐)
        SingleOutputStreamOperator<Tuple2<String, Integer>> source = executionEnvironment.fromElements(
                new Tuple2<>("A", 0),
                new Tuple2<>("B", 1000),
                new Tuple2<>("C", 2000),
                new Tuple2<>("D", 7000),
                new Tuple2<>("E", 3000),
                new Tuple2<>("F", 4000),
                new Tuple2<>("G", 5000),
                new Tuple2<>("H", 20000),
                new Tuple2<>("I", 8000)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Integer>>() {
            // 这里自定义WatermarkGenerator的原因是Flink按照运行时间周期发送watermark,但我们的例子是单次执行的,可以认为数据是一瞬间到来
            // 因此我们改写为每到来一条数据发送一次watermark,watermark的时间戳为数据的事件事件减去5000毫秒,意思是最多容忍数据来迟5000毫秒
            @Override
            public WatermarkGenerator<Tuple2<String, Integer>> createWatermarkGenerator(Context context) {
                return new WatermarkGenerator<Tuple2<String, Integer>>() {
                    @Override
                    public void onEvent(Tuple2<String, Integer> event, long eventTimestamp, WatermarkOutput output) {
                        long watermark = eventTimestamp - 5000L < 0 ? 0L : eventTimestamp - 5000L;
                        output.emitWatermark(new Watermark(watermark));
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {

                    }
                };
            }
        // 取第二个字段为watermark
        }).withTimestampAssigner((element, timestamp) -> element.f1));

        // 窗口大小5秒,允许延迟5秒
        // watermark和allowedLateness的区别是,watermark决定了什么时候窗口数据触发计算,allowedLateness决定什么数据被认为是lateElement,从而发送到sideOutput
        // 设置side output tag
        source.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateTag).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {
            @Override
            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
                Iterator<Tuple2<String, Integer>> iterator = elements.iterator();
                System.out.println("--------------------");
                while(iterator.hasNext()) {
                    System.out.println(iterator.next());
                }
            }
        // 打印sideoutput流内容
        }).getSideOutput(lateTag).process(new ProcessFunction<Tuple2<String, Integer>, Object>() {
            @Override
            public void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Object>.Context ctx, Collector<Object> out) throws Exception {
                System.out.println("Late element: " + value);
            }
        });

        executionEnvironment.execute();
    }
}

到了这里,关于Flink旁路输出OutputTag的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 输出至 Elasticsearch

    【1】引入 pom.xml 依赖 【2】 ES6 Scala 代码,自动导入的 scala 包需要修改为 scala._ 否则会出现错误。 【3】 ES6 输出展示

    2024年02月04日
    浏览(40)
  • Flink 输出至 Redis

    【1】引入第三方 Bahir 提供的 Flink-redis 相关依赖包 【2】 Flink 连接 Redis 并输出 Sink 处理结果 查看源码可知 RedisSink 是继承自 RichSinkFunctionIN 类 【3】查看 Redis 输出信息

    2024年02月04日
    浏览(27)
  • Flink侧输出流解析

    在实时数据处理领域,Apache Flink 已成为一个不可或缺的工具。它以其高吞吐量和低延迟处理能力而闻名。而在 Flink 的众多特性中,侧输出流(Side Outputs)提供了一种灵活的方式来处理复杂的数据流。本文将探讨如何在 Flink 的 Scala API 中有效使用侧输出流。 侧输出流是一种特

    2024年02月04日
    浏览(40)
  • flink处理函数--副输出功能

    在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出 本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出

    2024年02月07日
    浏览(44)
  • Flink Kafka[输入/输出] Connector

    本章重点介绍生产环境中最常用到的 Flink kafka connector 。使用 Flink 的同学,一定会很熟悉 kafka ,它是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟 kafka 进行一些数据的交换,比如利用 kafka consumer 读取数据,然后进行一系

    2024年02月04日
    浏览(41)
  • flink的副输出sideoutput单元测试

    处理函数中处理输出主输出的数据流数据外,也可以输出多个其他的副输出的数据流数据,当我们的处理函数有副输出时,我们需要测试他们功能的正确性,本文就提供一个测试flink副输出单元测试的例子 首先看一下处理函数,其中包含副输出逻辑 其次,看下对应的单元测试

    2024年02月03日
    浏览(45)
  • Flink源算子、转换算子和输出算子(DataSet)

    Flink是一种一站式处理的框架,既可以进行批处理(DataSet),也可以进行流处理(DataStream) 将Flink的算子分为两大类:DataSet 和 DataStream 1.1 fromCollection 从本地集合读取数据 1.2 readTextFile 从文件中读取 1.3 readTextFile 遍历目录 对一个文件目录内的所有文件,包括所有子目录中的

    2024年04月23日
    浏览(39)
  • flink-cdc,clickhouse写入,多路输出

    kafka日志数据从kafka读取 1、关联字典表:完善日志数据 2、判断日志内容级别:多路输出 低级:入clickhouse 高级:入clickhouse的同时推送到kafka供2次数据流程处理。

    2024年02月09日
    浏览(43)
  • Flink实现同时消费多个kafka topic,并输出到多个topic

    1)代码使用的 flink版本为1.16.1 ,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换; 2)本次编写的两个方案,均只适用于 数据源topic来自同一个集群 ,且kafka消费组相同,暂未研究flink的connect算子join多条流 代码涉及 Hadoop相关环境 ,若无该环境

    2023年04月20日
    浏览(90)
  • 一种基于动态水位值的Flink调度优化算法(flink1.5以前),等同于实现flink的Credit-based反压原理

    首先说明,偶然看了个论文,发现 flink优化原来比我想象中的更简单,得到了一些启发,所以写下这篇帖子,供大家共同学习。 看到的论文是《计算机科学与应用》21年11月的一篇 名字就叫做 : 一种基于动态水位值的Flink调度优化算法。感兴趣的小伙伴可以自己看一下 ,很

    2024年02月22日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包