ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

这篇具有很好参考价值的文章主要介绍了ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、数据同步

1.1、什么是数据同步

1.2、解决数据同步面临的问题

1.3、解决办法

1.3.1、同步调用

1.3.2、异步通知(推荐)

1.3.3、监听 binlog

1.3、基于 RabbitMQ 实现数据同步

1.3.1、需求

1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听

1.3.3、在“酒店管理服务”中发布消息

1.3.4、启动微服务并测试


一、数据同步


1.1、什么是数据同步

我们知道 elasticsearch 的数据是来源于 数据库(比如 mysql).  当我们在写了代码将 mysql 中的数据导入 es 中,那么这次导入之后 mysql 的数据并不会一成不变,将来我们的业务中会有 crud,数据库中的数据就和有新增、修改、删除,那么 mysql 数据一定那发生变化, es 如果不跟着变化,就会出现问题.

比如在一个商城系统中,到了 双11 ,数据库中的商品的价格下降,而 es 还是老价格,那么用户搜索时看到的商品的价格还是没有变化,用户可能就得考虑换软件了~  

因此,我们要保证 mysql 数据变化的时候 es 也能跟着同步变化,这就是数据同步.

Ps:实际上不光是 es 存在数据同步问题,凡是涉及到数据库双写的情况,比如 redis 和 mysql,都会存在数据同步问题.

1.2、解决数据同步面临的问题

如果你现在是一个单体式的项目,所有业务都写在一个项目中,那就比较好办,无非就是在及逆行新增、修改、删除业务的时候,同时把 es 也一起更新就  ok.

但是如果我们是一个 微服务 架构的项目上,不同的业务往往会在不同的微服务上,比如 “商品数据管理业务” 和 “商品数据搜索业务” 肯定会在两个不同的微服务上,那么跨微服务的项目就没办法直接操作了.

1.3、解决办法

1.3.1、同步调用

假设我们现在有两个微服务,一个是 酒店数据管理服务,另一个是酒店数据搜索服务. 假设这两个服务之间互相不能访问对方的数据库,也就是说,酒店管理服务只能访问 mysql,而 酒店搜索服务 只能访问 es,这也符合 微服务 里的标准和规范.

这里有一种办法是同步调用,步骤如下:

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),ElasticSearch,elasticsearch,mysql,大数据

1.比如用户做新增操作时,首先把数据写到数据库里.

2. 数据库写完了以后紧接着调用 酒店搜索服务中的 “更新索引库” 的接口.

3. 更新 es.

最后更新完了 es 就把响应反馈给搜索服务,然后搜索服务才会把把响应反馈给  管理服务,然后再反馈给用户.  这整个过程依次执行,因此也叫同步调用.

缺陷:

1. 数据耦合,业务耦合:原本只是写数据库,写完就结束了,然后现在还需要再写完数据库的代码后面再加上 调用 “更新索引库” 接口的代码,而且这调用 这个接口的业务跟我新增业务显然没有关系啊,现在业务耦合在一起,将来必然也会影响性能.

2. 影响性能(耦合带来的问题):原本数据库写完了,比如耗时 50ms,但是现在写完数据库,你还得等待后台调用这个接口返回的响应,而这个接口又要等待 es 这里的响应,假如这里也耗时 50ms,那么总的耗时不就是这个三个步骤相加的,达到 100 ms.

3. 牵一发而动全身(耦合带来的问题):如果 步骤 2 和 步骤 3 任意一个位置出现了异常,就会导致整个业务也出现崩溃.

同步调用这么多问题,那么就需要考虑别的方案了.

1.3.2、异步通知(推荐)

这里就需要使用到 mq 来实现了,步骤如下:

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),ElasticSearch,elasticsearch,mysql,大数据

1. 当有人做新增操作时,先去写数据库.

2. 写完之后,不调用任何服务的接口,而是向 mq 发送一个消息,通知一下 其他服务:“我这里数据新增了啊~”,整个步骤到这里结束.

那么至于谁来监听这个消息,监听了以后做什么,跟我有关系吗?没关系,这样一来,业务的耦合就解除了;  至于其他服务耗时多少秒,跟我也没关系,我写完数据库,发完消息就结束了,因此性能也提升了;再者,就算其他服务出现异常了,跟我这里也没关系.

