关于Kafka事务处理的详细讲解

这篇具有很好参考价值的文章主要介绍了关于Kafka事务处理的详细讲解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka事务

producer可能给多个topic,多个partition发送消息,这些消息组成一个事务,这些消息需要对consumer同时可见或者同时不可见。Kafka事务需要在producer端处理,consumer端不需要做特殊处理,跟普通消息消费一样

1.事务流程

关于Kafka事务处理的详细讲解,kafka,linq,数据库

 整个流程步骤:

  事务初始化: InitTransactions,事务初始化是一次性的而事务开启、提交/回滚则一致循环运行

  开启事务: beginTransaction

  发送消息,向n个topic发送多条:producer.send

  提交事务: commitTransaction

  回滚事务:abortTransaction

 2、事务配置

Producer

Consumer

transactional.id

事务ID,类型为String字符串,默认为空,客户端自定义,例如"order_bus"

isolation.level

事务隔离级别,默认为空,开启事务的话,需要将其设置为"read_committed"

enable.idempotence

消息幂等开关,true/false,默认为false,当配置了transactional.id,此项一定要设置为true,否则会抛出客户端配置异常

transaction.timeout.ms

事务超时时间,默认为10秒,最长为15分钟

enable.idempotence设置为true时,kafka会检查如下一些级联配置

配置项

内容要求

说明

acks

要求此配置项必须设置为all

响应必须要设置为all,也就是leader存储消息,并且所有follower也存储了消息后再返回,保证消息的可靠性

retries

> 0

因为幂等特性保证了数据不会重复,在需要强可靠性的前提下,需要用户设置的重试次数 > 0

max.in.flight.requests.per.connection

<= 5

此项配置是表明在producer还未收到broker应答的最大消息批次数量。该值设置的越大,标识可允许的吞吐越高,同时也越容易造成消息乱序

3、事务初始化

参与方:Producer、Broker

事务初始化由producer端触发,执行事务初始化主要做以下两个操作:

a)定位TransactionCoordinator

b)初始化producerId

事务初始化代码:

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

3.1 定位TransactionCoordinator(获取tc)

通过producer发送Coordinator请求(请求中包含自定义transactionid)到broker,broker获取到transactionid做hashcode,hashcode对kafka内部topic _transaction_state默认分区(默认50)取模,获取分区对应的broker作为transactionCoordinator

TransactionCoordinator可以理解为分布式事务中二(三)阶段提交的事务协调者。Kafka事务中TransactionCoordinator本质也是一个broker,只是这个broker起到针对当前事务的协调作用,所有的事务操作都需要直接发送给这个指定的broker

 3.2初始化ProducerId(Producer向tc获取)

Producer获取到TransactionCoordinator后,便需要向TransactionCoordinator发送请求获取producerId以及Epoch。可以认为producerId+Epoch是对事务型producer唯一标识。

 Producer在事务(初始化阶段)启动之前向TransactionCoordinator申请producerId,TransactionCoordinator服务在分配producerId后会将producerId和Epoch持久到事务topic _transaction_state中,这样就算producer宕机重启后也能处理未完成的事务

后续producer向broker发送请求也需要携带producerid和Epoch,这两个参数含义如下

参数

类型

含义

ProducerId

Long

从0开始,对应Producer端配置的TransactionId,他们存在映射关系,可以通过TransactionId来查询ProducerId;映射关系存储在kafka内部topic __transaction_state

Epoch

Short

从0开始,Producer每次重启,此项值都会+1;当超过short最大值后,ProducerId+1

 初始化ProducerId后,transactionCoordinator向_transaction_state中写入一条key-value数据(该数据持久化在broker端)此数据key是transactionid,此时事务状态为empty,同时transactionCoordinator向producer返回producerId和Epoch。transactionCoordinator向_transaction_state存储的数据格式如下:

Key

Value

TransactionId

producerId

8

从0开始,依次递增

epoch

2

从0开始,依次递增

transactionTimeoutMs

4

事务超时时间,默认10秒,最大15分钟

transactionStatus

1

