Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦

这篇具有很好参考价值的文章主要介绍了Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

前言

本篇博客是一篇elasticsearch的使用案例,包括结合MybatisPlus使用ES,如何保证MySQL和es的数据一致性,另外使用了RabbitMQ进行解耦,自定义了发消息的方法。

其他相关的Elasticsearch的文章列表如下:

  • Elasticsearch的Docker版本的安装和参数设置 & 端口开放和浏览器访问

  • Elasticsearch的可视化Kibana工具安装 & IK分词器的安装和使用

  • Elasticsearch的springboot整合 & Kibana进行全查询和模糊查询

引出


1.elasticsearch的使用案例,包括结合MybatisPlus使用ES;
2.如何保证MySQL和es的数据一致性;
3.使用了RabbitMQ进行解耦,自定义了发消息的方法。文章来源地址https://www.toymoban.com/news/detail-719303.html

结合MybatisPlus使用ES

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

        <!--mysql驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--        druid-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
        </dependency>

        <!--  springboot 整合mybaits plus   -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2.进行配置

package com.tianju.es.config;


import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;

/**
 * 你也可以不继承 AbstractElasticsearchConfiguration 类,而将 ESConfig 写成一般的配置类的型式。
 * 不过继承 AbstractElasticsearchConfiguration 好处在于,它已经帮我们配置好了elasticsearchTemplate 直接使用。
 */
@Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {
    @Override
    public RestHighLevelClient elasticsearchClient() {
        ClientConfiguration clientConfiguration =
                ClientConfiguration.builder()
                        .connectedTo("192.168.111.130:9200")
                        .build();
        return RestClients.create(clientConfiguration).rest();
    }
}

3.实体类上加入注解

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.math.BigDecimal;

/**
 * 产品,包括库存,价格信息
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("finance_sku")
@Document(indexName = "finance_sku")
public class FinanceSkuES {
    @TableId(value = "ID",type = IdType.AUTO)
    private Long id;
    @TableField("finance_sku_describe")
    @Field(index = true,analyzer = "ik_smart",
            searchAnalyzer = "ik_smart",type = FieldType.Text)
    private String detail; // 详情
    @TableField("finance_sku_price")
    private BigDecimal price;
    @TableField("finance_sku_stock")
    private Long stock;
    @TableField("finance_state")
    private Integer status;
}

参数解释

@Document(indexName = "books", shards = 1, replicas = 0)
@Data
public class Book {
    @Id
    @Field(type = FieldType.Integer)
    private Integer id;
    
    @Field(type = FieldType.Keyword)
    private String title;
    
    @Field(type = FieldType.Text)
    private String press;
    
    @Field(type = FieldType.Keyword)
    private String author;
    
    @Field(type = FieldType.Keyword,index=false)
    private BigDecimal price;
    
    @Field(type = FieldType.Text)
    private String description;
}
  • @Document :注解会对实体中的所有属性建立索引;
    indexName = “books” :表示创建一个名称为 “books” 的索引;
    shards = 1 : 表示只使用一个分片;
    replicas = 0 : 表示不使用复制备份;
    index = false: 不能索引查询
  • @Field(type = FieldType.Keyword) : 用以指定字段的数据类型。

4.创建操作的 Repository

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

从它的祖先们那里继承了大量的现成的方法,除此之外,它还可以按 spring data 的规则定义特定的方法。

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.mapper;

import com.tianju.es.entity.FinanceSkuES;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;

/**
 * 操作es,类似于之前的mapper
 */
@Repository
public interface SkuESMapper extends ElasticsearchRepository<FinanceSkuES, Long> {

    /**
     * 根据关键字进行 分词 分页查询 sku数据
     * @param detail 查询条件
     * @param pageable 分页
     * @return
     */
    Page<FinanceSkuES> findFinanceSkuESByDetail(String detail, Pageable pageable);

    /**
     * 根据id进行删除
     * @param id
     */
    void removeFinanceSkuESById(Long id);

}

5.初始化es中的数据

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

运行的后台信息

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

查看es页面的信息,index management

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

6.进行全查询以及分页

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

进行全查询

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

