Springboot使用kafka事务-生产者方

这篇具有很好参考价值的文章主要介绍了Springboot使用kafka事务-生产者方。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在上一篇文章中,我们使用了springboot的AOP功能实现了kafka的分布式事务,但是那样实现的kafka事务是不完美的,因为请求进来之后分配的是不同线程,但不同线程使用的kafka事务却是同一个,这样会造成多请求情况下的事务失效。

而解决这个问题的方法,就是每个线程都使用一个新的事务生产者去发送一条新的事务消息,然后这个事务还要和当前线程进行绑定,实现不同线程之间的事务隔离。

通常来说,这个繁杂的过程虽然我们可以实现,但是始终没有框架研发者做的那么完美,所以,我们首先要去看一下框架的作者有没有实现这个功能。

幸运地是,上述功能在kafka之中是有实现的,而且首次实现的时间是在2017年,所以我们可以直接使用作者提供的基于springboot的事务管理功能。

注入kafka事务

在springboot中启用kafka的事务,有两种方式,第一种方式为使用springboot提供的自动配置,第二种是自己往容器中注入。

方式一:springboot自动注入

想要使用自动注入,我们只需要在配置文件中加入transaction-id-prefix即可,配置文件如下:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      #bootstrap-servers: localhost:9010
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      transaction-id-prefix: test

这样配置之后,就开启了kafka的事务。

方式一弊端

这样虽然可以直接使用springboot自动装配功能,但是却有下面两个弊端

  • 只能使用一个kafka的集群地址
  • 全局开启了事务,有的方法并不需要全局开启事务
    所以一旦有多个kafka的地址需要配置,或者只想让部分方法使用事务,那么就可以使用第二种方法来解决,那就是自己往容器里面添加kafka的事务管理器。

方式二:向spring容器中添加自定义kafka事务管理器

在kafka事务管理器中,有三个重要的对象,分别是ProducerFactory、KafkaTemplate、KafkaTransactionManager,他们的作用如下:

  • ProducerFactory,用来创建kafka的生产者对象
  • KafkaTemplate,springboot封装的kafka模版
  • KafkaTransactionManager,kafka的事务管理器
    想要往spring容器中添加自定义的kafka事务管理器,其实就是添加一个自定义的KafkaTransactionManager对象,那么我们只需要想办法构造一个KafkaTransactionManager就好。

利用springboot的配置类,我们能很轻松的做到这一点。
第一步,构造一个配置类KafkaAndDataTransactionConfig,加上@Configuration注解。

@Configuration
public class KafkaAndDataTransactionConfig {
}

第二步,构建一个ProducerFactory对象的Bean,交给spring容器。

	@Resource
    NacosDiscoveryProperties nacosDiscoveryProperties;
	/**
     * 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同
     * @param props yaml文件中的定义属性
     */
    @Bean
    ProducerFactory<String, String> pf1(KafkaProperties props) {
        Map<String, Object> pProps = props.buildProducerProperties();
        pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        return new DefaultKafkaProducerFactory<>(pProps);
    }

注意其中的nacosDiscoveryProperties变量,这是用来获取实例在nacos中的ip地址,因为在多实例的情况下需要保证每一个事务id的唯一,才不会被kafka的事务管理器识别为失效事务生产者,从而导致事务冲突失效。
第三步,创建一个KafkaTransactionManager对象的Bean,添加到spring容器。

	/**
     * 注入一个kafka事务管理器,这个事务管理器使用事务id
     * @param pf1
     * @return
     */
    @Bean
    KafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTransactionManager<>(pf1);
    }

只需要将创建好的生产者bean,作为构造参数传入即可。
通过以上三步,我们就得到了一个支持事务的kafka事务管理器了,不过,此时我们还少创建了一个KafkaTemplate,没有这个对象我们将完不成事务发送的管控。

第四步,创建KafkaTemplate

	/**
     * 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务
     * @param pf1
     * @return
     */
    @Bean
    KafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTemplate<>(pf1);
    }

经过以上代码,我们就得到了一个完整的kafka事务管理器了。
全部代码如下:

@Configuration
public class KafkaAndDataTransactionConfig {
	@Resource
    NacosDiscoveryProperties nacosDiscoveryProperties;
	/**
     * 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同
     * @param props yaml文件中的定义属性
     */
    @Bean
    ProducerFactory<String, String> pf1(KafkaProperties props) {
        Map<String, Object> pProps = props.buildProducerProperties();
        pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());
        return new DefaultKafkaProducerFactory<>(pProps);
	/**
     * 注入一个kafka事务管理器,这个事务管理器使用事务id
     * @param pf1
     * @return
     */
    @Bean
    KafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTransactionManager<>(pf1);
    }
    /**
     * 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务
     * @param pf1
     * @return
     */
    @Bean
    KafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {
        return new KafkaTemplate<>(pf1);
    }

}

增加DataSourceTransaction事务管理器

