Kafka3.0.0版本——生产者 数据去重

这篇具有很好参考价值的文章主要介绍了Kafka3.0.0版本——生产者 数据去重。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、数据传递语义

1.1、至少一次

  • 至少一次(At Least Once )的含义
    生产者发送数据到kafka集群,kafka集群至少接收到一次数据。
  • 至少一次的条件:
    ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

1.2、最多一次

  • 最多一次(At Most Once )的含义
    生产者发送数据到kafka集群,kafka集群最多接收到一次数据。
  • 最多一次的条件:
    ACK级别设置为0

1.3、精确一次

  • 精确一次( Exactly Once )的含义
    对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
  • 精确一次的条件:
    幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2)

二、幂等性

2.1、幂等性原理

  • 幂等性:指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

2.2、重复数据的判断标准

  • 具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的Partition 表示分区号Sequence Number是单调自增的
  • 所以幂等性只能保证的是在单分区单会话内不重复

2.3、如何使用幂等性

  • 开启参数 enable.idempotence 默认为 true,false关闭。
  • 官网描述如下图:
    Kafka3.0.0版本——生产者 数据去重

三、生产者 事务

3.1、Kafka事务原理

Kafka3.0.0版本——生产者 数据去重

3.2、Kafka事务注意事项

  • Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。有了transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务
  • 开启事务,必须开启幂等性 。

3.3、Kafka事务的5个API

3.3.1、初始化事务API
  • 初始化事务
    Kafka3.0.0版本——生产者 数据去重
3.3.2、开启事务API
  • 开启事务
    Kafka3.0.0版本——生产者 数据去重
3.3.3、在事务内提交已经消费的偏移量API
  • 在事务内提交已经消费的偏移量(主要用于消费者)
    Kafka3.0.0版本——生产者 数据去重
3.3.4、提交事务API
  • 提交事务
    Kafka3.0.0版本——生产者 数据去重
3.3.5、放弃事务API
  • 放弃事务(类似于回滚事务的操作)
    Kafka3.0.0版本——生产者 数据去重

3.4、单个 Producer,使用事务保证消息的仅一次发送的示例代码

  • 示例代码

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerTranactions {
    
        public static void main(String[] args) {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            //指定事务id
            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactionalId_01");
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //初始化事务
            kafkaProducer.initTransactions();
            //开启事务
            kafkaProducer.beginTransaction();
    
            try {
                //5、调用 send 方法,发送消息
                for (int i = 0; i < 5; i++) {
                    kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i));
                }
                //提交事务
                kafkaProducer.commitTransaction();
            } catch (Exception e) {
                //终止事务
                kafkaProducer.abortTransaction();
            } finally {
                //6、关闭资源
                kafkaProducer.close();
            }
        }
    }
    
  • 在kafka集群上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.27:9092 --topic news
    
  • 在 IDEA 中执行代码,观察kafka集群控制台中是否接收到消息。
    Kafka3.0.0版本——生产者 数据去重文章来源地址https://www.toymoban.com/news/detail-429689.html

到了这里,关于Kafka3.0.0版本——生产者 数据去重的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 大数据开发之Kafka(概述、快速入门、生产者)

    大数据开发之Kafka(概述、快速入门、生产者)

    Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。 目前企业中比较常见的消息队列产品主要有Kafka、ActiveM

    2024年01月19日
    浏览(8)
  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(9)
  • SparkStreaming学习——读取socket的数据和kafka生产者的消息

    SparkStreaming学习——读取socket的数据和kafka生产者的消息

    目录 一、Spark Streaming概述 二、添加依赖 三、配置log4j 1.依赖下载好后打开IDEA最左侧的外部库 2.找到spark-core 3.找到apache.spark目录 4.找到log4j-defaults.properties文件 5.将该文件放在资源目录下,并修改文件名 6.修改log4j.properties第19行的内容 四、Spark Streaming读取Socket数据流 1.代码编

    2023年04月27日
    浏览(8)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(12)
  • kafka-保证数据不重复-生产者开启幂等性和事务的作用?

    kafka-保证数据不重复-生产者开启幂等性和事务的作用?

    适用于消息在写入到服务器日志后,由于网络故障,生产者没有及时收到服务端的 ACK 消息,生产者误以为消息没有持久化到服务端,导致生产者重复发送该消息,造成了消息的重复现象,而幂等性就是为了解决该问题。 通过3个值的唯一性去重: PID:生产者ID 分区号 seq:单

    2024年02月14日
    浏览(8)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(10)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(10)
  • Kafka 生产者

    Kafka 生产者

    目录 一、kafka生产者原理 二、kafka异步发送 配置kafka 创建对象,发送数据 带回调函数的异步发送 同步发送   三、kafka生产者分区 分区策略 指定分区:  指定key: 什么都不指定: 自定义分区器 四、生产者提高吞吐量 五、数据的可靠性 ACK应答级别 数据完全可靠条件 可靠性

    2023年04月15日
    浏览(6)
  • (三)Kafka 生产者

    (三)Kafka 生产者

    创建一个 ProducerRecord 对象,需要包含目标主题和要发送的内容,还可以指定键、分区、时间戳或标头。 在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基

    2024年02月09日
    浏览(6)
  • kafka学习-生产者

    kafka学习-生产者

    目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 在Kafka中保存的数据都是字节数组。 消息发送前,需要将消息序列化为字节数组进行发送。

    2024年02月09日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包