原来kafka也有事务啊,再也不担心消息不一致了

这篇具有很好参考价值的文章主要介绍了原来kafka也有事务啊,再也不担心消息不一致了。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

现在假定这么一个业务场景,从kafka中的topic获取消息数据,经过一定加工处理后,发送到另外一个topic中,要求整个过程消息不能丢失,也不能重复发送,即实现端到端的Exactly-Once精确一次消息投递。这该如何实现呢?

原来kafka也有事务啊,再也不担心消息不一致了

kafka事务介绍

针对上面的业务场景,kafka已经替我们想到了,在kafka 0.11版本以后,引入了一个重大的特性:幂等性和事务。

幂等性

这里提到幂等性的原因,主要是因为事务的启用必须要先开启幂等性,那么什么是幂等性呢?

幂等性是指生产者无论向kafka broker发送多少次重复的数据,broker 端只会持久化一条,保证数据不会重复。

幂等性通过生产者配置项enable.idempotence=true开启,默认情况下为true。

幂等性实现原理

原来kafka也有事务啊,再也不担心消息不一致了

  1. 每条消息都有一个主键,这个主键由 <PID, Partition, SeqNumber>组成。
  • PIDProducerID,每个生产者启动时,Kafka 都会给它分配一个 IDProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID
  • Partition:消息需要发往的分区号。
  • SeqNumber:生产者,他会记录自己所发送的消息,给他们分配一个自增的 ID,这个 ID 就是 SeqNumber,是该消息的唯一标识,每发送一条消息,序列号加 1。
  1. 对于主键相同的数据,kafka 是不会重复持久化的,它只会接收一条。

幂等性缺点

根据幂等性的原理,我们发现它存在下面的缺点:

  • 只能保证单分区、单会话内的数据不重复
  • kafka 挂掉,重新给生产者分配了 PID,还是有可能产生重复的数据

那么如何实现跨分区、kafka broker重启也能保证不重复呢?这就要使用事务了。

事务

所谓事务,就是要求保证原子性,要么全部成功,要么全部失败。那么具体该如何开启呢?

  1. kafka要想开启事务必须要启用幂等性,即生产者配置enable.idempotence=true
  2. kafka生产者需要配置唯一的事务idtransactional.id, 最好为其设置一个有意义的名字。
  3. kafka消费端也有一个配置项isolation.level和事务有很大关系。
  • read_uncommitted:默认值,消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。
  • read_committed:消费端应用只能消费到提交的事务内的消息。

kafka事务 API

现在我们用java的api来实现一下前面这个“消费-处理-生产“的例子吧。

  1. 引入依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>
  1. 创建事务的生产者
Properties prodcuerProps = new Properties();
// kafka地址
prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化
prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value序列化
prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 启用幂等性
producerProps.put("enable.idempotence", "true");
// 设置事务id
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
  • enable.idempotence配置项目为true
  • 设置transactional.id
  1. 创建事务的消费者
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put("group.id", "my-group-id");
// 设置consumer手动提交
consumerProps.put("enable.auto.commit", "false");
// 设置隔离级别,读取事务已提交的消息
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
//订阅主题
consumer.subscribe(Collections.singletonList("topic1"));
  • enable.auto.commit=false,设置手动提交消费者offset
  • 设置isolation.level=read_committed,消费事务已提交的消息
  1. 核心逻辑
// 初始化事务 
producer.initTransactions();
while(true) {
	// 拉取消息 
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
    if(!records.isEmpty()){
        // 准备一个 hashmap 来记录:"分区-消费位移" 键值对
        HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
        // 开启事务 
        producer.beginTransaction();
        try {
            // 获取本批消息中所有的分区
            Set<TopicPartition> partitions = records.partitions();
            // 遍历每个分区
            for (TopicPartition partition : partitions) {
                // 获取该分区的消息
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                // 遍历每条消息
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    // 执行数据的业务处理逻辑
                    ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase());
                    // 将处理结果写入 kafka
                    producer.send(outRecord);
                }

                // 将处理完的本分区对应的消费位移记录到 hashmap 中
                long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 事务提交的是即将到来的偏移量,这意味着我们需要加 1
                offsetsMap.put(partition,new OffsetAndMetadata(offset+1));
            }
            // 向事务管理器提交消费位移 
            producer.sendOffsetsToTransaction(offsetsMap,"groupid");
            // 提交事务 
            producer.commitTransaction();
        } catch(Exeception e) {
            e.printStackTrace();
            // 终止事务 
            producer.abortTransaction();
        }
    }
}
  • initTransactions(): 初始化事务
  • beginTransaction(): 开启事务
  • sendOffsetsToTransaction(): 在事务内提交已经消费的偏移量(主要用于消费者)
  • commitTransaction(): 提交事务
  • abortTransaction(): 放弃事务

kafka事务实现原理

kafka事务的实现引入了事务协调器,如下图所示:

原来kafka也有事务啊,再也不担心消息不一致了

  1. 生产者使用事务必须配置事务id, kafka根据事务id计算分配事务协调器
  2. 事务协调器返回pid,前面的幂等性中需要
  3. 开始发送消息到topic中,不过这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息
  4. 当生产者事务内的消息发送完毕,会向事务协调器发送 commitabort 请求,等待 kafka 响应
  5. 事务协调器收到请求后先持久化到内置事务主题__transaction_state中,__transaction_state默认有50个分区,每个分区负责一部分事务。事务划分是根据transactional.idhashcode%50,计算出该事务属于哪个分区。 该分区Leader副本所在的broker节点即为这个transactional.id对应的Transaction Coordinator节点,这也是上面第一步中的计算逻辑。
  6. 事务协调器后台会跟topic通信,告诉它们事务是成功还是失败的。
  • 如果是成功,topic会汇报自己已经收到消息,协调者收到主题的回应便确认了事务完成,并持久化这一结果。
  • 如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束。
  1. 持久化第6步中的事务成功或者失败的信息, 如果kafka broker配置max.transaction.timeout.ms之前既不提交也不中止事务, kafka broker将中止事务本身。 此属性的默认值为 15 分钟。

