SpringBoot+RabbitMq实现数据批量接收,批量操作

这篇具有很好参考价值的文章主要介绍了SpringBoot+RabbitMq实现数据批量接收,批量操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

例如:在某些场景下,会在极短时间产生大量数据,这时候单条数据入库就不太适合,我们可以堆积到一定数量进行批量入库,刚好呢,RabbitMQ提供了这个堆积的过程,我们就只需要实现批量入库操作即可,因此在此记录一下。


一、官方网站

官方文档地址
rabbitmq批量读取,java,java-rabbitmq,rabbitmq,spring boot

二、使用步骤

注意:SpringBoot版本必须是2.2.0以上,我是直接用的最新的
rabbitmq批量读取,java,java-rabbitmq,rabbitmq,spring boot
rabbitmq批量读取,java,java-rabbitmq,rabbitmq,spring boot

1.引入RabbitMQ的依赖

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.4.6</version>
</dependency>

2.RabbitConfig

package com.example.demomq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Administrator
 **/
@Configuration
public class RabbitMQConfig {

    public static final String TEST_EXCHANGE = "test_exchange";
    public static final String TEST_ROUTING_KEY = "test_routing_key";
    public static final String TEST_QUEUE = "test_queue";

    @Bean
    Queue testQueue() {
        return new Queue(TEST_QUEUE, true, false, false);
    }

    @Bean
    DirectExchange testExchange() {
        return new DirectExchange(TEST_EXCHANGE, true, false);
    }

    @Bean
    Binding testBinding() {
        return BindingBuilder.bind(testQueue()).to(testExchange()).with(TEST_ROUTING_KEY);
    }

    @Bean
    SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory());
        factory.setBatchListener(true);
        //每次接两条
        factory.setBatchSize(2);
        //十秒内没有数据再入队列,也执行
        factory.setReceiveTimeout(1000L * 10);
        factory.setConsumerBatchEnabled(true);
        return factory;
    }

    @Bean
    CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }
}

3.消息生产者

package com.example.demomq.pojo;

import lombok.Data;
import java.util.Date;
/**
 * @author Administrator
 **/
@Data
public class MessagePO {
    /**
     * 消息内容
     */
    private String content;
    /**
     * 消息的时间
     */
    private Date messageTime;
}


package com.example.demomq.production;

import cn.hutool.json.JSONUtil;
import com.example.demomq.config.RabbitMQConfig;
import com.example.demomq.pojo.MessagePO;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**
 * @author Administrator
 **/
@Component
public class Production implements ApplicationRunner {

    private static final int NUM  = 9;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        for (int i = 0; i < NUM; i++) {
            //睡眠一秒
            Thread.sleep(1000);

            MessagePO message = new MessagePO();
            message.setMessageTime(new Date());
            message.setContent("我是消息内容:" + i);

            rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_EXCHANGE, RabbitMQConfig.TEST_ROUTING_KEY, JSONUtil.toJsonStr(message));
        }
    }
}

4.消费者

package com.example.demomq.consumer;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONConfig;
import cn.hutool.json.JSONUtil;
import com.example.demomq.config.RabbitMQConfig;
import com.example.demomq.pojo.MessagePO;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

/**
 * @author Administrator
 **/
@Component
public class ConsumerMessage {

    @RabbitListener(queues = RabbitMQConfig.TEST_QUEUE, containerFactory = "consumerBatchContainerFactory")
    public void listenerMessage(List<String> messagePOList) {
        List<MessagePO> messagePOS = messagePOList.stream().map(item -> JSONUtil.toBean(item, MessagePO.class)).collect(Collectors.toList());
        JSONConfig jsonConfig = JSONConfig.create().setDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(JSONUtil.parseArray(messagePOS, jsonConfig));
        System.out.println("当前时间:" + DateUtil.now());
        System.out.println("=================================================================================");
        
        
        //TODO批量入库操作
    }
}

5.运行截图

rabbitmq批量读取,java,java-rabbitmq,rabbitmq,spring boot


总结

以上就是我理解的RabbitMQ批量接收和运用,在此记录。文章来源地址https://www.toymoban.com/news/detail-611045.html

到了这里,关于SpringBoot+RabbitMq实现数据批量接收,批量操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 本地模拟发送、接收RabbitMQ数据

    日常开发中,当线上RabbitMQ坏境还没准备好时,可在本地模拟发送、接收消息 Docker安装RabbitMQ 【SpringCloud】整合RabbitMQ六大模式应用(入门到精通) Spring RabbitMQ 配置多个虚拟主机(vhost)

    2024年02月21日
    浏览(30)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(32)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(28)
  • 【RabbitMQ】| Lion带你 (超详细) 从0到1使用SpringBoot操作RabbitMQ

    Spring Boot操作RabbitMQ是一种非常有用的技术,它可以提高应用程序的性能、可靠性和可伸缩性,为开发人员提供更好的开发和维护体验。下面是它的一些优点: 提高应用程序的可靠性和稳定性。RabbitMQ作为一种基于消息传递的中间件,可以将消息传递给应用程序,从而避免了高

    2024年02月01日
    浏览(25)
  • Java操作RabbitMq并整合SpringBoot

    秋风阁-北溪入江流 RabbitMq自带有专门的管理界面,可以在其管理界面对RabbitMq进行管理查看等操作。 RabbitMq的管理界面的对外端口为 15672 ,当我们启动RabbitMq后,需要启动管理界面插件后才能访问界面。 通过参数配置连接RabbitMq 通过amqp协议连接RabbitMq queueDeclarePassive: 创建或

    2024年02月16日
    浏览(50)
  • 黑马点评用rabbitmq实现优惠券秒杀下单后的异步操作数据库数据

    通过@Bean注入MessageConverter,保证消息的正确传输 修改后的代码 监听器

    2024年04月15日
    浏览(34)
  • RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

    我们先看一串代码,并思考一下为什么要先入库然后发MQ: 如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。 ① 消息从生产者发送到Broker 生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接

    2024年02月11日
    浏览(39)
  • SpringBoot整合实现RabbitMQ

    本文大纲 一.RabbitMQ介绍 二.RabbitMQ的工作原理 2.1 RabbitMQ的基本结构 2.2 组成部分说明 2.3 生产者发送消息流程 2.4 消费者接收消息流程 三.SpringBoot 整合实现RabbitMQ 3.1创建mq-rabbitmq-producer(生产者)发送消息 3.1.1pom.xml中添加相关的依赖 3.1.2 配置application.yml 3.1.3 配置RabbitMQ常量类

    2024年02月17日
    浏览(36)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(43)
  • RabbitMQ——高级特性(SpringBoot实现)

    本篇文章的内容与我之前如下这篇文章一样,只是使用技术不同,本篇文章使用SpringBoot实现RabbitMQ的高级特性! RabbitMQ——高级特性_小曹爱编程!的博客-CSDN博客 RabbitMQ——高级特性:1、RabbitMQ高级特性;2、RabbitMQ应用问题;3、RabbitMQ集群搭建 https://blog.csdn.net/weixin_62993347/

    2023年04月21日
    浏览(20)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包