创建延时队列、springboot配置多个rabbitmq

这篇具有很好参考价值的文章主要介绍了创建延时队列、springboot配置多个rabbitmq。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

创建延时队列

queue.file_delay_destroy
x-dead-letter-exchange:	exchange.file_delay_destroy
x-message-ttl:	259200000
259200000为3天,1000为1秒

创建延时队列、springboot配置多个rabbitmq,MQ,java,rabbitmq

创建普通队列

queue.file_destroy

创建普通交换机

exchange.file_delay_destroy

type选择fanout
创建延时队列、springboot配置多个rabbitmq,MQ,java,rabbitmq

交换机绑定普通队列

(图中已经绑定,红框为绑定过程)
创建延时队列、springboot配置多个rabbitmq,MQ,java,rabbitmq

普通队列绑定交换机

(图中已经绑定,红框为绑定过程)
创建延时队列、springboot配置多个rabbitmq,MQ,java,rabbitmq

延时队列

springboot配置多个rabbitmq

延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@EqualsAndHashCode
@Data
public class MQMessage implements Serializable {
  private JSONObject msg;
  private String messageId;   //存储消息发送的唯一标识
}
import com.sxqx.entity.MQMessage;

public interface MQMessageSender {
  /**
   *
   * @param queue 消息队列名称
   * @param msg 消息
   */
  void send(String queue, MQMessage msg);
}

RabbitConfig配置类

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class RabbitConfig {

  @Primary
  @Bean(name="mq1ConnectionFactory")
  public ConnectionFactory mq1ConnectionFactory(
          @Value("${spring.rabbitmq.mq1.host}") String host,
          @Value("${spring.rabbitmq.mq1.port}") int port,
          @Value("${spring.rabbitmq.mq1.username}") String username,
          @Value("${spring.rabbitmq.mq1.password}") String password,
          @Value("${spring.rabbitmq.mq1.virtual-host}") String virtualHost
  ){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
  }

  @Bean(name="mq2ConnectionFactory")
  public ConnectionFactory mq2ConnectionFactory(
          @Value("${spring.rabbitmq.mq2.host}") String host,
          @Value("${spring.rabbitmq.mq2.port}") int port,
          @Value("${spring.rabbitmq.mq2.username}") String username,
          @Value("${spring.rabbitmq.mq2.password}") String password,
          @Value("${spring.rabbitmq.mq2.virtual-host}") String virtualHost
  ){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
  }

  @Primary
  @Bean(name="mq1RabbitTemplate")
  public RabbitTemplate mq1RabbitTemplate(
          @Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory
  ){
    RabbitTemplate mq1RabbitTemplate = new RabbitTemplate(connectionFactory);
    mq1RabbitTemplate.setMessageConverter(jsonMessageConverter());
    return mq1RabbitTemplate;
  }

  @Bean(name="mq2RabbitTemplate")
  public RabbitTemplate mq2RabbitTemplate(
          @Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory
  ){
    RabbitTemplate mq2RabbitTemplate = new RabbitTemplate(connectionFactory);
    mq2RabbitTemplate.setMessageConverter(jsonMessageConverter());
    return mq2RabbitTemplate;
  }

  @Bean
  public Jackson2JsonMessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }


  @Bean(name = "mq1Factory")
  @Primary
  public SimpleRabbitListenerContainerFactory mq1Factory(
          SimpleRabbitListenerContainerFactoryConfigurer configurer,
          @Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory
  ) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
  }

  @Bean(name = "mq2Factory")
  public SimpleRabbitListenerContainerFactory mq2Factory(
          SimpleRabbitListenerContainerFactoryConfigurer configurer,
          @Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory
  ) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
  }
}

mq1

@Component
public class RabbitMQ1MessageSender  implements MQMessageSender, RabbitTemplate.ConfirmCallback{
  @Resource(name = "mq1RabbitTemplate")
  private RabbitTemplate mq1RabbitTemplate;
  Log log = LogFactory.getLog(RabbitMQ1MessageSender.class);
  @Autowired
  public RabbitMQ1MessageSender() {
  }

  @PostConstruct
  public void init(){
    mq1RabbitTemplate.setConfirmCallback(this);
  }

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if(!ack){
      log.error("消息接收失败" + cause);
      // 我们这里要做一些消息补发的措施
      System.out.println("id="+correlationData.getId());
    }
  }

  @Override
  public void send(String routingKey, MQMessage msg) {
    String jsonString = JsonConverter.bean2Json(msg);
    if (jsonString != null) {
      mq1RabbitTemplate.convertAndSend(routingKey, jsonString);
    }
  }
}

mq2

import com.sxqx.entity.MQMessage;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Component
public class RabbitMQ2MessageSender implements MQMessageSender, RabbitTemplate.ConfirmCallback{
  @Resource(name = "mq2RabbitTemplate")
  private RabbitTemplate mq2RabbitTemplate;
  Log log = LogFactory.getLog(RabbitMQ2MessageSender.class);
  @Autowired
  public RabbitMQ2MessageSender() {
  }
  
