Flink多流转换(2)—— 双流连结

这篇具有很好参考价值的文章主要介绍了Flink多流转换(2)—— 双流连结。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

双流连结(Join):根据某个字段的值将数据联结起来,“配对”去做处理

窗口联结(Window Join)

可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理

代码逻辑

首先需要调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams;接着通过.where().equalTo()方法指定两条流中联结的 key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算

stream1.join(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(<WindowAssigner>)
 .apply(<JoinFunction>)

对于JoinFunction

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
 OUT join(IN1 first, IN2 second) throws Exception;
}

使用时需要实现内部的join方法,有两个参数,分别表示两条流中成对匹配的数据

JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑

实现流程

Flink多流转换(2)—— 双流连结,大数据,Flink,flink,java,服务器,大数据

两条流的数据到来之后,首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入 JoinFunction 的.join()方法进行计算处理。所以窗口中每有一对数据成功联结匹配,JoinFunction 的.join()方法就会被调用一次,并输出一个结果

实例分析

// 基于窗口的join
public class WindowJoinTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env
                .fromElements(
                        Tuple2.of("a", 1000L),
                        Tuple2.of("b", 1000L),
                        Tuple2.of("a", 2000L),
                        Tuple2.of("b", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        DataStream<Tuple2<String, Long>> stream2 = env
                .fromElements(
                        Tuple2.of("a", 3000L),
                        Tuple2.of("b", 3000L),
                        Tuple2.of("a", 4000L),
                        Tuple2.of("b", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        stream1
                .join(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
                        return left + "=>" + right;
                    }
                })
                .print();

        env.execute();
    }
}

运行结果如下:

Flink多流转换(2)—— 双流连结,大数据,Flink,flink,java,服务器,大数据

间隔联结(Interval Join)

统计一段时间内的数据匹配情况,不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧

因此需要采用”间隔联结“的操作,针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);

于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围

所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果

匹配条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

举例分析如下:

Flink多流转换(2)—— 双流连结,大数据,Flink,flink,java,服务器,大数据

下方的流 A 去间隔联结上方的流 B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒,上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素,它的可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为 3 的元素,可匹配区间为[1, 4],B 中只有时间戳为 1 的一个数据可以匹配,于是得到匹配数据对(3, 1)

代码逻辑

基于KeyedStream进行联结操作:


①DataStream 通过 keyBy 得到KeyedStream

②调用.intervalJoin()来合并两条流,得到的是一个 IntervalJoin 类型

③通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作

④调用.process()需要传入一个处理函数:ProcessJoinFunction

stream1
 .keyBy(<KeySelector>)
 .intervalJoin(stream2.keyBy(<KeySelector>))
 .between(Time.milliseconds(-2), Time.milliseconds(1))
 .process (new ProcessJoinFunction<Integer, Integer, String(){
 	@Override
 	public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
 	out.collect(left + "," + right);
 }
 });

实例分析

// 基于间隔的join
public class IntervalJoinTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
                Tuple3.of("Mary", "order-1", 5000L),
                Tuple3.of("Alice", "order-2", 5000L),
                Tuple3.of("Bob", "order-3", 20000L),
                Tuple3.of("Alice", "order-4", 20000L),
                Tuple3.of("Cary", "order-5", 51000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );

        SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 36000L),
                new Event("Bob", "./home", 30000L),
                new Event("Bob", "./prod?id=1", 23000L),
                new Event("Bob", "./prod?id=3", 33000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );

        orderStream.keyBy(data -> data.f0)
                .intervalJoin(clickStream.keyBy(data -> data.user))
                .between(Time.seconds(-5), Time.seconds(10))
                .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
                    @Override
                    public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(right + " => " + left);
                    }
                })
                .print();

        env.execute();
    }}

运行结果如下:

Flink多流转换(2)—— 双流连结,大数据,Flink,flink,java,服务器,大数据

窗口同组联结(Window CoGroup)

用法跟 window join非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了

代码逻辑

stream1.coGroup(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .apply(<CoGroupFunction>)

与 window join 的区别在于,调用.apply()方法定义具体操作时,传入的是一个CoGroupFunction

public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
 void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}

coGroup方法与FlatJoinFunction.join()方法类似,其中的三个参数依旧是两条流中的数据以及用于输出的收集器;但前两个参数不再是单独的配对数据,而是可遍历的数据集合;

因此该方法中直接把收集到的所有数据一次性传入,然后自定义配对方式,不需要再计算窗口中两条流数据集的笛卡尔积;

实例分析

