mysql 5.7同步数据到es 7.6.2(集群)

这篇具有很好参考价值的文章主要介绍了mysql 5.7同步数据到es 7.6.2(集群)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

小伙伴们,你们好,我是老寇

mysql驱动、canal安装包:https://pan.baidu.com/s/1swrV9ffJnmz4S0mfkuBbIw 
提取码:1111

目录

一、前提条件

二、可选方案

三、es api同步数据

四、logstash同步数据到es

五、mysql通过binlog同步数据到es

一、前提条件

  • 安装mycat 1.6,点点我
  • 安装es 7.6.2集群,点点我
  • 安装logstash 7.6.2,点点我

二、可选方案

在实际项目中,业务数据主流存储在mysql,但是mysql处理海量数据的搜索能力较差,推荐mysql搭配es,为业务提供强大的搜索能力成为业界主流方案,难点在于如何将mysql导入到es。

mysql数据同步到es,有以下几种方案:

  • 更新mysql数据库的同时通过es api实现数据写入es(同步)
  • 通过logstash同步数据到es(异步)
  • 通过mysql binlog,将数据同步到es(异步)

三、es api同步数据

直接调用es api即可,代码详情,请点点我

    @PutMapping("/sync")
    @ApiOperation("同步修改ES")
    @CrossOrigin
    public void updateData(@RequestBody final ElasticsearchModel model) {
        String id = model.getId();
        String indexName = model.getIndexName();
        String paramJson = model.getData();
        elasticsearchUtil.updateData(indexName,id,paramJson);
    }

四、logstash同步数据到es

参考博客:同步MYSQL数据到Elasticsearch

logstash通过使用jdbc-input实现定时同步mysql数据,因此需要提前下载mysql的jdbc驱动

1.配置logstash.mysql.conf

input {
  # 配置JDBC数据源
  jdbc {
    # mysql jdbc驱动路径
    jdbc_driver_library => "D:/xxxxx/mysql-connector-java-5.1.35.jar"
    # mysql jdbc驱动
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql 连接地址
    jdbc_connection_string => "jdbc:mysql://localhost:8066/TESTDB"
    # mysql 账号
    jdbc_user => "root"
    # mysql 密码
    jdbc_password => "123456"
    # 定时任务配置,一分钟执行一次
    schedule => "* * * * *"
    # sql查询语句
    # 增量更新数据
    statement => "select * from boot_resource where id > :sql_last_value"
    # 允许sql_last_value的值来源于结果的某个字段值
    use_column_value => true
    # sql_last_value的值来自查询结果的最后一个id值
    tracking_column => "id"
    # 开启分页
    jdbc_paging_enabled => true
    # 分页大小
    jdbc_page_size => 50
  }
}

output {
  # 配置数据导入es
  elasticsearch {
    # 索引名称
    index => "laokou-resource"
    # es地址
    hosts => ["192.168.1.1:9200","192.168.1.2:9200","192.168.1.3:9200"]
    # document_id设置mysql表主键
    document_id => "%{id}"
  }
  stdout {
    codec => rubydebug
  }
}

 2.jdbc schedule 配置说明

jdbc schedule的配置规则

*   *   *   *   *
分  时  天  月  星期

各字段取值范围

  • 分 - 0 ~ 59
  • 时 - 0 ~ 23
  • 天 - 1 ~ 31
  • 月 - 1 ~ 12
  • 星期 - 0 ~ 6

常见特殊字符含义

  • 星号(*):代表所有值,如第一个星号,表示每分钟
  • 斜线(/):指定时间的间隔频率,如*/5,用在第一个分钟字段,表示每5分钟执行一次

举个栗子

# 每分钟执行一次
* * * * *

# 每5分钟执行一次
*/5 * * * *

# 每小时执行一次
* */1 * * *

# 每天八点执行一次
0 8 * * *

3.增量同步数据

