Flink多流转换(1)—— 分流&合流

这篇具有很好参考价值的文章主要介绍了Flink多流转换(1)—— 分流&合流。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

分流

代码示例

使用侧输出流

合流

联合(Union)

连接(Connect)


简单划分的话,多流转换可以分为“分流”和“合流”两大类

目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行连接合并操作

分流

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream

Flink多流转换(1)—— 分流&合流,Flink,大数据,flink,java,python,大数据

代码示例

调用.filter()方法进行筛选,将符合条件的数据拣选出来放到对应的流里

public class SplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());
        
        // 筛选Mary的浏览行为放入MaryStream流中
        DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Mary");
            }
        });
        
        // 筛选Bob的购买行为放入BobStream流中
        DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Bob");
            }
        });
        
        // 筛选其他人的浏览行为放入elseStream流中
        DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return !value.user.equals("Mary") && !value.user.equals("Bob") ;
            }
        });

        MaryStream.print("Mary pv");
        BobStream.print("Bob pv");
        elseStream.print("else pv");
        
        env.execute();
    }
}

缺点:上述操作将原始流复制了三份,对每一份分别进行筛选,因此代码冗余,不够高效

解决:①.split()方法(但限制了数据类型转换,已经废弃)

②测输出流

使用侧输出流

改进后的代码如下:

public class SplitStreamByOutputTag {
    // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)
    private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};
    private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")){
                    ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else if (value.user.equals("Bob")){
                    ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else {
                    out.collect(value);
                }
            }
        });

        processedStream.getSideOutput(MaryTag).print("Mary pv");
        processedStream.getSideOutput(BobTag).print("Bob pv");
        processedStream.print("else");

        env.execute();
    }
}

①定义OutputTag作为标签

②使用ctx.output方法将符合筛选条件的数据写入侧输出流

③使用getSideOutput方法从侧输出流中获得数据

合流

对于来源不同的多条流中的数据进行联合处理(与分流相比,合流操作更为普遍)

联合(Union)

直接将多条流合在一起(要求必须流中的数据类型必须相同),合并之后的新流会包括所有流中的元素,数据类型不变

Flink多流转换(1)—— 分流&合流,Flink,大数据,flink,java,python,大数据

操作:基于 DataStream 直接调用.union()方法

参数:其他 DataStream

返回值:一个 DataStream

stream1.union(stream2, stream3, ...)

水位线时效性:多流合并时处理的时效性是以最慢的那个流为准的(多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程)

代码示例:

public class UnionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);



        SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777)
                .map(data -> {
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        stream1.print("stream1");

        SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777)
                .map(data -> {
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream2.print("stream2");

        // 合并两条流
        stream1.union(stream2)
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("水位线:" + ctx.timerService().currentWatermark());
                    }
                })
                .print();


        env.execute();
    }
}

测试:

分别在两台机器上输入以下数据:

hadoop102 :Alice, ./home, 1000
hadoop103 :Alice, ./home, 2000
hadoop102 :Alice, ./home, 3000

水位线的推进如下:

Flink多流转换(1)—— 分流&合流,Flink,大数据,flink,java,python,大数据

连接(Connect)

连接操作允许流的数据类型不同

连接流(ConnectedStreams)

连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的

要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型

Flink多流转换(1)—— 分流&合流,Flink,大数据,flink,java,python,大数据

代码实现

①基于一条 DataStream 调用.connect()方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams

②调用同处理方法得到 DataStream(可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法)

public class ConnectTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Integer> stream1 = env.fromElements(1,2,3);
        DataStream<Long> stream2 = env.fromElements(1L,2L,3L);

        ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
        SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
            @Override
            public String map1(Integer value) {
                return "Integer: " + value;
            }

            @Override
            public String map2(Long value) {
                return "Long: " + value;
            }
        });

        result.print();

        env.execute();
    }
}

