最新版Flink CDC MySQL同步Elasticsearch(一)

这篇具有很好参考价值的文章主要介绍了最新版Flink CDC MySQL同步Elasticsearch(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.环境准备

首先我们要基于Flink CDC MySQL同步MySQL的环境基础上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。笔者已经搭建好环境,这里不做具体演示了,如果需要Es的搭建教程情况笔者其他博客

注意: 建议生产环境统一使用稳定版本Flink1.16.*。笔者这里只是作为教程编写采用当下最新版本,生产环境不推荐使用

2.编译flink-sql-connector-mysql-cdc

最新版本flink-1.17.1 mysql同步Es具体jar依赖版本如下所示:

注意:下载链接仅适用于稳定版本,SNAPSHOT依赖需要您自己构建。

flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

flink-sql-connector-mysql-cdc-2.5-SNAPSHOT.jar(需要自行进行构建编译,笔者构建的已经上次至次博客。需要可以进行下载,csdn需要积分下载,无法设置免费的,需要免费版可以直接联系笔者)

下载所需的JAR包并放在下面flink-1.17.1/lib/:

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

3.建立mysql和Es映射关系表

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。
最新版Flink CDC MySQL同步Elasticsearch(一),flink,flink,mysql,elasticsearch首先,每 3 秒启用一次检查点

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

编辑源数据库Flink Sql代码,如下所示:

CREATE TABLE products (
 id INT NOT NULL,
 name STRING,
 description STRING,
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入
 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的
 'port' = '3306', #源数据库端口
 'username' = 'root',#源数据库账号
 'password' = '*****',#源数据库密码
 'database-name' = 'mydb',#源数据库
 'table-name' = 'products'#源数据库表
);

在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表

-- Flink SQL
Flink SQL> CREATE TABLE products (
>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '192.168.50.163',
>     'port' = '3306',
>     'username' = 'root',
>     'password' = '****',
>     'database-name' = 'mydb',
>     'table-name' = 'products'
>   );

在es创建要同步的目标索引,具体语句如下:

PUT product1
{
  "settings": {
    "number_of_shards": 12,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "integer"
      },
      "name": {
        "type": "keyword"
      },
      "description": {
        "type": "text"
      }
    }
  }
}

编辑目标ES映射Flink Sql代码,如下所示:

   CREATE TABLE product1 (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'elasticsearch-7',#目标ES版本,最新目前支持7
     'hosts' = 'http://192.168.50.236:9200',#连接信息
     'index' = 'product1'#索引信息
 );

注意: 本文Es为测试版本没有配置账号密码,如果有账号密码配置即可 ‘username’ = ‘xxxx’,‘password’=‘xxxx’

建立目标索引与Flink SQL的映射关系,具体语句如下:

-- Flink SQL
 CREATE TABLE product1 (

>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>      'connector' = 'elasticsearch-7',#目标ES版本,最新目前支持7
>      'hosts' = 'http://192.168.50.236:9200',#连接信息
>      'index' = 'product1'#索引信息
>  );

使用Flink SQL添加mysql和Es映射表数据关联关系

-- Flink SQL
Flink SQL> insert into product1 select * from products;

4.时区问题处理

错误:
The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone Etc/UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决思路:

  • Flink集群开启NTP服务器 时间同步
  • 把服务器时区改成和数据库一样的时间本文为(Asia/Shanghai)
  • 配置Flink sql的时区为Asia/Shanghai,具体命令如下所示:
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';

注意:这是笔者遇到的问题,具体问题具体解决即可

5.具体实现结果

整体实现结果如下图所示:

Flink 运行任务

最新版Flink CDC MySQL同步Elasticsearch(一),flink,flink,mysql,elasticsearch

mysql 源数据表数据

最新版Flink CDC MySQL同步Elasticsearch(一),flink,flink,mysql,elasticsearch

Es目标索引已经数据查询图

最新版Flink CDC MySQL同步Elasticsearch(一),flink,flink,mysql,elasticsearch至此,笔者的Flink CDC MySQL同步Elasticsearch第一篇讲解完毕,希望能帮助到搭建文章来源地址https://www.toymoban.com/news/detail-539187.html

到了这里,关于最新版Flink CDC MySQL同步Elasticsearch(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下 授权链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 基于jdk1.8 + springboot2.7.x + elasticsearch7.x 到此就大功告成啦!代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/

    2024年02月16日
    浏览(43)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(45)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(50)
  • 【ElasticSearch】Docker安装最新版ElasticSearch 8.6.2

    在本文中,我将为您介绍如何在 Docker 中安装 ElasticSearch 8.6.2 。ElasticSearch是一个流行的开源搜索和分析引擎,可以帮助您快速、准确地搜索和分析数据。通过在 Docker 中安装 ElasticSearch ,您可以轻松地部署和管理您的 ElasticSearch 实例,并确保系统的可移植性和可靠性。 在安装

    2024年02月01日
    浏览(55)
  • WindowsServer安装mysql最新版

    目录   安装 配置 MySQL 环境变量 远程连接 MySQL 服务器 防火墙权限  配置 MySQL 服务的用户权限 测试远程连接 下载相应mysql安装包: MySQL :: Download MySQL Installer  选择不登陆下载  双击运行下载好的mysql-installer-community-*.*.*.msi 进入类型选择页面,本人需要mysql云服务就选择了s

    2024年02月02日
    浏览(46)
  • 【2023最新版】DataGrip使用MySQL教程

    目录  一、安装MySQL 二、安装DataGrip 三、DataGrip使用MySQL 1. 新建项目 2. DataGrip连接MySQL 下载驱动文件 填写root+密码 测试 成功 3. DataGrip操作MySQL 四、MySQL常用命令 1. 登录 2. 帮助 3. 查询所有数据库         MySQL是一种开源的关系型数据库管理系统(RDBMS),它是最流行和广泛

    2024年02月09日
    浏览(73)
  • MySQL最新版8.1.0安装配置教程

    目录 前言 安装流程图 1,MySQL数据库是什么? 2,下载zip压缩包 3,解压到要安装的目录 4,添加环境变量 4.1,找到环境变量 4.2,进行环境变量的添加  5.新建mysql 配置文件 6、安装mysql服务 7、初始化数据文件 8、启动mysql 9.进入mysql管理界面修改密码 10,重启MySQL即可正常使用 11,总

    2024年02月08日
    浏览(80)
  • MySql workBench客户端菜单汉化最新版

    找到Mysql Workbench安装目录 如:D:softMySQLMySQL Workbench 8.0 CE 进入data目录:D:softMySQLMySQL Workbench 8.0 CEdata 里面有main_menu.xml文件,将汉化文件覆盖即可 main_menu.xml汉化内容

    2024年01月19日
    浏览(42)
  • Google Chrome谷歌浏览器安装最新版Elasticsearch插件 图文教程 【一看就懂】

    我们在虚拟机安装了Elasticsearch后,往往还需要再安装一个可视化界面以便于使用。本文就教您如何在Google Chrome谷歌浏览器安装最新版Elasticsearch(es)插件。 | 1 如图所示,点击设置 进入如下界面,点击扩展程序 开启开发者模式后点击Chrome网上应用商店 如图所示搜索插件El

    2024年02月07日
    浏览(73)
  • Flink CDC MySQL同步MySQL错误记录

    0、相关Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者从mvnrepository.com下载 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包