elasticsearch实现mysql数据同步

这篇具有很好参考价值的文章主要介绍了elasticsearch实现mysql数据同步。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。

常见的数据同步方案有三种:

  • 同步调用

  • 异步通知

  • 监听binlog

以下使用异步通知同步elasticsearch的数据 

引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<!--fastjson-->
   <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>1.2.66</version>
</dependency>

配置elasticsearch

import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@MapperScan(basePackages = "cn.itcast.hotel.mapper")
@SpringBootApplication
public class HotelAdminApplication {

    public static void main(String[] args) {
        SpringApplication.run(HotelAdminApplication.class, args);
    }

    // 最新版
    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }

    @Bean
    public RestHighLevelClient client(){
        return  new RestHighLevelClient(RestClient.builder(
                HttpHost.create("http://43.139.59.28:9200")
        ));
    }

}

配置交换机、队列

声明交换机、队列名称 

constatnts包下新建一个类MqConstants,存储交换机和队列的名称

public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

声明交换机、队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }

    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

发送MQ消息  

在增、删、改业务中分别发送MQ消息

@Resource
    private RabbitTemplate rabbitTemplate;
    
    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
    }

接收MQ消息

接收到MQ消息要做的事情包括:

  • 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库

  • 删除消息:根据传递的hotel的id删除索引库中的一条数据

文档类

获取es传来的值文章来源地址https://www.toymoban.com/news/detail-697048.html



import lombok.Data;
import lombok.NoArgsConstructor;
 