事务状态(

0-Empty 事务刚开始时init是这个状态

1-Ongoing

2-PrepareCommit

3-PrepareAbort

4-CompleteCommit

5-CompleteAbort

6-Dead

7-PrepareEpochFence

topicTotalNum

4

当前事务关联的所有topic总和

topicNameLen

2

topic长度

topicName

X

topic内容

partitionNum

4

partition的个数

partitionIds

X

例如有n个partition,X = n * 4,每个partition占用4 byte

transactionLastUpdateTimestampMs

8

最近一次事务操作的更新时间戳

transactionStartTimestampMs

8

事务启动的时间戳

4、事务启动

参与方:Producer

producer.beginTransaction();

5、事务消息发送 

 参与方:Producer、 broker

producer.send()

5.1 消息发送-Producer

Producer在接收到producerId后就可以正常发送消息,不过在发送消息前,需要将这些消息的分区地址上传到transactionCoordinator。transactionCoordinator会将这些分区地址持久化到事务topic _transaction_state(持久化这些消息分区作用是为了后边6.2节中事务提交阶段知道该事务涉及到的所有分区,为每个分区生成提交请求,存到队列里等待发送).;

之后Producer便可以向对应分区发送消息

5.2 消息发送-TransactionCoordinator

消息发送阶段,TransactionCoordinator主要记录当前事务消息所在分区信息,即更新_transaction_state状态

5.3 消息发送 - broker

 消息发送时,broker工作是维护LSO(log stable offset),一个分区可能存储了多个事务消息,也有可能存储多个非事务普通消息,而LSO为第一个正在进行中的事务消息的offset

如下图:

关于Kafka事务处理的详细讲解,kafka,linq,数据库

 6、事务提交

参与方:Producer、Broker

producer.commitTransaction()

6.1事务提交-Producer

事务提交阶段,producer只是触发提交动作,并携带以下所需参数:

transactionId:事务Id,即客户自定义字符串

producerId:由transaction Coordinator生成,递增

epoch:transaction Coordinator生成,递增

committed:true

6.2 事务提交 - transactionCoordinator

producer提交事务——>TransactionCoordinator收到后,请求将状态修改为PrepareCommit——>TransactionCoordinator向Producer响应——>TransactionCoordinator向各个broker发送control marker消息,broker收到后将消息存储下来,用来比较当前事务已经成功提交——>待各个broker存储control marker后,coordinator将事务状态改为commit,事务结束

7、事务取消

参与方:producer、transactionCoordinator

producer.abortTransaction()

7.1 事务取消-producer

事务取消阶段,producer只是触发取消动作,并携带以下所需参数:

transactionId:事务Id,即客户自定义字符串

producerId:由transaction Coordinator生成,递增

epoch:transaction Coordinator生成,递增

committed:false

7.2 事务取消-TransactionCoordinator

事务取消除了由Producer触发外,还有可能由Coordinator触发,例如“事务超时”,Coordinator有个定时器,定时扫描那些已经超时的事务文章来源地址https://www.toymoban.com/news/detail-786384.html

8、示例代码

8.1 producer

package com.example.demo.transaction;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class TransactionProducer {
    private static Properties getProps(){
        Properties props =  new Properties();
        props.put("bootstrap.servers", "47.52.199.53:9092");
        props.put("retries", 2); // 重试次数
        props.put("batch.size", 100); // 批量发送大小
        props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
        props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送
        props.put("client.id", "producer-syn-2"); // 发送端id,便于统计
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("transactional.id","producer-1"); // 每台机器唯一
        props.put("enable.idempotence",true); // 设置幂等性
        return props;
    }
    public static void main(String[] args) {
          KafkaProducer<String, String> producer = new KafkaProducer<>(getProps());
          // 初始化事务
          producer.initTransactions();try {
                    Thread.sleep(2000);
                    // 开启事务
                    producer.beginTransaction();
                    // 发送消息到producer-syn
                    producer.send(new ProducerRecord<String, String>("producer-syn","test3"));
                    // 发送消息到producer-asyn
                    Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>("producer-asyn","test4"));
                    // 提交事务
                    producer.commitTransaction();
                }catch (Exception e){
                    e.printStackTrace();
                    // 终止事务
                    producer.abortTransaction();
                }
    }
}

8.2 producer+consumer

