RabbitMQ手动签收消息

这篇具有很好参考价值的文章主要介绍了RabbitMQ手动签收消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ手动签收消息

这里讲解SpringBoot使用RabbitMQ进行有回调的用法和消费者端手动签收消息的用法。文章来源地址https://www.toymoban.com/news/detail-666590.html

1、pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/>
    </parent>
    <groupId>com.example.demo</groupId>
    <artifactId>rabbitmq-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-demo</name>
    <description>rabbitmq-demno</description>
    <properties>
        <java.version>8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2、配置文件

server:
  port: 9090
spring:
  application:
    name: rabbit-confirm
  rabbitmq:
    template:
      # 使用return-callback时必须设置mandatory为true
      mandatory: true
    # 消息发送到交换机确认机制,是否确认回调
    publisher-confirm-type: correlated
    # 消息发送到交换机确认机制,是否返回回调
    publisher-returns: true
    listener:
      simple:
        # 并发消费者初始化值
        concurrency: 5
        # 最大值
        max-concurrency: 10
        # 每个消费者每次监听时可拉取处理的消息数量
        prefetch: 20
        # 确认模式设置为手动签收
        acknowledge-mode: manual
    username: zsx242030
    password: zsx242030
    virtual-host: /

3、定义配置类

package com.example.demo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfiguration {

    /**
     * 声明confirm.message队列
     */
    @Bean
    public Queue confirmQueue() {
        return new Queue("confirm.message");
    }

    /**
     * 声明一个名为exchange-2的交换机
     */
    @Bean
    public TopicExchange exchange2() {
        return new TopicExchange("exchange-2");
    }

    /**
     * 将confirm.message的队列绑定到exchange-2交换机
     */
    @Bean
    public Binding bindMessage1() {
        return BindingBuilder.bind(confirmQueue()).to(exchange2()).with("confirm.message");
    }
}

4、定义生产者

package com.example.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.sql.Timestamp;
import java.time.LocalDateTime;

@Component
@Slf4j
public class ConfirmProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 如果消息没有到exchange,则confirm回调,ack=false
     * 如果消息到达exchange,则confirm回调,ack=true
     * exchange到queue成功,则不回调return
     * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
     */
    private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
        if (!ack) {
            log.error("消息发送失败:correlationData: {},cause: {}", correlationData, cause);
        } else {
            log.info("消息发送成功:correlationData: {},ack: {}", correlationData, ack);
        }
    };

    private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routeKey) ->
            log.error("消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}", exchange, routeKey, replyCode, replyText);

    /**
     * 发送消息
     *
     * @param message 消息内容
     */
    public void send(String message) {
        // 构建回调返回的数据
        CorrelationData correlationData = new CorrelationData();
        Timestamp time = Timestamp.valueOf(LocalDateTime.now());
        correlationData.setId(time + "");
        Message message1 = MessageBuilder.withBody(message.toString().getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                // 将CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,然后人工处理
                .setCorrelationId(correlationData.getId())
                .build();
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend("exchange-2", "confirm.message", message1, correlationData);
    }
}

5、定义消费者

package com.example.demo.config;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class ConfirmConsumer {

    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "confirm.message",durable = "true"),
                    exchange = @Exchange(value = "exchange-2",type = "topic"),
                    key = "confirm.message"))
    public void receive(String message, Message message1, Channel channel) throws IOException {
        log.info("消费者收到消息:{}", message);
        long deliverTag = message1.getMessageProperties().getDeliveryTag();
        //第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型
        //为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。
        channel.basicAck(deliverTag, false);
    }
}

6、创建controller调用

package com.example.demo.controller;

import com.example.demo.config.ConfirmProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ConfirmController {

    @Resource
    private ConfirmProducer confirmProducer;

    @GetMapping("/confirm-message")
    public void confirmMessage() {
        confirmProducer.send("hello confirm message");
    }
}

7、启动类

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitDemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(RabbitDemoApplication.class, args);
	}

}

8、测试

