引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sand</groupId>
<artifactId>flinkcdc</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<!-- <flink.version>1.14.4</flink.version>-->
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<!-- <artifactId>flink-streaming-java_2.12</artifactId>-->
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-scala_2.12</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<!-- <artifactId>flink-clients_2.12</artifactId>-->
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>3.0.1-1.17</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
<!-- 打印日志的jar包 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.10</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.sand.DataStreamJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
数据库配置类
package com.sand;
import org.apache.commons.collections.CollectionUtils;
import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;
/**
* @author zdd
*/
public class CDCKit {
public static void main(String[] args) {
String tempDir = System.getProperty("java.io.tmpdir");
System.out.println("tempDir = " + tempDir);
}
/**
* 数据库
*/
private static final String database = "byyy_iowtb_wms_test";
/**
* 表名
*/
private static final List<String> tableList = Arrays.asList(
"inv_tt_stock_info",
"base_tm_sku",
"base_tm_third_sku_certificate",
"base_tm_sku_gsp"
);
/**
* ip
*/
private static final String hostname = "192.168.111.107";
/**
* 端口
*/
private static final int port = 3306;
/**
* 用户名
*/
private static final String username = "test_cdc";
/**
* 密码
*/
private static final String password = "Test_cdc@123";
public static String getDatabase() {
return database;
}
public static String getTableList() {
if (CollectionUtils.isEmpty(tableList)) {
return null;
}
//,分割
StringJoiner stringJoiner = new StringJoiner(",");
for (String tableName : tableList) {
stringJoiner.add(getDatabase() + "." + tableName);
}
return stringJoiner.toString();
}
public static String getHostname() {
return hostname;
}
public static int getPort() {
return port;
}
public static String getUsername() {
return username;
}
public static String getPassword() {
return password;
}
}
监控类
package com.sand;
import cn.hutool.core.io.FileUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.File;
import java.util.Objects;
import java.util.Properties;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
//获取临时文件目录
String tempDir = System.getProperty("java.io.tmpdir");
String latestCheckpoint = getLatestCheckpoint();
System.out.println("latestCheckpoint = " + latestCheckpoint);
Configuration configuration = new Configuration();
if(StringUtils.isNotBlank(latestCheckpoint)){
configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint);
}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.setParallelism(1);
//2.1 开启 Checkpoint,每隔 60 秒钟做一次 CK
env.enableCheckpointing(1000L * 60);
//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 设置状态后端
env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck"));
// ck 设置
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = new Properties();
properties.setProperty("snapshot.locking.mode", "none");
properties.setProperty("decimal.handling.mode", "string");
MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
.hostname(CDCKit.getHostname())
.port(CDCKit.getPort())
.databaseList(CDCKit.getDatabase())
.tableList(CDCKit.getTableList())
.username(CDCKit.getUsername())
.password(CDCKit.getPassword())
.scanNewlyAddedTableEnabled(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.debeziumProperties(properties)
.build();
//4.使用 CDC Source 从 MySQL 读取数据
env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink());
//5.打印数据
// mysqlStream.print();
//6.执行任务
env.execute();
}
private static String getLatestCheckpoint() {
File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck");
File[] files = ckDir.listFiles();
if (files == null) {
return null;
}
String path = null;
long lastModified = 0;
for (File file : files) {
//获取文件夹下-chk-开头文件夹-最新的文件夹
if (file.isDirectory()) {
File[] files1 = file.listFiles();
if (files1 == null) {
continue;
}
for (File file1 : files1) {
if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) {
continue;
}
if (file1.lastModified() > lastModified) {
lastModified = file1.lastModified();
path = file1.getAbsolutePath();
}
}
}
}
//删除其余目录
if (StringUtils.isEmpty(path)) {
return null;
}
String tempPath = path.substring(0, path.lastIndexOf("\\"));
for (File file : files) {
if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) {
FileUtil.del(file);
}
}
return path;
}
}
数据处理类
package com.sand;
/**
* @author zdd
*/
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {
@Override
public void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {
System.out.println("value = " + value);
}
}
文章来源地址https://www.toymoban.com/news/detail-837479.html
文章来源:https://www.toymoban.com/news/detail-837479.html
到了这里,关于FlinkCDC快速搭建实现数据监控的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!