十八、Flink自定义多并行Source

这篇具有很好参考价值的文章主要介绍了十八、Flink自定义多并行Source。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、概述

1)作用

自定义多并行的Source,即Source的并行度可以是1到多个。

2)实现

1.继承RichParallelSourceFunction,重写run()方法。文章来源地址https://www.toymoban.com/news/detail-716217.html

2、代码实现
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;

public class CustomerParallelSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> nums = env.addSource(new ParallelSourceFunc());

        System.out.println("自定义ParallelSourceFunc得到的DataStream的并行度为:" + nums.getParallelism());
        nums.print();

        env.execute();
    }

    private static class ParallelSourceFunc extends RichParallelSourceFunction<String> {
        private boolean flag = true;

        public ParallelSourceFunc() {
            System.out.println("构造方法执行了!!!!!!!!!!!");
        }

        /**
         * 先调用Open方法
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println(indexOfThisSubtask + ": Open方法被调用了");
        }

        /**
         * open方法调用完后再调用run方法
         * run方法task启动后会执行一次
         * 如果run方法一直不退出,就是一个无限的数据流
         * 如果数据读取完了,run方法退出,就是一个有限的数据流,Source退出,job也停止了
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println(indexOfThisSubtask + " : Run方法被调用了");

            Random random = new Random();

            //获取当前SubTask的Index
            while (flag) {
                int i = random.nextInt(100);
                ctx.collect(indexOfThisSubtask + " --> " + i);

                Thread.sleep(1000);
            }
        }


        /**
         * task cancel会执行一次
         */
        @Override
        public void cancel() {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println(indexOfThisSubtask + " : Cancel方法被调用了~~~~~");
            flag = false;
        }


        /**
         * 如果人为将job cancel先调用cancel方法再调用close方法
         * 如果没有将job人为的cancel,任务停掉前一定会调用close方法
         *
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println(indexOfThisSubtask + " : Close方法被调用了");
        }
    }
}

到了这里,关于十八、Flink自定义多并行Source的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink实战】玩转Flink里面核心的Source Operator实战

    🚀 作者 :“大数据小禅” 🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战 🚀 欢迎小伙伴们 点赞 👍、 收藏 ⭐、 留言 💬 Flink 的API层级介绍Source Operator速览 Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象 第一层是最底层的抽象为有

    2024年02月08日
    浏览(35)
  • 56、Flink 的Data Source 原理介绍

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月21日
    浏览(25)
  • 【Flink】DataStream API使用之源算子(Source)

    创建环境之后,就可以构建数据的业务处理逻辑了,Flink可以从各种来源获取数据,然后构建DataStream进项转换。一般将数据的输入来源称为数据源(data source),而读取数据的算子就叫做源算子(source operator)。所以,Source就是整个程序的输入端。 Flink中添加source的方式,是

    2024年02月10日
    浏览(29)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(1)源算子(Source)

    DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成: Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。 不同的环境,代码的提交运行的过程会

    2024年01月22日
    浏览(46)
  • 10、Flink的source、transformations、sink的详细示例(二)-source和transformation示例【补充示例】

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月10日
    浏览(64)
  • flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

    前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。 Tips:我觉得学习 Flink 还是挺有意思的

    2024年02月19日
    浏览(32)
  • Vue:使用Promise.all()方法并行执行多个请求

    在Vue中,可以使用Promise.all()方法来并行执行多个请求。当需要同时执行多个异步请求时,可以将这些请求封装为Promise对象并使用Promise.all()方法来执行它们。 示例1: 以下是一个示例代码,展示了如何通过Promise.all()方法并行执行多个请求: 在上述示例中,定义了三个请求:

    2024年02月12日
    浏览(28)
  • 【Flink精讲】Flink性能调优:CPU核数与并行度

    提交任务命令: bin/flink run -t yarn-per-job -d -p 5 指定并行度 -Dyarn.application.queue=test 指定 yarn 队列 -Djobmanager.memory.process.size=2048mb JM2~4G 足够 -Dtaskmanager.memory.process.size=4096mb 单个 TM2~8G 足够 -Dtaskmanager.numberOfTaskSlots=2 与容器核数 1core: 1slot 或 2core: 1slot -c com.atguigu.flin

    2024年04月11日
    浏览(32)
  • flink算子的并行度设置方法

    #flink算子的并行度设置方法 并行度(Parallelism)是flink中一个非常重要的概念,它主要是指一个算子可以被分的子任务数,通常越高就意味着算子计算速度越快。 如上图所示,map()算子的并行度为2,window()算子的并行度也为2,也可以说整个数据流的并行度就是2。并行度的设置

    2024年03月17日
    浏览(40)
  • Flink学习笔记(七)并行度详解

    一个Flink程序由多个任务(Source、Transformation和Sink)组成。一个任务由多个并行实例(线程)来执行,一个任务的并行实例(线程)数目被称为该任务的并行度。 Flink是一个分布式流处理框架,它基于TaskManager和Slot来实现任务的执行。TaskManager是Flink中负责运行任务的工作进程

    2024年02月09日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包