public class FlinkTask {
private String JobManagerAddress = "xxxx";
public JobID runTask(String jarPath, int parallelism, String entryPointClassName) {
RestClusterClient<StandaloneClusterId> client = null;
JobID jobId = null;
try {
// 集群信息
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.ADDRESS, JobManagerAddress);
configuration.setInteger(JobManagerOptions.PORT, 6123);
configuration.setInteger(RestOptions.PORT, 8081);
client = new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
//jar包存放路径
File jarFile = new File(jarPath);
SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
//构建提交任务参数
PackagedProgram program = PackagedProgram
.newBuilder()
.setConfiguration(configuration)
.setEntryPointClassName(entryPointClassName)
.setJarFile(jarFile)
.setSavepointRestoreSettings(savepointRestoreSettings).build();
//创建任务
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false);
//提交任务
CompletableFuture<JobID> result = client.submitJob(jobGraph);
jobId = result.get();
} catch (Exception e) {
e.printStackTrace();
}
return jobId;
}
导入依赖
<properties>
<java.version>1.8</java.version>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
参数格式参考:
{
"jarPath":"C:\\flink-1.13.5\\examples\\streaming\\WordCount.jar",
"parallelism":1,
"entryPointClassName":"org.apache.flink.streaming.examples.wordcount.WordCount"文章来源:https://www.toymoban.com/news/detail-516084.html
}文章来源地址https://www.toymoban.com/news/detail-516084.html
到了这里,关于使用Java代码远程提交flink任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!