SpringBoot+Redis stream实现消息队列

这篇具有很好参考价值的文章主要介绍了SpringBoot+Redis stream实现消息队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、前言

二、下载Redis及引入Redis依赖

三、配置消费者及消费组

四,配置Redsi及初始化stream、消费组、消费者


一、前言

相较于 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等重量级的消息队列中间件,Redis在需求量小的情况下,也可以作为消息中间件来使用。Redis作为消息队列使用,常见的有List、发布/订阅模型以及在Redis5以后出现的Stream。Stream相较于前两种,最大的优点就是可以持久化。

二、下载Redis及引入Redis依赖

下载Redis5以上的客户端,win版下载地址

pom中引入redis依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

三、配置消费者及消费组

application.yml中配置stream key,消费组和消费者可配置多个。

redis:
  mq:
    streams:
      # key名称
      - name:  RARSP:REPORT:READ:VS
        groups:
          # 消费组名称
          - name: VS_GROUP
            消费者名称
            consumers: VS-CONSUMER-A,VS-CONSUMER-B
      # key2
      - name: RARSP:REPORT:READ:BLC
        groups:
          - name: BLC_GROUP
            consumers: BLC-CONSUMER-A,BLC-CONSUMER-B
      # key3
      - name: RARSP:REPORT:READ:HD
        groups:
          - name: HD_GROUP
            consumers: HD-CONSUMER-A,HD-CONSUMER-B
     

自定义三个实体类RedisMqGroup、RedisMqStream、RedisMq,对应application.yml中的配置

public class RedisMqGroup {


    private String name;

    private String[] consumers;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String[] getConsumers() {
        return consumers;
    }

    public void setConsumers(String[] consumers) {
        this.consumers = consumers;
    }
}
public class RedisMqStream {
    public    String name;
    public   List<RedisMqGroup> groups;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public List<RedisMqGroup> getGroups() {
        return groups;
    }

    public void setGroups(List<RedisMqGroup> groups) {
        this.groups = groups;
    }
}
@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "redis.mq")
public class RedisMq {
    public   List<RedisMqStream> streams;

    public List<RedisMqStream> getStreams() {
        return streams;
    }

    public void setStreams(List<RedisMqStream> streams) {
        this.streams = streams;
    }
}

四,配置Redsi及初始化stream、消费组、消费者

@Slf4j
@Configuration
public class RedisConfiguration {

    @Resource
    private RedisTemplate redisTemplate;
    @Resource
    private RedisStreamUtil redisStreamUtil;
    @Resource
    private RedisMq redisMq;


    /**
     * 处理乱码
     * @return
     */
    @Bean
    public RedisTemplate redisTemplateInit() {
        // key序列化
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        //val实例化
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        //value hashmap序列化
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        //key haspmap序列化
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }

    @Bean
    public List<Subscription> subscription(RedisConnectionFactory factory){
        List<Subscription> resultList = new ArrayList<>();
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                 // 一次最多获取多少条消息
                .batchSize(5)
                .executor(executor)
                .pollTimeout(Duration.ofSeconds(1))
               // .errorHandler()
                .build();
        for (RedisMqStream redisMqStream :redisMq.getStreams()) {
            String streamName = redisMqStream.getName();
            RedisMqGroup redisMqGroup = redisMqStream.getGroups().get(0);

            initStream(streamName,redisMqGroup.getName());
            var listenerContainer = StreamMessageListenerContainer.create(factory,options);
            // 手动ask消息
            Subscription subscription = listenerContainer.receive(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());
            // 自动ask消息
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
            resultList.add(subscription);
            listenerContainer.start();
        }
        return resultList;
    }

    private void initStream(String key, String group){
        boolean hasKey = redisStreamUtil.hasKey(key);
        if(!hasKey){
            Map<String,Object> map = new HashMap<>(1);
            map.put("field","value");
            //创建主题
            String result = redisStreamUtil.addMap(key, map);
            //创建消费组
            redisStreamUtil.createGroup(key,group);
            //将初始化的值删除掉
            redisStreamUtil.del(key,result);
            log.info("stream:{}-group:{} initialize success",key,group);
        }
    }

}

Redis工具类

 
@Component
public class RedisStreamUtil {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 创建消费组
     * @param key stream-key值
     * @param group 消费组
     * @return java.lang.String
     */
    public String createGroup(String key, String group){
        return stringRedisTemplate.opsForStream().createGroup(key, group);
    }

    /**
     * 获取消费者信息
     * @param key stream-key值
     * @param group 消费组
     * @return org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers
     */
    public StreamInfo.XInfoConsumers queryConsumers(String key, String group){
        return stringRedisTemplate.opsForStream().consumers(key, group);
    }

    /**
     * 添加Map消息
     * @param key stream对应的key
     * @param value 消息数据
     * @return
     */
    public String addMap(String key, Map<String, Object> value){
        return stringRedisTemplate.opsForStream().add(key, value).getValue();
    }

    /**
     * 读取消息
     * @param: key
     * @return java.util.List<org.springframework.data.redis.connection.stream.MapRecord<java.lang.String,java.lang.Object,java.lang.Object>>
     */
    public List<MapRecord<String, Object, Object>> read(String key){
        return stringRedisTemplate.opsForStream().read(StreamOffset.fromStart(key));
    }

