【BUG事务内消息发送】事务内消息发送,事务还未结束,消息发送已被消费,查无数据怎么解决?

这篇具有很好参考价值的文章主要介绍了【BUG事务内消息发送】事务内消息发送,事务还未结束,消息发送已被消费,查无数据怎么解决?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题描述

在一个事务内完成插入操作,通过MQ异步通知其他微服务进行事件处理。
由于是在事务内发送,其他服务消费消息,查询数据时还不存在如何解决呢?


解决方案

通过spring-tx包的TransactionSynchronizationManager事务管理器解决。

public abstract class TransactionSynchronizationManager {

	private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
			new NamedThreadLocal<>("Transaction synchronizations");

	/**
	 * Return if transaction synchronization is active for the current thread.
	 * Can be called before register to avoid unnecessary instance creation.
	 * @see #registerSynchronization
	 */		
	public static boolean isSynchronizationActive() {
		return (synchronizations.get() != null);
	}

	
	/**
	 * Register a new transaction synchronization for the current thread.
	 * Typically called by resource management code.
	 * <p>Note that synchronizations can implement the
	 * {@link org.springframework.core.Ordered} interface.
	 * They will be executed in an order according to their order value (if any).
	 * @param synchronization the synchronization object to register
	 * @throws IllegalStateException if transaction synchronization is not active
	 * @see org.springframework.core.Ordered
	 */
	public static void registerSynchronization(TransactionSynchronization synchronization)
			throws IllegalStateException {

		Assert.notNull(synchronization, "TransactionSynchronization must not be null");
		Set<TransactionSynchronization> synchs = synchronizations.get();
		if (synchs == null) {
			throw new IllegalStateException("Transaction synchronization is not active");
		}
		synchs.add(synchronization);
	}
	
}

Rocketmq方法封装,通过TransactionSynchronizationManager.isSynchronizationActive()判断当前方法的调用是否在事务内。
如果是,则注册一个事务同步适配器,在事务提交后发送消息。
否则直接发送。

    /**
     * 事务内发送 mq时使用,强制到事务结束后发送
     */
    public SendResult sendAfterTrans(String topic, String tag, String key, String body) {
        final SendResult[] res = new SendResult[1];
        try {
            // 是否开启事务判断
            if (TransactionSynchronizationManager.isSynchronizationActive()) {
                log.debug("Mysql事务内Mq消息发送  延迟到事务提交后 waiting……");
                TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                    @Override
                    public void afterCommit() {
                        log.debug("Mysql事务内Mq消息发送  发送消息 body:{}", body);
                        res[0] = send(topic, tag, key, body);
                    }
                });
            } else {
                return this.send(topic, tag, key, body);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return res[0];
    }

【BUG事务内消息发送】事务内消息发送,事务还未结束,消息发送已被消费,查无数据怎么解决?,企业级开发常用实用,BUG,bug,rocketmq,mysql,spring,java文章来源地址https://www.toymoban.com/news/detail-680641.html

到了这里,关于【BUG事务内消息发送】事务内消息发送,事务还未结束,消息发送已被消费,查无数据怎么解决?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot整合rabbitmq 实现消息发送和消费

    Spring Boot提供了RabbitMQ的自动化配置,使得整合RabbitMQ变得非常容易。 首先,需要在pom.xml文件中引入amqp-client和spring-boot-starter-amqp依赖: 接下来需要在application.properties文件中配置RabbitMQ连接信息: 然后编写消息发送者: 其中,my-exchange和my-routing-key是需要自己定义的交换机和

    2024年02月07日
    浏览(42)
  • 记一次rabbitmq消息发送成功,消费丢失问题

    测试数据归档,偶现数据未归档 idea线上调试,log日志,数据库消息发送记录,代码分块重复执行看哪块出的问题,结果均无问题,最后使用rabbitmq trace日志,发现问题所在 1. 什么是Trace Trace 是Rabbitmq用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。可通过插

    2024年02月16日
    浏览(51)
  • kafka入门(一):kafka消息发送与消费

    kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 Consumer Group (消费者组) 每个消费

    2024年04月12日
    浏览(44)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(45)
  • 手拉手安装Kafka2.13发送和消费消息

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。 Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz 解压tar -xzf kafka_2.13-3.7.0.tgz Kafka依赖Zook

    2024年04月29日
    浏览(48)
  • Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

    erlang 和 rabbitmq 版本说明 https://www.rabbitmq.com/which-erlang.html 确认需要安装的mq版本以及对应的erlang版本。 RabbitMQ下载地址: https://packagecloud.io/rabbitmq/rabbitmq-server Erlang下载地址: https://packagecloud.io/rabbitmq/erlang RabbitMQ延迟消息插件下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exc

    2024年02月08日
    浏览(51)
  • RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?

    更多RocketMQ内容,见专栏:https://blog.csdn.net/saintmm/category_11280399.html 消息轨迹简单来说就是日志,其把消息的生产、存储、消费等所有的访问和操作日志。 在项目中存在发送方与消费方相互“扯皮”的情况: 发送方说消息已经发送成功,而消费方说没有消费到。 这时我们就希

    2024年01月17日
    浏览(51)
  • java实现阿里云rocketMQ消息的发送与消费(tcp协议sdk)

    登录阿里云官网,先申请rocketMQ,再申请Topic、Group ID,然后就是参考阿里云的JAVA SDK进行编程实现。 环境要求: 安装JDK 1.8或以上版本 安装Maven 安装Java SDK 参照 阿里云 官方文档,来一步一步操作。 文档提供的SDK有 TCP 和Http协议,这里使用 TCP协议 来实现rocketMQ消息的发送与消

    2024年02月07日
    浏览(43)
  • SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

    1、simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端

    2024年02月02日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包