Flink从入门到放弃—Stream API—常用算子(map和flatMap)

这篇具有很好参考价值的文章主要介绍了Flink从入门到放弃—Stream API—常用算子(map和flatMap)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

导航

观看本文章之前,请先看之前文章
(十)Flink Datastream API 编程指南 算子-1 (转换算子、物理分区、任务链、资源组 、算子和作业)等基本介绍

常用算子基本使用

效果

本次主要是阅读几个常用算子的源码。

阅读算子列表

  • map
  • flatmap
  • keyby
  • filter
  • reduce
  • window
  • windowAll
  • 其它

Map()

首先说一下这个算子是one to one的,通俗的讲就是进一条数据 经过逻辑处理后 必出一条数据。

用户代码

package com.stream.samples;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author DeveoperZJQ
 * @since 2022/11/12
 */
public class MapOperator {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 消费socket数据源
        DataStreamSource<String> dataStream = env.socketTextStream("192.168.112.147", 7777);

// map => String::length 是传入的虚函数map的逻辑
        SingleOutputStreamOperator<Integer> map = dataStream.map(String::length);

// print输出
        map.print();

        env.execute(MapOperator.class.getSimpleName());
    }
}

上面的逻辑非常简单,目的只是为了追踪map的入口,你可以使用debug模式,并且在map上打上断点,可以一层一层的往下看。

两种变形

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 根据传入函数获取返回值类型
       TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
       // 调用下面的map方法
       return this.map(mapper, outType);
   }

// 带返回的输出类型参数,可以看到传入的operator默认名字就叫Map
   public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
       return this.transform("Map", outputType, (OneInputStreamOperator)(new StreamMap((MapFunction)this.clean(mapper))));
   }

Flink从入门到放弃—Stream API—常用算子(map和flatMap)
上面两个重载方法刚好对应上面截图中的两个方法。

 @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    // 抽象工厂
        return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }

上面代码有一个抽象工厂的简单 算子工厂类,传入的是OneInputStreamOperator类型的算子,然后把用户代码的operator转换成了StreamOperatorFactory operatorFactory, 工厂类是真的多啊,如果设计模式关于工厂方法和抽象工厂没学牢固的同学,可以来这里细品。

// protected 只能包能调用
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
// 我理解这里调用这个就是为了抛出异常的返回类型,如果可用,typeUsed则标识为true
        this.transformation.getOutputType();
        // 构建OneInputTransformation
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operatorFactory, outTypeInfo, this.environment.getParallelism());
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);
        // 这里讲用户逻辑封装到resultTransform,全部装填到List<Transformation<?>> 为构建流图StreamGraph做准备
        this.getExecutionEnvironment().addOperator(resultTransform);
        return returnStream;
    }

// 添加算子到Transformations<>集合中
@Internal
    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

env.execute(MapOperator.class.getSimpleName());这里开始执行,物理执行图部署成功,这个就开始了消费数据。

当然通过上面说的,你可以尝试着自定义map算子或者其它算子,只要使用transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator<T, R> operator)得当,就会实现你的功能,但是一般不建议入门选手这样做。

这里面,咱们还看到很多使用clean() 方法的地方,后面章节会拿出一章说明这个。

flatMap()

可以看出,flatMap的源码和map非常类似。有一个地方值得咱们看一下,就是传入的函数和map是不同 ,flatmap可以一进多出,一进不出,但是map做不到,单纯从算子性能上说flatMap的开销是要比map大的。

