es和数据库同步方案

这篇具有很好参考价值的文章主要介绍了es和数据库同步方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

5.5 课程信息索引同步

5.5.1 技术方案

通过向索引中添加课程信息最终实现了课程的搜索,我们发现课程信息是先保存在关系数据库中,而后再写入索引,这个过程是将关系数据中的数据同步到elasticsearch索引中的过程,可以简单成为索引同步。

通常项目中使用elasticsearch需要完成索引同步,索引同步的方法很多:

1、针对实时性非常高的场景需要满足数据的及时同步,可以同步调用,或使用Canal去实现。

1)同步调用即在向MySQL写数据后远程调用搜索服务的接口写入索引,此方法简单但是耦合代码太高。

2)可以使用一个中间的软件canal解决耦合性的问题,但存在学习与维护成本。

canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,实现将MySQL的数据同步到消息队列、Elasticsearch、其它数据库等,应用场景十分丰富。

es和数据库同步方案,elasticsearch,大数据,搜索引擎

它的地址:

github地址:https://github.com/alibaba/canal

版本下载地址:https://github.com/alibaba/canal/releases

文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

Canal基于mysql的binlog技术实现数据同步,什么是binlog,它是一个文件,二进制格式,记录了对数据库更新的SQL语句,向数据库写数据的同时向binlog文件里记录对应的sql语句。当数据库服务器发生了故障就可以使用binlog文件对数据库进行恢复。

所以,使用canal是需要开启mysql的binlog写入功能,Canal工作原理如下:

es和数据库同步方案,elasticsearch,大数据,搜索引擎

1、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump

协议

2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

3、canal 解析 binary log 对象(原始为 byte 流)

详细使用Canal进行索引同步的步骤参考:Canal实现索引同步.pdf

2、当索引同步的实时性要求不高时可用的技术比较多,比如:MQ、Logstash、任务调度等。

MQ:向mysql写数据的时候向mq写入消息,搜索服务监听MQ,收到消息后写入索引。使用MQ的优势是代码解耦,但是需要处理消息可靠性的问题有一定的技术成本,做到消息可靠性需要做到生产者投递成功、消息持久化以及消费者消费成功三个方面,另外还要做好消息幂等性问题。但是MQ会产生数据可靠性问题,在之前的文档之我们讲解了MQ的消息可靠性。使用MQ技术实现难度还是比较大的。

Logstash: 开源实时日志分析平台 ELK包括Elasticsearch、Kibana、Logstash,Logstash负责收集、解析和转换日志信息,可以实现MySQL与Elasticsearch之间的数据同步。也可以实现解耦合并且是官方推荐,但需要增加学习与维护成本。

任务调度:向mysql写数据的时候记录修改记录,开启一个定时任务根据修改记录将数据同步到Elasticsearch。 

我们实现这个同步功能使用索引同步的方案进行解决。

实现代码

1、拷贝CourseIndex 模型类到内容管理model 工程的dto包下。

2、在内容管理服务中添加FeignClient

Java
package com.xuecheng.content.feignclient;

import com.xuecheng.content.model.dto.CourseIndex;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

/**
 * @description 搜索服务远程接口
 * @author Mr.M
 * @date 2022/9/20 20:29
 * @version 1.0
 */
@FeignClient(value = "search",fallbackFactory = SearchServiceClientFallbackFactory.class)
public interface SearchServiceClient {

 @PostMapping("/search/index/course")
 public Boolean add(@RequestBody CourseIndex courseIndex);
}

定义SearchServiceClientFallbackFactory :

Java
@Slf4j
@Component
public class SearchServiceClientFallbackFactory implements FallbackFactory<SearchServiceClient> {
    @Override
    public SearchServiceClient create(Throwable throwable) {

        return new SearchServiceClient() {

            @Override
            public Boolean add(CourseIndex courseIndex) {
                throwable.printStackTrace();
                log.debug("调用搜索发生熔断走降级方法,熔断异常:", throwable.getMessage());

                return false;
            }
        };
    }
}

3、编写课程索引任务执行方法

完善CoursePublishTask类中的saveCourseIndex方法

Java
//保存课程索引信息
public void saveCourseIndex(MqMessage mqMessage,long courseId){
    log.debug("保存课程索引信息,课程id:{}",courseId);

    //消息id
    Long id = mqMessage.getId();
    //消息处理的service
    MqMessageService mqMessageService = this.getMqMessageService();
    //消息幂等性处理
    int stageTwo = mqMessageService.getStageTwo(id);
    if(stageTwo > 0){
        log.debug("课程索引已处理直接返回,课程id:{}",courseId);
        return ;
    }

    Boolean result = saveCourseIndex(courseId);
    if(result){
        //保存第一阶段状态
        mqMessageService.completedStageTwo(id);
    }
}