同步数据的sql语句,直接扫描全表的数据,如果数据量较小,问题不大,如果数据量比较大,会直接卡死,logstash OOM挂掉,因此需要增量同步数据,并且每次仅同步新增数据

logstash提供了sql_last_value字段值,可以实现增量同步数据

实现思路:logstash每次执行sql时,会将sql查询结果的最后一条记录某个字段的值保存到sql_last_value字段中,下一次执行sql时,就以sql_last_value的值做条件,从这个值往后查询数据

statement => "select * from boot_resource where id > :sql_last_value"

4.数据分页

增量同步的数据太多,会导致logstash卡死,尤其是首次增量同步时(首次同步数据,其实是全表扫描),为避免一次查询太多数据,可以配置分页

    # 开启分页
    jdbc_paging_enabled => true
    # 分页大小
    jdbc_page_size => 50

5.启动logstash

logstash -f logstash.mysql.conf

五、mysql通过binlog同步数据到es

阿里的开源神器canal

mysql 5.7同步数据到es 7.6.2(集群)

1. canal简介

canal主要用于对mysql的增量日志进行解析(请注意,只支持增量解析,不支持全量解析),提供增量数据的订阅消费,对mysql增量数据进行实时同步,支持同步到mysql、elasticsearch、hbase等数据源

2.canal常用组件

  • canal-deployer(canal-server):监听mysql的binlog,把自己伪装成mysql slave,只负责接收数据,不做数据处理
  • canal-adapter:canal客户端,从canal-server中获取数据,对数据进行同步,可以同步到mysql、elasticsearch、hbase等数据源
  • canal-admin:提供整体配置管理、节点运维等功能,web界面方便用户快速和安全的操作

3.canal工作原理

