【Java】SpringBoot中实现Redis Stream队列

这篇具有很好参考价值的文章主要介绍了【Java】SpringBoot中实现Redis Stream队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot实现Redis Stream队列

前言

简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。

jdk:1.8

springboot-version:2.6.3

redis:5.0.1(5版本以上才有Stream队列)

准备工作

1pom

redis 依赖包(version 2.6.3)

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2 yml

spring: 
  redis:
    database: 0
    host: 127.0.0.1

3 RedisStreamUtil工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
public class RedisStreamUtil {

	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 创建消费组
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link String}
	 */
	public String oup(String key, String group) {
		return redisTemplate.opsForStream().createGroup(key, group);
	}

	/**
	 * 获取消费者信息
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link StreamInfo.XInfoConsumers}
	 */
	public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
		return redisTemplate.opsForStream().consumers(key, group);
	}

	/**
	 * 查询组信息
	 *
	 * @param key 键名称
	 * @return
	 */
	public StreamInfo.XInfoGroups queryGroups(String key) {
		return redisTemplate.opsForStream().groups(key);
	}

	// 添加Map消息
	public String addMap(String key, Map<String, Object> value) {
		return redisTemplate.opsForStream().add(key, value).getValue();
	}

	// 读取消息
	public List<MapRecord<String, Object, Object>> read(String key) {
		return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
	}

	// 确认消费
	public Long ack(String key, String group, String... recordIds) {
		return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
	}

	// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
	public Long del(String key, String... recordIds) {
		return redisTemplate.opsForStream().delete(key, recordIds);
	}

	// 判断是否存在key
	public boolean hasKey(String key) {
		Boolean aBoolean = redisTemplate.hasKey(key);
		return aBoolean != null && aBoolean;
	}
}


代码实现

生产者发送消息

生产者发送消息,在Service层创建addMessage方法,往队列中发送消息。

代码中addMap()方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。

@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {

    private final RedisStreamUtil redisStreamUtil;

    /**
     * 发送一个消息
     *
     * @return {@code Object}
     */
    @Override
    public Object addMessage() {
        RedisUser redisUser = new RedisUser();
        redisUser.setAge(18);
        redisUser.setName("hcr");
        redisUser.setEmail("156ef561@gmail.com");

        Map<String, Object> message = new HashMap<>();
        message.put("user", redisUser);

        String recordId = redisStreamUtil.addMap("mystream", message);
        return recordId;
    }
}

controller接口方法

@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {

    private final RedisStreamMqService redisStreamMqService;

    @GetMapping("/addMessage")
    public Object addMessage() {
        return redisStreamMqService.addMessage();
    }
}

调用测试,查看redis中是否正常添加数据。

接口返回数据

1702622585248-0

查看redis中的数据
java springboot redis stream 实现消息队列,spring boot,java,redis

消费者监听消息进行消费

创建RedisConsumersListener监听器

import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {

    public final RedisStreamUtil redisStreamUtil;

    /**
     * 监听器
     *
     * @param message
     */
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // stream的key值
        String streamKey = message.getStream();
        //消息ID
        RecordId recordId = message.getId();
        //消息内容
        Map<String, String> msg = message.getValue();
        log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);

        //处理逻辑

        //逻辑处理完成后,ack消息,删除消息,group为消费组名称
        StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
        xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
        redisStreamUtil.del(streamKey, recordId.getValue());
    }
}

创建RedisConfig配置类,配置监听

package cn.hcr.config;

import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@Slf4j
public class RedisConfig {

    @Resource
    private RedisStreamUtil redisStreamUtil;