  @PostConstruct
  public void init(){
    mq2RabbitTemplate.setConfirmCallback(this);
  }

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if(!ack){
      log.error("消息接收失败" + cause);
      // 我们这里要做一些消息补发的措施
      System.out.println("id="+correlationData.getId());
    }
  }

  @Override
  public void send(String routingKey, MQMessage msg) {
    String jsonString = JsonConverter.bean2Json(msg);
    if (jsonString != null) {
      mq2RabbitTemplate.convertAndSend(routingKey, jsonString);
    }
  }
}

application-prod.yaml

spring:
  rabbitmq:
    mq1:
      username: guest
      password: guest
      host: mq1_ip
      port: 5672
      virtual-host: /
      publisher-returns: true
      publisher-confirm-type: simple
      listener:
        simple:
          acknowledge-mode: auto # 手动应答
          prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发
          retry:
            max-attempts: 5 # 重试次数
            enabled: true # 开启重试
          concurrency: 5
          max-concurrency: 10
    mq2:
      username: guest
      password: guest
      host: mq2_ip
      port: 5672
      virtual-host: /
      publisher-returns: true
      publisher-confirm-type: simple
      listener:
        simple:
          acknowledge-mode: auto # 手动应答
          prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发
          retry:
            max-attempts: 5 # 重试次数
            enabled: true # 开启重试
          concurrency: 5
          max-concurrency: 10

mq1消费端,发消息给mq2

@Component
public class SyncWeatherLivePicMessageReceiver  implements IMessageReceiver {
    private final IWeatherLivePicMapper weatherLivePicMapper;
    private final RabbitMQ2MessageSender rabbitMQ2MessageSender;

    @Autowired
    public SyncWeatherLivePicMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper,
                                             RabbitMQ2MessageSender rabbitMQ2MessageSender) {
        this.weatherLivePicMapper = weatherLivePicMapper;
        this.rabbitMQ2MessageSender = rabbitMQ2MessageSender;
    }

    Log log = LogFactory.getLog(SyncWeatherLivePicMessageReceiver.class);
    private FtpHelper ftpHelperIns1;
    private FtpHelper ftpHelperIns2;
    private FTPFileFilter ftpFileFilter;
    @RabbitListener(queuesToDeclare = {
            @Queue(name = "sxqxgzbgxw_weather_live_pic")
    })
    @RabbitHandler
    @Override
    public void onMessageReceived(String mqMessageString) {
        JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);
        JsonNode msg = jsonNode.findValue("msg");
        JsonNode JsonNodeParams = msg.findValue("params");
        Map<String, Object> params = JsonConverter.jsonNode2HashMap(JsonNodeParams);
        if (params.size() > 0) {
            String times = params.get("times").toString();
            String serverFrom = params.get("serverFrom").toString();
			......
            
			// 清除数据
			MQMessage mqMessage = new MQMessage();
			JSONObject jsonObject = new JSONObject();
			jsonObject.put("filePath", "/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times);
			mqMessage.setMsg(jsonObject);
			rabbitMQ2MessageSender.send("queue.file_delay_destroy", mqMessage);

			// 清除DB记录
			MQMessage mqMessage2 = new MQMessage();
			JSONObject jsonObject2 = new JSONObject();
			jsonObject2.put("tableName", "WEATHER_LIVE_PIC");
			jsonObject2.put("picUrl", "http://外网ip/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times +"/"+ type + "/" + "gjzjqyz/"+ fileNameFrom);
			mqMessage2.setMsg(jsonObject);
			rabbitMQ2MessageSender.send("queue.db_delay_destroy", mqMessage2);                
        }
    }
}

mq2消费端用于递归删除文件

import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.utils.dataConverter.JsonConverter;
import com.sxqx.utils.file.FileHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.File;

@Component
public class FileDestroyMessageReceiver implements IMessageReceiver {
  Log log = LogFactory.getLog(FileDestroyMessageReceiver.class);
  @RabbitListener(queuesToDeclare = {
      @Queue(name = "queue.file_destroy")
  })
  @RabbitHandler
  @Override
  public void onMessageReceived(String mqMessageString) {
    JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);
    JsonNode msg = jsonNode.findValue("msg");
    String filePath = msg.findValue("filePath").asText();

    if (filePath.contains("/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000")) {
      File file = new File(filePath);
      if (file.exists()) {
        FileHelper.deleteFile(file);
      }
    } else {
      log.info("有人想删除设定之外的文件");
      log.info(filePath);
    }
  }
}

FileHelper工具类递归删除文件或文件夹

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;

