【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

这篇具有很好参考价值的文章主要介绍了【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 🎉🎉欢迎光临🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟特别推荐给大家我的最新专栏《Redis实战与进阶》

本专栏纯属为爱发电永久免费!!!

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽http://suzee.blog.csdn.net/

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

我们用的是云托管的的服务 那自然是部署中间件到云服务上去了 服务是一路开通 结果到了需要调试的时候 怎么也连不上 (说是内网直连,但关键是 同事们都在线下做本地测试的呀)【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶,Redis实战,java Spring后端项目实战 所遇问题及解决方案,个人学习笔记,redis,java,spring,rabbitmq,后端

直接无语了 面对这一场景 怎么办?业务还要继续 等着交货的  于是我想起了之前学过的技术栈 

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

目录

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

缺点也很明显:

应用场景:

Redis实现消息队列系统 实现步骤:

配置Redis:

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

实现消息的发布和订阅功能。

实战与改良

代码解释

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:

对了 中途遇到了这样一个错误

原因与分析:

实际业务中的测试

发布服务

订阅服务(监听服务)

测试结果


Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

  1. 简单轻量:Redis是一个内存中的数据存储系统,具有轻量级和简单的特点。相比较专门的消息队列系统,使用Redis作为消息队列不需要引入额外的组件和依赖,可以减少系统的复杂性。

  2. 速度快:由于Redis存储在内存中,它具有非常高的读写性能。这对于需要低延迟的应用程序非常有优势。

  3. 多种数据结构支持:Redis提供了丰富的数据结构,如列表、发布/订阅、有序集合等。这使得Redis在处理不同类型的消息和任务时更加灵活。

  4. 数据持久化:Redis可以通过将数据持久化到磁盘来提供数据的持久性。这意味着即使Redis重启,之前的消息也不会丢失。

  5. 广泛的应用场景:Redis不仅可以用作消息队列,还可以用作缓存、数据库、分布式锁等多种用途。如果你的应用程序已经使用了Redis,那么使用Redis作为消息队列可以减少技术栈的复杂性。

缺点也很明显:

  1. 缺少一些高级特性:相对于专门的消息队列系统,Redis在消息队列方面的功能可能相对简单。例如,它可能缺乏一些高级消息传递功能,如消息重试、消息路由、持久化消息等。

  2. 可靠性和一致性:Redis的主要设计目标是提供高性能和低延迟,而不是强一致性和高可靠性。在某些情况下,Redis可能会丢失消息,或者在出现故障时可能无法提供持久性保证。

应用场景:

适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑
如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。

Redis实现消息队列系统 实现步骤:

配置Redis:

  1. 首先,确保你已经正确地配置了Redis和Lettuce依赖项,并创建了LettuceConnectionFactory对象。

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
      redis:
        host: 
        port: 6379
        password: 
        lettuce:
          pool:
            max-active: 1000
            max-idle: 1000
            min-idle: 0
            time-between-eviction-runs: 10s
            max-wait: 10000
  2. 创建一个RedisTemplate对象,并将LettuceConnectionFactory设置为其连接工厂:

 @Bean
    public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setDefaultSerializer(new StringRedisSerializer());
        return template;
    }

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

redisTemplate.setDefaultSerializer(new StringRedisSerializer());

实现消息的发布和订阅功能。

  • 发布消息:
    redisTemplate.convertAndSend("channel_name", "message_payload");

    在上述代码中,"channel_name"是消息的通道名称,"message_payload"是要发布的消息内容。

  • 订阅消息:
  • 首先,创建一个MessageListener实现类来处理接收到的消息:

public class MessageListenerImpl implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 处理接收到的消息
        String channel = new String(message.getChannel());
        String payload = new String(message.getBody());
        // 执行自定义的逻辑
    }
}

创建一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类:

LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();

创建一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器:

RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名称"));
listenerContainer.start();

通过以上步骤,我们创建了一个LettuceConnectionFactory对象来与Redis服务器建立连接。然后,我们创建了一个MessageListener实现类来处理接收到的消息。接下来,我们创建了一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类。最后,我们创建了一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器,然后启动容器以开始监听指定通道上的消息。

以上的方案 好处就是 可以很明显的知道监听者在哪个部分 监听对应通道的信息 然而 业务当中 如果每一个对应模块的业务和通道都建立一个监听者来进行监听(我们假设每一个就业务所要得到消息以后所执行的逻辑都不相同) 那这个工作量就会暴增 

于是就有了第二种写法 :

实战与改良

/***
 * @title MessageManager
 * @author SUZE
 * @Date 2-17
 **/
