Spring Boot 整合Redis实现消息队列

这篇具有很好参考价值的文章主要介绍了Spring Boot 整合Redis实现消息队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、简介

  本篇文章主要来讲Spring Boot 整合Redis实现消息队列,实现redis用作消息队列有多种方式,比如:

  • 基于List rpush+lpop lpush+rpop
  • 基于List rpush+blpop lpush+brpop (阻塞式获取消息)
  • 基于Sorted Set 的优先级队列
  • Redis Stream (Redis5.0版本开始)
  • Pub/Sub 机制

  不过这里讲的是Pub/Sub 机制的,这种方式优缺点大致如下:

优点:

  • 一个消息可以发布到多个消费者
  • 消费者可以同时订阅多个信道,因此可以接收多种消息(处理时先根据信道判断)
  • 消息即时发送,消费者会自动接收到信道发布的消息

缺点:

  • 消息发布时,如果客户端不在线,则消息丢失
  • 消费者处理消息时出现了大量消息积压,则可能会断开通道,导致消息丢失
  • 消费者接收消息的时间不一定是一致的,可能会有差异(业务处理需要判重)

二、maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.alian</groupId>
    <artifactId>redis-message-queue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis-message-queue</name>
    <description>redis-message-queue</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <project.package.directory>target</project.package.directory>
        <java.version>1.8</java.version>
        <!--com.fasterxml.jackson 版本-->
        <jackson.version>2.9.10</jackson.version>
        <!--lombok 版本-->
        <lombok.version>1.16.14</lombok.version>
        <!--阿里巴巴fastjson 版本-->
        <fastjson.version>1.2.68</fastjson.version>
        <!--junit 版本-->
        <junit.version>4.12</junit.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--redis依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!--用于序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <!--java 8时间序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <!--JSON-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!--日志输出-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、编码实现

3.1、配置文件

application.properties

# 端口
server.port=8090
# 上下文路径
server.servlet.context-path=/redisQueue

# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
#spring.redis.host=192.168.0.193
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=10
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=20000
# 读时间(毫秒)
spring.redis.timeout=10000
# 连接超时时间(毫秒)
spring.redis.connect-timeout=10000

3.2、配置类

RedisConfiguration.java

package com.alian.queue.config;

import com.alian.queue.listener.RedisMessageListenerListener;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;

@Slf4j
@Configuration
@EnableCaching
public class RedisConfiguration {

    /**
     * redis配置
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        // 实例化redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        //设置连接工厂
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // key采用String的序列化
        redisTemplate.setKeySerializer(keySerializer());
        // value采用jackson序列化
        redisTemplate.setValueSerializer(valueSerializer());
        // Hash key采用String的序列化
        redisTemplate.setHashKeySerializer(keySerializer());
        // Hash value采用jackson序列化
        redisTemplate.setHashValueSerializer(valueSerializer());
        //执行函数,初始化RedisTemplate
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic("TOPIC_USER");
    }

    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisMessageListenerListener redisMessageListenerListener) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, channelTopic());
        return redisMessageListenerContainer;
    }

    /**
     * key类型采用String序列化
     *
     * @return
     */
    private RedisSerializer<String> keySerializer() {
        return new StringRedisSerializer();
    }

    /**
     * value采用JSON序列化
     *
     * @return
     */
    private RedisSerializer<Object> valueSerializer() {
        //设置jackson序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        //设置序列化对象
        jackson2JsonRedisSerializer.setObjectMapper(getMapper());
        return jackson2JsonRedisSerializer;
    }


    /**
     * 使用com.fasterxml.jackson.databind.ObjectMapper
     * 对数据进行处理包括java8里的时间
     *
     * @return
     */
    private ObjectMapper getMapper() {
        ObjectMapper mapper = new ObjectMapper();
        //设置可见性
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        //默认键入对象
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        //设置Java 8 时间序列化
        JavaTimeModule timeModule = new JavaTimeModule();
        timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
        timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
        //禁用把时间转为时间戳
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        mapper.registerModule(timeModule);
        return mapper;
    }

}

  这里就是在整合redis的前提下(如果不懂可以参考:SpringBoot整合redis(redis支持单节点和集群)),然后新增了如下配置:

    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic("TOPIC_USER");
    }

    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisMessageListenerListener redisMessageListenerListener) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, channelTopic());
        return redisMessageListenerContainer;
    }

  RedisMessageListenerContainer 是为Redis消息侦听器 MessageListener 提供异步行为的容器。处理侦听、转换和消息分派的低级别详细信息。它与低级别Redis(每个订阅一个连接)相反,容器只使用一个连接,该连接对所有注册的侦听器都是“多路复用”的,消息调度是通过任务执行器完成的。容器以惰性方式使用连接(仅当至少配置了一个侦听器时才使用连接),同时添加和删除侦听器具有未定义的结果,强烈建议对这些方法进行相应的同步/排序。

  • 创建一个Redis消息监听器容器
  • 设置 Redis 连接工厂
  • 将监听器和管道名想绑定
  • 配置管道名和推送消息时的管道名要一致,不然监听器监听不到消息