public class FileHelper {
    /**
     * 递归删除文件或文件夹
     * @param directory
     */
    public static void deleteFile(File directory) {
        if (!directory.exists()) {
            return;
        }

        File[] files = directory.listFiles();
        if (files!=null) {//如果包含文件进行删除操作
            for (File value : files) {
                if (value.isFile()) {
                    //删除子文件
                    value.delete();
                } else if (value.isDirectory()) {
                    //通过递归的方法找到子目录的文件
                    deleteFile(value);
                }
                value.delete();//删除子目录
            }
        }
        directory.delete();
    }
}

mq2消费端用于删除数据库数据

import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.mapper.remote.xugugzb.weatherlivepic.IWeatherLivePicMapper;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Objects;

@Component
public class DBDestroyMessageReceiver implements IMessageReceiver {
  private final IWeatherLivePicMapper weatherLivePicMapper;
  @Autowired
  public DBDestroyMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper){
    this.weatherLivePicMapper = weatherLivePicMapper;
  }
  Log log = LogFactory.getLog(DBDestroyMessageReceiver.class);
  @RabbitListener(queuesToDeclare = {
      @Queue(name = "queue.db_destroy")
  })
  @RabbitHandler
  @Override
  public void onMessageReceived(String mqMessageString) {
    JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);
    JsonNode msg = jsonNode.findValue("msg");
    String tableName = msg.findValue("tableName").asText();
    String picUrl = msg.findValue("picUrl").asText();

    if (picUrl.contains("/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/")) {
      if (Objects.equals("WEATHER_LIVE_PIC",tableName)) {
        weatherLivePicMapper.deleteWeatherLivePic(picUrl);
      }
    } else {
      log.info("有人想删除设定之外的数据");
      log.info(picUrl);
    }
  }
}

nginx.conf

上传静态资源至linux path,配置nginx.conf,使浏览器可以直接访问静态资源文章来源地址https://www.toymoban.com/news/detail-666823.html

server {
	listen       80;
	server_name  60.204.202.112;
	add_header   Cache-Control no-store;
	charset	     utf-8;

	location / {
		root   /mnt/sxqxgxw-gzb-front/dist/;
	try_files $uri $uri/ /index.html;
		index  index.html index.htm;
	}
	#访问/dataSharingStatic时,相当于访问/data/static/dataSharingStatic路径下资源
	location /dataSharingStatic {
		root   /data/static;
		autoindex   on;
		add_header  Access-Control-Allow-Origin *;
		add_header  Access-Control-Allow-Headers X-Requestd-With;
		add_header  Access-Control-Allow-Methods GET,POST,OPTIONS;
	}
	
	error_page   500 502 503 504  /50x.html;
	location = /50x.html {
		root   html;
	}
	
	location /api/ {
		proxy_pass    http://60.204.202.112:8896;
		rewrite	  ^/api/(.*)$ /$1 break;
		add_header    Access-Control-Allow-Origin *;
		add_header    Access-Control-Allow-Headers X-Requestd-With;
		add_header	  Access-Control-Allow-Methods GET,POST,OPTIONS;
	}
   
}

到了这里,关于创建延时队列、springboot配置多个rabbitmq的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MQ-消息队列-RabbitMQ

    MQ(Message Queue) 消息队列 ,是基础数据结构中“ 先进先出 ”的一种 数据结构 。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由

    2024年02月09日
    浏览(38)
  • 【mq】RabbitMq批量删除队列

    ​由于部分公司同事使用RabbitMq时,没有将Client设置为autodelete,导致大量冗余队列。其中这些队列又是无routekey队列,收到了批量的订阅消息,占用服务器内存。 ​如何将这些无用的队列删除成为一个问题?经过多次摸索,在rabbitmq management api里面找到了方案:

    2024年01月25日
    浏览(32)
  • RabbitMQ --- 惰性队列、MQ集群

    当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。 解决消息堆积有三种思路: 增加更多消费者,提高消费速度。也就是我们之前说的work

    2024年02月03日
    浏览(39)
  • RabbitMQ如何实现延时队列

    RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列。 1. 延时队列的概念 延

    2024年02月16日
    浏览(30)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(49)
  • 【技术分享】四、RabbitMQ “延时队列”

    延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。 ex: 定时任务:十分钟后执行某种操作。 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

    2024年02月08日
    浏览(47)
  • .NET中使用RabbitMQ延时队列和死信队列

    延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。 延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果

    2024年02月15日
    浏览(33)
  • MQ消息队列(主要介绍RabbitMQ)

    消息队列概念:是在消息的传输过程中保存消息的容器。 作用:异步处理、应用解耦、流量控制..... RabbitMQ:     SpringBoot继承RabbitMQ步骤:         1.加入依赖          2.配置         3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)         4.创建队列、

    2024年02月11日
    浏览(38)
  • Golang RabbitMQ实现的延时队列

    之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。 延迟队列是一种特殊类型的消息队列 ,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景

    2024年02月10日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包