{
  "content": [
    {
      "id": 1,
      "detail": "HUAWEI MateBook X Pro 2023 微绒典藏版 13代酷睿i7 32GB 2TB 14.2英寸3.1K原色全面屏 墨蓝",
      "price": 13999.0,
      "stock": 50,
      "status": 1
    },
    {
      "id": 2,
      "detail": "HUAWEI Mate 60 Pro+ 16GB+1TB 宣白",
      "price": 9999.0,
      "stock": 60,
      "status": 1
    },
    {
      "id": 3,
      "detail": "iPhone 15 Pro Max 超视网膜 XDR 显示屏",
      "price": 9299.0,
      "stock": 46,
      "status": 1
    },
    {
      "id": 4,
      "detail": "MacBook Air Apple M2 芯片 8 核中央处理器 8 核图形处理器 8GB 统一内存 256GB 固态硬盘",
      "price": 8999.0,
      "stock": 60,
      "status": 1
    }
  ],
  "pageable": {
    "sort": {
      "empty": true,
      "sorted": false,
      "unsorted": true
    },
    "offset": 0,
    "pageSize": 4,
    "pageNumber": 0,
    "paged": true,
    "unpaged": false
  },
  "totalElements": 4,
  "last": true,
  "totalPages": 1,
  "number": 0,
  "size": 4,
  "sort": {
    "empty": true,
    "sorted": false,
    "unsorted": true
  },
  "numberOfElements": 4,
  "first": true,
  "empty": false
}

带条件分页查询

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

注意分页查询的page从0开始,尝试发现需要输入分词器分词后最小单元,比如hu不是最小单元,而HUAWEI是

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

分词器进行分词的结果

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

es和mysql的数据一致性

延迟双删

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

    @Override
    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {
        // 把es看做是缓存,如何保证es 和 mysql的 数据一致性?
        // 延迟双删的模式
        // 1.先删除缓存 es
        skuESMapper.deleteAll();
        // 2.更新数据库 mysql
        updateById(financeSkuES);
        // 3.延时操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        // 4.再次删除缓存 es
        skuESMapper.deleteAll();

        // 5.最后更新缓存 es
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: "+byId);
        return byId.get();
    }

上面代码有不妥的地方,我这里是修改,结果一开始直接从es中全部删除,应该是根据id把修改的数据删除,然后把修改好的数据set进es里面

加锁的方式

感觉好像没什么用的样子,就是用了一下加锁

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

用rabbitmq进行解耦

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

配置yml文件

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

spring:
  main:
    allow-circular-references: true
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    ### 本地的数据库
    url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true
    username: root
    password: 123

  # redis的相关配置
  redis:
    host: 119.3.162.127
    port: 6379
    database: 0
    password: Pet3927

  # rabbitmq相关
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username: admin
    password: 123
    virtual-host: /test

    # 生产者保证消息可靠性
    publisher-returns: true
    publisher-confirm-type: correlated

    # 设置手动确认
    listener:
      simple:
        acknowledge-mode: manual

rabbitmq的配置类

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

将Java对象转换成json字符串传输

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    public static final String ES_EXCHANGE = "es_exchange";
    public static final String ES_QUEUE = "es_queue";
    public static final String ES_KEY = "es_key";

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(ES_EXCHANGE);
    }

    @Bean
    public Queue esQueue(){
        return new Queue(ES_QUEUE);
    }

    @Bean
    public Binding esQueueToDirectExchange(){
        return BindingBuilder.bind(esQueue())
                .to(directExchange())
                .with(ES_KEY);
    }

    /**
     * 将对象转换为json字符串
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());// 修改转换器
        return rabbitTemplate;
    }

}

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

callback回调方法

package com.tianju.es.rabbit;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * 生产者消息可靠性
 */
// RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
@Configuration
@Slf4j
public class CallbackConfig
        implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 初始化
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setMandatory(true);
    }

    /**
     * 不管成功或者失败都会执行
     * @param correlationData correlation对象需要在 发送消息时候 给
     * @param ack true表示成功,false表示发送失败
     * @param cause 如果失败的话,会写失败原因;如果成功,返回为null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.debug("ack是否成功:"+ack);
        log.debug("cause信息:"+cause);

        if (correlationData!=null){
            JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody());
            String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();
            String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();
            log.debug("消息体:"+jsonObject);
            log.debug("交换机:"+exchange);
            log.debug("路由key:"+routingKey);
        }


        if (ack){
            return;
        }

        // 失败了

        // 1、重试重试上限次数(默认值5)每重试一次时间间隔会增加
        // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。
        // 重发上限次数(默认值5)超过阈值会转人工处理

        // 2、把消息体、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。
        // 重发上限次数(默认值5)超过阈值会转人工处理
        // 2.1需要把相关的信息存放到数据中,表字段:消息体、交换机名称、路由键、状态、次数
        // 2.2定时任务(单体:spring定时任务  分布式:XxL-job),发送消息




    }


    /**
     * 只有失败了才会执行
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。

    }
}

自定义发消息工具类

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.common.util;


import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;


@Slf4j
public class RabbitUtil {

    /**
     * 延迟队列,发送消息,到达时间后进入死信队列中
     * @param rabbitTemplate 调用的rabbitTemplate
     * @param redisTemplate 用来在redis里面存token
     * @param msg 发送的消息
     * @param token 发送的token,用于保证幂等性
     * @param ttl 如果是延迟消费,则消息的过期时间,到达改时间后进入死信交换机,到死信队列中
     * @param exchange 交换机名字
     * @param routingKey 路由键名字
     * @param <T> 发送消息的实体类
     */
    public static  <T> void sendMsg(RabbitTemplate rabbitTemplate,
                                    StringRedisTemplate redisTemplate,
                                    T msg,String token,Integer ttl,
                                    String exchange,String routingKey) {

        log.debug("给交换机[{}]通过路由键[{}]发送消息 {},token为{}",exchange,routingKey,msg,token);

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                redisTemplate.opsForValue().set(token, token,5*60000);
                message.getMessageProperties().setMessageId(token);
                if (ttl!=null){
                    message.getMessageProperties().setExpiration(ttl.toString());
                }
                return message;
            }
        };

        CorrelationData correlationData = new CorrelationData();
        // 消息体
        Message message = new Message(JSON.toJSONBytes(msg));
        // 交换机名称
        message.getMessageProperties().setReceivedExchange(exchange);
        // 路由键
        message.getMessageProperties().setReceivedRoutingKey(routingKey);
        correlationData.setReturnedMessage(message);

        // 发送MQ消息
        rabbitTemplate.convertAndSend(exchange, // 发给交换机
                routingKey, // 根据这个routingKey就会给到TTL队列,到时间成死信,发给死信交换机,到死信队列
                msg,
                messagePostProcessor,
                correlationData
        );
    }
}