默认情况,DataSourceTransaction事务管理器springboot会帮我们自动配置,但是在使用了kafka的事务之后,会存在一个类的加载冲突,导致DataSourceTransaction没有被springboot自动加载到,所以我们还需要自己将DataSourceTransaction事务管理加入进来。
在上面的代码中,再加入以下代码

	//构造器注入DataSource和transactionManagerCustomizers
	private final DataSource dataSource;
    private final TransactionManagerCustomizers transactionManagerCustomizers;
	KafkaAndDataTransactionConfig(DataSource dataSource,
                      ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
        this.dataSource = dataSource;
        this.transactionManagerCustomizers = transactionManagerCustomizers.getIfAvailable();
    }
	/**
     * @Bean 去掉了ConditionalOnMissingBean 避免注入了kafka事务管理器后,springboot不再注入DataSourceTransactionManager
     * @Primary  作为主事务管理器,这样在使用@Transactional时,就会使用DataSourceTransactionManager
     * @param properties
     * @return
     */
    @Bean
    @Primary
    public DataSourceTransactionManager dstm(DataSourceProperties properties) {
        DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(this.dataSource);
        if (this.transactionManagerCustomizers != null) {
            this.transactionManagerCustomizers.customize(transactionManager);
        }
        return transactionManager;
    }

增加ChainedKafkaTransactionManager管理器

在实际开发中,有时候一个方法需要既支持kafka的事务,又需要支持JDBC的事务,这个时候为了兼容两者的事务,我们需要将两者的事务放到同一个事务管理器中,让他们两个构成一个事务。kafka的作者为我们提供了ChainedKafkaTransactionManager这个对象,来支持这个操作,只需要加入以下代码即可

	 //多个事务管理器构成一个事务,使用ChainedKafkaTransactionManager管理,是因为可以自动偏移kafka事务给消费者
	@Bean 
    public ChainedKafkaTransactionManager kafkaAndDataSourceTransactionManager(DataSourceTransactionManager transactionManager,
                                                                        @Autowired @Qualifier("kafkaTransactionManagerWithTxId") KafkaTransactionManager<?, ?> kafkaTransactionManager){
        return new ChainedKafkaTransactionManager<>(transactionManager, kafkaTransactionManager);
    }

以上,就是kafka集成springboot的方案,接下来,看看怎么使用

使用

基于以上的配置,一共有三种使用方式

  • 只使用kafka事务
  • 只使用JDBC事务
  • 同时使用kafka和JDBC事务

针对于上面的三种情况的切换,其实就是使用不同Transactional注解中的value值切换不同的事务管理器,事务的指定都在service层的实现类中。

只使用kafka事务

	//指定事务模版为自定义模版
    @Resource(name = "kafkaTemplateWithTxId")
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")
    public void transation() {
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");
        kafkaTemplate.send(stringStringProducerRecord);
    }

只使用JDBC事务

不需要指定任何的事务管理器

	@Override
    @Transactional(rollbackFor = Exception.class)
    public void transationOfJdbc() {
        xxxService.update(user);
    }

同时使用kafka和JDBC事务

指定自定义的事务管理器

	//指定事务模版为自定义模版
    @Resource(name = "kafkaTemplateWithTxId")
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")
    public void transationAll() {
	
        xxxService.update(user);
        spreadMonitorService.sendMsg();
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");
        kafkaTemplate.send(stringStringProducerRecord);
    }

结语

以上,就是在springboot中生产端实现事务的方法,总结一下,一共分为以下几步

  • 增加kafka事务管理器
  • 增加JDBC事务管理器
  • 增加事务链事务管理器
  • 使用三种事务管理器

下一篇,将写springboot中消费端如何配置。


引用资料:
kafka官网
kafka的github
spring-kafka官网文章来源地址https://www.toymoban.com/news/detail-677700.html

到了这里,关于Springboot使用kafka事务-生产者方的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(43)
  • Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

    #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生产者 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.ap

    2024年04月09日
    浏览(45)
  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(53)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(45)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(51)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(62)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(47)
  • 「Kafka」生产者篇

    在消息发送的过程中,涉及到了 两个线程 —— main 线程 和 Sender 线程 。 在 main 线程中创建了 一个 双端队列 RecordAccumulator 。 main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象,调用 send 函数发送消息,

    2024年01月19日
    浏览(41)
  • (三)Kafka 生产者

    创建一个 ProducerRecord 对象,需要包含目标主题和要发送的内容,还可以指定键、分区、时间戳或标头。 在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基

    2024年02月09日
    浏览(39)
  • Kafka 生产者

    目录 一、kafka生产者原理 二、kafka异步发送 配置kafka 创建对象,发送数据 带回调函数的异步发送 同步发送   三、kafka生产者分区 分区策略 指定分区:  指定key: 什么都不指定: 自定义分区器 四、生产者提高吞吐量 五、数据的可靠性 ACK应答级别 数据完全可靠条件 可靠性

    2023年04月15日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包