Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch

这篇具有很好参考价值的文章主要介绍了Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink CDC 系列文章:
《Flink CDC 系列(1)—— 什么是 Flink CDC》
《Flink CDC 系列(2)—— Flink CDC 源码编译》
《Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo》
《Flink CDC 系列(4)—— Flink CDC MySQL Connector 常用参数表》
《Flink CDC 系列(5)—— Flink CDC MySQL Connector 启动模式》
《Flink CDC 系列(6)—— Flink CDC MySQL Connector 工作机制之 Incremental Snapshot Reading》
《Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch》

简介

本文介绍了通过 Flink CDC + Flink SQL 同步 MySQL 数据到 ElasticSearch 的案例。案例包含了 Insert/Update/Delete 的操作。

系统环境和软件版本

Ubuntu 20.04
JDK 1.8
Maven 3.6.3
Flink 1.13.6
ElasticSearch 7.16.2

MySQL 测试数据准备


mysql> CREATE DATABASE mydb;

mysql> USE mydb;

mysql> CREATE TABLE products (
       id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
       name VARCHAR(255) NOT NULL,
       description VARCHAR(512)
     );

mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter");
Query OK, 1 row affected (0.01 sec)

ElasticSearch 安装

1. 安装包选择和下载

官网下载地址:
https://www.elastic.co/cn/downloads/past-releases/elasticsearch-7-16-2
Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch
根据自己的操作系统(和芯片)选择一个合适的安装包。苹果M1芯片或者在苹果M1芯片安装的虚拟机都是选择后缀ARRCH64的安装包。
笔者当前的系统环境是基于苹果M1芯片安装Ubuntu 20.04操作系统,因此选择了 LINUX ARRCH64 的安装包。

下载命令

axel -n 20 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.2-linux-aarch64.tar.gz
2. 解压
tar xvf elasticsearch-7.16.2-linux-aarch64.tar.gz
3. 启动 ElasticeSearch
cd elasticsearch-7.16.2
bin/elasticsearch
4. 验证是否启动成功
curl http://localhost:9200

如下图所示,说明启动成功了
Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch

Flink 集群准备

1. 下载 flink 1.13.6 的二进制安装包

axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz

2. 解压

tar xvf flink-1.13.6-bin-scala_2.11.tgz

3. 将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到

cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib

如何通过 Flink CDC 源码编译得到 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar,请参考:
《Flink CDC 系列(2)—— Flink CDC 源码编译》

4. 修改 /opt/flink-1.13.6/conf/workers

vi /opt/flink-1.13.6/conf/workers

workers文件内容:

localhost
localhost

意思是要在本机启动两个work进程

5. 修改 /opt/flink-1.13.6/conf/flink-conf.yaml

vi  /opt/flink-1.13.6/conf/flink-conf.yaml

设置参数: taskmanager.numberOfTaskSlots: 2

6. 下载 flink hadoop uber jar 文件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 文件拷贝到 /opt/flink-1.13.6/lib 目录下

Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch

7. 下载 flink elasticsearch connector jar 文件
flink-sql-connector-elasticsearch7_2.11-1.13.6.jar
,文件拷贝到 /opt/flink-1.13.6/lib 目录下
Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch

8. 启动单机集群

cd /opt/flink-1.13.6
bin/start-cluster.sh

9. 查看 jobmanager 和 taskmanager 的进程是否存活

$ jps -m
9824 Jps -m
9143 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
8875 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
9403 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
9727 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
ubuntu@ubuntu:/opt/flink-1.13.6$

演示开始

1. 启动 Flink SQL Client

cd /opt/flink-1.13.6
bin/sql-client.sh

2. 在 Flink SQL Client 中执行 DDL 和 查询

-- 创建 mysql-cdc source 
Flink SQL> CREATE TABLE products (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.64.6',
     'port' = '3306',
     'username' = 'test',
     'password' = 'test',
     'database-name' = 'mydb',
     'table-name' = 'products'
   );
