Elasticsearch实战-数据同步(解决es数据增量同步)

这篇具有很好参考价值的文章主要介绍了Elasticsearch实战-数据同步(解决es数据增量同步)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、数据同步问题分析

之前测试的数据都是一次从mysql导入到es,随着时间的推移,每天都有可能发生增删改查,不可能每次都全量同步,所以需要考虑增量同步问题。
Elasticsearch实战-数据同步(解决es数据增量同步)

二、解决方案

1、同步调用

Elasticsearch实战-数据同步(解决es数据增量同步)

缺点:
耦合性高,服务之间会相互影响

2、异步通知

依赖消息队列的可靠性
Elasticsearch实战-数据同步(解决es数据增量同步)

3、监听binlog

Elasticsearch实战-数据同步(解决es数据增量同步)

4、方案对比

Elasticsearch实战-数据同步(解决es数据增量同步)

三、案例-利用MQ实现Mysql与Elasticsearch数据同步

Elasticsearch实战-数据同步(解决es数据增量同步)

1、导入hotel-admin项目

启动:端口8099
Elasticsearch实战-数据同步(解决es数据增量同步)

2、申明exchange、queue、RoutingKey

Elasticsearch实战-数据同步(解决es数据增量同步)文章来源地址https://www.toymoban.com/news/detail-510827.html

/**
 * 定义队列和exchange
 * @author edevp
 */
@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.EXCHANGE_NAME, true, false);
    }

    @Bean
    public Queue InsertQueue() {
        return new Queue(MqConstants.INSERT_QUEUE_NAME, true);
    }

    @Bean
    public Queue DeleteQueue() {
        return new Queue(MqConstants.DELETE_QUEUE_NAME, true);
    }

    @Bean
    public Binding insertBinding() {
        return BindingBuilder.bind(InsertQueue()).to(topicExchange()).with(MqConstants.INSERT_KEY);
    }

    @Bean
    public Binding deleteBinding() {
        return BindingBuilder.bind(DeleteQueue()).to(topicExchange()).with(MqConstants.DELETE_KEY);
    }

}

3、在hotel-admin中完成增删改查的消息推送

 @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){

        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {

        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.DELETE_KEY,id);
    }

4、在hotel-demo中完成消息监听并更新到es中


```java
/**
 * 消息监听
 * @author edevp
 **/
@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.INSERT_QUEUE_NAME),
            exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = MqConstants.INSERT_KEY
    ))
    public void listenHotelInsert(Long hotelId){
        // 新增
        hotelService.saveById(hotelId);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.DELETE_QUEUE_NAME),
            exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = MqConstants.DELETE_KEY
    ))
    public void listenHotelDelete(Long hotelId){
        // 删除
        hotelService.deleteById(hotelId);
    }
}

# 四、代码仓库
hotel-admin:[https://gitee.com/edevp/hotel-admin](https://gitee.com/edevp/hotel-admin)(hotel-db-sync分支)
						
hotel-demo:[https://gitee.com/edevp/hotel-demo](https://gitee.com/edevp/hotel-demo)

到了这里,关于Elasticsearch实战-数据同步(解决es数据增量同步)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • DataX实现Mysql与ElasticSearch(ES)数据同步

    jdk1.8及以上 python2 查看是否安装成功 查看python版本号,判断是否安装成功 在datax/job下,json格式,具体内容及主要配置含义如下 mysqlreader为读取mysql数据部分,配置mysql相关信息 username,password为数据库账号密码 querySql:需要查询数据的sql,也可通过colums指定需要查找的字段(

    2024年02月05日
    浏览(42)
  • 【kafka】JDBC connector进行表数据增量同步过程中的源表与目标表时间不一致问题解决...

    〇、参考资料 时间不一致,差了8个小时 (1)source (2)sink 即sink和source都加  \\\"db.timezone\\\": \\\"Asia/Shanghai\\\", 并需要保持一直

    2024年02月11日
    浏览(21)
  • FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版

    网上挺多flinksql方式同步数据,但是遇到数据比较杂乱,会经常无缘无故报错,笔者被逼无奈,采用API方式处理数据后同步,不知为何API资料笔者找到的资料很少,还很不全,摸着石头过河总算完成任务,收获颇丰,以此分享给大家。 有个大坑,我用该程序监控mongodb只能监控

    2024年02月11日
    浏览(32)
  • elasticsearch(ES)分布式搜索引擎04——(数据聚合,自动补全,数据同步,ES集群)

    **聚合(aggregations)**可以让我们极其方便的实现对数据的统计、分析、运算。例如: 什么品牌的手机最受欢迎? 这些手机的平均价格、最高价格、最低价格? 这些手机每月的销售情况如何? 实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近

    2024年02月08日
    浏览(36)
  • 【ElasticSearch】ES与MySQL数据同步方案及Java实现

    elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。 操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用 同步调用方式

    2024年02月15日
    浏览(36)
  • 使用kettle同步全量数据到Elasticsearch(es)--elasticsearch-bulk-insert-plugin应用

    为了前端更快地进行数据检索,需要将数据存储到es中是一个很不错的选择。由于公司etl主要工具是kettle,这里介绍如何基于kettle的elasticsearch-bulk-insert-plugin插件将数据导入es。在实施过程中会遇到一些坑,这里记录解决方案。 可能会遇到的报错: 1、No elasticSearch nodes found 2、

    2024年02月01日
    浏览(59)
  • Redis主从架构、数据同步原理、全量同步、增量同步

    大家好,我是哪吒。 2023年再不会Redis,就要被淘汰了 图解Redis,谈谈Redis的持久化,RDB快照与AOF日志 Redis单线程还是多线程?IO多路复用原理 Redis集群的最大槽数为什么是16384个? Redis缓存穿透、击穿、雪崩到底是个啥?7张图告诉你 Redis分布式锁的实现方式 Redis分布式缓存、

    2024年02月07日
    浏览(54)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(33)
  • flinkcdc同步完全量数据就不同步增量数据了

    使用flinkcdc同步mysql数据,使用的是全量采集模型 startupOptions(StartupOptions.earliest()) 全量阶段同步完成之后,发现并不开始同步增量数据,原因有以下两个: 1.mysql中对应的数据库没有开启binlog 在/etc/my.cnf配置文件中,在[ mysqld ]添加以下内容 然后重启数据库 ,执行命令 和chec

    2024年02月11日
    浏览(23)
  • Maxwell - 增量数据同步工具

            今天来学习一个新的大数据小工具 Maxwell ,它和 Sqoop 很像。Sqoop主要用于在 Hadoop (比如 HDFS、Hive、HBase 等)和关系型数据库之间进行数据的批量导入和导出,而 Maxwell 则主要用于监控数据库的变化(通过监控 binlog ),并将变化的数据以JSON格式发布到消息队列(一

    2024年02月20日
    浏览(26)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包