springboot下使用rabbitMQ之开发配置方式(二)

这篇具有很好参考价值的文章主要介绍了springboot下使用rabbitMQ之开发配置方式(二)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

springboot下使用rabbitMQ之传参及序列化(二)

消息参数传递在开发中也是个坑,不论使用内置的SimpleMessageConverter还是Jackson2JsonMessageConverter均无法让Consumer接收动态参数

一.序列化的问题

首先贴出具体代码以及测试用例:

  • 消费者
    @RabbitListener(queues = "text.queue")
    @RabbitHandler(isDefault = true)
    public void exec(@Payload Map dto, Message message, Channel channel){
        // 注意,发送的消息类型必须是实现了Serializable接口的类型,消费者接口类型不能随便写!
        LOG.info(RabbitMQCfgEnum.TEXT +"接收到消息:{}", dto);
        // 设置手动确认才会需要执行此
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(),Boolean.TRUE);
    }
  • 生产者(测试用例)
    private Connection buidConnection()throws Exception{
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();//MQ采用工厂模式来完成连接的创建
        //2.在工厂对象中设置连接信息(ip,port,virtualhost,username,password)
        factory.setHost("10.156.122.215");//设置MQ安装的服务器ip地址
        factory.setPort(5672);//设置端口号
        factory.setVirtualHost("vhost");//设置虚拟主机名称
        //MQ通过用户来管理
        factory.setUsername("shadow");//设置用户名称
        factory.setPassword("shadow");//设置用户密码
        //3.通过工厂对象获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
    
    @Test
    public void test02()throws Exception{
        Connection connection = this.buidConnection();
        Channel channel = connection.createChannel();
        Map<String,Object> data = new HashMap<>(4);
        data.put("ordeNo", SeqGenUtil.genSeq());
        data.put("timestamp",System.currentTimeMillis());
        String json = JacksonUtil.toJsonString(data);
        channel.basicPublish(RabbitMQCfgEnum.TEXT.exchange, RabbitMQCfgEnum.TEXT.routingKey, null,json.getBytes(StandardCharsets.UTF_8));
        //关闭连接
        channel.close();
        connection.close();
    }

执行测试用例,它居然抛错了:

2023-07-17 14:10:24.037 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.rabbit.retry.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@3ad45d58(byte[55])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=text_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, consumerQueue=text.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(java.util.Map,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@7839ec46]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.util.Map] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=c28511a8-1bbe-9d57-879c-c5763ba40129, amqp_consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, amqp_lastInBatch=false, timestamp=1689574224032}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted
2023-07-17 14:10:24.038 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.r.listener.ConditionalRejectingErrorHandler:170 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)
	at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
	... 19 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(java.util.Map,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@7839ec46]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 14 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.util.Map] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=c28511a8-1bbe-9d57-879c-c5763ba40129, amqp_consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, amqp_lastInBatch=false, timestamp=1689574224032}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted

看吧json字符串是无法序列化为Map😂

试试用dto来接收这个json字符串看:

  • 消费者
    @RabbitListener(queues = "text.queue")
    @RabbitHandler(isDefault = true)
    public void exec(@Payload MQMessageDTO dto, Message message, Channel channel){
        // 注意,发送的消息类型必须是实现了Serializable接口的类型,消费者接口类型不能随便写!
        LOG.info(RabbitMQCfgEnum.TEXT +"接收到消息:{}", dto);
        // 设置手动确认才会需要执行此
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(),Boolean.TRUE);
    }
    
  • 消费者接收对象
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.io.Serializable;

@JsonIgnoreProperties(ignoreUnknown = true)
public class MQMessageDTO implements Serializable {

    /**
     * 序列化标识
     */
    private static final long serialVersionUID = 1L;

    private String ordeNo;
    private Long timestamp;

    public String getOrdeNo() {
        return ordeNo;
    }

    public void setOrdeNo(String ordeNo) {
        this.ordeNo = ordeNo;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "MQMessageDTO{" +
                "ordeNo='" + ordeNo + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }

}

再执行下测试用例看看:

2023-07-17 14:15:14.484 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.rabbit.retry.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@20c64033(byte[55])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=text_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-a17LF_eoKfyC9ynPPdJmfw, consumerQueue=text.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(com.mee.api.common.dto.MQMessageDTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@616a95dd]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mee.api.common.dto.MQMessageDTO] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=42e348e4-aca4-fc9e-46f3-b52c83295a0c, amqp_consumerTag=amq.ctag-a17LF_eoKfyC9ynPPdJmfw, amqp_lastInBatch=false, timestamp=1689574514480}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted
2023-07-17 14:15:14.485 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.r.listener.ConditionalRejectingErrorHandler:170 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)
	at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
	... 19 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(com.mee.api.common.dto.MQMessageDTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@616a95dd]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 14 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mee.api.common.dto.MQMessageDTO] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=42e348e4-aca4-fc9e-46f3-b52c83295a0c, amqp_consumerTag=amq.ctag-a17LF_eoKfyC9ynPPdJmfw, amqp_lastInBatch=false, timestamp=1689574514480}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted

