一、生产者如何提高吞吐量
- 设置批次大小,batch.size 默认 16K。
- 设置等待时间,linger.ms 默认 0。
- 设置缓冲区大小,buffer.memory 默认 32M
- 设置压缩, compression.type 默认 none,可配置值 gzip、snappy、lz4 和 zstd。
二、生产者提高吞吐量代码示例
-
代码
package com.xz.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * 生产者提高吞吐量 * 1、设置批次大小,batch.size 默认 16K * 2、设置等待时间,linger.ms 默认 0 * 3、设置缓冲区大小,buffer.memory 默认 32M * 4、设置压缩, compression.type 默认 none,可配置值 gzip、snappy、lz4 和 zstd * */ public class CustomProducerHandlingCapacity { public static void main(String[] args) throws InterruptedException { //1、创建 kafka 生产者的配置对象 Properties properties = new Properties(); //2、给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092"); //3、指定对应的key和value的序列化类型 key.serializer value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); /** * 4、提高吞吐量 * */ //设置缓冲区大小 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //设置批次大小 properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); //设置等待时间 linger.ms properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); //设置压缩 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //5、创建生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //6、调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null){ System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition()); } } }); Thread.sleep(2); } //7、关闭资源 kafkaProducer.close(); } }
-
在kafka集群上开启 Kafka 消费者
[root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.27:9092 --topic news
-
在 IDEA 中执行代码,观察kafka集群控制台中是否接收到消息。
文章来源:https://www.toymoban.com/news/detail-409111.html -
测试吞吐量需要大数据量,因此lz不做大数据量示例演示。文章来源地址https://www.toymoban.com/news/detail-409111.html
到了这里,关于Kafka3.0.0版本——生产者如何提高吞吐量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!