    /**
     * redis序列化
     *
     * @param redisConnectionFactory
     * @return {@code RedisTemplate<String, Object>}
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(5)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(1))
                        .errorHandler(throwable -> {
                            log.error("[MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();
        
        //该key和group可根据需求自定义配置
        String streamName = "mystream";
        String groupname = "mygroup";

        initStream(streamName, groupname);
        var listenerContainer = StreamMessageListenerContainer.create(factory, options);
        // 手动ask消息
        Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
        // 自动ask消息
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
        listenerContainer.start();
        return subscription;
    }

    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            Map<String, Object> map = new HashMap<>(1);
            map.put("field", "value");
            //创建主题
            String result = redisStreamUtil.addMap(key, map);
            //创建消费组
            redisStreamUtil.oup(key, group);
            //将初始化的值删除掉
            redisStreamUtil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }
}


redisTemplate:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组文章来源地址https://www.toymoban.com/news/detail-847713.html

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.RedisUser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
}

总结

以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。
Template:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

到了这里,关于【Java】SpringBoot中实现Redis Stream队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用SpringBoot利用Redis实现消息队列

    随着互联网的发展,消息队列的应用越来越广泛。消息队列可以解决系统之间的异步通信问题,提高系统的可靠性和可扩展性。在Java开发中,Redis作为一种高性能的缓存和消息队列系统,被广泛应用。本文将介绍如何使用SpringBoot中利用Redis实现消息队列。 在Redis中,List是一种

    2024年02月14日
    浏览(31)
  • (六)、Springboot+Redis实现通用消息队列stater

    其实除了主流的各大消息中间件ActiveMQ, RocketMQ,RabbitMQ,Kafka之外,其实Redis也是支持消息队列功能的。 而有时候我们不需要引入消息队列中间件,跟缓存中间件Redis一起一起共用一个Redis作为消息中间件也是可以的,这样就少用了一个组件。 1)、使用stream实现点对点消息模式

    2024年02月10日
    浏览(37)
  • Redis消息队列——Redis Stream

    “消息队列”是在消息的传输过程中保存消息的容器。“消息”是在两台计算机间传送的数据单位。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可

    2023年04月16日
    浏览(36)
  • 腾讯云短信服务实现 Java 发送手机验证码(SpringBoot+Redis 实现)

    前置:需要腾讯云的账号,后期授权需要,不需要买云服务器,有需要的可以购买短信套餐(几块钱) 搜索框输入短信,可以买一个短信套餐包,便宜不贵,进入短信服务的控制台 发送短信有频率限制,企业用户可以修改设置 之后我们需要对短信内容进行设置      类型有网站

    2024年02月09日
    浏览(46)
  • springboot+redis+mysql+quartz-通过Java操作redis的KEYS*命令获取缓存数据定时更新数据库

    代码讲解: 3-点赞功能-定时持久化到数据库(pipeline+lua)-完善过程2_哔哩哔哩_bilibili https://www.bilibili.com/video/BV1w14y1o7BV 本文章代码: blogLike_schedule/like03 · xin麒/XinQiUtilsOrDemo - 码云 - 开源中国 (gitee.com) https://gitee.com/flowers-bloom-is-the-sea/XinQiUtilsOrDemo/tree/master/blogLike_schedule/like03 数据

    2024年02月15日
    浏览(47)
  • Redis队列Stream、Redis多线程详解(二)

    足够简单,消费消息延迟几乎为零,但是需要处理空闲连接的问题。 如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获

    2023年04月18日
    浏览(37)
  • 基于springboot+Redis的前后端分离项目之消息队列(六)-【黑马点评】

    🎁🎁资源文件分享 链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA?pwd=eh11 提取码:eh11 我们来回顾一下下单流程 当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤 1、查询优惠卷 2、判断秒杀库存是否足够 3、查询订单

    2024年02月12日
    浏览(40)
  • 【java缓存、redis缓存、guava缓存】java中实现缓存的几种方式

    这种方式可以简单实现本地缓存,但是实际开发中不推荐使用,下面我们来实现一下这种方式。 首先创建一个管理缓存的类 这个类中有一个静态代码块,静态代码块会在类加载时就执行,我们可以在这里完成对缓存的初始化,决定缓存内一开始就有哪些数据 另外我们还可以

    2024年02月16日
    浏览(33)
  • Java语言开发的AI智慧导诊系统源码springboot+redis 3D互联网智导诊系统源码

    Java 语言开发的AI智慧导诊系统源码 springboot+redis 3D 互联网智导诊系统源码 智慧导诊解决盲目就诊问题,减轻分诊工作压力。降低挂错号比例,优化就诊流程,有效提高线上线下医疗机构接诊效率。可通过人体画像选择症状部位,了解对应病症信息和推荐就医科室。 智慧导诊

    2024年04月23日
    浏览(44)
  • 前后端分离java开发图形验证码+谷歌开源Kaptcha使用(Springboot+redis实现图形验证码校验)

    注册 - 登录 - 修改密码 一般需要发送验证码,但是容易被攻击恶意调用。 手机短信轰炸机是批量、循环给手机无限发送各种网站的注册验证码短信的方法。 短信一条5分钱,如果被大盗刷大家自己计算邮箱通知不用钱,但被大盗刷,带宽、连接等都被占用,导致无法正常使用

    2024年01月19日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包