十二、Flink自定义 FlatMap 方法

这篇具有很好参考价值的文章主要介绍了十二、Flink自定义 FlatMap 方法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、概述

1)作用

flatMap是将数据先map在打平,输入一个元素,可以输出0到多个元素

2)使用

1.匿名内部类

2.lambda表达式

3.实现FlatMapFunction接口

4.继承RichFlatMapFunction

2、代码实现
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class MyFlatmapDemo {
    public static void main(String[] args) throws Exception {
        // 创建执行环境的配置,添加webUI的端口号
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        // 从端口接入数据
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        // 1、匿名内部类
        SingleOutputStreamOperator<String> flatMapStream1 = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                for (String word : value.split(" ")) {
                    out.collect(word);
                }
            }
        });

        // 2、lambda表达式
        SingleOutputStreamOperator<String> flatMapStream2 = lines.flatMap((FlatMapFunction<String, String>) (value, out) -> {
            for (String word : value.split(" ")) {
                out.collect(word);
            }
        }).returns(String.class);

        // 3、继承 FlatmapFunction
        SingleOutputStreamOperator<String> flatMapStream3 = lines.flatMap(new MyFlatmapFunc());

        // 4、继承 RichFlatMapFunction
        SingleOutputStreamOperator<String> flatMapStream4 = lines.flatMap(new MyRichFlatMapFunc());

        flatMapStream1.print();
        flatMapStream2.print();
        flatMapStream3.print();
        flatMapStream4.print();

        env.execute();
    }
}

class MyFlatmapFunc implements FlatMapFunction<String,String>{
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : value.split(" ")) {
            out.collect(word);
        }
    }
}

class MyRichFlatMapFunc extends RichFlatMapFunction<String,String>{
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        System.out.println("可以在rich方法中创建状态和定时器");
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : value.split(" ")) {
            out.collect(word);
        }
    }
}
3、执行结果

1)输入测试数据

nc -lk 8888
hello world

控制台输出执行结果文章来源地址https://www.toymoban.com/news/detail-690199.html

3> hello
3> world
5> hello
5> world
8> hello
8> world
7> hello
7> world

到了这里,关于十二、Flink自定义 FlatMap 方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java中flatMap用法

        java中map是把集合每个元素重新映射,元素个数不变,但是元素值发生了变化。而flatMap从字面上来说是压平这个映射,实际作用就是将每个元素进行一个一对多的拆分,细分成更小的单元,返回一个新的Stream流,新的流元素个数增加。     java官方给出的示例如下:    

    2024年02月02日
    浏览(44)
  • map 和 flatMap 的区别

    ① map方法 ② flatMap方法 可以看到,不论是 map 还是 flatMap 方法,都是对以流的形式数据的处理,返回值同样都是流形式数据的泛型。本质一样,都是 map 操作,但是不同点在于,flatMap 操作会比 map 多一个 flat 操作。  \\\"flat\\\"单词本意有平的、扁平的含义,在源码中,我们对于

    2024年02月10日
    浏览(41)
  • kotlin map 与 flatmap

    kotlin map 与 flatmap 是2个不同的概念的 map 是一种数据结构,flatmap 是一个高阶函数,处理集合用的 Map 是一种数据结构,它由一系列的键值对组成,每个键都是唯一的,并且与一个特定的值相关联。你可以通过键来查找对应的值 下面定义一个map 并往里面填写值 或者使用mapOf 跟

    2024年01月22日
    浏览(50)
  • kotlin flatten 与 flatMap

    kotln中 flatten 和 flatMap 在 Kotlin 中虽然都用于扁平化处理集合,但它们的用法和效果并不完全一样 flatten 函数主要应用于嵌套集合(如 List of List 或 Set of Set 等),它会将嵌套集合中的所有元素合并到一个单一层次的集合中。 flatMap 不仅可以将嵌套集合扁平化,更重要的是它允

    2024年01月25日
    浏览(36)
  • Java8 - Streams flatMap()

    https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html 它由一个 2 级 Stream 或一个二维数组组成 。 在 Java 8 中,我们可以使用 flatMap 将上述 2 级 Stream 转换为 一级 Stream 或将 二维数组转换为 一维数组。 简言之, flatmap 方法让你把一个流中的每个值都换成另一个流,然后把所有

    2024年02月03日
    浏览(34)
  • java stream flatMap的使用及个人理解

    1.我认为用简单朴素的理解和使用,是对工具最好的诠释。java jdk8开始提供了stream流,方便我更高效的操作集合和编写代码。其中flatmap流中间操作api,我认为简单来说是对“集合中的集合的操作和展开”。比如说,一个对象集合里面的每个对象还有个集合对象。这时,我们如

    2024年01月18日
    浏览(35)
  • 17 | Spark中的map、flatMap、mapToPair mapvalues 的区别

    在Apache Spark中, map 、 flatMap 、 mapToPair 和 mapValues 是用于对RDD(Resilient Distributed Dataset)进行转换的不同操作。这些操作可以用来处理分布式数据集中的元素,但它们的用途和行为略有不同。 以下是它们的主要区别以及相应的Java代码示例: map : map 操作用于对RDD中的每个元

    2024年02月09日
    浏览(33)
  • spark中使用flatmap报错:TypeError: ‘int‘ object is not subscriptable

    菜鸟笔者在运行下面代码时发生了报错: 报错描述如下:  显然这是传入的数据类型发生了错误: 因为我们试图对整数对象执行下标操作,而这是不允许的。 原来flatMap底层通过取下标来展开元素 如果rdd集合里面有非可迭代对象(如int元素)则会报错TypeError: \\\'int\\\' object is n

    2024年02月04日
    浏览(41)
  • Flink(十二)【容错机制】

            最近已经放假了,但是一直在忙一个很重要的自己的一个项目,用 JavaFX 和一个大数据组件联合开发一个功能,也算不枉我学了一次 JavaFX,收获很大,JavaFX 它作为一个 GUI 开发语言,本质还是 Java,所以很好的锻炼了我的 Java 水平、抽象能力 ... 平常看似简单的一些概

    2024年01月17日
    浏览(43)
  • 大数据Flink(五十二):Flink中的批和流以及性能比较

    文章目录 Flink中的批和流以及性能比较 ​​​​​​​​​​​​​​一、Flink中的批和流

    2024年02月15日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包