package com.example.demo.transaction;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class TransactionProducer {
    private static Properties getProps(){
        Properties props =  new Properties();
        props.put("bootstrap.servers", "47.52.199.53:9092");
        props.put("retries", 2); // 重试次数
        props.put("batch.size", 100); // 批量发送大小
        props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
        props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送
        props.put("client.id", "producer-syn-2"); // 发送端id,便于统计
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("transactional.id","producer-1"); // 每台机器唯一
        props.put("enable.idempotence",true); // 设置幂等性
        return props;
    }
    public static void main(String[] args) {
          KafkaProducer<String, String> producer = new KafkaProducer<>(getProps());
          // 初始化事务
          producer.initTransactions();try {
                    Thread.sleep(2000);
                    // 开启事务
                    producer.beginTransaction();
                    // 发送消息到producer-syn
                    producer.send(new ProducerRecord<String, String>("producer-syn","test3"));
                    // 发送消息到producer-asyn
                    Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>("producer-asyn","test4"));
                    // 提交事务
                    producer.commitTransaction();
                }catch (Exception e){
                    e.printStackTrace();
                    // 终止事务
                    producer.abortTransaction();
                }
    }
}

到了这里,关于关于Kafka事务处理的详细讲解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka复习:(23)事务

    一、生产者,开启事务。 二、消费者,设置隔离级别为\\\"read_committed\\\" 三、运行结果,按照上述配置,当生产者发送消息并从kafka broker获取到offset后就会sleep,在生产者sleep的时候,消费者是获取不到消息的,只有sleep完成并提交事务之后,消费者才会获取到消息

    2024年02月10日
    浏览(24)
  • kafka-事务

    事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。 幂等性引入了Porducer ID 事务属性引入了Transaction Id属性。 使用场景 enable.idempotence = true,transactional.id不设置:只支持幂等性。 enable.i

    2024年02月14日
    浏览(19)
  • Kafka事务机制:原理和实践

    Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用程序。它不仅以高吞吐量、可扩展性和容错能力著称,还提供了事务支持,以确保数据的完整性和一致性。在这篇博客中,我们将深入探讨 Kafka 的事务机制,了解其原理,并通过一个实际的例子来说明

    2024年02月03日
    浏览(36)
  • Kafka消息传递保障——事务与幂等

    消息传递保障对于分布式系统的可靠性至关重要。在分布式系统中消息传递保障是确保系统可靠性的核心问题之一。系统需要确保消息能够按照预期的方式进行传递,以满足业务需求。 Kafka是一种分布式的消息队列系统,作为消息中间件常用于实现基于发布/订阅模型的消息传

    2024年02月12日
    浏览(37)
  • SpringBoot使用kafka事务-消费者方

    在上一篇文章中,写到了如何在springboot中生产者如何使用kafka的事务,详情链接:Springboot使用kafka事务-生产者方 那么,这一篇就接着上篇所写的内容,讲解一下再springboot中消费者如何使用kafka的事务。 在springboot中kafka的消费者配置也和生产者一样,有两种配置的方式: 第一

    2024年02月08日
    浏览(41)
  • Springboot使用kafka事务-生产者方

    在上一篇文章中,我们使用了springboot的AOP功能实现了kafka的分布式事务,但是那样实现的kafka事务是不完美的,因为请求进来之后分配的是不同线程,但不同线程使用的kafka事务却是同一个,这样会造成多请求情况下的事务失效。 而解决这个问题的方法,就是每个线程都使用

    2024年02月11日
    浏览(43)
  • kafka生产者幂等与事务

    目录 前言: 幂等 事务 总结:  参考资料  Kafka 消息交付可靠性保障以及精确处理一次语义的实现。 所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种: 最多一次(at most once):消息可能会丢失,但绝不会被

    2024年02月12日
    浏览(36)
  • kafka-- kafka集群 架构模型职责分派讲解

    生产者将消息发送到相应的Topic,而消费者通过从Topic拉取消息来消费 Kafka奇数个节点 消费者consumer会将消息拉去过来 生产者producer会将消息发送出去 数据管理 放在zookeeper

    2024年02月12日
    浏览(29)
  • Kafka系列之:一次性传送和事务消息传递

    本文档概述了加强 Kafka 消息传递语义的提案。 Kafka 目前至少提供一次语义,即。当进行可靠性调整时,可以保证用户每条消息写入都将至少保留一次,而不会丢失数据。由于生产者重试,流中可能会出现重复。例如,代理可能会在提交消息和向生产者发送确认之间崩溃,导

    2024年02月16日
    浏览(46)
  • kafka中幂等性producer和事务性producer

    在Kafka中,“幂等性生产者”的概念是指一种特性,它确保消息在生产者的发送操作被重试时仅发送一次。幂等性是一种重要的特性,因为在分布式系统中,网络问题或其他故障可能导致生产者发送的消息在传输过程中失败,从而需要重新发送。如果生产者没有幂等性保证,

    2024年02月14日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包