还是一样的抛错😂

没办法,我们得用先前能走通的调试下Consumer的Message这个参数看:

springboot下使用rabbitMQ之开发配置方式(二)

看到了有个貌似是参数类型的东东出现在了inferredArgumentType这个参数位置,对头~,可以试着将类型放进去不就好了嘛,试试看~

  • 消费者
    @RabbitListener(queues = "default.queue")
    @RabbitHandler(isDefault = true)
    public void exec(@Payload MQMessageDTO dto, Message message, Channel channel){
        // 注意,发送的消息类型必须是实现了Serializable接口的类型,消费者接口类型不能随便写!
        LOG.info(RabbitMQCfgEnum.DEFAULT +"接收到消息:{}", dto);
    }
  • 测试用例
    @Test
    public void test03()throws Exception{
        Connection connection = this.buidConnection();
        Channel channel = connection.createChannel();
        MQMessageDTO data = new MQMessageDTO();
        data.setOrdeNo(SeqGenUtil.genSeq());
        data.setTimestamp(System.currentTimeMillis());
       MessageProperties messageProperties = new MessageProperties();
        messageProperties.setInferredArgumentType(MQMessageDTO.class);
        messageProperties.setContentType("application/x-java-serialized-object");
        Message message = new SimpleMessageConverter().toMessage(data, messageProperties);
        channel.basicPublish(RabbitMQCfgEnum.TEXT.exchange, RabbitMQCfgEnum.TEXT.routingKey, null,message.getBody());
        //关闭连接
        channel.close();
        connection.close();
    }

试着运行了下测试用例,发现还是抛错😂

2023-07-17 15:31:26.051 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.rabbit.retry.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@2ac0f60d(byte[214])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=text_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-YZyegHnG7Y_t5bio0BmSvQ, consumerQueue=text.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(com.mee.api.common.dto.MQMessageDTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@3388051d]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mee.api.common.dto.MQMessageDTO] for GenericMessage [payload=byte[214], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=51ebd117-d95b-463a-f43b-7feff582b194, amqp_consumerTag=amq.ctag-YZyegHnG7Y_t5bio0BmSvQ, amqp_lastInBatch=false, timestamp=1689579086047}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted
2023-07-17 15:31:26.052 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.r.listener.ConditionalRejectingErrorHandler:170 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)
	at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
	... 19 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(com.mee.api.common.dto.MQMessageDTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@3388051d]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 14 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mee.api.common.dto.MQMessageDTO] for GenericMessage [payload=byte[214], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=51ebd117-d95b-463a-f43b-7feff582b194, amqp_consumerTag=amq.ctag-YZyegHnG7Y_t5bio0BmSvQ, amqp_lastInBatch=false, timestamp=1689579086047}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted

可能思路还是存在问题,这样吧,先用RabbitTemplate推一个消息看
主要代码如下:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDefaultDirect() {
        MQMessageDTO data = new MQMessageDTO();
        data.setOrdeNo(SeqGenUtil.genSeq());
        data.setTimestamp(System.currentTimeMillis());
        // 注意,发送的消息必须是实现了Serializable类型的消息,接收的时候也应该是这个类型,不可乱写
        rabbitTemplate.convertAndSend(RabbitMQCfgEnum.DEFAULT.exchange, RabbitMQCfgEnum.DEFAULT.routingKey,data);
        LOG.info("已发送::{}",data );
    }

能收到消息:

2023-07-17 15:36:17.354 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] -> INFO  c.m.a.common.service.impl.MQConsumerDefaultHandler:49 - 
    DEFAULT接收到消息:MQMessageDTO{ordeNo='2307171536109000', timestamp=1689579370802}

花了幾分鐘,試著在RabbitTemplate下找到了这段代码:

	protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
			Message message) throws IOException {
		BasicProperties convertedMessageProperties = this.messagePropertiesConverter
				.fromMessageProperties(message.getMessageProperties(), this.encoding);
		channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
	}

