【ElasticSearch】ES与MySQL数据同步方案及Java实现

这篇具有很好参考价值的文章主要介绍了【ElasticSearch】ES与MySQL数据同步方案及Java实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、同步实现思路

elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。
【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java

1、方案一:同步调用

操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用

【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java
同步调用方式下,业务耦合太多。

2、方案二:异步通知

引入消息队列,hotel-admin将数据写入mysql,并且自己再往MQ发条消息,说"数据更新了",任务就算完成,至于后面的hotel-demo什么时候更新ES,花费了多久,那是hotel-demo自己的事情。
【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java

3、方案三:监听binlog

使用canal中间件去监听mysql的binlog,当binlog发生改变,就通知hotel-demo,和上面不同的时,更加解放了hotel-admin这个上游服务,它往mysql写完数据就算任务完成,不用发消息,也不用调用其他服务,达到了完全解耦合
【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java
其实mysql的binlog发生改变,搭配cancel,就像方案二的hotel-admin服务发了一条消息到MQ。


三种实现方式的对比:
【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java

二、实现ES与MySQL数据同步

1、导入hotel-admin工程

启动服务,访问localhost:{spring.service.port}

【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java
在hotel-admin服务中,模拟MySQL数据的增删改查。

2、项目分析

mysql的增删改动作需要同步到es中,但对es来说,增和改可以归为一类,就看文档id存在不存在了。接下来使用方案二,及MQ实现数据同步,思路如下;

  • 声明exchange、queue、RoutingKey
  • 在hotel-admin中的增、删、改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据

模型如下:

【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java

3、SpringAMQP整合

  • 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>    		
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


  • 临时启动个rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# 访问host:15672,用户和密码为默认的guest
  • 在hotel-admin中的application.yml,添加mq连接信息
spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: guest # 用户名
      password: guest # 密码


  • 最后,记得给消费方也引入AMQP依赖,并添加上mq的连接信息

4、声明队列和交换机

  • 在常量目录下定义队列和交换机的名字
package cn.llg.hotel.constants;

public class HotelMqConstants {
	//交换机名称
    public static final String EXCHANGE_NAME = "hotel.topic";
    //新增和修改队列
    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
    //删除队列
    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
    //RoutingKey
    public static final String INSERT_KEY = "hotel.insert";
    public static final String DELETE_KEY = "hotel.delete";
}

  • 接下来声明队列和交换机,可以基于注解,也可以基于Bean,后者复杂些,这里演示后者
package cn.llg.hotel.config;

import cn.llg.hotel.constants.HotelMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @date 2023/7/12
 */
@Configuration
public class MqConfig {

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
    }

    /**
     * 绑定队列和交换机关系
     */
    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder
                .bind(insertQueue())
                .to(topicExchange())
                .with(HotelMqConstants.INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder
                .bind(deleteQueue())
                .to(topicExchange())
                .with(HotelMqConstants.DELETE_KEY);
    }
}


5、发送消息MQ

注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:

  • 交换机名称
  • routingKey
  • 消息内容,这里消息体尽量小些,别把一整个对象发过去
package cn.llg.hotel.web;

import cn.llg.hotel.constants.HotelMqConstants;
import cn.llg.hotel.pojo.Hotel;
import cn.llg.hotel.pojo.PageResult;
import cn.llg.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.security.InvalidParameterException;

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        // 新增酒店
        hotelService.save(hotel);
        // 发送MQ消息,MQ是基于内存的,你把整个酒店对象hotel发过去很容易占满队列,发个主键ID就好,消息体尽量小些
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);

        // 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);

        // 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
    }
    //其他接口
    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }
    
    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }
}



6、监听MQ消息

hotel-demo整合完SpringAMQP后,在hotel-demo中监听消息。

  • 新建类HotelListener类,并加@Component注解以Bean的形式管理
package cn.llg.hotel.mq;

import cn.llg.hotel.constants.HotelMqConstants;
import cn.llg.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @date 2023/7/13
 */
@Component
public class HotelListener {

    @Resource
    IHotelService hotelService;

    /**
     * 监听酒店新增或者修改的业务
     * id接受一个Long,因为发送过来的是一个Long id
     * @param id 酒店ID
     */
    @RabbitListener(queues = HotelMqConstants.INSERT_QUEUE_NAME)
    public void listenHotelInsertAndUpdate(Long id){
        hotelService.insertDocById(id);
    }

    /**
     * 监听酒店删除业务
     */
    @RabbitListener(queues = HotelMqConstants.DELETE_QUEUE_NAME)
    public void listenHotelDelete(Long id){
        hotelService.deleteDocById(id);
    }
}
  • 拿到MQ中的酒店id后,使用JavaHighLevelClient对象来更新ES数据
package cn.llg.hotel.service;

import cn.llg.hotel.domain.dto.RequestParams;
import cn.llg.hotel.domain.pojo.Hotel;
import cn.llg.hotel.domain.vo.PageResult;
import com.baomidou.mybatisplus.extension.service.IService;


public interface IHotelService extends IService<Hotel> {

    void insertDocById(Long id);