缺陷:

1. 这样一来,比较依赖 mq 消息的可靠性.

2. 引入新的中间件,实现的复杂度也会有一定的上升.

不过这些缺陷,跟同步调用比起来,算不了什么,因此这也是比较推荐的方案.

1.3.3、监听 binlog

mysql 默认情况下 binlog 时关闭的,一旦开启,那么每次 mysql 在做增删改的时候,都会记录相应的操作到 binlog 中.

那么可以使用类似于 canal 这样的中间件来监听 binlog,一旦发现变化,立马通知对应的微服务,这个时候就知道数据发生变更,就可以进行更新了.

优势:

这种方案他既不给任何中间件发消息,也不去调用任何接口,因此耦合度是最低的.

劣势:

1.开启 binlog,对 mysql 的压力就增加了.

2.引入新的中间件.

1.3、基于 RabbitMQ 实现数据同步

1.3.1、需求

现在有两个微服务:“酒店管理服务” 和 "酒店搜索服务"

用户操作 “酒店管理服务” 进行增删改数据、要求对 "酒店搜索服务" 中的 es 的数据也要完成相同的数据更改操作.

这里使用 MQ 的异步方式实现数据同步.

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),ElasticSearch,elasticsearch,mysql,大数据

1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听

在 “酒店搜索服务” 中去声明一个 exchange,用来接收 增删改 的消息,接着声明两个队列即可,一个队列用来增改(这两用一个队列是因为在 es 中,增改可以使用同一个 DSL 语句实现),另一个用来删除(这里我是以 Bean 的方式注入到容器中了).

public class MqConstants {

    //主题交换机
    public static final String EXCHANGE_TOPIC = "hotel.topic";
    //增加 or 修改酒店 队列
    public static final String INSERT_QUEUE = "hotel.insert.queue";
    //删除酒店 队列
    public static final String DELETE_QUEUE = "hotel.delete.queue";
    //增加 or 修改酒店 routingKey
    public static final String INSERT_KEY = "hotel.insert.key";
    //删除酒店 routingKey
    public static final String DELETE_KEY = "hotel.delete.key";

}
@Configuration
public class MqConfig {

    @Bean
    public TopicExchange hotelTopicExchange() {
        return new TopicExchange(MqConstants.EXCHANGE_TOPIC, true, false);
    }

    @Bean
    public Queue hotelInsertQueue() {
        return new Queue(MqConstants.INSERT_QUEUE, true);
    }

    @Bean
    public Queue hotelDeleteQueue() {
        return new Queue(MqConstants.DELETE_QUEUE, true);
    }

    @Bean
    public Binding hotelInsertBinding() {
        return BindingBuilder.bind(hotelInsertQueue()).to(hotelTopicExchange()).with(MqConstants.INSERT_KEY);
    }

    @Bean
    public Binding hotelDeleteBinding() {
        return BindingBuilder.bind(hotelDeleteQueue()).to(hotelTopicExchange()).with(MqConstants.DELETE_KEY);
    }

}

Ps:此类(MqConfig)的包必须与启动类同级,否则声明交换机和队列失败.

最后就可以使用 @RabbitListener 监听 队列 了.

@Component
public class MqListener {

    @Autowired
    private IHotelService hotelService;

    @RabbitListener(queues = MqConstants.INSERT_QUEUE)
    public void HotelInsertOrUpdateListener(Long id) {
        hotelService.insertHotelById(id);
    }

    @RabbitListener(queues = MqConstants.DELETE_QUEUE)
    public void HotelDeleteListener(Long id) {
        hotelService.deleteHotelById(id);
    }

}

Ps:此类(MyListener,监听者类)上必须要有 @Component 注解(交由给 Spring 来管理),否则声明的交换机和队列无效.

1.3.3、在“酒店管理服务”中发布消息

酒店管理服务中,一旦用户进行酒店的 增删改,就会对数据库信息进行修改,然后将增删改的消息发布到 MQ 中.

Ps:这里不要发送 hotel 整体数据,太大可能会导致占满队列(Mq 是基于内存存储的,因此会设定队列上限),因此这里发送 id 即可.  es 这边拿到 id,就可进行相应的增删改.

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        // 新增酒店
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        //修改酒店信息
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        //删除酒店信息
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.DELETE_KEY, id);
    }