我这里使用的是主题订阅:ChannelTopic,你也可以使用模式匹配:PatternTopic,从而匹配多个信道。

3.3、监听器

package com.alian.queue.listener;

import com.alian.queue.service.ConsumeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedisMessageListenerListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private ConsumeService consumeService;

    /**
     * 消息处理
     *
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(pattern);
        log.info("onMessage --> 消息通道是:{}", channel);
        try {
            RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
            Object deserialize = valueSerializer.deserialize(message.getBody());
            log.info("反序列化的结果:{}", deserialize);
            if (deserialize == null) {
                return;
            }
            String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
            log.info("计算得到的key: {}", md5DigestAsHex);
            Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, "1", 20, TimeUnit.SECONDS);
            if (Boolean.TRUE.equals(result)) {
                // redis消息进行处理
                consumeService.processMessage(channel, deserialize.toString());
                log.info("处理redis消息完成");
            } else {
                log.info("其他服务处理中");
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("处理redis消息异常:", e);
        }

    }

}

  我们实现MessageListener 接口,就可以通过onMessage()方法接收到消息了,该方法有两个参数:

  • 参数 message getBody() 方法以二进制形式获取消息体, getChannel() 以二进制形式获取消息通道
  • 参数 pattern 二进制形式的消息通道(实际和 message.getChannel() 返回值相同)

3.4、消费服务

ConsumeService.java

package com.alian.queue.service;

import com.alian.queue.dto.UserDto;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ConsumeService {

    public void processMessage(String channel,String message) {
        // 可以根据channel再继续映射到不同的实现
        UserDto userDto = JSONObject.parseObject(message, UserDto.class);
        log.info("接收的结果:{}", userDto);
        // 做业务...
        // 还可以分布式锁幂等处理
    }
}

  这个就是消息处理了,可以根据channel再继续映射到不同的实现,然后业务也可以继续使用分布式锁进行逻辑判断处理,这里就不具体去操作了。

3.5、实体

UserDto.java

package com.alian.queue.dto;

import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

@Data
public class UserDto implements Serializable {

    private String id;//员工ID

    private String name;//员工姓名

    private int age;//员工年龄

    private String department;//部门

    private double salary;//工资

    private LocalDateTime hireDate;//入职时间

    public UserDto() {

    }

    /*
     *  简单的构造方法用于测试
     */
    public UserDto(String id, String name, int age, String department, double salary, LocalDateTime hireDate) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.department = department;
        this.salary = salary;
        this.hireDate = hireDate;
    }
}

四、验证

  我们把上面的服务启动多个实例,这里就用端口区别,分别是 8090 8091,然后发送消息到 Redis,下面使我们的测试类:

@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RedisMessageQueueTest {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Test
    public void sendMessage() {
        UserDto userDto = new UserDto("BAT002", "包雅馨", 25, "财务部", 8800.0, LocalDateTime.of(2016, 11, 10, 8, 30, 0));
        // 注意这里的通道名【TOPIC_USER】要和RedisMessageListenerContainer里面配置的一致
        redisTemplate.convertAndSend("TOPIC_USER", JSON.toJSONString(userDto));
        log.info("发送成功");
    }

}

  注意这里的通道名要和 RedisMessageListenerContainer 里配置的一致,不然消息发送不出去的。

8090的日志

onMessage --> 消息通道是:TOPIC_USER
反序列化的结果:{"age":25,"department":"财务部","hireDate":"2016-11-10T08:30:00","id":"BAT002","name":"包雅馨","salary":8800.0}
计算得到的key: c69dc1563cc892718bf3ee0c5b90320b
接收的结果:UserDto(id=BAT002, name=包雅馨, age=25, department=财务部, salary=8800.0, hireDate=2016-11-10T08:30)
处理redis消息完成

8091的日志

onMessage --> 消息通道是:TOPIC_USER
反序列化的结果:{"age":25,"department":"财务部","hireDate":"2016-11-10T08:30:00","id":"BAT002","name":"包雅馨","salary":8800.0}
计算得到的key: c69dc1563cc892718bf3ee0c5b90320b
其他服务处理中

  从结果上可以看到,分布式服务都可以接收到消息,但是最终只有一台服务会真正进行业务的处理,因为我这里使用了最简单的分布式锁来控制了,实际上 redisTemplate.opsForValue().setIfAbsent() 并不是最优解,尤其是在集群模式,我这里只是为了演示要有这么一个操作,推荐还是使用 Redisson 去做分布式锁更可靠。如果有不懂的可以参考我之前的文章:SpringBoot基于Redisson实现分布式锁并分析其原理

五、优化

  其实我们还可以继续去优化我们的配置,消息的接收都是在频繁的创建线程,从而占用系统资源,我们可以通过线程池的方式去优化,RedisMessageListenerContainer 类中有一个方法setTaskExecutor(Executor taskExecutor)可以为监听容器配置线程池。配置线程池以后,所有的线程都会由该线程池产生,因此我们可以通过调节线程池来控制队列监听的速率。修改步骤大致如下:

