RabbitMQ可靠性消息发送(java实现)

这篇具有很好参考价值的文章主要介绍了RabbitMQ可靠性消息发送(java实现)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客

一、整体架构

step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性;

step2:Producer发送消息到MQ Broker;

step3:Producer收到 broker 返回的确认消息;

step4:更改消息记录库的状态(定义三种状态:0待确认、1已确认、2确认失败);

step5:定时任务获取长时间处于待确认状态的消息;

step6:Producer重试发送消息;

step7:重试次数超过3次,将消息状态更新为确认失败,后续根据具体业务再处理确认失败的消息;

RabbitMQ可靠性消息发送(java实现)

二、消息记录的增删改查

1. 当前项目名为 rabbit-core-producer,为了实现消息记录入库,需要跟数据库打交道,这里首先添加依赖:

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
	    <dependency>
	      <groupId>org.mybatis.spring.boot</groupId>
	      <artifactId>mybatis-spring-boot-starter</artifactId>
	      <version>1.1.1</version>
	    </dependency>
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>druid</artifactId>
		    <version>1.1.10</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency> 

 2. 消息记录的建表语句 rabbit-producer-message-schema.sql

-- 表 broker_message.broker_message 结构
CREATE TABLE IF NOT EXISTS `broker_message` (
  `message_id` varchar(128) NOT NULL,
  `message` varchar(4000),
  `try_count` int(4) DEFAULT 0,
  `status` varchar(10) DEFAULT '',
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

将 rabbit-producer-message-schema.sql 放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message-schema.sql, rabbit-core-producer项目 在 RabbitMQ基础组件封装—整体结构 有具体说明(当前博客是 RabbitMQ基础组件封装—整体结构 的其中一个章节)。

3. 数据源的配置文件 rabbit-producer-message.properties

rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://localhost:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=root
rabbit.producer.druid.jdbc.password=root
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testOnBorrow=false
rabbit.producer.druid.jdbc.testOnReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true

同样需要将该文件放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message.properties。

因为上面配置中有用到数据库 broker_message,所以需要自己提前建好一个数据库 broker_message。

4. BrokerMessage.java

public class BrokerMessage implements Serializable {
	
	private static final long serialVersionUID = 7447792462810110841L;

	private String messageId;

    private Message message;

    private Integer tryCount = 0;

    private String status;

    private Date nextRetry;

    private Date createTime;

    private Date updateTime;

    // getter、setter方法省略
}

5. BrokerMessageMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.didiok.rabbit.producer.mapper.BrokerMessageMapper" >
  <resultMap id="BaseResultMap" type="com.didiok.rabbit.producer.entity.BrokerMessage" >
    <id column="message_id" property="messageId" jdbcType="VARCHAR" />
    <result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler" />
    <result column="try_count" property="tryCount" jdbcType="INTEGER" />
    <result column="status" property="status" jdbcType="VARCHAR" />
    <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  </resultMap>
  <sql id="Base_Column_List" >
    message_id, message, try_count, status, next_retry, create_time, update_time
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
    select 
    <include refid="Base_Column_List" />
    from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
    delete from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </delete>
  <insert id="insert" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    insert into broker_message (message_id, message, try_count, 
      status, next_retry, create_time, 
      update_time)
    values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER}, 
      #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP}, 
      #{updateTime,jdbcType=TIMESTAMP})
  </insert>
  <insert id="insertSelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    insert into broker_message
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        message_id,
      </if>
      <if test="message != null" >
        message,
      </if>
      <if test="tryCount != null" >
        try_count,
      </if>
      <if test="status != null" >
        status,
      </if>
      <if test="nextRetry != null" >
        next_retry,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="updateTime != null" >
        update_time,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        #{messageId,jdbcType=VARCHAR},
      </if>
      <if test="message != null" >
        #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    update broker_message
    <set >
      <if test="message != null" >
        message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        try_count = #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        update_time = #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </set>
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    update broker_message
    set message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      try_count = #{tryCount,jdbcType=INTEGER},
      status = #{status,jdbcType=VARCHAR},
      next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      create_time = #{createTime,jdbcType=TIMESTAMP},
      update_time = #{updateTime,jdbcType=TIMESTAMP}
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  

  <update id="changeBrokerMessageStatus" >
    update broker_message bm
    set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
      	bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}  
  </update>
  
  
  <select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
  	<![CDATA[
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
	    and bm.next_retry < sysdate()
    ]]>
  </select>
  
  <select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
  </select>
  
  
   <update id="update4TryCount" >
    update broker_message bm
    set bm.try_count = bm.try_count + 1,
      bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
   </update>
   
   