mysql 5.7同步数据到es 7.6.2(集群)

  •  canal模拟mysql master slave的交互协议,把自己伪装mysql slave,向mysql master发送dump协议(dump用于备份
  • mysql master接收到dump请求,向canal推送binlog
  • canal通过解析binlog,将数据同步到其他的数据源

4.MySQL主从复制原理

mysql 5.7同步数据到es 7.6.2(集群)

  •  mysql master在每个事务更新数据完成后,将该操作记录串行写入binlog(二进制文件)文件中
  • mysql slave 开启一个I/O线程,该线程在master打开一个普通连接,主要工作的是binlog dump process,如果读取的进度已经跟上master,就会进入休眠状态并等待master产生新的事件,I/O线程的最终目的就是将这些事件写入到replay log(中继日志
  • sql线程会读取中继日志,并按顺序执行该日志的sql事件,从而与mysql master的数据保持一致

5.下载canal 1.1.5并安装(win11教程,下载完解压文件

百度网盘:https://pan.baidu.com/s/1swrV9ffJnmz4S0mfkuBbIw 
提取码:1111

6.mysql配置

master 配置

log-bin=mysql-bin ##开启二进制日志
binlog-format=row ##二进制日志格式(row\mixed\statement)

查看binlog是否启用

show variables like '%log_bin%';

mysql 5.7同步数据到es 7.6.2(集群)

 查看binlog模式

show variables like '%binlog_format%';

mysql 5.7同步数据到es 7.6.2(集群)

 7.创建账号,用于订阅binlog

CREATE USER canal IDENTIFIED BY '123456';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

8.创建数据库

DROP TABLE IF EXISTS `boot_city`;
CREATE TABLE `boot_city` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `parent_id` bigint(20) NOT NULL COMMENT '上级编号',
  `region` varchar(100) NOT NULL COMMENT '地区名称',
  `region_id` int(11) NOT NULL COMMENT '地区编号',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1324914279466823682 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='城市';

9.配置canal-server

1.配置conf/example/instance.properties

# 需要同步数据的MySQL地址
canal.instance.master.address=192.168.1.1:3306 
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=123456
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*

2.启动服务只需要双击bin/startup.bat

10.配置canal-adapter

1.修改conf/application.yml

#修改 es\mysql配置 
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.1.1:3306/kcloud?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 192.168.1.1:9300,192.168.1.2:9300,192.168.1.3:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: laokou-elasticsearch

2.修改conf/es7/mytest_user.yml

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: laokou-city
  _id: id
  sql: "select a.id,a.parent_id,a.region,a.region_id from boot_city a"
  etlCondition: "where a.id >= {}"
  commitBatch: 3000

3.启动服务只需要双击bin/startup.bat

11.启动kibana,创建索引

PUT /laokou-city
{
  "mappings": {
    "properties": {
      "id":{
        "type": "long"
      },
      "parent_id":{
        "type": "long"
      },
      "region":{
        "type": "keyword"
      },
      "region_id":{
        "type": "integer"
      }
    }
  }
}

12.数据库插入数据

INSERT INTO `boot_city` VALUES ('1324912501966925826', '227', '东安县', '2093');
INSERT INTO `boot_city` VALUES ('1324912504676446210', '227', '双牌县', '2094');
INSERT INTO `boot_city` VALUES ('1324912506161229825', '227', '道县', '2095');
INSERT INTO `boot_city` VALUES ('1324912507595681794', '227', '江永县', '2096');
INSERT INTO `boot_city` VALUES ('1324912509306957825', '227', '宁远县', '2097');
INSERT INTO `boot_city` VALUES ('1324912511995506689', '227', '蓝山县', '2098');
INSERT INTO `boot_city` VALUES ('1324912514667278338', '227', '新田县', '2099');
INSERT INTO `boot_city` VALUES ('1324912516038815746', '227', '江华瑶族自治县', '2100');
INSERT INTO `boot_city` VALUES ('1324912517494239234', '228', '鹤城区', '2101');
INSERT INTO `boot_city` VALUES ('1324912520161816578', '228', '中方县', '2102');
INSERT INTO `boot_city` VALUES ('1324912521667571714', '228', '沅陵县', '2103');
INSERT INTO `boot_city` VALUES ('1324912523072663553', '228', '辰溪县', '2104');
INSERT INTO `boot_city` VALUES ('1324912524578418690', '228', '溆浦县', '2105');
INSERT INTO `boot_city` VALUES ('1324912526096756737', '228', '会同县', '2106');
INSERT INTO `boot_city` VALUES ('1324912527514431490', '228', '麻阳苗族自治县', '2107');
INSERT INTO `boot_city` VALUES ('1324912529036963841', '228', '新晃侗族自治县', '2108');
INSERT INTO `boot_city` VALUES ('1324912530588856321', '228', '芷江侗族自治县', '2109');
INSERT INTO `boot_city` VALUES ('1324912531956199426', '228', '靖州苗族侗族自治县', '2110');
INSERT INTO `boot_city` VALUES ('1324912533394845697', '228', '通道侗族自治县', '2111');

canal-server会监听binlog日志数据是否发生更改

13.效果截图

mysql 5.7同步数据到es 7.6.2(集群)

14.问题解决

注:如果能获取canal-sever数据,不能写入es

请替换canal-adapter的plugin目录下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar

百度网盘:https://pan.baidu.com/s/1swrV9ffJnmz4S0mfkuBbIw 
提取码:1111

大功告成文章来源地址https://www.toymoban.com/news/detail-431120.html

到了这里,关于mysql 5.7同步数据到es 7.6.2(集群)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • elasticsearch(ES)分布式搜索引擎04——(数据聚合,自动补全,数据同步,ES集群)

    **聚合(aggregations)**可以让我们极其方便的实现对数据的统计、分析、运算。例如: 什么品牌的手机最受欢迎? 这些手机的平均价格、最高价格、最低价格? 这些手机每月的销售情况如何? 实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近

    2024年02月08日
    浏览(51)
  • Mysql和ES、Redis数据同步方案汇总_redis同步数据从mysql到es

    文章目录 前言 一、数据同步方案 1.同步双写 [2.异步双写(MQ方式)](#2MQhttpssocsdnnetsosearchqMQspm1001210130017020_53) 3.基于Mysql表定时扫描同步 [4.基于Binlog实时同步](#4BinloghttpssocsdnnetsosearchqBinlogspm1001210130017020_119) [二、数据迁移同步工具选型](#httpssocsdnnetsosearchqspm1001210130017020_141) 总结

    2024年04月23日
    浏览(36)
  • MySQL:5.6同步到5.7 GTID报错

    问题描述和处理 同步到的版本为5.7.35,按理说在5.7种还是一个比较新的版本了,报错大概如下: 从报错来看肯定属于逻辑时钟并发的问题,因为在5.6中gtid event没有last commit和seq number,因此可能并发的时候遇到了问题,因此简单的关闭了MTS并发回放就可以了继续了。 分析原因

    2024年02月05日
    浏览(36)
  • es与mysql的数据同步问题

    方案一:同步调用  缺点:1.业务耦合             2.降低整体性能             3.当一个业务出现问题则直接卡死 方案二:异步通知(解除了两个服务之间的耦合)  缺点:比较依赖与mq 方案三:监听MySQL的binlog日志  三种方式优缺点对比  方案二: 1.引入依赖 2.编写配置

    2023年04月17日
    浏览(44)
  • Mysql和ES数据同步方案汇总

    在实际项目开发中,我们经常将Mysql作为业务数据库,ES作为查询数据库,用来实现读写分离,缓解Mysql数据库的查询压力,应对海量数据的复杂查询。这其中有一个很重要的问题,就是如何实现Mysql数据库和ES的数据同步,今天和大家聊聊Mysql和ES数据同步的各种方案。 为什么

    2024年01月20日
    浏览(46)
  • ELK增量同步数据【MySql->ES】

            1.  linux,已经搭建好的logstash+es+kibana【系列版本7.0X】,es 的plugs中安装ik分词器 ES版本:  Logstash版本:  (以上部署,都是运维同事搞的,我不会部署,同事给力) 1、在Logstash安装目录下【/usr/share/logstash】,新建XX.sh,内容如下: 2. 在Logstash安装目录下【/usr/shar

    2024年02月11日
    浏览(38)
  • Logstash同步Mysql数据至ES

    官方文档 注意版本要和自己的es版本一致 下载地址:logstash 上传至服务器并进行解压。 1、通过官网下载mysql连接jar包 下载地址:mysql 连接jar包 根据自己mysql版本和系统进行选择 ​  ​   ​   ​   ​ 2、在IDEA中复制msyql连接jar包 ​   ​ 将jar包移动至/logstash/logstash-core/lib

    2024年02月10日
    浏览(37)
  • logstash同步mysql数据到es(三、es模板问题)

     相关问题汇总: logstash同步mysql数据到es(一、es模板问题,请求返回400) logstash同步mysql数据到es(二、jdbc_driver_library问题)_(please check user and group permissions for the p-CSDN博客 logstash同步mysql数据到es(三、es模板问题)-CSDN博客 使用docker实现logstash同步mysql到es-CSDN博客 [INFO ] 2023-12-11 09

    2024年01月17日
    浏览(79)
  • FlinkCDC数据实时同步Mysql到ES

    考大家一个问题,如果想要把数据库的数据同步到别的地方,比如es,mongodb,大家会采用哪些方案呢? ::: 定时扫描同步? 实时日志同步? 定时同步是一个很好的方案,比较简单,但是如果对实时要求比较高的话,定时同步就有点不合适了。今天给大家介绍一种实时同步方案,就

    2024年02月03日
    浏览(42)
  • canal同步mysql数据到es中

    项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表和读写分离,每张表数据控制在500W一下,但是效率还是达不到要求,为了提高查询效率,我们使用ES查询。 而将mysql实时同步到es中保证数据一致性就成了我们的工作之下。 jdk1.8(依赖jdk环境,需要先

    2023年04月08日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包