来看一下下面两者的区别:new StreamMap((MapFunction) vs new StreamFlatMap((FlatMapFunction)

StreamMap

public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

单从这个各自的构造器上看不出什么,传入的函数不一样,可以点击进去

@Public
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {
    O map(T value) throws Exception;
}

map方法是有返回值的。

再来看下具体处理方法:

public void processElement(StreamRecord<IN> element) throws Exception {
        this.output.collect(element.replace(((MapFunction)this.userFunction).map(element.getValue())));
    }

从上面可以看出map直接把数据通过collect回收了,等于没有对用户暴露,那么flatMatp呢?看下面的源码。

StreamFlatMap

public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

点击进去,是下面的接口类,里面有一个flatMap方法,继承的类也是一样的。

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

    void flatMap(T value, Collector<O> out) throws Exception;
}

flatMap没有返回值,它是利用Collector out返回数据的。

再来看下具体处理方法:

public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
    }

 public void processElement(StreamRecord<IN> element) throws Exception {
        this.collector.setTimestamp(element);
        ((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);
    }

把this.collector作为参数封装到flatMap里了,用户可以通过collector进行操作。

那么现在是不是明白了,为啥map是一对一,flatMap是一对多和一对0了吧。

结语

本文章是系列文章中的一篇,如果有错误的地方,欢迎批评指正!欢迎同行交流!文章来源地址https://www.toymoban.com/news/detail-401873.html

到了这里,关于Flink从入门到放弃—Stream API—常用算子(map和flatMap)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink的常用算子以及实例

    特性:接收一个数据,经过处理之后,就返回一个数据 1.1. 源码分析 我们来看看map的源码 map需要接收一个MapFunctionT,R的对象,其中泛型T表示传入的数据类型,R表示经过处理之后输出的数据类型 我们继续往下点,看看MapFunctionT,R的源码 这是一个接口,那么在代码中,我们就需

    2024年02月12日
    浏览(35)
  • 【Flink】DataStream API使用之源算子(Source)

    创建环境之后,就可以构建数据的业务处理逻辑了,Flink可以从各种来源获取数据,然后构建DataStream进项转换。一般将数据的输入来源称为数据源(data source),而读取数据的算子就叫做源算子(source operator)。所以,Source就是整个程序的输入端。 Flink中添加source的方式,是

    2024年02月10日
    浏览(29)
  • Apache Flink从入门到放弃——Flink简介(一)

       随着大数据的发展,大数据的存储、计算、运用百花齐放;而大数据的计算中最重要的就是计算引擎,时至今日,很多人将大数据引擎分为四代,分别是: 第一代,Hadoop承载的MapReduce,将计算分为Map和Reduce两个阶段,同时采用Hadoop集群的分布式计算原理来实现数据的计

    2024年02月05日
    浏览(26)
  • flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作

    前言:今天是学习 flink 第三天啦,学习了高级 api 开发中11 中重要算子,查找了好多资料理解其中的原理,以及敲了好几个小时代码抓紧理解原理。 Tips:虽然学习进度有点慢,希望自己继续努力,不断猜想 api 原理,通过敲代码不断印证自己的想法,转码大数据之路一定会越

    2024年02月19日
    浏览(33)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(1)源算子(Source)

    DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成: Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。 不同的环境,代码的提交运行的过程会

    2024年01月22日
    浏览(46)
  • Java 8 中的 Stream API - map() 方法详解

    摘要: Java 8 中的 Stream API 提供了一种新的处理集合和数组的方式,可以使代码更加简洁、易读,同时还可以提高性能。其中 map() 方法是比较常用的方法之一,它可以将 Stream 对象中的每个元素映射为另一个元素。本文将对 Java 8 中的 Stream API 和 map() 方法进行详细介绍,并通

    2024年04月09日
    浏览(42)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

    所谓 “分流” ,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream ,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选

    2024年01月24日
    浏览(38)
  • 大数据学习之Flink算子、了解DataStream API(基础篇一)

    注: 本文只涉及DataStream 原因:随着大数据和流式计算需求的增长,处理实时数据流变得越来越重要。因此,DataStream由于其处理实时数据流的特性和能力,逐渐替代了DataSet成为了主流的数据处理方式。 目录 DataStream API (基础篇) 前摘: 一、执行环境 1. 创建执行环境 2. 执

    2024年01月23日
    浏览(43)
  • 【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月05日
    浏览(40)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

    用户自定义函数( user-defined function , UDF ),即用户可以根据自身需求,重新实现算子的逻辑。 用户自定义函数分为: 函数类 、 匿名函数 、 富函数类 。 Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction 、 FilterFunction 、 ReduceFunction 等。所

    2024年01月23日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包