三、Kafka生产者

这篇具有很好参考价值的文章主要介绍了三、Kafka生产者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

3.1 生产者消息发送流程

1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker
【RecordAccumulator缓冲的结构:每一个分区对应一个双端队列deque,存放的单元是ProducerBatch,一个Batch中存放了多个Record,那么存消息是自动放到尾端,而读取消息(发送线程读取)是从头部开始的,目的是让发送的消息更加紧凑,节约空间,提高效率。注意这个大的缓冲池,默认是32M,如果超出了会阻塞send()方法,可以设置参数来调节这个大小。】

三、Kafka生产者,Kafka,kafka



3.2 异步发送 API

public class CustomProducer {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

public class CustomProducerCallback {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}



3.3 同步发送数据

只需在异步发送的基础上,再调用一下 get()方法即可。

public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i)).get();
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

3.4 生产者分区

1 kafka分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

2 生产者发送消息的分区策略

  • 指定分区
  • 没有指定分区,但是传入了key值
  • 既没有指定分区,也没有传入key值
    三、Kafka生产者,Kafka,kafka

3 自定义分区器

1)需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区

2)定义类实现 Partitioner 接口,重写 partition()方法。

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取数据 atguigu  hello
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("atguigu")){
            partition = 0;
        }else {
            partition = 1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3)使用分区器的方法,在生产者的配置中添加分区器参数

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 关联自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null) {
                        System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

三、Kafka生产者,Kafka,kafka



3.5 生产者如何提高吞吐量

  • 分批次发送消息
  • 对生产者消息采用压缩

四个重要参数:
三、Kafka生产者,Kafka,kafka

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}


3.6 数据可靠性

1、ACK应答级别

  • 0:生产者发送过来的数据,不需要等数据落盘应答
  • 1:生产者发送过来的数据,Leader收到数据后应答
  • -1 :生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。-1和all等价。
    三、Kafka生产者,Kafka,kafka



ACK=-1时存在的问题?
Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据对可靠性要求比较高的场景,



数据重复性问题
三、Kafka生产者,Kafka,kafka

2、代码演示

// acks
properties.put(ProducerConfig.ACKS_CONFIG,“1”);
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);

public class CustomProducerAcks {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // acks
        properties.put(ProducerConfig.ACKS_CONFIG,"1");

        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}



3.7 数据去重

1、三个概念

保证消息至少被消费一次 :ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

保证消息最多被消费一次: ACK级别设置为0

保证消息只能精确消费一次: (ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2) + 消息消费的幂等性

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。



2 幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭



3 生产者事务

1) kafka的事务原理
这里讲的不好,待会找另外的视频看一下

2)kafka事务的api
Kafka 的事务一共有如下 5 个 API


        // 1 初始化事务
        void initTransactions();
        // 2 开启事务
        void beginTransaction() throws ProducerFencedException;
        // 3 在事务内提交已经消费的偏移量(主要用于消费者)
        void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets,String consumerGroupId) throws ProducerFencedException;
        // 4 提交事务
        void commitTransaction() throws ProducerFencedException;
        // 5 放弃事务(类似于回滚事务的操作)
        void abortTransaction() throws ProducerFencedException;

事务使用的demo

public class CustomProducerTranactions {

    public static void main(String[] args) {
        // 0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");
        // 指定对应的key和value的序列化类型 key.serializer
//      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();

        kafkaProducer.beginTransaction();

        try {
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
            }
            int i = 1 / 0;
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            kafkaProducer.abortTransaction();
        } finally {
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
}

3.8 数据有序性

三、Kafka生产者,Kafka,kafka

3.9 数据乱序

三、Kafka生产者,Kafka,kafka文章来源地址https://www.toymoban.com/news/detail-660738.html

到了这里,关于三、Kafka生产者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka-生产者

    Kafka在实际应用中,经常被用作高性能、可扩展的消息中间件。 Kafka自定义了一套网络协议,只要遵守这套协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息。 在实践生产过程中,一套API封装良好、灵活易用的客户端可以避免开发人员重复劳动,提高开发效率,也

    2024年01月20日
    浏览(27)
  • 「Kafka」生产者篇

    在消息发送的过程中,涉及到了 两个线程 —— main 线程 和 Sender 线程 。 在 main 线程中创建了 一个 双端队列 RecordAccumulator 。 main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象,调用 send 函数发送消息,

    2024年01月19日
    浏览(30)
  • Kafka生产者

    1.acks 如果acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。 缺点:如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息就丢失了 优点:因为生产者不需要等待服务器的响应,所有他可以以网络能够支持的最大速度发送消息,从而

    2024年01月19日
    浏览(31)
  • 三、Kafka生产者

    1 发送原理 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 【RecordAccumulator缓冲的结构: 每一个分区对应一

    2024年02月12日
    浏览(30)
  • (三)Kafka 生产者

    创建一个 ProducerRecord 对象,需要包含目标主题和要发送的内容,还可以指定键、分区、时间戳或标头。 在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基

    2024年02月09日
    浏览(32)
  • Kafka 生产者

    目录 一、kafka生产者原理 二、kafka异步发送 配置kafka 创建对象,发送数据 带回调函数的异步发送 同步发送   三、kafka生产者分区 分区策略 指定分区:  指定key: 什么都不指定: 自定义分区器 四、生产者提高吞吐量 五、数据的可靠性 ACK应答级别 数据完全可靠条件 可靠性

    2023年04月15日
    浏览(35)
  • kafka学习-生产者

    目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 在Kafka中保存的数据都是字节数组。 消息发送前,需要将消息序列化为字节数组进行发送。

    2024年02月09日
    浏览(31)
  • Kafka(生产者)

    目 前 企 业 中 比 较 常 见 的 消 息 队 列 产 品 主 要 有 Kafka(在大数据场景主要采用 Kafka 作为消息队列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 传统消息队列的应用场景 传统的消息队列的主要应用场景包括: 缓存/消峰 、 解耦 和 异步通信 。 缓冲/消峰: 有助于控制和优化数据流经过

    2024年02月11日
    浏览(34)
  • 【Kafka】高级特性:生产者

    整个流程如下: Producer创建时,会创建一个Sender线程并设置为守护线程。 生产消息时,内部其实是异步流程;生产的消息先经过拦截器-序列化器-分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。 批次发送的条件为:缓冲区数据大小达到batch.size或者

    2024年01月24日
    浏览(26)
  • Kafka生产者相关概念

    Kafka中消息是以topic进行分类的,Producer生产消息,Consumer消费消息,都是面向topic的。 Topic是逻辑上的概念,Partition是物理上的概念,每个Partition对应着一个log文件,该log文件中存储的就是producer生产的数据。 写入方式 producer采用推(push)模式将消息发布到broker,每条消息都

    2024年04月13日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包