private Boolean saveCourseIndex(Long courseId) {

    //取出课程发布信息
    CoursePublish coursePublish = coursePublishMapper.selectById(courseId);
    //拷贝至课程索引对象
    CourseIndex courseIndex = new CourseIndex();
    BeanUtils.copyProperties(coursePublish,courseIndex);
    //远程调用搜索服务api添加课程信息到索引
    Boolean add = searchServiceClient.add(courseIndex);
    if(!add){
        XueChengPlusException.cast("添加索引失败");
    }
    return add;

}文章来源地址https://www.toymoban.com/news/detail-672989.html

到了这里,关于es和数据库同步方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 数据库➡ES,数据更新同步

            mysql5.7(自己部署),elasticsearch:7.4.2(自己部署),centos7.0(虚拟环境),python3.8(自己部署),pycharm(自己安装);--建议可用docker部署mysql和es         ①、mysql数据,estable表字段(ID--unique,VALUE--str is json format)唯一ID,json格式的字符串,(表中存的为需要更新

    2024年01月17日
    浏览(32)
  • 《Spring Boot 实战派》--13.集成NoSQL数据库,实现Elasticsearch和Solr搜索引擎

             关于搜索引擎 我们很难实现 Elasticseach 和 Solr两大搜索框架的效果;所以本章针对两大搜索框架,非常详细地讲解 它们的原理和具体使用方法, 首先 介绍什么是搜索引擎 、如何用 MySQL实现简单的搜索引擎,以及Elasticseach 的 概念和接口类; 然后介绍Elasticseach

    2023年04月09日
    浏览(77)
  • MySQL数据库同步方案

    MYSQL主从同步架构是目前使用最多的数据库架构之一,主从同步使得数据可以从一个数据库服务器复制到其他服务器上,在复制数据时,一个服务器充当主服务器(master),其余的服务器充当从服务器(slave)。 如上图所示,192.168.4.10(主机名为\\\"10.mysql\\\")作为MySQL主数据库,192.168.4.

    2024年02月10日
    浏览(35)
  • 【ElasticSearch】ES与MySQL数据同步方案及Java实现

    elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。 操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用 同步调用方式

    2024年02月15日
    浏览(38)
  • 【ElasticSearch】深入探索 ElasticSearch 对数据的聚合、查询自动补全、与数据库间的同步问题以及使用 RabbitMQ 实现与数据库间的同步

    在本文中,我们将深入探讨 ElasticSearch 在数据处理中的关键功能,包括数据聚合、查询自动补全以及与数据库的同步问题。 首先,我们将聚焦于 ElasticSearch 强大的聚合功能,解释什么是聚合以及如何通过 DSL 语句和 RestClient 实现各种聚合操作。这一功能能够让我们更深入地了

    2024年02月08日
    浏览(36)
  • 数据库同步 Elasticsearch 后数据不一致,怎么办?

    Q1:Logstash 同步 postgreSQL 到 Elasticsearch 数据不一致。 在使用 Logstash 从 pg 库中将一张表导入到 ES 中时,发现 ES 中的数据量和 PG 库中的这张表的数据量存在较大差距。如何快速比对哪些数据没有插入?导入过程中,Logstash 日志没有异常。PG 中这张表有 7600W。 Q2:mq 异步双写数

    2024年02月15日
    浏览(30)
  • 【es数据库】python 使用Elasticsearch数据库

    Elasticsearch是一个开源的高扩展性搜索引擎,它可以快速地存储、搜索和分析大量的数据。 使用Python语言和Elasticsearch,可以轻松地创建和操作“数据库”和“数据库表”,而且具备分布式和高扩展性的特点,适用于大规模数据存储与搜索场景。 ES是一种文档数据库,它并不像

    2024年02月12日
    浏览(30)
  • 【ES数据库】Elasticsearch安装使用

    Elasticsearch 和 MongoDB/Redis 类似,是非关系型数据库,从索引文档到文档能被搜索到只有一个轻微的延迟,是采用Restful API标准的可扩展和高可用的实时数据分析的全文搜索工具 Elastic Search 的实现原理是,利用内置分词器(Analyzer)对数据库文本进行分词,将解析出的和数据

    2024年02月04日
    浏览(32)
  • datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)

    1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控) 2.datax版本:自己编译的DataX-datax_v202210 3.hdfs版本:3.1.3 4.hive版本:3.1.2 1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个

    2023年04月23日
    浏览(33)
  • elasticsearch 安装 (es数据库安装详细)

    以下操作在debian11下,其它linux版本相同 安装的是8.6.2版本(2023.3),可以直接复制使用 以下操作默认在root下进行,如果切换用户会说明 1.下载安装包 注意:如果要安装kibana,版本尽量要一致 主体程序从这里下载 链接: es 2.创建es的用户 3.创建es存储位置 存放在/var/es(根据自

    2024年02月05日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包