使用Java代码远程提交flink任务

这篇具有很好参考价值的文章主要介绍了使用Java代码远程提交flink任务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

public class FlinkTask {

    private String JobManagerAddress = "xxxx";

    public JobID runTask(String jarPath, int parallelism, String entryPointClassName) {
        RestClusterClient<StandaloneClusterId> client = null;
        JobID jobId = null;
        try {
            // 集群信息
            Configuration configuration = new Configuration();
            configuration.setString(JobManagerOptions.ADDRESS, JobManagerAddress);
            configuration.setInteger(JobManagerOptions.PORT, 6123);
            configuration.setInteger(RestOptions.PORT, 8081);
            client = new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
            //jar包存放路径
            File jarFile = new File(jarPath);
            SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
            //构建提交任务参数
            PackagedProgram program = PackagedProgram
                    .newBuilder()
                    .setConfiguration(configuration)
                    .setEntryPointClassName(entryPointClassName)
                    .setJarFile(jarFile)
                    .setSavepointRestoreSettings(savepointRestoreSettings).build();
            //创建任务
            JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false);
            //提交任务
            CompletableFuture<JobID> result = client.submitJob(jobGraph);
            jobId = result.get();

        } catch (Exception e) {
            e.printStackTrace();
        }
        return jobId;
    }

导入依赖

<properties>
        <java.version>1.8</java.version>
        <flink.version>1.13.5</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
        <!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
</dependencies>

参数格式参考:

{

    "jarPath":"C:\\flink-1.13.5\\examples\\streaming\\WordCount.jar",

    "parallelism":1,

    "entryPointClassName":"org.apache.flink.streaming.examples.wordcount.WordCount"

}文章来源地址https://www.toymoban.com/news/detail-516084.html

到了这里,关于使用Java代码远程提交flink任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【JAVA】提交任务时,线程池队列已满,这时会发生什么

    🍎 个人博客: 个人主页 🏆 个人专栏: JAVA ⛳️   功不唐捐,玉汝于成 目录 前言 正文 抛出异常: 阻塞等待: 丢弃任务: 调整线程池参数: 使用拒绝策略: 结语  我的其他博客   在并发编程中,线程池作为一种重要的资源管理工具,被广泛应用于提高系统性能和响应

    2024年02月20日
    浏览(35)
  • flink的常见的任务提交方式

    此方式使用起来相对比较简单,但是无法满足需要设置savepoint暂存点的流式任务需求。 使用此方式需要先创建Flink远方的执行环境,然后按序执行FlinkSql,流程如下: java示例如下: 此方式主要通过用java编写一个任务,然后打成jar的形式上传到flink集群。此方式比较灵活,可

    2024年04月26日
    浏览(35)
  • flink客户端提交任务报错

    { “errors”: [ “org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.ntat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest KaTeX parse error: Undefined control sequence: n at position 26: …ndler.java:110)̲n̲tat java.util.… UniHandle.tryFire(CompletableFuture.java:797)ntat j

    2024年02月15日
    浏览(51)
  • Flink通过Java API提交作业到keberos认证的Yarn

    背景 作为数据中台与大数据底座交互层,系统需要要提供一个rest api,供上层应用提交到flink作业到kerberos认证yarn上,网上资料多是通过flink run命令,记录下怎么通过api的方式把一个任务以application的方法提交到yarn集群,最重要的是可以通过springboot 就可以提交程序到yarn上面

    2024年03月11日
    浏览(55)
  • 采用seatunnel提交Flink和Spark任务

    seatunnel 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。 seatunnel 让Spark和Flink的使用更简单,更高效。 注:当前版本用的是2.1.3版本  如果在github下载自己编译有问题 可在此地址下载编译好的文件seatunnel-2.1.3-b

    2024年02月15日
    浏览(46)
  • 关于flink重新提交任务,重复消费kafka的坑

    按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据 我以为checkpoint和fsbackend要同时设置,其实,1.14.3版本,setCheckpointStorage和stateBackend改成了分着设置 我上边代码这样设置,相当于首先指定了以下checkpoint按照默认的backend存储,然后又指定了按

    2024年02月03日
    浏览(55)
  • Git使用教程,本地与远程(Github)仓库提交代码

    学会如何利用git来管理代码 Git简单来说就是代码版本控制系统,通过他可以进行多人开发同一个项目然后讲每个人的代码块合并完成一个大项目,还能控制代码版本记录等。Git四个区域 工作区:处理工作的区域(即做项目打代码的区域) 暂存区:已完成的工作临时存放区域

    2024年02月03日
    浏览(73)
  • flink on yarn 远程提交

    2024年02月10日
    浏览(54)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • Flink1.14提交任务报错classloader.check-leaked-classloader问题解决

    我的hadoop版本是3.1.3,Flink版本是1.14。不知道是hadoop版本的原因还是Flink版本更新的原因。当我运行一个简单的Flink测试时,虽然结果出来了但是后面还跟着一段报错信息。 测试命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 报错信息: Trying to acce

    2024年02月11日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包