FlinkCDC快速搭建实现数据监控

这篇具有很好参考价值的文章主要介绍了FlinkCDC快速搭建实现数据监控。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sand</groupId>
    <artifactId>flinkcdc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <!--        <flink.version>1.14.4</flink.version>-->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <!--            <artifactId>flink-streaming-java_2.12</artifactId>-->
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>org.apache.flink</groupId>-->
        <!--            <artifactId>flink-scala_2.12</artifactId>-->
        <!--            <version>${flink.version}</version>-->
        <!--        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <!--            <artifactId>flink-clients_2.12</artifactId>-->
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>3.0.1-1.17</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.1-1.17</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
        </dependency>
        <!-- 打印日志的jar包 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.2</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.10</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>

                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.sand.DataStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>


            </plugins>
        </pluginManagement>
    </build>
</project>

数据库配置类

package com.sand;

import org.apache.commons.collections.CollectionUtils;

import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;

/**
 * @author zdd
 */
public class CDCKit {

    public static void main(String[] args) {
        String tempDir = System.getProperty("java.io.tmpdir");
        System.out.println("tempDir = " + tempDir);

    }
    /**
     * 数据库
     */
    private static final String database = "byyy_iowtb_wms_test";

    /**
     * 表名
     */
    private static final List<String> tableList = Arrays.asList(
            "inv_tt_stock_info",
            "base_tm_sku",
            "base_tm_third_sku_certificate",
            "base_tm_sku_gsp"
    );

    /**
     * ip
     */
    private static final String hostname = "192.168.111.107";

    /**
     * 端口
     */
    private static final int port = 3306;

    /**
     * 用户名
     */
    private static final String username = "test_cdc";

    /**
     * 密码
     */
    private static final String password = "Test_cdc@123";



    public static String getDatabase() {
        return database;
    }

    public static String getTableList() {
        if (CollectionUtils.isEmpty(tableList)) {
            return null;
        }
        //,分割
        StringJoiner stringJoiner = new StringJoiner(",");
        for (String tableName : tableList) {
            stringJoiner.add(getDatabase() + "." + tableName);
        }
        return stringJoiner.toString();
    }


    public static String getHostname() {
        return hostname;
    }

    public static int getPort() {
        return port;
    }

    public static String getUsername() {
        return username;
    }

    public static String getPassword() {
        return password;
    }

}

监控类



package com.sand;


import cn.hutool.core.io.FileUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.File;
import java.util.Objects;
import java.util.Properties;


public class DataStreamJob {
    public static void main(String[] args) throws Exception {
        //获取临时文件目录
        String tempDir = System.getProperty("java.io.tmpdir");
        String latestCheckpoint = getLatestCheckpoint();
        System.out.println("latestCheckpoint = " + latestCheckpoint);

        Configuration configuration = new Configuration();
        if(StringUtils.isNotBlank(latestCheckpoint)){
            configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint);
        }
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        env.setParallelism(1);
        //2.1 开启 Checkpoint,每隔 60 秒钟做一次 CK
        env.enableCheckpointing(1000L * 60);
        //2.2 指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2.3 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        //2.5 设置状态后端
        env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck"));


        // ck 设置
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        Properties properties = new Properties();
        properties.setProperty("snapshot.locking.mode", "none");
        properties.setProperty("decimal.handling.mode", "string");

        MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
                .hostname(CDCKit.getHostname())
                .port(CDCKit.getPort())
                .databaseList(CDCKit.getDatabase())
                .tableList(CDCKit.getTableList())
                .username(CDCKit.getUsername())
                .password(CDCKit.getPassword())
                .scanNewlyAddedTableEnabled(true)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .debeziumProperties(properties)
                .build();

        //4.使用 CDC Source 从 MySQL 读取数据
        env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink());
        //5.打印数据
//        mysqlStream.print();
        //6.执行任务
        env.execute();
    }

    private static String getLatestCheckpoint() {
        File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck");
        File[] files = ckDir.listFiles();
        if (files == null) {
            return null;
        }

        String path = null;
        long lastModified = 0;
        for (File file : files) {
            //获取文件夹下-chk-开头文件夹-最新的文件夹
            if (file.isDirectory()) {
                File[] files1 = file.listFiles();
                if (files1 == null) {
                    continue;
                }
                for (File file1 : files1) {
                    if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) {
                        continue;
                    }
                    if (file1.lastModified() > lastModified) {
                        lastModified = file1.lastModified();
                        path = file1.getAbsolutePath();
                    }
                }
            }
        }
        //删除其余目录
        if (StringUtils.isEmpty(path)) {
            return null;
        }
        String tempPath = path.substring(0, path.lastIndexOf("\\"));
        for (File file : files) {
            if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) {
                FileUtil.del(file);
            }
        }
        return path;
    }
}

