[flink 实时流基础]源算子和转换算子

这篇具有很好参考价值的文章主要介绍了[flink 实时流基础]源算子和转换算子。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
[flink 实时流基础]源算子和转换算子,Java探索者之路,flink,linq,大数据
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));

        // 2. 直接填元素
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);

        source.print();

        env.execute();
    }
2. 从文件读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path("input/world.txt"))
            .build();

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource")
            .print();


        env.execute();
    }
3. 从 socket 读取
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        source.print();


        env.execute();
    }

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("atguigu")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
5. 从数据生成器读取数据
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-datagen</artifactId>
			<version>${flink.version}</version>
		</dependency>
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:" + value;
            }
        }, 10, // 自动生成的数字序列
            RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条
            Types.STRING // 返回类型
        );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();


        env.execute();


    }

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
[flink 实时流基础]源算子和转换算子,Java探索者之路,flink,linq,大数据

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
[flink 实时流基础]源算子和转换算子,Java探索者之路,flink,linq,大数据
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
[flink 实时流基础]源算子和转换算子,Java探索者之路,flink,linq,大数据
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
[flink 实时流基础]源算子和转换算子,Java探索者之路,flink,linq,大数据文章来源地址https://www.toymoban.com/news/detail-847286.html

到了这里,关于[flink 实时流基础]源算子和转换算子的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • stm32f407探索者开发板(二)——新建工程(基于固件库)

    说实话,我非常不想这篇文章,因为太长太长了,我看视频写都写了一个下午(虽然我下午一直在摸鱼,啊啊啊啊啊)害,不管了,赶紧开始写吧,不然今晚是写不完了,呜呜呜…… 把这个里面的文件放先给下好,我这里是没有光盘的,所以需要从百度网盘上下载好,这里面

    2023年04月08日
    浏览(82)
  • 阿里云天池 天池实验室DSW探索者版 免费GPU 天池notebook教程

    1、DSW教程 点击天池notebook,进入我的实验室 选择一个私有项目,点击编辑 集成机器学习 PAI DSW (DataScienceWorkshop)探索者版开发环境 左边文件管理,中间工作区,右边是计算资源。 在文件资源管理区的顶部还有4个按钮,从左到右分别对应的是:打开DSW Launcher启动器,新建文

    2024年02月01日
    浏览(59)
  • stm32f407探索者开发板(十四)——IO引脚复用和映射

    端口复用和重映射都是和单片机的I/O口有关系,端口复用是将一个I/O赋予多个功能,通过设置I/O的工作模式来切换不同的功能。重映射是将某些I/O口上面的功能映射到其他I/O口上面去。但是注意一点:重映射的I/O都是厂家设置好的,不能自己更改。 端口复用 什么是端口复用

    2024年02月16日
    浏览(45)
  • stm32f407探索者开发板(二十二)——通用定时器基本原理讲解

    STM32F40x系列总共最多有14个定时器 三种(4)STM32定时器区别 STM3 F4 的通用 TIMx (TIM2、TIM3、TIM4 和 TIM5)定时器功能特点包括: 16 /32 位向上、向下、向上/向下(中心对齐)计数模式,自动装载计数器(TIMx_CNT)。 16 位可编程(可以实时修改)预分频器(TIMx_PSC),计数器时钟频率的分频系

    2024年02月12日
    浏览(43)
  • 看野火的视频,用正点原子的板子(STM32F4探索者)做ADC读取电压实验

    使用STM32F4的ADC1通道5(PA5)来采样外部电压值(这里采样两个电压值TPAD(3.3v),GND(0v)),最后通过串口打印电压值。 由上图可以看到, ADC1 的通道 5是对应着引脚PA5的。 板子右边是GND,ADC,TPAD引脚,做实验时,把两者相连即可读电压值。 ADC时钟: 这里是用于模拟电路的时

    2024年02月09日
    浏览(45)
  • STM32F407ZGT6正点原子F4探索者开发板 -- 跑马灯例程

    LED0 - PF9 LED1 - PF10 PF9 = 0, LED0 亮,PF9 = 1,LED0 灭 PF10 = 0, LED1 亮,PF10 = 1,LED1 灭

    2024年02月15日
    浏览(56)
  • 应届生谈薪技巧和注意事项,怎么为自己多争取1~2k(FPGA,芯片谈薪,数字IC,嵌入式,模拟IC,FPGA探索者)

      找工作的终极目标:谈薪!谈高薪!今天【FPGA探索者】给大家分享一下谈薪的技巧和注意事项,别被HR轻易压价。   本文适用人群: 应届毕业生 。 FPGA探索者 FPGA+数字IC笔试面试,无线通信物理层及数字信号处理,半导体芯片行业求职,校招社招实习,职场趣事,行业动

    2024年01月25日
    浏览(58)
  • 大数据学习之Flink算子、了解(Transformation)转换算子(基础篇三)

    目录 Transformation转换算子(基础篇三) 三、转换算子(Transformation) 1.基本转换算子 1.1 映射(Map) 1.2 过滤(filter) 1.3 扁平映射(flatmap) 1.4基本转换算子的例子 2.聚合算子(Aggregation) 2.1 按键分区(keyBy) 2.2 简单聚合 2.3 归约聚合(reduce) 3.用户自定义函数(UDF) 3.1 函

    2024年02月20日
    浏览(41)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(2)转换算子(Transformation)【基本转换算子、聚合算子】

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。 map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个 “一 一映射”,消费一个元素就产出一个元素 。 我们只

    2024年01月23日
    浏览(49)
  • Flink源算子、转换算子和输出算子(DataSet)

    Flink是一种一站式处理的框架,既可以进行批处理(DataSet),也可以进行流处理(DataStream) 将Flink的算子分为两大类:DataSet 和 DataStream 1.1 fromCollection 从本地集合读取数据 1.2 readTextFile 从文件中读取 1.3 readTextFile 遍历目录 对一个文件目录内的所有文件,包括所有子目录中的

    2024年04月23日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包