</mapper>

6. BrokerMessageMapper.java

@Mapper
public interface BrokerMessageMapper {
	
    int deleteByPrimaryKey(String messageId);
    
    int insert(BrokerMessage record);

    int insertSelective(BrokerMessage record);

    BrokerMessage selectByPrimaryKey(String messageId);

    int updateByPrimaryKeySelective(BrokerMessage record);

    int updateByPrimaryKeyWithBLOBs(BrokerMessage record);

    int updateByPrimaryKey(BrokerMessage record);
	
	void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);

	List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
	
	List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
	
	int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);
	
}

7. MessageStoreService.java(这里不加接口类了,直接在MessageStoreService.java中写具体逻辑实现)

@Service
public class MessageStoreService {

	@Autowired
	private BrokerMessageMapper brokerMessageMapper;
	
	public int insert(BrokerMessage brokerMessage) {
		return this.brokerMessageMapper.insert(brokerMessage);
	}
	
	public BrokerMessage selectByMessageId(String messageId) {
		return this.brokerMessageMapper.selectByPrimaryKey(messageId);
	}

	public void succuess(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_OK.getCode(),
				new Date());
	}
	
	public void failure(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_FAIL.getCode(),
				new Date());
	}
	
	public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
		return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
	}
	
	public int updateTryCount(String brokerMessageId) {
		return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
	}
}

三、整合数据源

1. 读取配置文件,生成数据源,RabbitProducerDataSourceConfiguration.java

@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
	
	private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
	
	@Value("${rabbit.producer.druid.type}")
	private Class<? extends DataSource> dataSourceType;
	
	@Bean(name = "rabbitProducerDataSource")
	@Primary
	// 以这个rabbit.producer.druid.jdbc为前缀的属性值都会注入到DataSource中
	@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
	public DataSource rabbitProducerDataSource() throws SQLException {
		DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
		LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
		return rabbitProducerDataSource;
	}
	
    public DataSourceProperties primaryDataSourceProperties(){
        return new DataSourceProperties();
    }
    
    public DataSource primaryDataSource(){
        return primaryDataSourceProperties().initializeDataSourceBuilder().build();
    }
	
}

2. 执行指定的sql脚本 ,BrokerMessageConfiguration.java

/**
 * 	$BrokerMessageConfiguration 
 * 	帮我执行SQL脚本
 * 	帮我进行数据库表结构的创建
 *
 */
@Configuration
public class BrokerMessageConfiguration {

    @Autowired
    private DataSource rabbitProducerDataSource;

    /**
     * 加载 rabbit-producer-message-schema.sql 脚本(这是一个建表语句)
     */
    @Value("classpath:rabbit-producer-message-schema.sql")
    private Resource schemaScript;
    
    @Bean
    public DataSourceInitializer initDataSourceInitializer() {
    	System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
        final DataSourceInitializer initializer = new DataSourceInitializer();
        // 设置之前生成的数据源
        initializer.setDataSource(rabbitProducerDataSource);
        // 执行指定的sql脚本
        initializer.setDatabasePopulator(databasePopulator());
        return initializer;
    }

    /**
     * 执行指定的sql脚本
     * @return
     */
    private DatabasePopulator databasePopulator() {
        final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(schemaScript);
        return populator;
    }
}

3. 接下来是和 Mybatis 配置相关的文件:RabbitProducerMyBatisConfiguration.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {

	@Resource(name= "rabbitProducerDataSource")
	private DataSource rabbitProducerDataSource;
	
	@Bean(name="rabbitProducerSqlSessionFactory")
	public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
		
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(rabbitProducerDataSource);
		ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
		try {
			// mapper.xml文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
			bean.setMapperLocations(resolver.getResources("classpath:com/didiok/rabbit/producer/mapping/*.xml"));
			SqlSessionFactory sqlSessionFactory = bean.getObject();
			sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
			return sqlSessionFactory;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	@Bean(name="rabbitProducerSqlSessionTemplate")
	public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
	
}

4. Mapper扫描配置相关的文件: RabbitProducerMybatisMapperScanerConfig.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScanerConfig {
	
	@Bean(name="rabbitProducerMapperScannerConfigurer")
    public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
        // mapper.java文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
        MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
        mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
        mapperScannerConfigurer.setBasePackage("com.didiok.rabbit.producer.mapper");
        return mapperScannerConfigurer;
    }

}

四、可靠性发送消息代码实现

