数据同步MySQL -> Elasticsearch

这篇具有很好参考价值的文章主要介绍了数据同步MySQL -> Elasticsearch。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好我是苏麟,今天聊聊数据同步 .

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

MySQL =>ES(单向)

同步方式

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本 4 种方式 , 全量同步(首次)+增量同步(新数据)

1.定时任务 (推荐 : 简单)

定时任务 : 比如1分钟1次,找到 MySQL 中过去几分钟内(至少是定时周期的2 倍)发生改变的数据,然后更新到 ES.

  • 优点:简单易懂、占用资源少、不用引入第三方中间件
  • 缺点:有时间差
  • 应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
2.双写 

双写 : 写数据的时候,必须也去写 ES;更新删除数据库同理。(事务:建议先保证 MySQL写成功,如果ES 写失败了,可以通过定时任务 + 日志 +告警进行检测和修复(补偿))

  • 优点 : 不知道
  • 缺点 : 繁琐
3. Logstash

用 Logstash 数据同步管道 (一般要配合 kafka 消息队列 + beats 采集器)

  • 优点 : 用起来方便,插件多
  • 缺点 : 成本更大 : 一般要配合其他组件使用 (比如 kafka) , 维护成本 : 多维护一个组件 , 学习成本 : 学习使用
4.Canal (推荐 : 简单 , 实时性非常强)

Canal 监听 MySQL Binlog,实时同步

  • 优点 : 实时同步 , 实时性非常强
  • 缺点 :  忽略不计   (MySQL8版本可能连接失败)

定时任务

找到 MySQL 中过去几分钟内发生改变的数据,然后更新到 ES.

双向写入

写数据的时候,也去写 ES .

这个不推荐!

Logstash

传输 处理 数据的管道

文章 : Getting Started with Logstash | Logstash Reference [7.17] | Elastic

下载 : https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-windows-x86_64.zip

数据同步MySQL -> Elasticsearch,Elasticsearch,SpringCloud篇,MySQL,mysql,elasticsearch,数据库

快速开始 : Running Logstash on Windows | Logstash Reference [7.17] | Elastic 

这里需要学习成本 , 有兴趣的小伙伴自己了解 , 这里不过多赘述 .

订阅数据库流水的同步方式 Canal

地址 : GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

原理 : 数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理canal : 帮你监听 binlog,并解析 binlog 为你可以理解的内容。 

它伪装成了 MySQL 的从节点,获取主节点给的 binlog,如图:

数据同步MySQL -> Elasticsearch,Elasticsearch,SpringCloud篇,MySQL,mysql,elasticsearch,数据库

快速开始 : QuickStart · alibaba/canal Wiki · GitHub

windows 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.ini 文件:

Linux 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.cnf 文件:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MSQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

这里有个报错 java 找不到,修改 startup.bat 脚本为你自己的 java home

set JAVA_HOME=C:\Users\59278\.jdks\corretto-1.8.0_302 (自己MySQL路径)

set PATH=%JAVA_HOME%\bin;%PATH%

Java 中引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

Demo  

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

    
  

}

在启动之后 修改MySQL中数据 , java控制台就能实时输出 .

这期就到这里 , 下期见 !文章来源地址https://www.toymoban.com/news/detail-841963.html

到了这里,关于数据同步MySQL -> Elasticsearch的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Logstash同步MySQL数据到ElasticSearch

    当MySQL数据到一定的数量级,而且索引不能实现时,查询就会变得非常缓慢,所以使用ElasticSearch来查询数据。本篇博客介绍使用Logstash同步MySQL数据到ElasticSearch,再进行查询。 测试环境 Windows系统 MySQL 5.7 Logstash 7.0.1 ElasticSearch 7.0.1 Kibana 7.0.1 ELK工具下载可访问:https://www.elastic

    2024年02月01日
    浏览(42)
  • Enterprise:使用 MySQL connector 同步 MySQL 数据到 Elasticsearch

    Elastic MySQL 连接器是 MySQL 数据源的连接器。它可以帮我们把 MySQL 里的数据同步到 Elasticsearch 中去。在今天的文章里,我来详细地描述如何一步一步地实现。 在下面的展示中,我将使用 Elastic Stack 8.8.2 来进行展示。 无缝集成:将 Elasticsearch 连接到 MongoDB Enterprise:使用 MySQL c

    2024年02月16日
    浏览(46)
  • 基于Canal同步MySQL数据到Elasticsearch

    基于 canal 同步 mysql 的数据到 elasticsearch 中。 相关软件的安装请参考:《Canal实现数据同步》 1.1 pom依赖 1.2 SimpleCanalClientExample编写 注意当后面 canal-adapter 也连接上 canal-server 后,程序就监听不到数据变化了。 这个类只是测试,下面不使用。 由于目前 canal-adapter 没有官方dock

    2024年02月07日
    浏览(66)
  • Elasticsearch和MySQL之间的数据同步问题

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章参考网上的课程,介绍Elasticsearch和MySQL之间的数据同步问题。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人

    2023年04月19日
    浏览(39)
  • 利用MQ实现mysql与elasticsearch数据同步

    1.声明exchange、queue、RoutingKey 2. 在hotel-admin中进行增删改(SQL),完成消息发送 3. 在hotel-demo中完成消息监听,并更新elasticsearch数据 4. 测试同步 我这里的mq是挂在了docker上,虚拟机地址是192.168.116.128。到时候这个根据自己的项目改就行 在hotel-demo中,定义配置类,声明队列、交

    2024年02月09日
    浏览(34)
  • DataX实现Mysql与ElasticSearch(ES)数据同步

    jdk1.8及以上 python2 查看是否安装成功 查看python版本号,判断是否安装成功 在datax/job下,json格式,具体内容及主要配置含义如下 mysqlreader为读取mysql数据部分,配置mysql相关信息 username,password为数据库账号密码 querySql:需要查询数据的sql,也可通过colums指定需要查找的字段(

    2024年02月05日
    浏览(60)
  • 【elasticsearch专题】:Logstash从入门到同步MySQL数据

      Elasticsearch是在数据处理生态系统中担任一个开源的分布式 搜索 和 分析 引擎的角色,专为 存储 、 检索 和分析大量的数据而打造。与此相伴的是Kibana,一个开源数据可视化平台,用于以优雅的方式展示Elasticsearch中的数据,并赋予用户创建仪表盘、图表和报告的能力。然

    2024年02月04日
    浏览(49)
  • docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

    🚀 本文提供的指令完全可以按顺序逐一执行,已进行了多次测试。因此如果你是直接按照我本文写的指令一条条执行的,而非自定义修改过,执行应当是没有任何问题的。 🚀 本文讲述:使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elas

    2024年02月02日
    浏览(58)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(44)
  • 使用Logstash同步mysql数据到Elasticsearch(亲自踩坑)

    这篇文章主要介绍了如何使用Logstash同步mysql数据到Elasticsearch(亲自踩坑),如果帮助到了大家,希望用你毛茸茸的小手点个赞🤗;如有错误或未考虑周全的地方,希望在评论区留言🫡 Logstash官方文档提供了解决方案 一. 安装Logstash Logstash下载地址 下载版本一定要和Elastics

    2024年04月09日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包