总结

本文讲解了通过kafka事务可以实现端到端的精确一次的消息语义,通过事务机制,KAFKA 实现了对多个 topic 的多个 partition 的原子性的写入,通过一个例子了解了一下如何使用事物。同时也简单介绍了事务实现的原理,它底层必须要依赖kafka的幂等性机制,同时通过类似“二段提交”的方式保证事务的原子性。

欢迎关注个人公众号【JAVA旭阳】交流学习!文章来源地址https://www.toymoban.com/news/detail-472949.html

到了这里,关于原来kafka也有事务啊,再也不担心消息不一致了的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 漏洞扫描工具大全,妈妈再也不用担心我挖不到漏洞了

    Acunetix Web Vulnerability Scanner(简称AWVS)是一款知名的网络漏洞扫描工具,它通过网络爬虫测试你的网站安全,检测流行安全漏洞。 AWVS常见功能: 网站爬行

    2024年02月09日
    浏览(33)
  • 初识Linux(下).妈妈再也不用担心我Linux找不到门了

    “我会定期分享我的学习和工作经验,也欢迎大家留言和交流,让我们共同学习和进步!感谢大家的支持!” 系列文章 初识Linux(上).妈妈再也不用担心我Linux找不到门了。 初识Linux(中).妈妈再也不用担心我Linux找不到门了。 初识Linux(下).妈妈再也不用担心我Linux找不到门了。

    2024年02月05日
    浏览(36)
  • Spring Boot 项目代码混淆,实战来了,再也不用担心代码泄露了!

    简单就是把代码跑一哈,然后我们的代码 .java文件 就被编译成了 .class 文件 就是针对编译生成的 jar/war 包 里面的 .class 文件 逆向还原回来,可以看到你的代码写的啥。 比较常用的反编译工具 JD-GUI ,直接把编译好的jar丢进去,大部分都能反编译看到源码: 那如果不想给别人反

    2023年04月26日
    浏览(32)
  • 多数人都不会用,有了这些视频APP,再也不担心失效!

    阿虚储物间里一大热门下载内容就是影视类APP了 但相信有这类需求的粉丝都知道:这类APP要么你忍受烦人的广告,要么就找去广告版, 但去广告版有个最大的问题就是经!常!失!效! 其实阿虚早就介绍过不少更稳定的影视APP了,只是可能很多粉丝都没注意到 今天阿虚就来

    2024年02月11日
    浏览(205)
  • 有了这些开源 Icon 库,妈妈再也不担心我的 UI 太丑啦!

    Remix Icon 是一套面向设计师和开发者的开源图标库,所有的图标均可免费用于个人项目和商业项目。 与拼凑混搭的图标库不同,Remix Icon 的每一枚图标都是由设计师按照统一规范精心绘制的,在拥有完美像素对齐的基础上,确保每一枚图标风格一致且简洁易读。 图标以 24x24

    2024年02月11日
    浏览(43)
  • 使用ChatGPT+MindShow一键生成PPT,以后再也不用担心制作PPT啦

    💖 作者简介:大家好,我是阿牛,全栈领域优质创作者。😜 📝 个人主页:馆主阿牛🔥 🎉 支持我:点赞👍+收藏⭐️+留言📝 💬格言:迄今所有人生都大写着失败,但不妨碍我继续向前!🔥 我们经常会有制作ppt的需求,尤其大学里面的小组报告,什么班会团课之类的,

    2023年04月23日
    浏览(40)
  • Z-Libary最新地址检测,再也不用担心找不到ZLibary了

    Z-Library。世界上最大的数字图书馆。 如果你知道了一本书的书名,那在Z-Library上基本上都可以找到进行下载, Z-Library 有很多入口,分为官方和民间镜像。官方自己做了个跳转站点,会自动寻找官方可用网站。一般用官方入口即可,但也存在所有官方入口均封闭情况,此时建议

    2024年02月08日
    浏览(28)
  • 解析不同种类的StableDiffusion模型Models,再也不用担心该用什么了

    Stable Diffusion是一个基于Latent Diffusion Models(潜在扩散模型,LDMs)的文图生成(text-to-image)模型。具体来说,Stable Diffusion在 LAION-5B 的一个子集上训练了一个Latent Diffusion Models,该模型专门用于文图生成。Latent Diffusion Models通过在一个潜在表示空间中迭代“去噪”数据来生成图

    2023年04月19日
    浏览(31)
  • 用Python制作抢购脚本,自动抢购飞天茅台,再也不要担心手慢无了

    前段时间老逛刷朋友圈,有个朋友发文说:每天早上 10 点守着,花了七天终于抢到了!!!并配上了一个茅台的图片。 老逛不喝酒也不懂酒,就去问了这哥们啥情况,这哥们说在京东抢了一瓶茅台酒,只花了 1499 元,这瓶酒原价 3000 左右。 我去京东看了看,搜索「茅台」第

    2024年02月05日
    浏览(52)
  • Kafka消息传递保障——事务与幂等

    消息传递保障对于分布式系统的可靠性至关重要。在分布式系统中消息传递保障是确保系统可靠性的核心问题之一。系统需要确保消息能够按照预期的方式进行传递,以满足业务需求。 Kafka是一种分布式的消息队列系统,作为消息中间件常用于实现基于发布/订阅模型的消息传

    2024年02月12日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包