redis stream restTemplate消息监听队列框架搭建

这篇具有很好参考价值的文章主要介绍了redis stream restTemplate消息监听队列框架搭建。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.6</version>
    </parent>




<dependencies>

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description: TODO
 **/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {

    private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);

    // 创建一个线程池
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    @Override
    public void onMessage(MapRecord message) {
        // 异步处理消息
        threadPoolExecutor.execute(()->{
            System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));
        });

    }
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;

import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.time.Duration;
import java.util.stream.Collectors;

/**
 * @Description: TODO
 **/
@Configuration
public class RedisMQConfig {

    @Autowired
    private RedisMQListener redisMQListener;

    @Autowired
    private RedisUtils redisUtils;

    private static RedisTemplate<Object, Object> redisTemplate;
    private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);

    public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {
        if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {
            StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);
            if (xInfoGroups.isEmpty()) {
                redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
            } else {
                if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {
                    redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
                }
            }
        } else {
            redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
        }
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .pollTimeout(Duration.ofSeconds(1)).build();
        StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
        Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);
        streamMessageListenerContainer.start();
        return subscription;
    }

}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")
    public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {
        String key = jsonObject.getString("key");
        String value = jsonObject.getString("value");
        MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));
        redisTemplate.opsForStream().add(mapRecord);
        System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());
        return formatResult(null);
    }

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

redis stream restTemplate消息监听队列框架搭建,框架,redis,数据库,缓存

redis stream restTemplate消息监听队列框架搭建,框架,redis,数据库,缓存文章来源地址https://www.toymoban.com/news/detail-792032.html

到了这里,关于redis stream restTemplate消息监听队列框架搭建的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Laravel 队列监听Superviso进程管理(消息队列后台监听)

    在 Ubuntu 系统上,可以使用以下命令安装 Supervisor: 在CentOS: 安装完成后,Supervisor 的配置文件会被放置在  /etc/supervisor/conf.d/  目录下 在  /etc/supervisor/conf.d/  目录下创建一个新的配置文件,例如  laravel-worker.conf : 编辑配置文件: 在打开的文件中,添加以下配置: 运行以

    2024年02月02日
    浏览(28)
  • Redis队列Stream、Redis多线程详解(二)

    足够简单,消费消息延迟几乎为零,但是需要处理空闲连接的问题。 如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获

    2023年04月18日
    浏览(30)
  • 【Java】SpringBoot中实现Redis Stream队列

    简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。 jdk:1.8 springboot-version:2.6.3 redis:5.0.1(5版本以上才有Stream队列) 1 pom redis 依赖包(version 2.6.3) 2 yml 3 RedisStreamUtil 工具类 生产者发送消息 生产者发送消息,在Service层创建 addMessage 方法,往

    2024年04月11日
    浏览(26)
  • 【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

     🎉🎉欢迎光临🎉🎉 🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀 🌟特别推荐给大家我的最新专栏 《Redis实战与进阶》 本专栏纯属为爱发电永久免费!!! 这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.csdn.net/ 我们用的是云

    2024年02月20日
    浏览(37)
  • Redis如何实现消息队列

    Redis可以通过List数据结构实现简单的消息队列。在Redis中,我们可以使用 LPUSH 命令将消息推送到列表的左侧,使用 RPOP 命令从列表的右侧获取消息。这样,就可以实现一个先进先出(FIFO)的消息队列。 下面是一个使用Redis实现消息队列的简单示例: 首先,确保你已经安装了

    2024年02月14日
    浏览(26)
  • redis实现消息队列

    消息队列(Message Queue)是一种常见的软件架构模式,用于在分布式系统中传递和处理异步消息。它解耦了发送消息的应用程序和接收消息的应用程序之间的直接依赖关系,使得消息的发送者和接收者可以独立地演化和扩展。 消息队列的基本原理是发送者将消息发送到一个中

    2024年02月09日
    浏览(23)
  • Redis消息队列

    消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色: 消息队列:存储和管理消息,也被称为消息代理(Message Broker) 生产者:发送消息到消息队列 消费者:从消息队列获取消息并处理消息 Redis的list数据结构是一个双向链表,很容易

    2024年02月02日
    浏览(28)
  • 实战:Spring Cloud Stream消息驱动框架整合rabbitMq

    相信很多同学都开发过WEB服务,在WEB服务的开发中一般是通过缓存、队列、读写分离、削峰填谷、限流降级等手段来提高服务性能和保证服务的正常投用。对于削峰填谷就不得不用到我们的MQ消息中间件,比如适用于大数据的kafka,性能较高支持事务活跃度高的rabbitmq等等,MQ的

    2024年02月08日
    浏览(35)
  • Redis 消息队列和发布订阅

    采用redis 三种方案: ● 生产者消费者:一个消息只能有一个消费者 ● 发布者订阅者:一个消息可以被多个消费者收到 ● stream模式:实现队列和广播模式 Producer调用redis的lpush往特定key里放消息,Consumer调用brpop去不断监听key。 1、利用redis的链表,存储数据,实现队列模式

    2024年01月18日
    浏览(31)
  • 基于Redis实现消息队列的实践

    消息队列是一种典型的发布/订阅模式,是专门为异步化应用和分布式系统设计的,具有高性能、稳定性及可伸缩性的特点,是开发分布式系统和应用系统必备的技术之一。目前,针对不同的业务场景,比较成熟可靠的消息中间件产品有RocketMQ、Kafka、RabbitMq等,基于Redis再去实

    2024年02月07日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包