[INFO] Execute statement succeed.
Flink SQL> select * from products;
id                 name                 description
1                 scooter1          Small 1-wheel scooter

Flink SQL> CREATE TABLE products_es_sink (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'products'
   );
[INFO] Execute statement succeed.

Flink SQL> insert into products_es_sink select * from products;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b962baa7f6a8890cc45e43a7c95765d2

3. 查看 Elasticearch Index 的数据

curl http://localhost:9200/products/_search?pretty
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "name" : "scooter1",
          "description" : "Small 1-wheel scooter"
        }
      }
    ]
  }
}

4. 在Mysql客户端插入新的数据

mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");

5. 查看 Elasticearch Index 的数据
在命令行执行:

curl http://localhost:9200/products/_search?pretty
{
  "took" : 433,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "name" : "scooter1",
          "description" : "Small 1-wheel scooter"
        }
      },
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "scooter2",
          "description" : "Small 2-wheel scooter"
        }
      }
    ]
  }
}
-- 新数据写到了elasticsearch

6. 在Mysql客户端更新的数据

mysql> update products set name = 'scooter----1' where id = 1;

7. 查看 Elasticearch Index 的数据
在命令行执行:

curl http://localhost:9200/products/_search?pretty
{
  "took" : 154,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "scooter2",
          "description" : "Small 2-wheel scooter"
        }
      },
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "name" : "scooter----1",
          "description" : "Small 1-wheel scooter"
        }
      }
    ]
  }
}
-- id=1的数据被更新到了elasticsearch

7. 在Mysql客户端删除的数据

mysql> delete from products where id  = 1;

8. 查看 Elasticearch Index 的数据

在命令行执行:

curl http://localhost:9200/products/_search?pretty
{
  "took" : 347,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "products",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "scooter2",
          "description" : "Small 2-wheel scooter"
        }
      }
    ]
  }
}

-- id=1的数据被删除

总结

通过 Flink CDC 可以捕获到 MySQL 的 insert/update/delete 操作日志,并通过 Flink SQL 可对 ElasticSearch 的索引数据进行 insert/update/delete。文章来源地址https://www.toymoban.com/news/detail-408130.html

到了这里,关于Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 最新版Flink CDC MySQL同步Elasticsearch(一)

    首先我们要基于Flink CDC MySQL同步MySQL的环境基础上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。笔者已经搭建好环境,这里不做具体演示了,如果需要Es的搭建教程情况笔者其他博客 注意: 建议生产环境统一使用稳定版本Flink1.16.*。笔者这里只是作为教程编写

    2024年02月13日
    浏览(38)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(44)
  • Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下 授权链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 基于jdk1.8 + springboot2.7.x + elasticsearch7.x 到此就大功告成啦!代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/

    2024年02月16日
    浏览(44)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(46)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • Flink CDC系列之:Oracle CDC Connector

    2023年08月23日
    浏览(51)
  • Flink CDC 实时mysql到mysql

    CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。 mysqlcdc需要mysql开启binlog,找到my.cnf,在 [mysqld] 中加入如下信息 [mysqld]

    2024年02月12日
    浏览(47)
  • CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi

    继上一篇 《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》 讨论了一种典型的 CDC 集成方案后,本文,我们改用 Flink CDC 完成同样的 CDC 数据入湖任务。与上一个方案有所不同的是:借助现有的 Flink 环境,我们可以直接使用 Flink CDC 从源头数据库接入数据,

    2024年02月22日
    浏览(48)
  • Flink CDC MySQL同步MySQL错误记录

    0、相关Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者从mvnrepository.com下载 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    浏览(53)
  • flink mysql cdc调试问题记录

    最近需要用到flink cdc作为数据流处理框架,在demo运行中发现一些问题,特此记录问题和解决过程。 Caused by: java.lang.IllegalArgumentException: Can\\\'t find any matched tables, please check your configured database-name: [localdb] and table-name: [flink_cdc_message] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.

    2023年04月17日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包