    /**
     * 确认消费
     * @param key
     * @param group
     * @param recordIds
     * @return java.lang.Long
     */
    public Long ack(String key, String group, String... recordIds){
        return stringRedisTemplate.opsForStream().acknowledge(key, group, recordIds);
    }



    /**
     * 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
     * @param: key
     * @param: recordIds
     * @return java.lang.Long
     */
    public Long del(String key, String... recordIds){
        return stringRedisTemplate.opsForStream().delete(key, recordIds);
    }

    /**
     * 判断是否存在key
     * @param key
     * @return
     */
    public boolean hasKey(String key){
        Boolean aBoolean = stringRedisTemplate.hasKey(key);
        return aBoolean==null?false:aBoolean;
    }
}

五、生产消息、消费消息

生产消息代码

Map<String,Object> message = new HashMap<>(2);         
 message.put("body","消息主题" );
 message.put("sendTime", "消息发送时间");
 String streamKey = "";//stream的key值,对应application.yml中配置的
 redisStreamUtil.addMap(streamKey, message);

消费消息文章来源地址https://www.toymoban.com/news/detail-595193.html

@Slf4j
@Component
public class ReportReadMqListener implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
          // stream的key值
          String streamKey = message.getStream();
          //消息ID
          RecordId recordId = message.getId();
          //消息内容
          Map<String, String> msg = message.getValue(); 
          //TODO 处理逻辑
         
          //逻辑处理完成后,ack消息,删除消息,group为消费组名称
          redisStreamUtil.ack(streamKey,group,recordId.getValue());
          redisStreamUtil.del(streamKey,recordId.getValue());
    }
    
}

到了这里,关于SpringBoot+Redis stream实现消息队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis消息队列——Redis Stream

    “消息队列”是在消息的传输过程中保存消息的容器。“消息”是在两台计算机间传送的数据单位。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可

    2023年04月16日
    浏览(26)
  • redis — 基于Spring Boot实现redis延迟队列

    1. 业务场景 延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种: 在各种购物平台上下单,订单超过30分钟未支付

    2024年02月13日
    浏览(29)
  • 使用SpringBoot利用Redis实现消息队列

    随着互联网的发展,消息队列的应用越来越广泛。消息队列可以解决系统之间的异步通信问题,提高系统的可靠性和可扩展性。在Java开发中,Redis作为一种高性能的缓存和消息队列系统,被广泛应用。本文将介绍如何使用SpringBoot中利用Redis实现消息队列。 在Redis中,List是一种

    2024年02月14日
    浏览(26)
  • (六)、Springboot+Redis实现通用消息队列stater

    其实除了主流的各大消息中间件ActiveMQ, RocketMQ,RabbitMQ,Kafka之外,其实Redis也是支持消息队列功能的。 而有时候我们不需要引入消息队列中间件,跟缓存中间件Redis一起一起共用一个Redis作为消息中间件也是可以的,这样就少用了一个组件。 1)、使用stream实现点对点消息模式

    2024年02月10日
    浏览(27)
  • redis stream restTemplate消息监听队列框架搭建

    整体思路         1. pom增加redis依赖;         2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;         3. 将消息订阅bean及监听器注册到配置中; 1. pom 2. 消息监听器实现代码 3. redis订阅bean及监听器注册 4. 测试生产消息 消息监听成功 4.1 生产消息 4.2 消息监听器

    2024年01月16日
    浏览(25)
  • Spring Boot进阶(63):「超详细」利用 Redis 实现高效延时队列:踩坑、优化、实践

            提到延时队列,相信各位同学并不会陌生,JDK原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢?举一个常见的例子,比如淘宝下单30分钟内,若没有支付,则自动取消订单,这该如何实现

    2024年02月07日
    浏览(33)
  • 【Java】SpringBoot中实现Redis Stream队列

    简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。 jdk:1.8 springboot-version:2.6.3 redis:5.0.1(5版本以上才有Stream队列) 1 pom redis 依赖包(version 2.6.3) 2 yml 3 RedisStreamUtil 工具类 生产者发送消息 生产者发送消息,在Service层创建 addMessage 方法,往

    2024年04月11日
    浏览(26)
  • 在Spring Boot中使用Redis 5的Stream

    Redis是一个开源的高性能键值对存储系统,而Redis 5引入了新的数据结构——Stream(流)。Stream可以用于高效地保存和处理事件流数据。在本文中,我们将学习如何在Spring Boot应用程序中使用Redis 5的Stream功能。 步骤1:配置Redis连接 首先,确保您的Spring Boot应用程序已经添加了

    2024年02月09日
    浏览(26)
  • 【Spring Boot 3】【Redis】消息发布及订阅

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月21日
    浏览(36)
  • Redis如何实现消息队列

    Redis可以通过List数据结构实现简单的消息队列。在Redis中,我们可以使用 LPUSH 命令将消息推送到列表的左侧,使用 RPOP 命令从列表的右侧获取消息。这样,就可以实现一个先进先出(FIFO)的消息队列。 下面是一个使用Redis实现消息队列的简单示例: 首先,确保你已经安装了

    2024年02月14日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包