/**
 * 	$RabbitBrokerImpl 真正的发送不同类型的消息实现类
 *
 */
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {

	@Autowired
	private RabbitTemplateContainer rabbitTemplateContainer;
	
	@Autowired
	private MessageStoreService messageStoreService;
	
	/**
	 * 可靠性消息发送
	 */
	@Override
	public void reliantSend(Message message) {
		message.setMessageType(MessageType.RELIANT);
		BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
		if(bm == null) {
			//1. 把数据库的消息发送日志先记录好
			Date now = new Date();
			BrokerMessage brokerMessage = new BrokerMessage();
			brokerMessage.setMessageId(message.getMessageId());
			brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
			//tryCount默认等于0 所以在最开始发送的时候不需要进行设置
			brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
			brokerMessage.setCreateTime(now);
			brokerMessage.setUpdateTime(now);
			brokerMessage.setMessage(message);
			messageStoreService.insert(brokerMessage);			
		}
		//2. 执行真正的发送消息逻辑
		sendKernel(message);
	}

	@Override
	public void rapidSend(Message message) {
		// 省略...
	}
	
	/**
	 * 	$sendKernel 发送消息的核心方法 使用异步线程池进行发送消息
	 * @param message
	 */
	private void sendKernel(Message message) {
		AsyncBaseQueue.submit((Runnable) () -> {
			CorrelationData correlationData =
					// 回调函数confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中
					new CorrelationData(String.format("%s#%s#%s",
							message.getMessageId(),
							System.currentTimeMillis(),
							message.getMessageType()));
			String topic = message.getTopic();
			String routingKey = message.getRoutingKey();
			RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
			rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
			log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());			
		});
	}

	@Override
	public void confirmSend(Message message) {
		// 省略...
	}

	@Override
	public void sendMessages() {
		// 省略...
	}

}

并且在回调函数中,也要添加相应的逻辑:

/**
 * 	$RabbitTemplateContainer池化封装
 * 	每一个topic 对应一个RabbitTemplate
 *	1.	提高发送的效率
 * 	2. 	可以根据不同的需求制定化不同的RabbitTemplate, 比如每一个topic 都有自己的routingKey规则
 */
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {

	private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
	
	private Splitter splitter = Splitter.on("#");
	
	private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
	
	@Autowired
	private ConnectionFactory connectionFactory;
	
	@Autowired
	private MessageStoreService messageStoreService;
	
	public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
		// 省略...
	}

	/**
	 * 	无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm
	 */
	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		// 	具体的消息应答
		List<String> strings = splitter.splitToList(correlationData.getId());
		String messageId = strings.get(0);
		long sendTime = Long.parseLong(strings.get(1));
		String messageType = strings.get(2);
		if(ack) {
			//	当Broker 返回ACK成功时, 就是更新一下日志表里对应的消息发送状态为 SEND_OK
			
			// 	如果当前消息类型为reliant 我们就去数据库查找并进行更新
			if(MessageType.RELIANT.endsWith(messageType)) {
				this.messageStoreService.succuess(messageId);
			}
			log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
		} else {
			log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
			
		}
	}
}

上面大部分代码都是在实现迅速类型的消息发送时已经编写了,只是在 confirm()方法中添加了:

            // 	如果当前消息类型为reliant 我们就去数据库查找并进行更新
			if(MessageType.RELIANT.endsWith(messageType)) {
				this.messageStoreService.succuess(messageId);
			}

五、定时任务获取长时间处于待确认状态的消息并重新发送

1. 实现分布式定时任务

这里的定时任务是使用 ElasticJob,并对其进行封装,封装在项目 rabbit-task中,封装成为了两个注解 @EnableElasticJob 和 @ElasticJobConfig 。

具体的 ElasticJob 的使用和封装过程可参考教程:ElasticJob使用与封装

2.  将封装好的项目 rabbit-task 添加到 当前项目中并使用

(1)引入 rabbit-task 的依赖

        <dependency>
    		<groupId>com.bfxy.base.rabbit</groupId>
    		<artifactId>rabbit-task</artifactId> 
    		<version>0.0.1-SNAPSHOT</version>			
  		</dependency>

(2)使用注解@EnableElasticJob

在当前项目 rabbit-core-producer 中的 自动装配类 中添加注解 @EnableElasticJob,使得当 应用程序启动的时候,就能对 ZooKeeper注册中心进行初始化,以及 ElasticJob的定时任务解析类 ElasticJobConfParser 的初始化。

/**
 * 	$RabbitProducerAutoConfiguration 自动装配 
 *
 */