1.3.4、启动微服务并测试

a)在酒店管理页面中,修改 “7天连锁酒店(上海莘庄地铁站店)” 为 “7天连锁酒店(此处正在施工,请谨慎前往)”,如下.

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),ElasticSearch,elasticsearch,mysql,大数据

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),ElasticSearch,elasticsearch,mysql,大数据

b)在酒店搜索页面中,搜索 “施工” 关键词,就可以看到在 “酒店管理服务” 中更新的信息,已经同步到了 “酒店搜索服务” 中.

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),ElasticSearch,elasticsearch,mysql,大数据

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),ElasticSearch,elasticsearch,mysql,大数据文章来源地址https://www.toymoban.com/news/detail-718204.html

到了这里,关于ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理

    项目中需要建立客户端与服务端之间的长链接,首先就考虑用WebSocket,再来SpringBoot原来整合WebSocket方式并不高效,因此找到了netty-websocket-spring-boot-starter 这款脚手架,它能让我们在SpringBoot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单 2.1.1 @ServerEndpo

    2024年02月12日
    浏览(41)
  • 计算机毕业设计 基于SpringBoot大学生就业服务平台的设计与实现 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

    🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不

    2024年02月08日
    浏览(39)
  • 服务器部署整合了elasticsearch的springboot项目后报错

            今天在服务器上面更新自己的项目的时候报错了 报错太长了,我提炼了一下,主要是说bean注入失败,各种service和controller全都寄了,后来看到里面有个elasticsearchRepository,又因为刚整合了elasticsearch,所以基本上可以确定问题就是出在elasticsearch上。         这

    2024年02月05日
    浏览(79)
  • 企业级开发项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步

    1、商品上架时:search-service新增商品到elasticsearch 2、商品下架时:search-service删除elasticsearch中的商品 数据同步是希望,当我们商品修改了数据库中的商品信息,索引库中的信息也会跟着改。在微服务中数据库和索引库是在两个不同的服务中。如果,商品的服务,向es的服务中

    2024年02月12日
    浏览(45)
  • Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发

    前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费; 1 为什么要使用Rabbitmq: RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点: 异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和

    2024年02月07日
    浏览(43)
  • 开源.NET8.0小项目伪微服务框架(分布式、EFCore、Redis、RabbitMQ、Mysql等)

    为什么说是伪微服务框架,常见微服务框架可能还包括服务容错、服务间的通信、服务追踪和监控、服务注册和发现等等,而我这里为了在使用中的更简单,将很多东西进行了简化或者省略了。 年前到现在在开发一个新的小项目,刚好项目最初的很多功能是比较通用的,所以

    2024年03月09日
    浏览(45)
  • 基于 Hertz 和 Kitex 的 Go 微服务项目 | 开源项目推荐

    FreeCar 是一个基于 Hertz 与 Kitex 的全栈微服务项目,欢迎 Star。 项目地址:CyanAsterisk/FreeCar Hertz 是一个超大规模的企业级微服务 HTTP 框架,具有高易用性、易扩展、低时延等特点。 Hertz 默认使用自研的高性能网络库 Netpoll,在一些特殊场景中,相较于 go net,Hertz 在 QPS、时延上

    2024年02月07日
    浏览(25)
  • 【项目实战】基于高并发服务器的搜索引擎

    作者:爱写代码的刚子 时间:2024.4.24 前言:基于高并发服务器的搜索引擎,引用了第三方库cpp-httplib,cppjieba,项目的要点在代码注释中了 index.html index.hpp log.hpp parser.cc(用于对网页的html文件切分且存储索引关系) searcher.hpp util.hpp http_server.cc(用于启动服务器和搜索引擎)

    2024年04月28日
    浏览(35)
  • 微服务 & 云原生:基于 Gogs + Drone 进行项目 CI/CD

    以一个简单的前后端项目来说,分别编写前后端的 Dockerfile 文件并构建镜像,然后编写 docker-compose.yml 构建部署,启动运行。每次代码变更后都需重新手动打包、构建、推送。 一个简单的例子: 前端: 项目名:kubemanagement-web 技术栈:Vue 后端: 项目名:kubemanagement 技术栈:

    2024年02月14日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包