@Component
public class ReservedMessageManager {
    private String ListenerId;
    private String UserId;
    private String message;
    private final RedisTemplate<String, String> redisTemplate;

    @Autowired
    public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        subscribeToChannel("reserved");
    }
    @Resource
    private SmsServer smsServer;

    public void publishMessage(String channel, reserveMessage message) {
        String  Message=serialize(message);
        redisTemplate.convertAndSend("channel_name", "message_payload");
        redisTemplate.convertAndSend(channel, Message);
    }
    // 接收到消息时触发的事件
    private void handleReserveMessage(String channel, reserveMessage reserveMessage) {
        if (reserveMessage != null) {
            String userId = reserveMessage.getUserId();
            String ListenerId=reserveMessage.getListenerId();
            String message = reserveMessage.getMessage();
            //TODO 处理接收到的消息逻辑 这里后续要对Message进行一个检测他有wait agree和refused和over四种状态 思种状态就是不一样的发送内容
            switch (message){
                //TODO 消息要给两边都发 所以要发两份 发信息的文案
                case "wait":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "agree":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "refuse":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "over":
                    //这里要操作文档系统了

                    //拒绝的话 那就要监听一下
                    smsServer.sendSms(userId,ListenerId,message);
                    break;

            }
            //smsServer.sendSms(userId,ListenerId,message);
            // 其他处理逻辑...
        }
    }

    public void subscribeToChannel(String channel) {
        redisTemplate.execute((RedisCallback<Object>) (connection) -> {
            connection.subscribe((message, pattern) -> {
                String channelName = new String(message.getChannel());
                byte[] body = message.getBody();
                // 解析接收到的消息
                switch (channelName){
                    case "reserved":
                        reserveMessage reserveMessage = deserializeMessage(new String(body));
                        handleReserveMessage(channelName, reserveMessage);
                        break;
                    //还有其他的通道 例如refuse就是一个 拒绝通道 专门监听拒绝的理由
                }
            }, channel.getBytes());
            return null;
        });
    }
    // 反序列化
    private reserveMessage deserializeMessage(String body) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(body, reserveMessage.class);
        } catch (IOException e) {
            // 处理反序列化异常
            e.printStackTrace();
            return null;
        }
    }

    // 序列化
    public String serialize(reserveMessage reserveMessage) throws SerializationException {
        if (reserveMessage == null) {
            return null;
        }
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsString(reserveMessage);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing object", e);
        }
    }

}

代码解释

  1. subscribeToChannel方法接受一个channel参数,用于指定要订阅的通道名称。
  2. redisTemplate.execute方法用于执行Redis操作,并传入一个RedisCallback回调函数。
  3. 回调函数使用lambda表达式的形式实现,接受一个connection参数,表示与Redis的连接。
  4. 在回调函数中,调用connection.subscribe方法来订阅通道。该方法接受一个回调函数作为参数,用于处理接收到的消息。
  5. 在消息回调函数中,首先从message对象中获取通道名称和消息体。
  6. 使用new String(message.getChannel())将通道名称转换为字符串类型,并存储在channelName变量中。
  7. 使用message.getBody()获取消息体的字节数组表示,并存储在body变量中。
  8. switch语句中,根据通道名称进行不同的处理。在这个例子中,仅处理"reserved"通道。
  9. 对于"reserved"通道的处理,调用deserializeMessage方法将消息体反序列化为reserveMessage对象,并将其存储在名为reserveMessage的局部变量中。
  10. 调用handleReserveMessage方法,将通道名称和反序列化后的reserveMessage对象作为参数进行处理。
  11. handleReserveMessage方法用于处理接收到的保留消息的逻辑。它检查消息类型,并根据类型执行不同的操作。根据消息类型,它调用smsServer.sendSms方法向指定的userIdlistenerId发送短信。

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:

对了 中途遇到了这样一个错误

错误信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

原因与分析:

reserveMessage类缺少默认构造函数,这导致Jackson库无法构造该类的实例。错误消息中提到了以下内容:"Cannot construct instance of TopOne.MessageSystem.entity.reserveMessage (no Creators, like default constructor, exist)"。
为了使Jackson能够正确地反序列化对象,需要在reserveMessage类中添加一个默认构造函数。默认构造函数是一个无参数的构造函数,它不需要任何参数来创建对象。
在你的reserveMessage类中

这个是改好的封装类:
 

@Data
public class reserveMessage {
    private String UserId;
    private String ListenerId;
    private String message;


    public reserveMessage() {
        // 默认构造函数
    }
    public reserveMessage(String userId, String ListenerId,String message) {
        this.UserId = userId;
        this.ListenerId = ListenerId;
        this.message=message;
    }


}