①ConnectedStreams有两个类型参数,分别是stream1和stream2的类型;

②map方法中实现了一个CoMapFunction,表示分别对两条流中的数据执行 map 操作

类型参数<IN1, IN2, OUT>,分别表示第一条流、第二条流,以及合并后的流中的数据类型

这里我们将一条 Integer 流和一条 Long 流合并,转换成 String 输出。所以当遇到第一条流输入的整型值时,调用.map1();而遇到第二条流输入的长整型数据时,调用.map2():最终都转换为字符串输出,合并成了一条字符串流

③补充:ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);

传入的参数是两条流中各自的键选择器

这样的操作就是把两条流中key相同的数据放到了一起,然后针对来源的流各自进行处理;

同样也可以在合并之前先使用KeyBy进行分区,然后基于两条KeyedStream进行连接操作;

要注意两条流定义的键的类型必须相同,否则会抛出异常


CoProcessFunction

对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)

例如:CoMapFunction、CoFlatMapFunction、CoProcessFunction

CoProcessFunction源码如下:

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}

public abstract class Context {...}
...
}

简单示例:实现一个实时对账的需求,也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息

// 实时对账
public class BillCheckExample {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 来自app的支付日志
        SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );

        // 来自第三方支付平台的支付日志
        SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {
                        return element.f3;
                    }
                })
        );

        // 检测同一支付单在两条流中是否匹配,不匹配就报警
        appStream.connect(thirdpartStream)
                .keyBy(data -> data.f0, data -> data.f0)
                .process(new OrderMatchResult())
                .print();

        env.execute();
    }

    // 自定义实现CoProcessFunction
    public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{
        // 定义状态变量,用来保存已经到达的事件
        private ValueState<Tuple3<String, String, Long>> appEventState;
        private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

        @Override
        public void open(Configuration parameters) throws Exception {
            appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
            );

            thirdPartyEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
            );
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
            // 看另一条流中事件是否来过
            if (thirdPartyEventState.value() != null){
                out.collect("对账成功:" + value + "  " + thirdPartyEventState.value());
                // 清空状态
                thirdPartyEventState.clear();
            } else {
                // 更新状态
                appEventState.update(value);
                // 注册一个5秒后的定时器,开始等待另一条流的事件
                ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
            }
        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
            if (appEventState.value() != null){
                out.collect("对账成功:" + appEventState.value() + "  " + value);
                // 清空状态
                appEventState.clear();
            } else {
                // 更新状态
                thirdPartyEventState.update(value);
                // 注册一个5秒后的定时器,开始等待另一条流的事件
                ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
            if (appEventState.value() != null) {
                out.collect("对账失败:" + appEventState.value() + "  " + "第三方支付平台信息未到");
            }
            if (thirdPartyEventState.value() != null) {
                out.collect("对账失败:" + thirdPartyEventState.value() + "  " + "app信息未到");
            }
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }}

运行结果如下:

Flink多流转换(1)—— 分流&合流,Flink,大数据,flink,java,python,大数据

运行结果解析:
①在CoProcessFunction的实现中,声明了两个状态变量用来保存App的支付信息和第三方的支付信息

②App支付信息到达之后,触发processElement1中的操作,检查第三方的支付信息是否已经到达(如果先到达会保存在相应的状态变量中);如果已经到达,则对账成功;如果没有到达,则等待5s,仍未到达则对账失败;

③第三方支付信息到达后,流程同②

④对于order-1,时间戳为1000的数据(App)到达后,第三方支付信息未到达,等待5s,接着时间戳未3000的数据(第三方)到达后,发现App支付信息已经到达,因此对账成功

⑤对于order-2和order-3,均是等待5s后没有检测到App(第三方)数据到达而发出报警信息

广播连接流(BroadcastConnectedStream)

DataStream 调用.connect()方法时可以传入一个广播流(BroadcastConnectedStream)

这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)

如何创建广播流