// 基于窗口的join
public class CoGroupTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env
                .fromElements(
                        Tuple2.of("a", 1000L),
                        Tuple2.of("b", 1000L),
                        Tuple2.of("a", 2000L),
                        Tuple2.of("b", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        DataStream<Tuple2<String, Long>> stream2 = env
                .fromElements(
                        Tuple2.of("a", 3000L),
                        Tuple2.of("b", 3000L),
                        Tuple2.of("a", 4000L),
                        Tuple2.of("b", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        stream1
                .coGroup(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
                        collector.collect(iter1 + "=>" + iter2);
                    }
                })
                .print();

        env.execute();
    }
}

运行结果如下:

Flink多流转换(2)—— 双流连结,大数据,Flink,flink,java,服务器,大数据

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili  文章来源地址https://www.toymoban.com/news/detail-825459.html

到了这里,关于Flink多流转换(2)—— 双流连结的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink-多流转换(Union、Connect、Join)

    无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的。而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多条流进行处理的场景。 简单划分的话,多流转换可以分为“

    2024年02月14日
    浏览(41)
  • Flink双流(join)

    Join大体分类只有两种: Window Join和Interval Join Window Join有可以根据Window的类型细分出3种: Tumbling(滚动) Window Join、Sliding(滑动) Window Join、Session(会话) Widnow Join。         🌸Window 类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作

    2024年02月22日
    浏览(34)
  • 说说Flink双流join

    Flink双流JOIN主要分为两大类 一类是基于原生State的Connect算子操作 另一类是基于窗口的JOIN操作。其中基于窗口的JOIN可细分为window join和interval join两种。 基于原生State的Connect算子操作 实现原理:底层原理依赖Flink的State状态存储,通过将数据存储到State中进行关联join, 最终输出

    2024年02月10日
    浏览(30)
  • flink重温笔记(十三): flink 高级特性和新特性(2)——ProcessFunction API 和 双流 join

    前言:今天是学习 flink 的第 13 天啦!学习了 flink 高级特性和新特性之ProcessFunction API 和 双流 join,主要是解决大数据领域数据从数据增量聚合的问题,以及快速变化中的流数据拉宽问题,即变化中多个数据源合并在一起的问题,结合自己实验猜想和代码实践,总结了很多自

    2024年03月12日
    浏览(52)
  • flink双流ioin的大状态如何解决和调优

    Flink 中的双流 ioin 操作(双流连接)通常涉及大状态的处理,这可能导致一些性能和状态管理的挑战。以下是解决和调优 Flink 中双流 ioin 大状态的一些建议: 解决方案: 增大任务管理器的堆内存: 对于处理大状态的任务,增加 Flink 任务管理器的堆内存可以提供更多的内存

    2024年01月22日
    浏览(34)
  • Flink多流处理之join(关联)

    Flink的 API 中只提供了 join 的算子,并没有 left join 或者 right join ,这里我们就介绍一下 join 算子的使用,其实 join 算子底层调用的就是 coGroup ,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup. 数据源 代码 结果 这个 API 使用起来还是比较简单

    2024年02月13日
    浏览(26)
  • Flink多流处理之Broadcast(广播变量)

    写过Spark批处理的应该都知道,有一个广播变量 broadcast 这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有 broadcast ,简单来说和Spark中的类似,但是有所区别,首先Spark中的 broadcast 是静态的数据,而Flink中的 broadcast 是动态的,也就是源源不断的数据流.在Fl

    2024年02月13日
    浏览(37)
  • Flink多流处理之connect拼接流

    Flink中的拼接流 connect 的使用其实非常简单,就是 leftStream.connect(rightStream) 的方式,但是有一点我们需要清楚,使用 connect 后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理 leftStream 和 rightStream ,也可以使用不

    2024年02月13日
    浏览(31)
  • Flink+Paimon多流拼接性能优化实战

    目录 (零)本文简介 意外收获: (一)背景 (二)探索梳理过程 (三)源码改造 (四)修改效果 1、JOB状态 2、Level5的dataFile总大小 3、数据延迟 4、关联率 (五)未来展望:异步Compact Paimon多流拼接/合并性能优化;         为解决 离线T+1多流拼接数据时效性 、 Flink实时

    2024年02月09日
    浏览(35)
  • flink多流操作(connect cogroup union broadcast)

    connect 翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样 DataStream 连接成一个新 的 ConnectedStreams。需要注意的是,connect 方法与 union 方法不同,虽然调用 connect 方法将两个 流连接成一个新的 ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的

    2024年02月21日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包