好家伙,原来需要传一个 BasicProperties 对象,copy之~

  • 测试用例
    @Test
    public void test03()throws Exception{
        Connection connection = this.buidConnection();
        //mq提供Channel来将处理消息
        //创建Channel
        Channel channel = connection.createChannel();
        MQMessageDTO data = new MQMessageDTO();
        data.setOrdeNo(SeqGenUtil.genSeq());
        data.setTimestamp(System.currentTimeMillis());
        Message message = new SimpleMessageConverter().toMessage(data, new MessageProperties());
        AMQP.BasicProperties prop = new DefaultMessagePropertiesConverter().fromMessageProperties(message.getMessageProperties(), "UTF-8");

        channel.basicPublish(RabbitMQCfgEnum.TEXT.exchange, RabbitMQCfgEnum.TEXT.routingKey, prop,message.getBody());
        //关闭连接
        channel.close();
        connection.close();
    }
  • 执行结果
2023-07-17 15:41:20.880 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> INFO  c.m.api.common.service.impl.MQConsumerTextHandler:43 - 
    TEXT接收到消息:MQMessageDTO{ordeNo='2307171541179000', timestamp=1689579677767}

基本结论:
在springboot框架下mq消费者接收生产者的消息内容,如果想统一封装则只能用字符串的形式!这点儿很重要,因为发送方配置中指定了发送消息的内容类型~
但是,如果生产者与消费者不是在一个应用,能不能用同一个类型接收呢,这是个好问题,试试看~

二.模拟发送方与接收方参数测试

为了真实模拟,我将上面使用过的这个消息体MQMessageDTO copy一份到其他包里面,生产者与消费者使用不同包下的MQMessageDTO

springboot下使用rabbitMQ之开发配置方式(二)

最终的代码是这样子:

  • 消费者
    @RabbitListener(queues = "text.queue")
    @RabbitHandler(isDefault = true)
    public void exec(@Payload com.mee.api.common.dto.MQMessageDTO dto, Message message, Channel channel){
        // 注意,发送的消息类型必须是实现了Serializable接口的类型,消费者接口类型不能随便写!
        LOG.info(RabbitMQCfgEnum.TEXT +"接收到消息:{}", dto);
        // 设置手动确认才会需要执行此
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(),Boolean.TRUE);
    }
  • 生产者(测试用例)
    @Test
    public void test03()throws Exception{
        Connection connection = this.buidConnection();
        //mq提供Channel来将处理消息
        //创建Channel
        Channel channel = connection.createChannel();
        com.mee.api.common.dto.tmp.MQMessageDTO data = new com.mee.api.common.dto.tmp.MQMessageDTO();
        data.setOrdeNo(SeqGenUtil.genSeq());
        data.setTimestamp(System.currentTimeMillis());
        Message message = new SimpleMessageConverter().toMessage(data, new MessageProperties());
        AMQP.BasicProperties prop = new DefaultMessagePropertiesConverter().fromMessageProperties(message.getMessageProperties(), "UTF-8");

        channel.basicPublish(RabbitMQCfgEnum.TEXT.exchange, RabbitMQCfgEnum.TEXT.routingKey, prop,message.getBody());
        //关闭连接
        channel.close();
        connection.close();
    }
  • 测试结果
2023-07-17 15:55:03.410 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.rabbit.retry.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=text_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-aS3e5aUBnUMAoQQlxU7QQQ, consumerQueue=text.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(com.mee.api.common.dto.MQMessageDTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@292b4ff]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.mee.api.common.dto.tmp.MQMessageDTO] to [com.mee.api.common.dto.MQMessageDTO] for GenericMessage [payload=MQMessageDTO{ordeNo='2307171555019000', timestamp=1689580501335}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=31472e0d-b0a5-676e-2899-e1542410937f, amqp_consumerTag=amq.ctag-aS3e5aUBnUMAoQQlxU7QQQ, amqp_lastInBatch=false, contentType=application/x-java-serialized-object, timestamp=1689580503404}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted
2023-07-17 15:55:03.411 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN  o.s.a.r.listener.ConditionalRejectingErrorHandler:170 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)
	at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
	at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
	... 19 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(com.mee.api.common.dto.MQMessageDTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@292b4ff]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 14 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.mee.api.common.dto.tmp.MQMessageDTO] to [com.mee.api.common.dto.MQMessageDTO] for GenericMessage [payload=MQMessageDTO{ordeNo='2307171555019000', timestamp=1689580501335}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=31472e0d-b0a5-676e-2899-e1542410937f, amqp_consumerTag=amq.ctag-aS3e5aUBnUMAoQQlxU7QQQ, amqp_lastInBatch=false, contentType=application/x-java-serialized-object, timestamp=1689580503404}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
	at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
	... 27 common frames omitted

这上面有一句话很重要:

Cannot convert from [com.mee.api.common.dto.tmp.MQMessageDTO] to [com.mee.api.common.dto.MQMessageDTO]

所以在不同的应用下使用mq ,需要用相同包下的相同类传递参数。

