flink的常见的任务提交方式

这篇具有很好参考价值的文章主要介绍了flink的常见的任务提交方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、以flinksql的方式直接提交任务

此方式使用起来相对比较简单,但是无法满足需要设置savepoint暂存点的流式任务需求。

使用此方式需要先创建Flink远方的执行环境,然后按序执行FlinkSql,流程如下:

flink任务提交方式,开发,flink,大数据

java示例如下:

package com.xw.flink;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;

public class testSqlServer {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.1.88",18082);
        TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任务名称设定
        configuration.setString("pipeline.name","sqlserver");
        String sourceDDL = "CREATE TABLE Orders (f1 STRING,f2 STRING,f3 STRING) WITH ( " +
                " 'connector' = 'jdbc',  " +
                " 'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver',  " +
                " 'url'='jdbc:sqlserver://192.168.1.40:1433;databaseName=test;useLOBs=false',  " +
                " 'table-name'='test_czd1',  " +
                " 'username'='root',  " +
                " 'password'='root'" +
                ")";
        tableEnv.executeSql(sourceDDL);
        String rtLog = "CREATE TABLE logs (node_id STRING, send_count BIGINT,PRIMARY KEY(node_id) NOT ENFORCED) WITH ( " +
                " 'connector' = 'jdbc',  " +
                " 'driver'='com.mysql.cj.jdbc.Driver',  " +
                " 'url'='jdbc:mysql://192.168.0.68:3306/testDB?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=GMT%2B8',  " +
                " 'table-name'='rt_log',  " +
                " 'username'='root',  " +
                " 'password'='root'," +
                " 'sink.buffer-flush.max-rows' = '20000'," +
                " 'sink.buffer-flush.interval' = '3000'" +
                ")";
        tableEnv.executeSql(rtLog);
        String sql = "insert into logs(node_id) select f1 from Orders limit 5";
        tableEnv.executeSql(sql);
    }
}

2、以任务jar的方式上传任务

此方式主要通过用java编写一个任务,然后打成jar的形式上传到flink集群。此方式比较灵活,可以精确控制任务的算子。但是对于现场的运维来说是一个比较困难的问题,因为要求运维人员需要有代码开发的能力。

java实现示例:

public class testSqlServer {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任务名称设定
        configuration.setString("pipeline.name","sqlserver");
        //todo 此部分可以是Flink-table  也可以是Flinksql
      
    }
}

然后将其打成jar包,然后上传到flink

flink任务提交方式,开发,flink,大数据

填写class的全路径和并行度即可执行。

3、以Rest API方式进行提交

此方式综合了flinksql和flinkjar的两种形式,你也以在远方编写flinksql,然后通过调用API的形式将FlinkSql和参数发送到flink集群上可执行的jar。jar拿到参数组装flink任务并提交。

Rest API官网可参考REST API | Apache Flink

java编写一个接受参数的jar,模版可参考文章来源地址https://www.toymoban.com/news/detail-858215.html

