@RabbitListener 消息队列 消息序列化

这篇具有很好参考价值的文章主要介绍了@RabbitListener 消息队列 消息序列化。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MessageConvert

涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析。RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等

  • 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
  • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
  • 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitMQConfig {

    public static final String WINCALLCDR_QUEUE = "WINCHANCDR_QUEUE";

    //生产者
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //发送消息进行序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    //消费者
    @Bean("rabbitListenerContainerFactory")
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory mqConnectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(mqConnectionFactory);
        //--加上这句  自定义MessageConverter
        factory.setMessageConverter(new RabbitMessageConverter());
        //反序列化
        //factory.setMessageConverter(new Jackson2JsonMessageConverter());

        //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
        return factory;
    }
}

自定义MessageConverter

在一些场景下我们希望在消息发送到MQ之前或者接受消息前对消息做一些自定义处理,这个时候就需要自定义MessageConverter了

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class RabbitMessageConverter implements MessageConverter {

    /**
     * 发送消息时转换
     */
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.out.println("=======toMessage=========");
        return new Message(object.toString().getBytes(), messageProperties);
    }

    /**
     * 接受消息时转换
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return new String(message.getBody());
    }
}

@RabbitListener 用法

使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理。@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过<rabbit:annotation-driven/>来设置@RabbitListener的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener。

注意
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常

配置消费者

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.icsoc.axt.job.config.RabbitMQConfig;
import net.icsoc.axt.job.dto.WinCallCdrDTO;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Component
@Slf4j
public class CallListener {
    @Autowired
    RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void convertAndSendOrder() {
         //创建生产数据
         String jsonStr ="{user_id:234}"
         rabbitTemplate.convertAndSend("exchange.topic", "routingKey.aa", jsonStr);
    }

    @RabbitListener(queues = RabbitMQConfig.WINCALLCDR_QUEUE, containerFactory = "rabbitListenerContainerFactory")
    public void winCallCdr(String messsageBody) {

        //log.info("winCallCdr消费者收到消息  : " + messsageBody);
        WinCallCdrDTO winCallCdrDTO = JSON.parseObject(messsageBody, WinCallCdrDTO.class);
        try {
            exectueSaveWinCallCdrData2Db(winCallCdrDTO);
            log.info("winCallCdr成功消费消息 {}", winCallCdrDTO.getCallId());
        } catch (DataAccessException e) {
            log.error("消费winCallCdr异常 {} {}", messsageBody, e);
        }
    }
}

和 @RabbitHandler 搭配使用

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {

    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
    
}

@Payload 与 @Headers

使用 @Payload 和 @Headers 注解可以消息中的 body 与 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
    System.out.println("body:"+body);
    System.out.println("Headers:"+headers);
}

也可以获取单个 Header 属性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
    System.out.println("body:"+body);
    System.out.println("token:"+token);
}

通过 @RabbitListener 注解声明 Binding

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
        value = @Queue(value = "consumer_queue",durable = "true"),
        key = "key.#"
))
public void processMessage1(Message message) {
    System.out.println(message);
}

自动确认

生产者产生10笔消息,自动确认模式下,消息处理成功,消费者才会去获取下一笔消息;消息处理抛出异常,那么将会消息重回队列。自动确认分四种情况(第一就是正常消费,其他三种为异常情况)

  • 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
  • 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
  • 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
  • 抛出其他的异常,消息会被拒绝,且requeue = true

手动确认

常用API

  • channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);   ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
  • channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);  Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
  • channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);   nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列

当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行

消息重发送到队尾

可能会出现堆积

    //消费者处理消息缓慢
    @RabbitListener(queues = {"kinson1"})
    public void receiver3(Message msg, Channel channel) throws IOException {
        try {
            //打印数据
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【开始】:{}",message);
            if("0".equals(message)){
                throw new RuntimeException("0的消息消费异常");
            }
            log.info("【结束】:{}", message);
            //ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            //捕获异常后,重新发送到指定队列,自动ack不抛出异常即为ack
            channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                    msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    msg.getBody());
        }
    }

 如何处理异常消息

如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。解决这个问题可以采取两种方案:

1.一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,直接丢弃该消息方案。

2.另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,处理次数到达我们设置的值时,我们就丢弃该消息,但需要记录详细的日志。

将业务队列绑定死信队列,当消息被丢弃后,进入到死信队列(代码修复后监听死信队列补偿消息)。可以避免我们手动的恢复消息。

@Component
@Slf4j
public class CustomerRev {

    @RabbitListener(queues = {"kinson1"})
    public void receiver3(Message msg, Channel channel) throws IOException {
        try {
            //打印数据
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【开始】:{}",message);
            if("0".equals(message)){
                throw new RuntimeException("0的消息消费异常");
            }
            log.info("【结束】:{}", message);
        } catch (Exception e) {
            //捕获异常后,重新发送到指定队列,自动确认不抛出异常即为ack
            Integer retryCount;
            Map<String, Object> headers = msg.getMessageProperties().getHeaders();
            if(!headers.containsKey("retry-count")){
                retryCount=0;
            }else {
                retryCount = (Integer)headers.get("retry-count");
            }
            //判断是否满足最大重试次数(重试3次)
            if(retryCount++<3) {
                headers.put("retry-count",retryCount);
                //重新发送到MQ中
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("text/plain").headers(headers).build();
                channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                        msg.getMessageProperties().getReceivedRoutingKey(), basicProperties,
                        msg.getBody());
            }
        }
    }
}

重试机制如何合理配置

重试机制能保证某些场景下消息能被消费掉。适合重试场景:大部分属于读取,如调用第三方接口、网络波动问题、暂时调用不了、网络连接等。重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。

采坑:不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          # 开启消费者重试机制(默认就是true,false则取消重试机制)
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间距(单位:秒)
          initial-interval: 2s

以上配置消息会重试5次,如果一直失败,RabbitMQ放弃消费了文章来源地址https://www.toymoban.com/news/detail-730835.html

到了这里,关于@RabbitListener 消息队列 消息序列化的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • @RabbitListener的作用详解

    @RabbitListener用于在Spring Boot应用程序中创建消费者端接收和处理消息的方法。它是基于Spring AMQP和RabbitMQ实现的,可以用于消费者端消费RabbitMQ队列中的消息。 具体来说,@RabbitListener的作用是: 声明该方法是一个RabbitMQ消息监听器,用于接收指定队列中的消息。 自动创建和配置

    2024年02月13日
    浏览(25)
  • @RabbitListener详解

    @RabbitListener 是用于在 Spring AMQP 中消息监听的注解。它允许在 Spring 应用程序中声明消息监听器。在使用 @RabbitListener 注解的方法上,可以接收来自 RabbitMQ 队列的消息。这些方法可以通过使用 @RabbitHandler 注解标记,并在方法中声明一个参数来表示要接收的消息。 例如:

    2024年02月14日
    浏览(24)
  • rabbitmq整合springboot:ChannelAwareMessageListener和@RabbitListener的使用

    Springboot中使用Rabbimq监听队列中有两种方式,一种是@RabbitListener注解的方式,一种是实现springboot:ChannelAwareMessageListener接口的方式 前者使用如下: 消费者: 生产者: 后者使用方式: 配置文件:

    2024年02月12日
    浏览(40)
  • Spring Boot 中的 @RabbitListener 注解是什么,原理,如何使用

    在 RabbitMQ 中,消息的接收需要通过监听队列来实现。在 Spring Boot 应用程序中,可以使用 @RabbitListener 注解来监听队列,并在接收到消息时执行指定的方法。本文将介绍 @RabbitListener 注解的原理、使用方法和常见应用场景。 @RabbitListener 注解是 Spring AMQP 框架中的一个关键组件,

    2024年02月09日
    浏览(64)
  • 【序列化与反序列化】关于序列化与反序列化MessagePack的实践

    在进行序列化操作之前,我们还对系统进行压测,通过 jvisualvm 分析cpu,线程,垃圾回收情况等;运用火焰图 async-profiler 分析系统性能,找出程序中占用CPU资源时间最长的代码块。 代码放置GitHub:https://github.com/nateshao/leetcode/tree/main/source-code/src/main/java/com/nateshao/source/code/ser

    2024年02月11日
    浏览(57)
  • 【网络】序列化反序列化

    在前文《网络编程套接字》中,我们实现了服务器与客户端之间的字符串通信,这是非常简单的通信,在实际使用的过程中,网络需要传输的不仅仅是字符串,更多的是结构化的数据(类似于 class , struct 类似的数据)。 那么我们应该怎么发送这些结构化的数据呢? 如果我们

    2024年02月05日
    浏览(43)
  • 序列化,反序列化之实例

    介绍文章 __construct() 当一个对象创建时自动调用 __destruct() 当对象被销毁时自动调用 (php绝大多数情况下会自动调用销毁对象) __sleep() 使**用serialize()函数时触发 __wakeup 使用unserialse()**函数时会自动调用 __toString 当一个对象被当作一个字符串被调用 __call() 在对象上下文中调用不

    2024年02月14日
    浏览(45)
  • Qt 对象序列化/反序列化

    阅读本文大概需要 3 分钟 日常开发过程中,避免不了对象序列化和反序列化,如果你使用 Qt 进行开发,那么有一种方法实现起来非常简单和容易。 我们知道 Qt 的元对象系统非常强大,基于此属性我们可以实现对象的序列化和反序列化操作。 比如有一个学生类,包含以下几

    2024年02月13日
    浏览(42)
  • 【网络】协议定制+序列化/反序列化

    如果光看定义很难理解序列化的意义,那么我们可以从另一个角度来推导出什么是序列化, 那么究竟序列化的目的是什么? 其实序列化最终的目的是为了对象可以 跨平台存储,和进行网络传输 。而我们进行跨平台存储和网络传输的方式就是IO,而我们的IO支持的数据格式就是

    2024年02月08日
    浏览(43)
  • 协议,序列化,反序列化,Json

    协议究竟是什么呢?首先得知道主机之间的网络通信交互的是什么数据,像平时使用聊天APP聊天可以清楚,用户看到的不仅仅是聊天的文字,还能够看到用户的头像昵称等其他属性。也就可以证明网络通信不仅仅是交互字符串那么简单。事实上网络通信还可能会通过一个结构

    2024年02月13日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包