Java整合RabbitMQ实现生产消费(7种通讯方式)

这篇具有很好参考价值的文章主要介绍了Java整合RabbitMQ实现生产消费(7种通讯方式)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境说明

  • RabbitMQ环境,参考RabbitMQ环境搭建
  • Java版本:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

工程搭建

  1. 创建maven项目
  2. pom.xml文件引入RabbitMQ依赖
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>compile</scope>
        </dependency>

    </dependencies>

连接RabbitMQ

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQConnections {

    public static final String RABBITMQ_HOST = "127.0.0.1";
    public static final int RABBITMQ_PORT = 5672;
    public static final String RABBITMQ_USERNAME = "guest";
    public static final String RABBITMQ_PASSWORD = "guest";
    public static final String RABBITMQ_VIRTUAL_HOST = "/";

    /**
     * 构建RabbitMQ连接对象
     *
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置Rabbitmq连接信息
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
        //3.返回连接对象
        return factory.newConnection();
    }

}

通讯模式

1.简单通讯

即一个生产者可以向一个队列发送消息,一个消费者可以尝试从一个队列接收数据。如下图:
Java整合RabbitMQ实现生产消费(7种通讯方式)

    public final static String HELLO_QUEUE_NAME = "hello";
    @Test
    public void publish_hello() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null);
        //4.发布消息
        String msg = "hello,world";
        channel.basicPublish("", HELLO_QUEUE_NAME, null, msg.getBytes());
    }
	@Test
    public void consume_hello() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received '" + message + "'");
        };
        channel.basicConsume(HELLO_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

2.工作队列通讯

与简单通讯一样,当消费能力不足或想要提高吞吐时可添加多个消费者进行处理业务。如下图,队列中的消息会逐条被C1和C2消费。
Java整合RabbitMQ实现生产消费(7种通讯方式)

	public final static String WORK_QUEUE_NAME = "work";

    @Test
    public void publish_work_queue() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        //4.发布消息
        String msg = "hello,work queue";
        channel.basicPublish("", WORK_QUEUE_NAME, null, msg.getBytes());
    }
        @Test
    public void consume_work_queue1() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume1 Received '" + message + "'");
        };
        channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

    @Test
    public void consume_work_queue2() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume2 Received '" + message + "'");
        };
        channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

3.发布/订阅通讯

工作队列背后的假设是,每个任务只交付给一个消费者做同一件事。如果要交付给多个消费者做不同的事,需要引入交换机实现一个完整的消息传递模型,这种模式被称为“发布/订阅”。如下图,消息会发布到交换机中,交换机向绑定的队列同时发送消息,最终C1和C2会同时消费此条消息。
Java整合RabbitMQ实现生产消费(7种通讯方式)

public final static String PUB_EXCHANGE_NAME = "pub-ex";
    public final static String PUB1_QUEUE_NAME = "pub-que1";
    public final static String PUB2_QUEUE_NAME = "pub-que2";

    @Test
    public void publish_pub_sub() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建交换机
        channel.exchangeDeclare(PUB_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //4.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null);
        channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null);
        //5.绑定队列
        channel.queueBind(PUB1_QUEUE_NAME, PUB_EXCHANGE_NAME, "");
        channel.queueBind(PUB2_QUEUE_NAME, PUB_EXCHANGE_NAME, "");
        //6.发布消息
        String msg = "hello,pub/sub";
        channel.basicPublish(PUB_EXCHANGE_NAME, "", null, msg.getBytes());
    }
    @Test
    public void consume_pub_sub1() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("pub_sub1 Received '" + message + "'");
        };
        channel.basicConsume(PUB1_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

    @Test
    public void consume_pub_sub2() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("pub_sub2 Received '" + message + "'");
        };
        channel.basicConsume(PUB2_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

4.路由通讯

发布/订阅模式是交换机将一条消息同时路由给多个队列,“路由”模式可以将消息通过交换机指定到某个队列中从而被消费。如下图,交换机将所有类型的日志路由到一个队列中,将error类型的日志路由到另一个队列中。
Java整合RabbitMQ实现生产消费(7种通讯方式)

    public final static String ROUT_EXCHANGE_NAME = "rout-ex";
    public final static String ROUTALL_QUEUE_NAME = "rout-queall";
    public final static String ROUTONE_QUEUE_NAME = "rout-queone";

    @Test
    public void publish_routing() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建交换机
        channel.exchangeDeclare(ROUT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //4.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null);
        channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null);
        //5.绑定队列
        channel.queueBind(ROUTALL_QUEUE_NAME, ROUT_EXCHANGE_NAME, "all");
        channel.queueBind(ROUTONE_QUEUE_NAME, ROUT_EXCHANGE_NAME, "one");
        //6.发布消息
        String msg1 = "hello,1-all";
        String msg2 = "hello,2-all";
        String msg3 = "hello,1-one";
        channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg1.getBytes());
        channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg2.getBytes());
        channel.basicPublish(ROUT_EXCHANGE_NAME, "one", null, msg3.getBytes());
    }
    @Test
    public void consume_routing_all() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_routing_all Received '" + message + "'");
        };
        channel.basicConsume(ROUTALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

    @Test
    public void consume_routing_one() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_routing_one Received '" + message + "'");
        };
        channel.basicConsume(ROUTONE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

5.主题通讯

“路由”模式仍然有局限性——它不能基于多个标准进行路由。主题可以带来很大的灵活性,发送到主题交换的消息不能有任意的routing_key,它必须是一个用点分隔的单词列表,routing_key有两种重要的特殊情况:

  • *只能代替一个词。
  • #可以替换零个或多个单词。
    Java整合RabbitMQ实现生产消费(7种通讯方式)
    public final static String TOPIC_EXCHANGE_NAME = "topic-ex";
    public final static String TOPICALL_QUEUE_NAME = "topic-queall";
    public final static String TOPICONE_QUEUE_NAME = "topic-queone";

    @Test
    public void publish_topic() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建交换机
        channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //4.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null);
        channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null);
        //5.绑定队列
        channel.queueBind(TOPICALL_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "*.all.*");
        channel.queueBind(TOPICONE_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "#.one");
        //6.发布消息
        String msg1 = "hello.all.world";
        String msg2 = "hello.world.one";
        channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.all.world", null, msg1.getBytes());
        channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.world.one", null, msg2.getBytes());
    }
        @Test
    public void consume_topic_all() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_topic_all Received '" + message + "'");
        };
        channel.basicConsume(TOPICALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

    @Test
    public void consume_topic_one() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_topic_one Received '" + message + "'");
        };
        channel.basicConsume(TOPICONE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

6.RPC通讯

RabbitMQ作为消息中间件可以达到应用解耦效果,如果想达到RPC远程调用同步返回结果,RabbitMQ同样支持,其原理如下:

  • 发布者发送消息时指定一个回调队列和唯一id
  • 消费者处理完成后将结果发送到回调队列中
  • 发布者按照唯一id接收消息并处理

如下图
Java整合RabbitMQ实现生产消费(7种通讯方式)

    public final static String RPC_QUEUE_NAME = "rpc-que";
    public final static String RPCCALLBACK_QUEUE_NAME = "rpc-callback-que";

    @Test
    public void publish_rpc() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        // replyTo回调队列
        channel.queueDeclare(RPCCALLBACK_QUEUE_NAME, false, false, false, null);
        //4.发布消息
        String msg = "hello rpc";
        String correlationId = UUID.randomUUID().toString();
        /*AMQP 协议预先定义了一组与消息一起使用的14个属性。除了以下属性外,大多数属性很少使用:
        deliveryMode:将消息标记为持久(值为2)或瞬时(任何其他值)。
        contentType:用于描述编码的mime类型。例如,对于常用的JSON编码,最好将此属性设置为:application/JSON。
        replyTo:通常用于命名回调队列。
        correlationId:用于将RPC响应与请求关联。*/
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(RPCCALLBACK_QUEUE_NAME).correlationId(correlationId).build();
        //5.回调响应结果
        channel.basicPublish("", RPC_QUEUE_NAME, basicProperties, msg.getBytes());
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String recid = delivery.getProperties().getCorrelationId();
            if (correlationId.equalsIgnoreCase(recid)) System.out.println("rpc-callback-que   '" + message + "'");
        };
        channel.basicConsume(RPCCALLBACK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }
        @Test
    public void consume_rpc() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_rpc Received '" + message + "'");
            String correlationId = delivery.getProperties().getCorrelationId();
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
            String replyTo = delivery.getProperties().getReplyTo();
            String callbackmsg = "rpc callback";
            channel.basicPublish("", replyTo, basicProperties, callbackmsg.getBytes());
        };
        channel.basicConsume(RPC_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

7.Publisher确认通讯

Publisher确认是RabbitMQ扩展以实现可靠发布。当在通道上启用发布者确认时,代理将异步确认客户端发布的消息,这意味着它们已在服务器端得到处理。

    public final static String CONFIRM_EXCHANGE_NAME = "confirm-ex";
    public final static String CONFIRM_QUEUE_NAME = "confirm-que";

    @Test
    public void publish_confirm() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.开启确认选项
        channel.confirmSelect();
        //4.构建交换机
        channel.exchangeDeclare(CONFIRM_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //5.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null);
        //6.绑定队列
        String right_routing_key = "confirm";
        String error_routing_key = "confirm_err";
        channel.queueBind(CONFIRM_QUEUE_NAME, CONFIRM_EXCHANGE_NAME, right_routing_key);
        //7.消息到达交换机确认监听
        channel.addConfirmListener((sequenceNumber, multiple) -> {
            System.out.println("消息成功发送到交换机");
        }, (sequenceNumber, multiple) -> {
            System.err.println("消息未发送到交换机,补偿操作。");
        });
        //8.消息到达队列确认监听
        channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> {
            System.err.format("消息 %s 未路由到指定队列: %s, replyText: %s,replyCode: %d%n", body, routingKey, replyText, replyCode);
        });
        //设置消息持久化
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
        //7.发布消息
        String msg = "hello confirm";
        channel.basicPublish(CONFIRM_EXCHANGE_NAME, error_routing_key,true, basicProperties, msg.getBytes());
        System.in.read();
    }

	@Test
    public void consume_ack() throws IOException, TimeoutException {
        //1.获取连接对象
        Connection connection = MQConnections.getConnection();
        //2.构建Channl
        Channel channel = connection.createChannel();
        //3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
        channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null);
        //4.监听消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_routing_one Received '" + message + "'");
            //消息处理后手动ACK
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // ack为false
        channel.basicConsume(CONFIRM_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

代码仓库

https://gitee.com/codeWBG/learn_rabbitmq文章来源地址https://www.toymoban.com/news/detail-442930.html

到了这里,关于Java整合RabbitMQ实现生产消费(7种通讯方式)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot整合Kafka简单配置实现生产消费

    *本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考 spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html 搭建Kafka环境,参考Kafka集群环境搭建及使用 Java环境:JDK1.8 Maven版本:apach

    2024年02月16日
    浏览(31)
  • Java编写简易rabbitmq生产者与消费者

    开发时经常与其它系统用rabbitmq对接,当需要自测时,还是自己写rabbitmq生产者、消费者自测方便些。 下面总结下不用框架、使用java编写简易rabbitmq的方法。 (1)如果是maven,那就用 (2)如果没用maven,那就找一个 amqp-client-3.3.4.jar 文件,然后引入项目 说明: (1)其中的配置,按照

    2024年01月16日
    浏览(36)
  • 【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)

    这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。 目录 一、防止消息丢失 1.1、消息确认机制(生产者) (1)生产者丢失消息 (2)生产者消息确认机制 1.2、消息确认机制(消费者) (1)消费者丢失消息

    2024年02月02日
    浏览(44)
  • c++实现RabbitMQ简单的生产者和消费者

    基本思想:利用c++实现RabbitMQ简单的生产者和消费者 CMakeList.txt   producer.cpp 队列中存储5条生产者消息  consumer.cpp

    2024年02月16日
    浏览(30)
  • springboot整合rabbitmq 实现消息发送和消费

    Spring Boot提供了RabbitMQ的自动化配置,使得整合RabbitMQ变得非常容易。 首先,需要在pom.xml文件中引入amqp-client和spring-boot-starter-amqp依赖: 接下来需要在application.properties文件中配置RabbitMQ连接信息: 然后编写消息发送者: 其中,my-exchange和my-routing-key是需要自己定义的交换机和

    2024年02月07日
    浏览(31)
  • 优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

    前言 SpringBoot 集成 RabbitMQ 公司老大觉得使用注解太繁琐了,而且不能动态生成队列所以让我研究是否可以动态绑定,所以就有了这个事情。打工人就是命苦没办法,硬着头皮直接就上了,接下来进入主题吧。 需求思路分析 根据老大的需求,大致分为使用配置文件进行配置,

    2024年02月16日
    浏览(28)
  • 【JAVA】生产环境kafka重复消费问题记录

    业务系统每周都有定时任务在跑,由于是大任务因此采用分而治之思想将其拆分为多个分片小任务采用 kafka异步队列消费 的形式来减少服务器压力,每个小任务都会调用后台的c++算法,调用完成之后便会回写数据库的成功次数。今天观测到定时任务的分片小任务存在被重复消

    2024年04月12日
    浏览(31)
  • SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

    1、simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端

    2024年02月02日
    浏览(37)
  • 【Kafka】Java实现数据的生产和消费

    Kafka 是由 LinkedIn 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的 基于发布订阅模式的消息引擎系统 。 Broker:消息中间件处理节点,一个Kafka节点就是一个Broker,一个或者多个Broker可以组成一个Kafka集群; T

    2023年04月19日
    浏览(39)
  • SpringBoot集成RabbitMq,RabbitMq消费与生产,消费失败重发机制,发送签收确认机制

           这里spring-boot依赖版本为2.3.7版本,RabbitMq集成amqp包,版本在spring-boot中有涵盖,不单独指明版本了。 Exchange 交换机配置 队列queue配置 将队列和交换机绑定, 并设置用于匹配键 配置加载 RabbitTemplate 生产者 消费者 消息确认签收配置        消息确认签收机制不过多

    2024年01月21日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包