public class SqlTemplate {
    public static void main(String[] args) throws Exception {
        ParameterTool parameters = ParameterTool.fromArgs(args);//获取传递的参数
        String arg = parameters.get("arg",null);
        if(arg == null){
            return ;
        }
        arg = URLDecoder.decode(arg, StandardCharsets.UTF_8.toString());//URLDecoder解码
        String[] programArgs =  arg.split("\\|\\|");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置重启策略,最多重启三次,每次间隔5秒钟
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,                    Time.seconds(5)
        ));
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任务名称设定
        configuration.setString("pipeline.name",programArgs[0]);
        // 任务并行度设定
        env.setParallelism(Integer.parseInt(programArgs[1]));
        //任务类型,流式任务强制开启checkpoint
        if("stream".equals(programArgs[2])){
            //检查点设定
            if(!StringUtils.isNullOrWhitespaceOnly(programArgs[3])){
                CheckPoint cp = JSON.parseObject(programArgs[3],CheckPoint.class);
                //开启检查点
                if(cp.getEnable()){
                    //开启检查点,1S一次
                    env.enableCheckpointing(cp.getCheckpointInterval());
                    //检查点策略 EXACTLY_ONCE 精准一次  AT_LEAST_ONCE至少一次
                    env.getCheckpointConfig().setCheckpointingMode(cp.getCheckPointingMode()==1?CheckpointingMode.EXACTLY_ONCE:CheckpointingMode.AT_LEAST_ONCE);
                     Checkpoint 必须在一分钟内完成,否则就会被抛弃
                    env.getCheckpointConfig().setCheckpointTimeout(cp.getCheckpointTimeout());
                     同一时间只允许一个 checkpoint 进行
                    env.getCheckpointConfig().setMaxConcurrentCheckpoints(cp.getMaxConcurrentCheckpoints());
                    //设置检查点保存位置
                    env.getCheckpointConfig().setCheckpointStorage(cp.getCheckpointDirectory());
                    //开启实验性的 unaligned checkpoints
                    if(cp.getUnalignedCheckpointsEnabled()){
                        env.getCheckpointConfig().enableUnalignedCheckpoints();
                    }
                }
            }else{//开启默认配置
                //开启检查点,5S一次
                env.enableCheckpointing(5000);
                //检查点策略 EXACTLY_ONCE 精准一次  AT_LEAST_ONCE至少一次
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                 Checkpoint 必须在五分钟内完成,否则就会被抛弃
                env.getCheckpointConfig().setCheckpointTimeout(300000);
                 同一时间只允许一个 checkpoint 进行
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                //开启实验性的 unaligned checkpoints
                env.getCheckpointConfig().enableUnalignedCheckpoints();
            }
        }
        //可执行的SQL单节点执行
        String sql = programArgs[4];
        //特殊符号在连接器里都会被使用,采用双特殊符号进行分割
        String[] sqlExecu = sql.split(";;");
        List<String> create = new ArrayList<>();
        List<String> insert = new ArrayList<>();
        for (String script : sqlExecu) {
            if(!script.startsWith("insert") && !script.startsWith("INSERT")){
                create.add(script);
            }else{
                insert.add(script);
            }
        }
        //可执行的SQL单节点执行
        create.forEach(tableEnv::executeSql);
        // 运行多条 INSERT 语句,将原表数据输出到多个结果表中
        StatementSet stmtSet = tableEnv.createStatementSet();
        insert.forEach(stmtSet::addInsertSql);
        //开始执行任务
        TableResult execute = stmtSet.execute();
     }

到了这里,关于flink的常见的任务提交方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Java代码远程提交flink任务

    导入依赖 参数格式参考: {     \\\"jarPath\\\":\\\"C:\\\\flink-1.13.5\\\\examples\\\\streaming\\\\WordCount.jar\\\",     \\\"parallelism\\\":1,     \\\"entryPointClassName\\\":\\\"org.apache.flink.streaming.examples.wordcount.WordCount\\\" }

    2024年02月11日
    浏览(44)
  • flink客户端提交任务报错

    { “errors”: [ “org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.ntat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest KaTeX parse error: Undefined control sequence: n at position 26: …ndler.java:110)̲n̲tat java.util.… UniHandle.tryFire(CompletableFuture.java:797)ntat j

    2024年02月15日
    浏览(51)
  • 采用seatunnel提交Flink和Spark任务

    seatunnel 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。 seatunnel 让Spark和Flink的使用更简单,更高效。 注:当前版本用的是2.1.3版本  如果在github下载自己编译有问题 可在此地址下载编译好的文件seatunnel-2.1.3-b

    2024年02月15日
    浏览(47)
  • 关于flink重新提交任务,重复消费kafka的坑

    按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据 我以为checkpoint和fsbackend要同时设置,其实,1.14.3版本,setCheckpointStorage和stateBackend改成了分着设置 我上边代码这样设置,相当于首先指定了以下checkpoint按照默认的backend存储,然后又指定了按

    2024年02月03日
    浏览(55)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • Flink1.14提交任务报错classloader.check-leaked-classloader问题解决

    我的hadoop版本是3.1.3,Flink版本是1.14。不知道是hadoop版本的原因还是Flink版本更新的原因。当我运行一个简单的Flink测试时,虽然结果出来了但是后面还跟着一段报错信息。 测试命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 报错信息: Trying to acce

    2024年02月11日
    浏览(41)
  • 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(48)
  • Flink(三)flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月16日
    浏览(46)
  • 3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月12日
    浏览(44)
  • 《Flink学习笔记》——第二章 Flink的安装和启动、以及应用开发和提交

    ​ 介绍Flink的安装、启动以及如何进行Flink程序的开发,如何运行部署Flink程序等 2.1 Flink的安装和启动 本地安装指的是单机模式 0、前期准备 java8或者java11(官方推荐11) 下载Flink安装包 https://flink.apache.org/zh/downloads/ hadoop(后面Flink on Yarn部署模式需要) 服务器(我是使用虚拟

    2024年02月10日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包