数据处理类

package com.sand;

/**
 * @author zdd
 */
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {
    @Override
    public void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {
        System.out.println("value = " + value);
    }
}

文章来源地址https://www.toymoban.com/news/detail-837479.html

到了这里,关于FlinkCDC快速搭建实现数据监控的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink Metrics监控 pushgateway搭建

    Flink Metrics 简介 Flink Metrics 是 Flink 集群运行中的各项指标,包含机器系统指标,比如:CPU、内存、线程、JVM、网络、IO、GC 以及任务运行组件(JM、TM、Slot、作业、算子)等相关指标。 Flink 一共提供了四种监控指标:分别为 Counter、Gauge、Histogram、Meter。 Flink 主动方式共提供了

    2024年02月16日
    浏览(35)
  • FlinkCDC 实时监控 MySQL

    通过 FlinkCDC 实现 MySQL 数据库、表的实时变化监控,这里只把变化打印了出来,后面会实现如何再写入其他 MySQL 库中; 在 my.cnf 中开启 binlog,我这里指定了 test 库,然后重启 MySQL 在 IDEA 中新建工程 flinkcdc pom.xml resources/log4j.properties 反序列化类: com/zsoft/flinkcdc/MyDeserialization

    2024年02月06日
    浏览(33)
  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(51)
  • 使用FlinkCDC从mysql同步数据到ES,并实现数据检索

    随着公司的业务量越来越大,查询需求越来越复杂,mysql已经不支持变化多样的复杂查询了。 于是,使用cdc捕获MySQL的数据变化,同步到ES中,进行数据的检索。 springboot集成elasticSearch(附带工具类)

    2024年04月13日
    浏览(33)
  • 【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用)

    简介:     FlinkCDC读取Mysql数据源,程序中使用了自定义反序列化器,完整的Flink结构,开箱即用。 本工程提供 1、项目源码及详细注释,简单修改即可用在实际生产代码 2、成功编译截图 3、自己编译过程中可能出现的问题 4、mysql建表语句及测试数据 5、修复FlinkCDC读取Mysql数

    2024年02月07日
    浏览(39)
  • FlinkCDC实现主数据与各业务系统数据的一致性(瀚高、TIDB)

             文章末尾附有flinkcdc对应瀚高数据库flink-cdc-connector代码下载地址         目前项目有主数据系统和N个业务系统,为保障“一数一源”,各业务系统表涉及到主数据系统的字段都需用主数据系统表中的字段进行实时覆盖,这里以某个业务系统的一张表举例说明:

    2024年02月03日
    浏览(36)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • Linux服务器监控的神器:Netdata(小白教程)快速搭建

    发现了一个单机监控的Netdata,眼前着实为之一亮。 令人印象非常之深刻的个主要特性: 界面酷炫,实时监控 零配置,即装即用 官网地址在这里: https://my-netdata.io/ netdata的主要功能,主要有几点(详细的可查看github上的说明): interactive bootstrap dashboards, 酷炫(主要是dark主

    2024年02月04日
    浏览(42)
  • 如何使用Java Websocket实现实时数据监控功能?

    随着互联网应用的不断发展,实时数据监控功能成为了许多应用的必备功能之一。本文将介绍如何使用Java WebSocket实现实时数据监控功能,并提供具体的代码示例。 :Java WebSocket、实时数据监控、代码示例 一、什么是WebSocket? WebSocket是一种网络协议,可以在实现了We

    2024年02月03日
    浏览(44)
  • 在k8s中快速搭建基于Prometheus监控系统

    公众号「架构成长指南」,专注于生产实践、云原生、分布式系统、大数据技术分享 K8s本身不包含内置的监控工具,所以市场上有不少这样监控工具来填补这一空白,但是没有一个监控工具有prometheus全家桶使用率高,因为它由 CNCF维护,已经成为了监控 k8s 集群的事实上的行

    2024年02月04日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包