实际业务中的测试

发布服务

【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶,Redis实战,java Spring后端项目实战 所遇问题及解决方案,个人学习笔记,redis,java,spring,rabbitmq,后端

订阅服务(监听服务)

【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶,Redis实战,java Spring后端项目实战 所遇问题及解决方案,个人学习笔记,redis,java,spring,rabbitmq,后端

测试结果

【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶,Redis实战,java Spring后端项目实战 所遇问题及解决方案,个人学习笔记,redis,java,spring,rabbitmq,后端

成功

这里面的打印是代替了原本业务中的短信发送 也算是成了

这一期就到这 感谢观看文章来源地址https://www.toymoban.com/news/detail-829495.html

到了这里,关于【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MQ消息队列详解以及MQ重复消费问题

    https://blog.csdn.net/qq_44240587/article/details/104630567 核心的就是:解耦、异步、削锋 现有ABCDE五个系统,最初的时候BCD三个系统都要调用A系统的接口获取数据,一切都很正常,但是突然,D系统说:我不要了,你不用给我传数据了,A系统无奈,只能修改代码,将调用D系统的代码删除

    2024年04月13日
    浏览(25)
  • MQ-消息队列-RabbitMQ

    MQ(Message Queue) 消息队列 ,是基础数据结构中“ 先进先出 ”的一种 数据结构 。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由

    2024年02月09日
    浏览(22)
  • 消息队列(MQ)面试

    目录 讲一讲MQ 面试官: 在你之前的项目中,你是否使用过消息队列(MQ)?能详细介绍一下你在项目中如何使用MQ吗? 在用户和用户之间的多对多聊天通信中如何使用,请具体来讲一下。 那你可以讲一下消息的确认机制、消息重发机制吗,如何保证不出现消息丢失或者乱序的

    2024年02月09日
    浏览(19)
  • 消息队列MQ

    MQ的原理可以简单概括为生产者将消息发送到队列中,消费者从队列中获取消息进行处理。具体来说,MQ的原理包括以下几个方面: 生产者:生产者将消息发送到MQ服务器中,消息可以是文本、对象、文件等形式。生产者可以使用API或者其他工具将消息发送到MQ服务器,同时可

    2024年02月03日
    浏览(18)
  • python消息队列4种方法及使用场景

    Python 有许多消息队列实现,其中一些最流行的包括: 一:RabbitMQ 是一个高度可靠的消息队列系统,用于发送和接收消息,支持多种消息协议。一个开源的消息队列系统,具有高可用性、高可靠性和高可扩展性等特点,适用于以下场景: 异步任务处理:当应用需要异步执行任

    2024年02月03日
    浏览(31)
  • MQ消息队列(主要介绍RabbitMQ)

    消息队列概念:是在消息的传输过程中保存消息的容器。 作用:异步处理、应用解耦、流量控制..... RabbitMQ:     SpringBoot继承RabbitMQ步骤:         1.加入依赖          2.配置         3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)         4.创建队列、

    2024年02月11日
    浏览(28)
  • MQ消息队列篇:三大MQ产品的必备面试种子题

    MQ(消息队列)是一种FIFO(先进先出)的数据结构,主要用于实现异步通信、削峰平谷和解耦等功能。它通过将生产者生成的消息发送到队列中,然后由消费者进行消费。这样,生产者和消费者之间就不存在直接的耦合关系。 其中,MQ的优势主要体现在以下几个方面: 异步通

    2024年02月14日
    浏览(18)
  • RaabitMQ(三) - RabbitMQ队列类型、死信消息与死信队列、懒队列、集群模式、MQ常见消息问题

    这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。 经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。 Durability有两个选项,Durable和Transient。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。但是同时,由于需

    2024年02月14日
    浏览(26)
  • IM即时通讯开发MQ消息队列

    消息是互联网信息的一种表现形式,是人利用计算机进行信息传递的有效载体,比如即时通讯网坛友最熟悉的即时通讯消息就是其具体的表现形式之一。   消息从发送者到接收者的典型传递方式有两种:     1)一种我们可以称为即时消息:即消息从一端发出后(消息发送者

    2024年02月12日
    浏览(48)
  • Java使用Redis实现消息队列

    近期刷Java面试题刷到了“如何使用Redis实现消息队列”,解答如下: 一般使用 list 结构作为队列, rpush 生产消息, lpop 消费消息。当 lpop 没有消息的时候,要适当sleep 一会再重试。若不使用sleep,则可以用指令blpop(该指令在没有消息的时候,它会阻塞住直到消息到来) rp

    2024年02月21日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包