基于DataStream调用.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor),说明状态的名称和类型;

因为广播状态底层是用一个“映射”(map)结构来保存的

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
数据流和广播流的连接

得到“广播连接流”(BroadcastConnectedStream),然后基于广播连接流调用.process()方法,就可以同时获取规则和数据,进行动态处理

DataStream<String> output = stream
 .connect(ruleBroadcastStream)
 .process( new BroadcastProcessFunction<>() {...} );
BroadcastProcessFunction

BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个方法,针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据,而另一条流则是要用新规则来更新广播状态,所以对应的两个方法叫作.processElement().processBroadcastElement()

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili 文章来源地址https://www.toymoban.com/news/detail-821336.html

到了这里,关于Flink多流转换(1)—— 分流&合流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《Flink学习笔记》——第九章 多流转换

    无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的。而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多条流进行处理的场景 简单划分(两大类): 分流——把一

    2024年02月11日
    浏览(35)
  • Flink学习——处理函数ProcessFunction及多流转换

            在DataStream的更底层,我们可以不定义任何具体的算子(如map(),filter()等)二只提炼出一个统一的“处理”(process)操作 。它是所有转换算子的概括性的表达。可以自定义处理逻辑。         所以这一层接口就被叫做“ 处理函数 ”( process function )         处理

    2024年02月14日
    浏览(40)
  • Flink-多流转换(Union、Connect、Join)

    无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的。而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多条流进行处理的场景。 简单划分的话,多流转换可以分为“

    2024年02月14日
    浏览(41)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

    所谓 “分流” ,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream ,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选

    2024年01月24日
    浏览(39)
  • Flink之SideOutput(数据分流)

    Flink在早期版本有一个 split 算子用来做 数据分流 使用的,但是在 flink-1.12 开始这个 API 就已经被删除了,在 1.12 版本以后我们是通过 process 算子来做数据分流的,这里就介绍一下如何使用 prodess 进行数据分流. 代码 结果数据 通过结果内容可以看到数据完全按照我们分流的逻辑进

    2024年02月14日
    浏览(31)
  • Spring SpEL在Flink中的应用-与Filter结合实现数据动态分流

    SpEL表达式与Flink fiter结合可以实现基于表达式的灵活动态过滤。有关SpEL表达式的使用请参考 Spring SpEL在Flink中的应用-SpEL详解 。 可以将过滤规则放入数据库,根据不同的数据设置不同的过滤表达式,从而实现只需修改过滤表达式不用修改Flink代码的功能。对于基于Flink进行数

    2024年01月25日
    浏览(28)
  • flink学习之广播流与合流操作demo

    广播流是什么? 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景? 一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现

    2024年02月09日
    浏览(28)
  • 【Flink实战】Flink中的分流

    Flink中的分流 在Flink中将数据流切分为多个子数据流,子数据流称为”旁路输出数据流“。 拆分流数据的方式 Split,已经废弃,不推荐使用 Fliter SideOut,推荐使用 Fliter分流的Java实现 SideOut分流的Java实现 SideOutPut 是 Flink 框架推荐的分流方法,在使用 SideOutPut 时,需要按照以下

    2024年02月10日
    浏览(28)
  • Flink多流处理之join(关联)

    Flink的 API 中只提供了 join 的算子,并没有 left join 或者 right join ,这里我们就介绍一下 join 算子的使用,其实 join 算子底层调用的就是 coGroup ,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup. 数据源 代码 结果 这个 API 使用起来还是比较简单

    2024年02月13日
    浏览(26)
  • Flink多流处理之Broadcast(广播变量)

    写过Spark批处理的应该都知道,有一个广播变量 broadcast 这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有 broadcast ,简单来说和Spark中的类似,但是有所区别,首先Spark中的 broadcast 是静态的数据,而Flink中的 broadcast 是动态的,也就是源源不断的数据流.在Fl

    2024年02月13日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包