    void deleteDocById(Long id);
}
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {

    @Resource
    RestHighLevelClient client;

    @Override
    public void insertDocById(Long id) {
        try {
            //0.根据ID查数据,并转为文档类型
            Hotel hotel = getById(id);
            HotelDoc hotelDoc = new HotelDoc(hotel);
            //1.准备request
            IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
            //2.准备DSL
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            //3.发送请求
            client.index(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteDocById(Long id) {

        try {
            //1.准备request
            DeleteRequest request = new DeleteRequest("hotel",id.toString());
            //2.发送请求
            client.delete(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }
}

最后补充下上面的Hotel和HotelDoc之间的转换关系:

@Data
@TableName("tb_hotel")
public class Hotel {
    @TableId(type = IdType.INPUT)
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String longitude;
    private String latitude;
    private String pic;
}

@Data
@NoArgsConstructor
public class HotelDoc {
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String location;
    private String pic;
    //距离
    private Object distance;
    //是否充广告
    private Boolean isAD;
    //ES中的completion,后面存数组,这里可以对应成List
    private List<String> suggestion;

    public HotelDoc(Hotel hotel) {
        this.id = hotel.getId();
        this.name = hotel.getName();
        this.address = hotel.getAddress();
        this.price = hotel.getPrice();
        this.score = hotel.getScore();
        this.brand = hotel.getBrand();
        this.city = hotel.getCity();
        this.starName = hotel.getStarName();
        this.business = hotel.getBusiness();
        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
        this.pic = hotel.getPic();
        if(this.business.contains("/")){
            //此时business有多个值,需要分开后放入suggestion
            String[] arr = this.business.split("/");
            //添加元素
            this.suggestion = new ArrayList<>();
            Collections.addAll(this.suggestion,arr);
            this.suggestion.add(this.brand);
        }else{
            this.suggestion = Arrays.asList(this.brand,this.business);
        }

    }
}

7、测试同步功能

重启两个服务,查看MQ:

【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java

点击队列查看详情,可以看到绑定交换机成功:

【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java
接下来去酒店管理页面修改一条酒店信息(我直接调接口了,不去页面改)

【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java
在酒店搜索页面搜一下:

【ElasticSearch】ES与MySQL数据同步方案及Java实现,ElasticSearch,elasticsearch,mysql,java

可以看到ES数据跟随MySQL更新成功!文章来源地址https://www.toymoban.com/news/detail-558244.html

到了这里,关于【ElasticSearch】ES与MySQL数据同步方案及Java实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • elasticsearch 实现与mysql 数据同步

    mysql8相关的安装可以看下另一篇博客 https://editor.csdn.net/md/?articleId=135905811 1.下载安装logstash 2.logstash 配置 logstash.yml 3.pipelines.yml 配置 同步方式: 1.logstash 2.go-mysql-elasticsearch 3.canal(阿里云) 一.logstash 1.安装mysql-connector-java 插件(需与mysql 版本一致) 2.配置logstash.conf 3.启动logsta

    2024年04月12日
    浏览(32)
  • 使用Logstash同步mysql数据到Elasticsearch(亲自踩坑)_将mysql中的数据导入es搜索引擎利用logstash(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月28日
    浏览(45)
  • 利用MQ实现mysql与elasticsearch数据同步

    1.声明exchange、queue、RoutingKey 2. 在hotel-admin中进行增删改(SQL),完成消息发送 3. 在hotel-demo中完成消息监听,并更新elasticsearch数据 4. 测试同步 我这里的mq是挂在了docker上,虚拟机地址是192.168.116.128。到时候这个根据自己的项目改就行 在hotel-demo中,定义配置类,声明队列、交

    2024年02月09日
    浏览(28)
  • Elasticsearch实战-数据同步(解决es数据增量同步)

    之前测试的数据都是一次从mysql导入到es,随着时间的推移,每天都有可能发生增删改查,不可能每次都全量同步,所以需要考虑增量同步问题。 缺点: 耦合性高,服务之间会相互影响 依赖消息队列的可靠性 启动:端口8099

    2024年02月11日
    浏览(63)
  • docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

    🚀 本文提供的指令完全可以按顺序逐一执行,已进行了多次测试。因此如果你是直接按照我本文写的指令一条条执行的,而非自定义修改过,执行应当是没有任何问题的。 🚀 本文讲述:使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elas

    2024年02月02日
    浏览(43)
  • 本地部署Canal笔记-实现MySQL与ElasticSearch7数据同步

    本地搭建canal实现mysql数据到es的简单的数据同步,仅供学习参考 建议首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki 本地搭建MySQL数据库 本地搭建ElasticSearch 本地搭建canal-server 本地搭建canal-adapter 本地环境为window11,大部分组件采用docker进行部署,MySQL采用8.0.27, 推荐

    2024年02月02日
    浏览(43)
  • 基于Canal实现Mysql数据实时同步到Elasticsearch(Docker版)

    1、Canal简介   Canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。   Canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向My

    2024年02月10日
    浏览(38)
  • DolphinScheduler 调度 DataX 实现 MySQL To ElasticSearch 增量数据同步实践

    基于SQL查询的 CDC(Change Data Capture): 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据。也就是我们说的基于SQL查询抽取; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 不保障实时性,基于离线调度存在天然的

    2024年02月03日
    浏览(39)
  • 一种Mysql和Mongodb数据同步到Elasticsearch的实现办法和系统

    本文分享自天翼云开发者社区《一种Mysql和Mongodb数据同步到Elasticsearch的实现办法和系统》,作者:l****n 核心流程如下:   核心逻辑说明: MySQL Binlog解析 : 首先,从MySQL的二进制日志(Binlog)中解析出表名。这一步骤非常关键,因为我们只关注特定表的数据变更。 进一步,我

    2024年02月05日
    浏览(38)
  • Elasticsearch 系列(六)- ES数据同步和ES集群

    本章将和大家分享ES的数据同步方案和ES集群相关知识。废话不多说,下面我们直接进入主题。 1、数据同步问题 Elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,Elasticsearch也必须跟着改变,这个就是Elasticsearch与mysql之间的数据同步。 在微服务中,负责酒

    2024年04月28日
    浏览(78)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包