5.1、注册任务执行器

  RedisConfiguration 新注册Bean:Executor

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置核心线程数
        executor.setCorePoolSize(5);
        //设置最大线程数
        executor.setMaxPoolSize(10);
        //设置任务队列容量
        executor.setQueueCapacity(10);
        //设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        //设置默认线程名称(线程前缀名称,有助于区分不同线程池之间的线程比如:taskExecutor-)
        executor.setThreadNamePrefix("taskExecutor-");
        //设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //设置允许核心线程超时
        executor.setAllowCoreThreadTimeOut(true);
        return executor;
    }

5.2、配置任务执行器

  redisMessageListenerContainer.setTaskExecutor(taskExecutor())

	@Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisMessageListenerListener redisMessageListenerListener) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        // 设置连接工厂
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        // 绑定监听器和信道
        redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, channelTopic());
        // 配置任务执行器
        redisMessageListenerContainer.setTaskExecutor(taskExecutor());
        return redisMessageListenerContainer;
    }

5.3、启用异步执行器

  RedisConfiguration 增加注解@EnableAsync,为了其他操作也能用到异步操作处理。文章来源地址https://www.toymoban.com/news/detail-647342.html

@Slf4j
@EnableAsync
@Configuration
@EnableCaching
public class RedisConfiguration {
   //...
}

到了这里,关于Spring Boot 整合Redis实现消息队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • (六)、Springboot+Redis实现通用消息队列stater

    其实除了主流的各大消息中间件ActiveMQ, RocketMQ,RabbitMQ,Kafka之外,其实Redis也是支持消息队列功能的。 而有时候我们不需要引入消息队列中间件,跟缓存中间件Redis一起一起共用一个Redis作为消息中间件也是可以的,这样就少用了一个组件。 1)、使用stream实现点对点消息模式

    2024年02月10日
    浏览(40)
  • Spring Boot整合Redis实现订单超时处理

    🎉欢迎来到架构设计专栏~Spring Boot整合Redis实现订单超时处理 ☆* o(≧▽≦)o *☆嗨~我是IT·陈寒🍹 ✨博客主页:IT·陈寒的博客 🎈该系列文章专栏:架构设计 📜其他专栏:Java学习路线 Java面试技巧 Java实战项目 AIGC人工智能 数据结构学习 🍹文章作者技术和水平有限,如果文

    2024年02月03日
    浏览(45)
  • 【Spring】SpringBoot整合Redis,用Redis实现限流(附Redis解压包)

       📝个人主页:哈__ 期待您的关注  本文介绍SpringBoot整合Redis并且进行接口的限流,文章主要介绍的是一种思想,具体代码还要结合实际。 Redis的解压包我放在了百度网盘上,有需要的可以下载。 Redis-x64-3.0.504 解压码:uhwj 我们在本地解压下载的Redis压缩包,打开解压后的

    2024年04月09日
    浏览(52)
  • 微服务 Spring Boot 整合Redis 实现优惠卷秒杀 一人一单

    CSDN话题挑战赛第2期 参赛话题:Java技术分享 在分布式系统中,经常需要使用 全局唯一ID 查找对应的数据。产生这种ID需要保证系统全局唯一,而且要高性能以及占用相对较少的空间。 全局唯一ID在数据库中一般会被设成 主键 ,这样为了保证数据插入时索引的快速建立,还需

    2024年02月03日
    浏览(47)
  • Spring Boot进阶(63):「超详细」利用 Redis 实现高效延时队列:踩坑、优化、实践

            提到延时队列,相信各位同学并不会陌生,JDK原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢?举一个常见的例子,比如淘宝下单30分钟内,若没有支付,则自动取消订单,这该如何实现

    2024年02月07日
    浏览(44)
  • 第三章 Spring Boot 整合 Kafka消息队列 消息者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年02月22日
    浏览(45)
  • Spring Boot 整合SpringSecurity和JWT和Redis实现统一鉴权认证

    本文主要讲了Spring Security文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是青衿🥇 ☁️博客首页:CSDN主页放风讲故事 🌄每日一句:努力一点,优秀一点 Spring Security Spring Security是一个强大且高度可定制的身份验证和访问控制框架。它是保护基

    2024年02月05日
    浏览(50)
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(45)
  • Spring Boot整合Redis

    Redis是一个开源(BSD许可)的、内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件,并提供多种语言的API。 Redis支持多种类型的数据结构,如 字符串(strings)、散列(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)与范围查询、bitmaps、 hyperlo

    2024年02月09日
    浏览(38)
  • springboot 整合redis 延迟消息功能

    1、redis配置项一定要顶格,修改redis如下图的配置:yes 改为 no 2、 3、springboot中配置redis 4、监听器 5、container 6.代码中发布消息

    2024年02月09日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包