前言
例如:在某些场景下,会在极短时间产生大量数据,这时候单条数据入库就不太适合,我们可以堆积到一定数量进行批量入库,刚好呢,RabbitMQ提供了这个堆积的过程,我们就只需要实现批量入库操作即可,因此在此记录一下。
一、官方网站
官方文档地址
二、使用步骤
注意:SpringBoot版本必须是2.2.0以上,我是直接用的最新的
1.引入RabbitMQ的依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.6</version>
</dependency>
2.RabbitConfig
package com.example.demomq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Administrator
**/
@Configuration
public class RabbitMQConfig {
public static final String TEST_EXCHANGE = "test_exchange";
public static final String TEST_ROUTING_KEY = "test_routing_key";
public static final String TEST_QUEUE = "test_queue";
@Bean
Queue testQueue() {
return new Queue(TEST_QUEUE, true, false, false);
}
@Bean
DirectExchange testExchange() {
return new DirectExchange(TEST_EXCHANGE, true, false);
}
@Bean
Binding testBinding() {
return BindingBuilder.bind(testQueue()).to(testExchange()).with(TEST_ROUTING_KEY);
}
@Bean
SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory());
factory.setBatchListener(true);
//每次接两条
factory.setBatchSize(2);
//十秒内没有数据再入队列,也执行
factory.setReceiveTimeout(1000L * 10);
factory.setConsumerBatchEnabled(true);
return factory;
}
@Bean
CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
return factory;
}
}
3.消息生产者
package com.example.demomq.pojo;
import lombok.Data;
import java.util.Date;
/**
* @author Administrator
**/
@Data
public class MessagePO {
/**
* 消息内容
*/
private String content;
/**
* 消息的时间
*/
private Date messageTime;
}
package com.example.demomq.production;
import cn.hutool.json.JSONUtil;
import com.example.demomq.config.RabbitMQConfig;
import com.example.demomq.pojo.MessagePO;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
/**
* @author Administrator
**/
@Component
public class Production implements ApplicationRunner {
private static final int NUM = 9;
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
for (int i = 0; i < NUM; i++) {
//睡眠一秒
Thread.sleep(1000);
MessagePO message = new MessagePO();
message.setMessageTime(new Date());
message.setContent("我是消息内容:" + i);
rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_EXCHANGE, RabbitMQConfig.TEST_ROUTING_KEY, JSONUtil.toJsonStr(message));
}
}
}
4.消费者
package com.example.demomq.consumer;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONConfig;
import cn.hutool.json.JSONUtil;
import com.example.demomq.config.RabbitMQConfig;
import com.example.demomq.pojo.MessagePO;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author Administrator
**/
@Component
public class ConsumerMessage {
@RabbitListener(queues = RabbitMQConfig.TEST_QUEUE, containerFactory = "consumerBatchContainerFactory")
public void listenerMessage(List<String> messagePOList) {
List<MessagePO> messagePOS = messagePOList.stream().map(item -> JSONUtil.toBean(item, MessagePO.class)).collect(Collectors.toList());
JSONConfig jsonConfig = JSONConfig.create().setDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(JSONUtil.parseArray(messagePOS, jsonConfig));
System.out.println("当前时间:" + DateUtil.now());
System.out.println("=================================================================================");
//TODO批量入库操作
}
}
5.运行截图
文章来源:https://www.toymoban.com/news/detail-611045.html
总结
以上就是我理解的RabbitMQ批量接收和运用,在此记录。文章来源地址https://www.toymoban.com/news/detail-611045.html
到了这里,关于SpringBoot+RabbitMq实现数据批量接收,批量操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!