http://localhost:9090/confirm-message
2022-07-05 18:20:43.043  INFO 4492 --- [nectionFactory1] com.example.demo.config.ConfirmProducer  : 消息发送成功:correlationData: CorrelationData [id=2022-07-05 18:20:43.025],ack: true
2022-07-05 18:20:43.046  INFO 4492 --- [ntContainer#0-5] com.example.demo.config.ConfirmConsumer  : 消费者收到消息:hello confirm message

到了这里,关于RabbitMQ手动签收消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 中的 RabbitMQ 的消息接收配置是什么,原理,如何使用

    RabbitMQ 是一个流行的消息队列系统,它可以用于在应用程序之间传递消息。Spring Boot 提供了对 RabbitMQ 的支持,我们可以使用 Spring Boot 中的 RabbitMQ 消息接收配置来接收 RabbitMQ 中的消息。本文将介绍 RabbitMQ 的消息接收配置的原理和如何在 Spring Boot 中使用它。 在 RabbitMQ 中,消

    2024年02月13日
    浏览(47)
  • 手动清除RabbitMQ队列的消息缓存

           在个人的springboot项目中使用到了rabbitmq作为消息队列中间件,但是在项目的调试过程中出现了某些错误,导致rabbitmq的生产者端循环产生了多条消息给消费者,而消费者又因为该错误无法及时将消息处理掉,所以在消费端积压了多条消息;在修复了错误重启项目后

    2024年02月11日
    浏览(39)
  • 手动清理RabbitMq队列中的消息

    一、手动删除队列中指定个数的消息 打开RabbitMq管理页面,进入队列。 点击 Get messages Requeue 改成No Mesaages 设置一个值 点击Get messages 二、一次清理队列中的所有消息 打开RabbitMq管理页面,进入队列。 点击Purge 点击按钮Purge Messages

    2024年02月11日
    浏览(34)
  • springBoot-rabbitMq手动确认消息

    代码基础怎么写我就不说了,看我的另一篇博客 springBoot整合RabbitMQ(Demo)_我要用代码向我喜欢的女孩表白的博客-CSDN博客 假设你要手动ack,怎么做呢? 通常自动是,mq发给服务端,服务端收到了就当处理过了,但是我们要保证数据不丢失。所以得处理完了,才告诉mq说我做

    2024年02月11日
    浏览(33)
  • SpringBoot集成RabbitMq,RabbitMq消费与生产,消费失败重发机制,发送签收确认机制

           这里spring-boot依赖版本为2.3.7版本,RabbitMq集成amqp包,版本在spring-boot中有涵盖,不单独指明版本了。 Exchange 交换机配置 队列queue配置 将队列和交换机绑定, 并设置用于匹配键 配置加载 RabbitTemplate 生产者 消费者 消息确认签收配置        消息确认签收机制不过多

    2024年01月21日
    浏览(43)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(70)
  • [Spring boot] Spring boot 整合RabbitMQ实现通过RabbitMQ进行项目的连接

     🍳作者:天海奈奈 💭眼过千遍不如手锤一遍:推荐一款模拟面试,斩获大厂 o f f e r ,程序员的必备刷题平台 − − 牛客网  👉🏻点击开始刷题之旅 目录 什么是RabbitMQ   消息队列:接受并转发消息,类似于快递公司 消息队列的优点 消息队列的特性 RabbitMQ特点 RabbitMQ核

    2024年01月24日
    浏览(81)
  • 【Spring Boot】Spring Boot集成RabbitMQ

    Spring Boot提供了`spring-boot-starter-amqp`组件,只需要简单地配置即可与Spring Boot无缝集成。下面通过示例演示集成RabbitMQ实现消息的接收和发送。 步骤01 配置pom包。 创建Spring Boot项目并在pom.xml文件中添加spring-bootstarter-amqp等相关组件依赖: 在上面的示例中,引入Spring Boot自带的

    2024年02月06日
    浏览(49)
  • Spring Clould 消息队列 - RabbitMQ

      视频地址:微服务(SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式)  同步和异步通讯 微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个

    2024年02月12日
    浏览(40)
  • Spring Boot 整合RabbitMQ

    第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Spring Cloud Netflix 之 Hystrix 第九章 代码管理gitlab 使用 第十章 Spr

    2024年02月05日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包