使用Flink CDC将Mysql中的数据实时同步到ES

这篇具有很好参考价值的文章主要介绍了使用Flink CDC将Mysql中的数据实时同步到ES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间……

我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过flink sql-client实现的,但这有个问题,当fink集群重启,JOB就没有了,没有办法通过savePointing来恢复。所以还是记录下。

代码

直接上代码:文章来源地址https://www.toymoban.com/news/detail-516097.html

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000);
	    env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/savepointings");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE orders (\n" +
                "   order_id INT,\n" +
                "   order_date TIMESTAMP(0),\n" +
                "   customer_name STRING,\n" +
                "   price DECIMAL(10, 5),\n" +
                "   product_id INT,\n" +
                "   order_status BOOLEAN,\n" +
                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "   'connector' = 'mysql-cdc',\n" +
                "   'hostname' = 'localhost',\n" +
                "   'port' = '3306',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '123456',\n" +
                "   'database-name' = 'mydb',\n" +
                "   'table-name' = 'orders'\n" +
                " );").await();

        tableEnv.executeSql("CREATE TABLE products (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    description STRING,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = 'localhost',\n" +
                "    'port' = '3306',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '123456',\n" +
                "    'database-name' = 'mydb',\n" +
                "    'table-name' = 'products'\n" +
                "  );").await();

        tableEnv.executeSql("CREATE TABLE enriched_orders (\n" +
                "   order_id INT,\n" +
                "   order_date TIMESTAMP(0),\n" +
                "   customer_name STRING,\n" +
                "   price DECIMAL(10, 5),\n" +
                "   product_id INT,\n" +
                "   order_status BOOLEAN,\n" +
                "   product_name STRING,\n" +
                "   product_description STRING,\n" +
                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "     'connector' = 'elasticsearch-7',\n" +
                "     'hosts' = 'http://localhost:9200',\n" +
                "     'index' = 'enriched_orders_lhc'\n" +
                " );");

        tableEnv.executeSql("INSERT INTO enriched_orders\n" +
                " SELECT o.*, p.name, p.description\n" +
                " FROM orders AS o\n" +
                " LEFT JOIN products AS p ON o.product_id = p.id");

        env.execute("Mysql to ES");
    }

到了这里,关于使用Flink CDC将Mysql中的数据实时同步到ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink CDC2.4 整库实时同步MySql 到Doris

            Flink 1.15.4          目前有很多工具都支持无代码实现Mysql - Doris 的实时同步         如:SlectDB 已发布的功能包                 Dinky SeaTunnel TIS 等等          不过好多要么不支持表结构变动,要不不支持多sink,我们的业务必须支持对表结构的实时级变动

    2024年02月11日
    浏览(57)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(55)
  • Flink CDC实时同步PG数据库

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git  1、更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # m

    2024年02月13日
    浏览(69)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 开启 log archiving (1).启用 log archiving         a:以DBA用户连接数据库    

    2024年02月11日
    浏览(47)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql(无主键)

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 具体环境设置和maven依赖请看上篇:Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 现在操作的是源表和目标表都无主键数

    2024年02月15日
    浏览(40)
  • 使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris

    现有数据库:mysql 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤 问题:随业务增长

    2023年04月08日
    浏览(56)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(44)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(44)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(77)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包