利用MQ实现mysql与elasticsearch数据同步

这篇具有很好参考价值的文章主要介绍了利用MQ实现mysql与elasticsearch数据同步。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

流程

1.声明exchange、queue、RoutingKey
2. 在hotel-admin中进行增删改(SQL),完成消息发送
3. 在hotel-demo中完成消息监听,并更新elasticsearch数据
4. 测试同步

利用MQ实现mysql与elasticsearch数据同步,JAVA微服务,mysql,elasticsearch,java

1.引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

我这里的mq是挂在了docker上,虚拟机地址是192.168.116.128。到时候这个根据自己的项目改就行

 spring: 
  rabbitmq:
    host: 192.168.116.128 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

2.声明交换机、队列和绑定关系

package cn.itcast.hotel.constants;

public class MqConstants {

    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

在hotel-demo中,定义配置类,声明队列、交换机:

package cn.itcast.hotel.config;


import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    // 定义交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
    }
    // 定义队列
    @Bean
    public Queue insertQueue()
    {
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
    }
    @Bean
    public Queue deleteQueue()
    {
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
    }

    // 定义绑定关系
    @Bean
    public Binding insertQueueBinding()
    {
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }
    @Bean
    public Binding deleteQueueBinding()
    {
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

3.发送MQ消息

在hotel-admin中的增、删、改业务中分别发送MQ消息,具体怎么添加根据:

    // 新增
    @PostMapping
    public void saveHotel(){
        //数据库新增操作
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    // 更新
    @PutMapping()
    public void updateById(){
            //数据库修改操作
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());

    }
    // 删除
    @DeleteMapping("/{id}")
    public void deleteById() {
		// 数据库删除操作
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);

    }

4.监听MQ消息

接收MQ消息

hotel-demo接收到MQ消息要做的事情包括:

  • 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
  • 删除消息:根据传递的hotel的id删除索引库中的一条数据

1)首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、删除业务

void deleteById(Long id);

void insertById(Long id);

2)给hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中实现业务:文章来源地址https://www.toymoban.com/news/detail-707020.html

@Override
public void deleteById(Long id) {
    try {
        // 1.准备Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        // 2.发送请求
        client.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Override
public void insertById(Long id) {
    try {
        // 0.根据id查询酒店数据
        Hotel hotel = getById(id);
        // 转换为文档类型
        HotelDoc hotelDoc = new HotelDoc(hotel);

        // 1.准备Request对象
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        // 2.准备Json文档
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        // 3.发送请求
        client.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

编写监听类

package cn.itcast.hotel.mq;

import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {
    // 专门用于消息监听的类

    @Autowired
    private IHotelService hotelService;

    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id)
    {
        hotelService.insertById(id);
    }

    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id)
    {
        hotelService.deleteById(id);
    }
}

到了这里,关于利用MQ实现mysql与elasticsearch数据同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • elasticsearch实现mysql数据同步

    当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。 常见的数据同步方案有三种: 同步调用 异步通知 监听binlog 以下使用异步通知同步elasticsearch的数据  constatnts 包下新建一个类 MqConstants,存储交换机和队列的名称 发送MQ消息   在增、删、改业务中分

    2024年02月09日
    浏览(36)
  • elasticsearch 实现与mysql 数据同步

    mysql8相关的安装可以看下另一篇博客 https://editor.csdn.net/md/?articleId=135905811 1.下载安装logstash 2.logstash 配置 logstash.yml 3.pipelines.yml 配置 同步方式: 1.logstash 2.go-mysql-elasticsearch 3.canal(阿里云) 一.logstash 1.安装mysql-connector-java 插件(需与mysql 版本一致) 2.配置logstash.conf 3.启动logsta

    2024年04月12日
    浏览(38)
  • DataX实现Mysql与ElasticSearch(ES)数据同步

    jdk1.8及以上 python2 查看是否安装成功 查看python版本号,判断是否安装成功 在datax/job下,json格式,具体内容及主要配置含义如下 mysqlreader为读取mysql数据部分,配置mysql相关信息 username,password为数据库账号密码 querySql:需要查询数据的sql,也可通过colums指定需要查找的字段(

    2024年02月05日
    浏览(59)
  • 基于Canal实现Mysql数据实时同步到Elasticsearch(Docker版)

    1、Canal简介   Canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。   Canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向My

    2024年02月10日
    浏览(48)
  • 本地部署Canal笔记-实现MySQL与ElasticSearch7数据同步

    本地搭建canal实现mysql数据到es的简单的数据同步,仅供学习参考 建议首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki 本地搭建MySQL数据库 本地搭建ElasticSearch 本地搭建canal-server 本地搭建canal-adapter 本地环境为window11,大部分组件采用docker进行部署,MySQL采用8.0.27, 推荐

    2024年02月02日
    浏览(56)
  • DolphinScheduler 调度 DataX 实现 MySQL To ElasticSearch 增量数据同步实践

    基于SQL查询的 CDC(Change Data Capture): 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据。也就是我们说的基于SQL查询抽取; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 不保障实时性,基于离线调度存在天然的

    2024年02月03日
    浏览(44)
  • 一种Mysql和Mongodb数据同步到Elasticsearch的实现办法和系统

    本文分享自天翼云开发者社区《一种Mysql和Mongodb数据同步到Elasticsearch的实现办法和系统》,作者:l****n 核心流程如下:   核心逻辑说明: MySQL Binlog解析 : 首先,从MySQL的二进制日志(Binlog)中解析出表名。这一步骤非常关键,因为我们只关注特定表的数据变更。 进一步,我

    2024年02月05日
    浏览(45)
  • mysql数据利用pipe同步至redis

    所有数据表,一条记录一个空间 ps:*4 #表示有4个参数、$4 #表示“参数”有4个字节、 所有数据表,一个表一个空间 ps:这里发现当存在空值时,CONCAT结果为空,所以对于空数据需要做处理 相关逻辑优化 数据入库,如果作为键的字段非主键,即作为键的字段,可能存在相同的

    2024年02月15日
    浏览(36)
  • 数据同步MySQL -> Elasticsearch

    大家好我是苏麟,今天聊聊数据同步 . 一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。 MySQL =ES(单向) 同步方式 首次安装完 ES,把 MySQL 数据全量同步到

    2024年03月21日
    浏览(43)
  • 利用Canal把MySQL数据同步到ES

    Canal是阿里巴巴开源的一个数据库变更数据同步工具,主要用于 MySQL 数据库的增量数据到下游的同步,例如同步到 Elasticsearch、HBase、Hive 等。下面是一个基本的步骤来导入 MySQL 数据库到 Elasticsearch。 安装和配置 Canal 首先,需要在你的机器上安装并配置Canal。具体步骤可在 C

    2024年02月16日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包