@Data
@NoArgsConstructor
public class HotelDoc {
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String location;
    private String pic;
    // 排序时的 距离值
    private Object distance;
    public HotelDoc(Hotel hotel) {
        this.id = hotel.getId();
        this.name = hotel.getName();
        this.address = hotel.getAddress();
        this.price = hotel.getPrice();
        this.score = hotel.getScore();
        this.brand = hotel.getBrand();
        this.city = hotel.getCity();
        this.starName = hotel.getStarName();
        this.business = hotel.getBusiness();
        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
        this.pic = hotel.getPic();
    }
}
model类 
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("tb_hotel")
public class Hotel {
    @TableId(type = IdType.INPUT)
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String longitude;
    private String latitude;
    private String pic;
}
 service类
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {

    @Resource
    private RestHighLevelClient client;


    @Override
    public void deleteById(Long id) {
        try {
            // 1.准备Request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            // 2.发送请求
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void insertById(Long id) {
        try {
            // 0.根据id查询酒店数据
            Hotel hotel = getById(id);
            // 转换为文档类型
            HotelDoc hotelDoc = new HotelDoc(hotel);

            // 1.准备Request对象
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
            // 2.准备Json文档
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            // 3.发送请求
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
监听器
import cn.itcast.hotel.constatnts.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听酒店新增或修改的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
        hotelService.insertById(id);
    }

    /**
     * 监听酒店删除的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}

到了这里,关于elasticsearch实现mysql数据同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java SpringBoot API 实现ES(Elasticsearch)搜索引擎的一系列操作(超详细)(模拟数据库操作)

    小编使用的是elasticsearch-7.3.2 基础说明: 启动:进入elasticsearch-7.3.2/bin目录,双击elasticsearch.bat进行启动,当出现一下界面说明,启动成功。也可以访问http://localhost:9200/ 启动ES管理:进入elasticsearch-head-master文件夹,然后进入cmd命令界面,输入npm run start 即可启动。访问http

    2024年02月04日
    浏览(54)
  • ElasticSearch搜索引擎:数据的写入流程

    (1)ES 客户端选择一个节点 node 发送请求过去,这个节点就是协调节点 coordinating node  (2)协调节点对 document 进行路由,通过 hash 算法计算出数据应该落在哪个分片 shard 上,然后根据节点上维护的 shard 信息,将请求转发到对应的实际处理节点node上 shard = hash(document_id) %

    2023年04月14日
    浏览(63)
  • SpringBoot封装Elasticsearch搜索引擎实现全文检索

    注:本文实现了Java对Elasticseach的分页检索/不分页检索的封装 ES就不用过多介绍了,直接上代码: 创建Store类(与ES字段对应,用于接收ES数据) Elasticsearch全文检索接口:不分页检索 Elasticsearch全文检索接口:分页检索 本文实现了Java对Elasticsearch搜索引擎全文检索的封装 传入

    2024年02月04日
    浏览(44)
  • 搜索引擎(大数据检索)论述[elasticsearch原理相关]

    首先需要大致知道搜索引擎有大致几类:1.全文搜索引擎 2.垂直搜索引擎 3.类目搜索引擎等。 1.全文搜索引擎:是全文本覆盖的,百度,google等都是全文本搜索,就是我搜一个词项“方圆”,那么这个词项可以是数字平方的概念,可以是一个人名,可以是一首歌等,所有的相

    2023年04月08日
    浏览(52)
  • 【搜索引擎2】实现API方式调用ElasticSearch8接口

    1、理解ElasticSearch各名词含义 ElasticSearch对比Mysql Mysql数据库 Elastic Search Database 7.X版本前有Type,对比数据库中的表,新版取消了 Table Index Row Document Column mapping Elasticsearch是使用Java开发的,8.1版本的ES需要JDK17及以上版本;es默认带有JDK,如果安装es环境为java8,则会默认使用自带

    2024年04月17日
    浏览(40)
  • 基于Elasticsearch与Hbase组合框架的大数据搜索引擎

    本项目为学校大数据工程实训项目,共开发4周,答辩成绩不错。代码仓库放文章尾,写的不好,代码仅供参考。 对于结构化数据 ,因为它们具有特定的结构,所以我们一般都是可以通过关系型数据库(MySQL,Oracle 等)的二维表(Table)的方式存储和搜索,也可以建立索引。

    2024年02月09日
    浏览(63)
  • Elasticsearch (ES) 搜索引擎: 数据类型、动态映射、多类型(子字段)

    原文链接:https://xiets.blog.csdn.net/article/details/132348634 版权声明:原创文章禁止转载 专栏目录:Elasticsearch 专栏(总目录) ES 映射字段的 数据类型 ,官网文档参考:Field data types。 下面是 ES 常用的一些基本数据类型。 字符串 类型: keyword :类型。 text :文本类型。

    2024年03月23日
    浏览(64)
  • 用SpringBoot和ElasticSearch实现网盘搜索引擎,附源码,详细教学

    可以扫描小程序码体验,切换到搜索Tabbar。 小程序端界面实现 网页端实现界面 对外提供的api 接口声明 接口实现 执行搜索策略。 提供2种搜索策略,分别是MySQL和ElasticSearch搜索策略。在配置文件进行配置搜索策略。 搜索类型枚举 配置文件中的搜索策略相关配置 es搜索策略实

    2024年02月08日
    浏览(42)
  • elasticsearch[五]:深入探索ES搜索引擎的自动补全与拼写纠错:如何实现高效智能的搜索体验

    前一章讲了搜索中的拼写纠错功能,里面一个很重要的概念就是莱文斯坦距离。这章会讲解搜索中提升用户体验的另一项功能 - [自动补全]。本章直接介绍 ES 中的实现方式以及真正的搜索引擎对自动补全功能的优化。 大家对上面的这个应该都不陌生,搜索引擎会根据你输入的

    2024年01月24日
    浏览(59)
  • 如何使用内网穿透工具实现Java远程连接本地Elasticsearch搜索分析引擎

    简单几步,结合Cpolar 内网穿透工具实现Java 远程连接操作本地分布式搜索和数据分析引擎Elasticsearch。 Cpolar内网穿透提供了更高的安全性和隐私保护,通过使用加密通信通道,Cpolar技术可以确保数据传输的安全性,这为用户和团队提供了更可靠的保护,使他们能够放心地处理和

    2024年02月04日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包