@EnableElasticJob
@Configuration
@ComponentScan({"com.didiok.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {


}

(3)实现定时任务的具体处理逻辑并在类上加注解@EnableElasticJob

这里为了消息的可靠性发送,我们需要抓取 超时却仍处于待确认状态 的消息,进行重新发送消息。这里使用 ElasticJob 的流式定时任务 DataFlowJob。

@Component
@ElasticJobConfig(
		name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
		cron= "0/10 * * * * ?",
		description = "可靠性投递消息补偿任务",
		overwrite = true,
		shardingTotalCount = 1
		)
@Slf4j
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage>{

	@Autowired
	private MessageStoreService messageStoreService;
	
	@Autowired
	private RabbitBroker rabbitBroker;
	
	private static final int MAX_RETRY_COUNT = 3;
	
	@Override
	public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
		// 抓取状态为未确认,而且 next_retry 小于当前时间的这些消息,为了确定百分百能发送成功,需要再进行重发
		List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
		log.info("--------@@@@@ 抓取数据集合, 数量:	{} 	@@@@@@-----------" , list.size());
		return list;
	}

	@Override
	public void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {
		
		dataList.forEach( brokerMessage -> {
			
			String messageId = brokerMessage.getMessageId();
			if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {
				// 重试次数大于3,就不再进行重发了,直接认为发送失败,更改标记为失败
				this.messageStoreService.failure(messageId);
				log.warn(" -----消息设置为最终失败,消息ID: {} -------", messageId);
			} else {
				//	每次重发的时候要更新一下try_count和next_retry字段
				this.messageStoreService.updateTryCount(messageId);
				// 	重发消息
				this.rabbitBroker.reliantSend(brokerMessage.getMessage());
			}
			
		});
	}
}

上面的代码中加入了注解

@ElasticJobConfig(
      name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
      cron= "0/10 * * * * ?",
      description = "可靠性投递消息补偿任务",
      overwrite = true,
      shardingTotalCount = 1
      )

则该类中的逻辑会定时执行。

对于重发消息的代码 this.rabbitBroker.reliantSend(brokerMessage.getMessage());,之前已经做过说明,这里不再赘述。文章来源地址https://www.toymoban.com/news/detail-442204.html

到了这里,关于RabbitMQ可靠性消息发送(java实现)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rabbitmq消息可靠性之消息回调机制

    rabbitmq消息可靠性之消息回调机制 rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制 生产者确认机制 消息持久

    2024年02月08日
    浏览(56)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(49)
  • RabbitMQ高级篇---消息可靠性

    1、消息可靠性: 消息从发送到消费者接受,会经历多个过程,每个消息传递的过程都可能导致消息的丢失: 常见的丢失原因: 发送时消息丢失原因: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 Rab

    2024年01月20日
    浏览(52)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(57)
  • 消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

    官方网址 源码:https://kafka.apache.org/downloads 快速开始:https://kafka.apache.org/documentation/#gettingStarted springcloud整合 发送消息流程 主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。 Map集合:存储了主线程的消息。 Sender线程:真正的

    2024年03月10日
    浏览(45)
  • RabbitMQ 能保证消息可靠性吗

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 前面我们在做MQ组件选型时,提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    浏览(47)
  • RabbitMQ之消息的可靠性传递

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 RabbitMQ之消息的可靠性传递 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在当今的信息化时代,消息传递在企业级应用和分布式

    2024年01月19日
    浏览(44)
  • 如何保证 RabbitMQ 的消息可靠性?

    项目开发中经常会使用消息队列来 完成异步处理、应用解耦、流量控制等功能 。虽然消息队列的出现解决了一些场景下的问题,但是同时也引出了一些问题,其中使用消息队列时如何保证消息的可靠性就是一个常见的问题。 如果在项目中遇到需要保证消息一定被消费的场景

    2024年02月07日
    浏览(49)
  • rabbitmq如何保证消息的可靠性

    RabbitMQ可以通过以下方式来保证消息的可靠性: 在发布消息时,可以设置消息的delivery mode为2,这样消息会被持久化存储在磁盘上,即使RabbitMQ服务器重启,消息也不会丢失。 可以创建持久化的队列,这样即使RabbitMQ服务器重启,队列也不会丢失。 在消费者端,可以 设置手动

    2024年01月23日
    浏览(54)
  • Rabbitmq怎么保证消息的可靠性?

    一、消费端消息可靠性保证 : 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费者在处理过程中发生异常或者未完成

    2024年04月14日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包