进行消息的发送

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

接口

package com.tianju.es.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.tianju.es.entity.FinanceSkuES;

public interface SkuService extends IService<FinanceSkuES> {

    /**
     * 延迟双删的方式,保证es 缓存 和 mysql数据库的数据一致性
     * @param financeSkuES 修改的数据
     * @return
     */
    FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);

    /**
     * 加锁的方式,不过感觉没啥用的样子
     * @param financeSkuES
     * @return
     */
    FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);

    /**
     * 通过rabbitmq进行解耦
     * @param financeSkuES
     * @return
     */
    String updateByIdRabbitMQ(FinanceSkuES financeSkuES);
}

实现类

package com.tianju.es.service.impl;

import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tianju.common.util.RabbitUtil;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import com.tianju.es.mapper.SkuMybatisPlusMapper;
import com.tianju.es.rabbit.RabbitConfig;
import com.tianju.es.service.SkuService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.Optional;
import java.util.UUID;

@Service
public class SkuServiceImpl extends ServiceImpl<SkuMybatisPlusMapper,FinanceSkuES>
        implements SkuService {

    @Autowired
    private SkuESMapper skuESMapper;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {
        // 把es看做是缓存,如何保证es 和 mysql的 数据一致性?
        // 延迟双删的模式
        // 1.先删除缓存 es
        skuESMapper.deleteAll();
        // 2.更新数据库 mysql
        updateById(financeSkuES);
        // 3.延时操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        // 4.再次删除缓存 es
        skuESMapper.deleteAll();

        // 5.最后更新缓存 es
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: "+byId);
        return byId.get();
    }

    @Override
    public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) {
        // 第二种方式加锁
        String uuid = UUID.randomUUID().toString();
        // 相当于setnx指令
        Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid);
        try {
            if (skuLock){ // 抢到了锁
                skuESMapper.deleteAll();
                updateById(financeSkuES);
            }
        }finally {
            if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){
                stringRedisTemplate.delete("skuLock");
            }
        }
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: "+byId);
        return byId.get();
    }

    @Override
    public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) {
        // 采用rabbitmq进行解耦

        updateById(financeSkuES);
        FinanceSkuES skuES = getById(financeSkuES.getId());
        String uuid = IdUtil.fastUUID();

        RabbitUtil.sendMsg(
                rabbitTemplate,stringRedisTemplate,skuES,uuid,null,
                RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY
        );

        return "已经发送消息:"+skuES;
    }
}

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

接收到消息,更新es

接收到消息进行es的更新,把原来的删除,把最新的set进去

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

package com.tianju.es.rabbit;

