4.1、Flink任务怎样读取集合中的数据

这篇具有很好参考价值的文章主要介绍了4.1、Flink任务怎样读取集合中的数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、API说明

非并行数据源:
        def fromElements[T: TypeInformation](data: T*): DataStream[T]
        def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] 
        def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] 

并行数据源:
        def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T])

使用场景:

        常用来调试代码使用


2、这是一个完整的入门案例

开发语言:Java1.8

Flink版本:flink1.17.0文章来源地址https://www.toymoban.com/news/detail-638143.html

package com.baidu.datastream.source;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;

import java.util.Arrays;
import java.util.List;

// --------------------------------------------------------------------------------------------
//  TODO 从集合中读取数据
// --------------------------------------------------------------------------------------------

/*
 *  TODO 通过`读取Java集合中数据`来创建 DataStreamSource
 *
 *  方法1:fromCollection
 *        Collection、Iterator -> DataStreamSource
 *  方法2:fromElements
 *        OUT... data -> DataStreamSource
 *  方法3:fromParallelCollection
 *        SplittableIterator -> DataStreamSource
 *  重要提示:
 *       fromCollection、fromElements 创建的是非并行source算子(并行度只能为1)
 *       fromParallelCollection 创建的是并行算子(并行度>=1)
 * */

public class ReadCollection {
    public static void main(String[] args) throws Exception {
        fromCollection();
        //fromElements();
        //fromParallelCollection();
    }

    public static void fromCollection() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.读取Java集合数据
        List<String> list = Arrays.asList("刘备", "张飞", "关羽", "赵云", "马超", "黄忠");
        env.fromCollection(list).print();

        // 3.触发程序执行
        env.execute();
    }

    public static void fromElements() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.读取给定的对象序列
        env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠").print();

        // 3.触发程序执行
        env.execute();
    }

    public static void fromParallelCollection() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.读取给定的对象序列
        NumberSequenceIterator numberSequenceIterator = new NumberSequenceIterator(1, 10);

        env.fromParallelCollection(numberSequenceIterator, Long.class).print();
        /*
         * 注意: fromParallelCollection生成的source为并行算子
         *       集合中的数据会被平均分配到并行子任务中去
         * */

        // 3.触发程序执行
        env.execute();
    }
}

到了这里,关于4.1、Flink任务怎样读取集合中的数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink的API分层、架构与组件原理、并行度、任务执行计划

            Apache Flink的API分为四个层次,每个层次都提供不同的抽象和功能,以满足不同场景下的数据处理需求。下面是这四个层次的具体介绍: CEP API: Flink API 最底层的抽象为有状态实时流处理。其抽象实现是Process Function,并且Process Function被  框架集成到了DataStream API中

    2024年02月05日
    浏览(34)
  • springboot读取yml文件中的list列表、数组、map集合和对象方法实例

    目录前言application.yml定义list集合application.yml定义数组类型总结 前言 springboot配置文件yml类型简单的风格,十分受大家的欢迎,支持字符string类型,支持列表list类型,支持集合map类型,支持数组array类型,支持类对象类型,下面我们来实战下这些形式的配置如何取值 applicatio

    2024年02月09日
    浏览(34)
  • Flink(三)flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月16日
    浏览(37)
  • mongotop跟踪Mongodb集合读取和写入数据

    从 MongoDB 4.4 开始,MongoDB 数据库工具现在与 MongoDB 服务器分开发布,并使用自己的版本控制,初始版本为100.0.0. 此前,这些工具与 MongoDB 服务器一起发布,并使用匹配的版本控制。 mongotop 版本100.7.3支持以下版本的 MongoDB Server: MongoDB 6.0 MongoDB 5.0 MongoDB 4.4 MongoDB 4.2 行为 mongo

    2024年02月15日
    浏览(29)
  • 【Flink】 Flink实时读取mysql数据

    准备 你需要将这两个依赖添加到 pom.xml 中 mysql mysql-connector-java 8.0.0 读取 kafka 数据 这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。 package com.zhisheng.connectors.mysql.utils; impo

    2024年02月03日
    浏览(35)
  • (二)Easyexcel 的使用(读取数据到map集合中)

            前面讲述了使用实体类的方式绑定excel表头的方式进行读取和写入操作,是比较简单的,那么由于表头可能会变,那么就不能使用绑定实体类的方式进行了,于是搜索百度一番,借鉴别人的博客,使用map集合的方式进行读取和写入操作。 目录 1、导入相关依赖 2、

    2024年02月09日
    浏览(18)
  • 如何解决Flink任务的数据倾斜

    如何解决flink任务的数据倾斜问题 Flink 任务的数据倾斜问题可以通过以下几种方法来解决: 使用滑动窗口:滑动窗口可以将窗口划分成多个子窗口,从而使数据更加均衡地分配到不同的计算节点中。同时,滑动窗口还可以使窗口内的数据更加连续,从而减少数据倾斜的情况。

    2024年02月14日
    浏览(35)
  • Flink读取kafka数据报错

    报错如下: 解决办法: 修改/usr/local/wyh/kafka/kafka_2.12-2.8.1/config下面的server.properties,默认该配置是被注释掉的额,所以需要放开注释并且配置Host:

    2024年02月13日
    浏览(39)
  • 3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月12日
    浏览(33)
  • flink 从kafka读取数据报错

    报错: Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandl

    2024年02月02日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包