总结:springboot下mq序列化的方式有三

  • 1.使用默认的SimpleMessageConverter实现序列化

  • 2.配置使用Jackson2JsonMessageConverter(jackson),配置如下

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory,ObjectMapper objectMapper){
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
        simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));
        return simpleRabbitListenerContainerFactory;
    }

    @Bean
    public RabbitTemplate jackRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jsonConvert(ObjectMapper objectMapper){
        return new Jackson2JsonMessageConverter(objectMapper);
    }
  • 3.统一String传递消息内容,接收时用如下方式还原为String
String msgStr = new String(bt,StandardCharsets.UTF_8);

如需要转换为特定类型请自行使用fastjson,jackson,gson等等方式将字符串序列化为指定对象即可~文章来源地址https://www.toymoban.com/news/detail-578524.html

到了这里,关于springboot下使用rabbitMQ之开发配置方式(二)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【SpringBoot】两种配置文件, 详解 properties 和 yml 的语法格式, 使用方式, 读取配置

    各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: 📕 JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 📗 Java数据结构: 顺序表, 链表, 堆, 二叉树, 二叉搜索树, 哈希表等 📘 JavaEE初阶: 多线程, 网络编程, TCP/IP协议, HTTP协议

    2024年02月10日
    浏览(52)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(52)
  • 一文带你如何用SpringBoot+RabbitMQ方式来收发消息

    预告了本篇的内容:利用RabbitTemplate和注解进行收发消息,还有一个我临时加上的内容:消息的序列化转换。 本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管

    2024年02月09日
    浏览(40)
  • 安装RabbitMQ及配置Centos7 方式(2)

    自行搭建学习参考使用,这里采用的Centos7 方式,这已经是多年前的方式了,现在主流方式是容器化安装、部署,docker、ks8,同学们可自行去学习参考。 环境:centos7 、otp_src_21.3、rabbitmq-server-generic-unix-3.7.9、c++。 注意 : Erlang 和 RabbitMQ版本对照 RabbitMQ版本 Erlang最低版本要求

    2024年03月10日
    浏览(50)
  • 创建延时队列、springboot配置多个rabbitmq

    type选择fanout (图中已经绑定,红框为绑定过程) (图中已经绑定,红框为绑定过程) 延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作 RabbitConfig配置类 mq1 mq2 application-prod.yaml mq1消费端,发消息给mq2 mq2消费端用于递归删除文件 FileHelper工具类递归删除文件或文

    2024年02月11日
    浏览(39)
  • SpringBoot RabbitMQ收发消息、配置及原理

    今天分析SpringBoot通过自动配置集成RabbitMQ的原理以及使用。 RabbitMQ是基于AMQP协议的message broker,所以我们首先要对AMQP做一个简单的了解。 AMQP (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers. AMQP是A

    2024年02月20日
    浏览(39)
  • SpringBoot项目配置多个RabbitMQ解决方案

    目前有一个Spring Boot项目,已经接入一个RabbitMQ Broker,由于业务扩展,需要新增一个RabbitMQ Broker进行消费,由于单个Broker时通过Spring默认配置进行使用,因此需要做出修改 pom 增加properties配置 增加配置类 增加properties配置 修改配置类

    2024年02月11日
    浏览(37)
  • springboot-rabbitmq 实现动态配置监听容器

    1.1.1从factories我们可以看到mq的启动配置类 1.1.2然后我们找到 RabbitAutoConfiguration ,发现它引入了 RabbitAnnotationDrivenConfiguration 这个配置类 1.1.3进入 RabbitAnnotationDrivenConfiguration 滑到最低部看到这里引入了 @EnableRabbit 这个注解,找个注解里面又引出 RabbitBootstrapConfiguration 这个配置类

    2023年04月09日
    浏览(73)
  • 【SpringBoot快速入门】(2)SpringBoot的配置文件与配置方式详细讲解

    之前我们已经学习的Spring、SpringMVC、Mabatis、Maven,详细讲解了Spring、SpringMVC、Mabatis整合SSM的方案和案例,上一节我们学习了SpringBoot的开发步骤、工程构建方法以及工程的快速启动,从这一节开始,我们开始学习SpringBoot配置文件。接下来,我们逐步开始学习,本教程所有示例

    2024年02月03日
    浏览(37)
  • Springboot读取配置的一些方式

    从配置文件中获取属性应该是 SpringBoot 开发中最为常用的功能之一,但就是这么常用的功能,仍然有很多开发者在这个方面踩坑。 下面整理了几种获取配置属性的方式, 弄清配置加载、读取的底层原理 ,一旦出现问题可以分析。 以下示例源码 Springboot 版本均为 2.7.6 一、E

    2024年02月15日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包