import com.rabbitmq.client.Channel;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class ESListener {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private SkuESMapper skuESMapper;

    @RabbitListener(queues = RabbitConfig.ES_QUEUE)
    public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {
        String messageId = message.getMessageProperties().getMessageId();
        log.debug("进行业务----> 监听到队列{}的消息,messageId为{}",financeSkuES,messageId);

        try {
            // 幂等性
            if (redisTemplate.delete(messageId)){
                // 根据id删除原有的 es 数据
                // 然后把新的数据set进来
                log.debug("处理es的业务,删除原有的,替换最新的");
                skuESMapper.removeFinanceSkuESById(financeSkuES.getId());
                skuESMapper.save(financeSkuES);
            }

            // 手动签收消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }catch (Exception e){
            // 幂等性
            redisTemplate.opsForValue().set(messageId,messageId,5*60000);

            // 1、重试重试上限次数(默认值5) 每重试一次时间间隔会增加
            // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。
            // 重发上限次数(默认值5)超过阈值会转人工处理

            // 已知的消息,交换机,路由器,消息 message.getBody()  消息发送给的是监听的队列

            try {
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq

后台打印的日志

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦,SpringBoot,# Message Queue,elasticsearch,mysql,rabbitmq


总结

1.elasticsearch的使用案例,包括结合MybatisPlus使用ES;
2.如何保证MySQL和es的数据一致性;
3.使用了RabbitMQ进行解耦,自定义了发消息的方法。

到了这里,关于Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Logstash同步mysql数据到Elasticsearch(亲自踩坑)_将mysql中的数据导入es搜索引擎利用logstash(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月28日
    浏览(52)
  • 淘宝太细了:mysql 和 es 的5个一致性方案,你知道吗?

    在40岁老架构师 尼恩的 读者交流群 (50+)中,最近有小伙伴拿到了一线互联网企业如拼多多、极兔、有赞、希音的面试资格,遇到一几个很重要的面试题: 说5种mysql 和 elasticsearch 数据一致性方案 与之类似的、其他小伙伴遇到过的问题还有: Mysql 和 ES 数据一致性问题及方案?

    2024年02月13日
    浏览(35)
  • DataX实现Mysql与ElasticSearch(ES)数据同步

    jdk1.8及以上 python2 查看是否安装成功 查看python版本号,判断是否安装成功 在datax/job下,json格式,具体内容及主要配置含义如下 mysqlreader为读取mysql数据部分,配置mysql相关信息 username,password为数据库账号密码 querySql:需要查询数据的sql,也可通过colums指定需要查找的字段(

    2024年02月05日
    浏览(60)
  • 如何保证ES和数据库的数据一致性?

    在业务中,我们通常需要把数据库中的数据变更同步到ES中,那么如何保证数据库和ES的一致性呢?通常有以下几种做法: 双写 在代码中,对数据库和ES进行双写,并且先操作本地数据库,后操作ES,而且还需要把两个操作放到一个事务中:  在以上逻辑中,如果写数据库成功

    2024年04月28日
    浏览(53)
  • 【ElasticSearch】ES与MySQL数据同步方案及Java实现

    elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。 操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用 同步调用方式

    2024年02月15日
    浏览(51)
  • SpringCloud分布式搜索引擎、数据聚合、ES和MQ的结合使用、ES集群的问题

    目录 数据聚合 聚合的分类 ​编辑 DSL实现Bucket聚合 ​编辑  DSL实现Metrics聚合​编辑 RestAPI实现聚合  对接前端接口​编辑  自定义分词器​编辑 Completion suggester查询 Completion suggester查询 酒店数据自动补全 实现酒店搜索框界面输入框的自动补全  数据同步问题分析​编辑 同

    2024年02月16日
    浏览(50)
  • JAVA面试题分享二百五十五:mysql 和 es 的5个一致性方案,你知道吗?

    目录 问题场景分析 方案一:同步双写 方案二:异步双写 方案2.1 使用内存队列(如阻塞队列)异步 方案2.2 使用消息队列(如阻塞队列)异步 方案三:定期同步 方案四:数据订阅 方案五:etl 工具 咱们的生产需求上,为了便于商品的聚合搜索,高速搜索,采用两大优化方案

    2024年02月04日
    浏览(45)
  • Elasticsearch实战(二十三)---ES数据建模与Mysql对比 一对多模型

    我们如何把Mysql的模型合理的在ES中去实现? 就需要你对要存储的数据足够的了解,及对应用场景足够的深入分析,才能建立一个合适的模型,便于你后期扩展 一对一 模型 一对多 模型 多对多 模型 1.一对多 模型 我们现在有两个模型, 一个商品Product, 一个分类Category , 我们对比下一

    2024年02月08日
    浏览(59)
  • Elasticsearch实战(二十二)---ES数据建模与Mysql对比 一对一模型

    我们如何把Mysql的模型合理的在ES中去实现? 就需要你对要存储的数据足够的了解,及对应用场景足够的深入分析,才能建立一个合适的模型,便于你后期扩展 实体之间的关系: 一对一 模型 一对一(1:1):一个实体最多只能能另一个实体相关联,另一个实体如是。 例:一个只能

    2024年02月10日
    浏览(58)
  • 【ES数据库】Elasticsearch安装使用

    Elasticsearch 和 MongoDB/Redis 类似,是非关系型数据库,从索引文档到文档能被搜索到只有一个轻微的延迟,是采用Restful API标准的可扩展和高可用的实时数据分析的全文搜索工具 Elastic Search 的实现原理是,利用内置分词器(Analyzer)对数据库文本进行分词,将解析出的和数据

    2024年02月04日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包