java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

这篇具有很好参考价值的文章主要介绍了java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.生产者推送数据

常用参数

bootstrap.servers:Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。

acks:Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认消息;all表示生产者会等待所有副本都确认消息。确认模式越高,可靠性越高,但延迟也越大。

retries:消息发送失败时的重试次数。默认值为0,表示不进行重试。可以将其设置为大于0的值,例如3,表示最多重试3次。

batch.size:消息批量发送的大小。当生产者累积到一定数量的消息时,会将其打包成一个批次一次性发送给Broker。默认值为16384字节,即16KB。

linger.ms:消息发送的延迟时间。生产者会等待一定的时间,以便将更多的消息打包成一个批次一次性发送给Broker。默认值为0,表示立即发送。设置较大的值可以提高吞吐量,但可能会增加消息的延迟。

buffer.memory:生产者可用于缓存消息的内存大小。默认值为33554432字节,即32MB。如果生产者生产消息的速度快于发送消息的速度,可能会导致缓存溢出。可以调整该参数来适应生产者的生产速度。

key.serializer:Key的序列化器。Kafka消息可以包含Key和Value,Key和Value都需要进行序列化。该参数指定Key的序列化器。

value.serializer:Value的序列化器。该参数指定Value的序列化器。

max.block.ms:生产者在发送消息之前等待Broker元数据信息的最长时间。如果在该时间内无法获取到Broker元数据信息,则会抛出TimeoutException异常。默认值为60000毫秒,即60秒。

compression.type:消息压缩类型。可选值为none、gzip、snappy、lz4。默认值为none,表示不进行压缩。压缩可以减少消息的传输大小,提高网络带宽的利用率,但会增加CPU的消耗。

interceptor.classes:消息拦截器列表。可以指定多个消息拦截器对消息进行加工处理。例如,可以在消息中添加时间戳、添加消息来源等信息。
以上参数只是一部分,Kafka生产者还有更多参数可以进行配置。需要根据实际情况选择合适的参数进行配置。

例子

下面是一个单例模式配置 kafka生产者的例子(避免多次创建实例,减少资源的消耗)

public class SingletonKafkaProducerExample {
    private static SingletonKafkaProducerExample instance;
    private static Producer<String, String> producer;
    private SingletonKafkaProducerExample() {
    //参数设置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:端口");
        props.put("acks", "all");
        props.put("max.block.ms",120000);//默认60s
        props.put("retries", 3)//默认0;
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("request.timeout.ms",60*1000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
//sasl认证 (根据实际情况看是否配置)
		props.put("security.protocol", "SASL_PLAINTEXT");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';");
		producer = new KafkaProducer<>(props);
		logger.info("kafka连接成功");
    }
    public static SingletonKafkaProducerExample getInstance() {
        if (instance == null) {
            synchronized (SingletonKafkaProducerExample.class) {
                if (instance == null) {
                    instance = new SingletonKafkaProducerExample();
                }
            }
        }
        return instance;
    }
    public void sendMessage(String topic, String key, String value) {
        try {
        //这里也可以不用设置key和partition,例如不设置分区 系统会使用轮询算法自动匹配partition
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            
            Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("发送消息到" + metadata.topic() + "失败:" + exception.getMessage());
                } else {
                    System.out.println("发送消息到" + metadata.topic() + "成功:partition=" + metadata.partition() + ", offset=" + metadata.offset());
                }
            });
            future.get(); // 等待返回数据
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("发送消息失败:" + e.getMessage());
        } 
    }
    public void closeProducer() {
        producer.close();
    }
}


以上参数配置只是案例,实际参数配置需要根据业务情况自己设置
下面是生产的方法介绍:

close(): 关闭生产者,释放相关资源。
close(Duration timeout): 在指定的超时时间内关闭生产者,释放相关资源。
initTransactions(): 初始化事务,启用事务支持。
beginTransaction(): 开始事务。
send(ProducerRecord<K, V> record): 发送一条消息记录到指定的主题。
send(ProducerRecord<K, V> record, Callback callback): 发送一条消息记录,并附带一个回调函数用于异步处理发送结果。
send(ProducerRecord<K, V> record, ProducerCallback<T> callback): 发送一条消息记录,并使用自定义的回调函数处理发送结果。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId): 将消费者组的偏移量提交给事务。
partitionsFor(String topic): 获取指定主题的分区信息。
metrics(): 获取生产者的度量指标信息。
flush(): 将所有已挂起的消息立即发送到Kafka服务器,等待服务器确认后再返回。
commitTransaction(): 提交当前事务。
abortTransaction(): 中止当前事务。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata): 将消费者组的偏移量和消费者组元数据提交给事务。

可能遇见的问题

1.多个topic发送消息的时候总有1.2发送失败 报Failed to update metadata after 60000ms
这种情况出现的原因可能是Kafka集群中Broker的元数据信息还没有被更新到Kafka客户端中,导致Kafka客户端无法连接到指定的Broker。

解决

增加等待时间:可以通过设置max.block.ms属性来增加等待时间
提高重试次数:可以通过设置retries属性来提高重试次数
检查Broker配置
检查网络连接
检查Kafka版本

如果下面3个都没问题,就增加等待时间和重试次数。本人遇到这样的问题解决了

消费者 推送数据

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置消费者参数
		Properties props = new Properties();
		/*
		bootstrap.servers
		Kafka集群中Broker的地址列表,格式为"hostname:port",例如:"localhost:9092"。可以配置多个Broker,用逗号分隔。
		*/
		props.put("bootstrap.servers", "ip:port");
		/*
		group.id
		消费者组的名称,同一个消费者组中的消费者会共享消费消息的责任。例如:"test"。
		*/
		props.put("group.id", "test");
		/*
		enable.auto.commit
		是否自动提交偏移量,默认为true。如果为false,则需要手动提交偏移量。
		*/
		props.put("enable.auto.commit", "true");
		/*
		session.timeout.ms
		消费者会话超时时间(毫秒),如果消费者在该时间内没有向Kafka Broker发送心跳,则会被认为已经失效。默认10000毫秒。
		*/
		props.put("session.timeout.ms", "30000");
		/*
		auto.offset.reset
		如果消费者在初始化时没有指定偏移量或指定的偏移量不存在,则从哪个位置开始消费,默认latest,即从最新的消息开始消费。其他可选值为earliest和none。
		*/
		props.put("auto.offset.reset", "earliest");
		/*
		
		key.deserializer
		key的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
		*/
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		/*
		value.deserializer
		value的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
		*/
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		/*
		max.poll.records
		每次拉取消息的最大记录数,默认500条。
		*/
		props.put("max.poll.records", "10000");
		/*
		fetch.min.bytes
		每次拉取的最小字节数,默认1字节。
		
		fetch.max.bytes
		每次拉取的最大字节数,默认52428800字节,即50MB。
		
		fetch.max.wait.ms
		最长等待时间(毫秒),如果在该时间内没有拉取到任何消息,则返回空结果。默认500毫秒。
		*/
		props.put("fetch.min.bytes", "1024");
		props.put("fetch.max.bytes", "1048576");
		props.put("fetch.max.wait.ms", "500");
		
		/*
		max.partition.fetch.bytes
		每个分区最大拉取字节数,默认1048576字节,即1MB。
		*/
		props.put("max.partition.fetch.bytes", "1024");
		/*
		connections.max.idle.ms
		最大空闲连接时间(毫秒),超过该时间则连接被认为已经过期并关闭。默认540000毫秒,即9分钟。
		*/
		props.put("connections.max.idle.ms", "540000");
		
		/*
		request.timeout.ms
		请求超时时间(毫秒),如果在该时间内没有收到Broker的响应,则认为请求失败。默认30000毫秒。
		*/
		props.put("request.timeout.ms", "40000");
		/*
		retry.backoff.ms
		重试等待时间(毫秒),如果请求失败,则等待一段时间后再次重试。默认500毫秒。
		*/
		props.put("retry.backoff.ms", "500");
		
		/*
		security.protocol
		安全协议类型,例如SSL或SASL_SSL。
		
		ssl.keystore.location
		SSL证书的路径和名称。
		
		ssl.keystore.password
		SSL证书的密码。
		
		ssl.truststore.location
		SSL信任证书库的路径和名称。
		
		ssl.truststore.password
		SSL信任证书库的密码。
		*/
		props.put("security.protocol", "SSL");
		props.put("ssl.keystore.location", "/path/to/keystore");
		props.put("ssl.keystore.password", "password");
		props.put("ssl.truststore.location", "/path/to/truststore");
		props.put("ssl.truststore.password", "password");
        // 创建Kafka消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(Arrays.asList("my-topic"));
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(6);
        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                // 获取消息所在分区的编号
                int partition = record.partition();
                // 将消息提交给对应的线程进行处理
                executor.submit(new MessageHandler(record.value(), partition));
            }
        }
    }
    // 消息处理器
    static class MessageHandler implements Runnable {
        private final String message;
        private final int partition;
        public MessageHandler(String message, int partition) {
            this.message = message;
            this.partition = partition;
        }
        @Override
        public void run() {
            // 对消息进行处理
            System.out.printf("Partition %d: Message received: %s%n", partition, message);
        }
    }
}

以上参数根据自己需求填写
可以根据分区 使用多线程执行

下面是消费者的方法讲解文章来源地址https://www.toymoban.com/news/detail-562168.html

subscribe(Collection<String> topics): 订阅一个或多个主题,开始消费这些主题中的消息。
unsubscribe(): 取消订阅当前已经订阅的所有主题,停止消费消息。
poll(Duration timeout):Kafka服务器拉取一批消息记录,该方法会阻塞指定的超时时间,等待服务器返回消息。如果在超时时间内没有收到消息,则返回空记录。
commitSync(): 同步方式提交消费者的消费偏移量(offset),表示消息已成功消费。
commitSync(Duration timeout): 在指定的超时时间内同步提交消费者的消费偏移量。
commitAsync(): 异步方式提交消费者的消费偏移量,不等待提交结果。
commitAsync(OffsetCommitCallback callback): 异步方式提交消费者的消费偏移量,并在提交完成后执行回调函数。
seek(TopicPartition partition, long offset): 将消费者的偏移量(offset)设置为指定分区的指定偏移量,以便从指定位置开始消费消息。
seekToBeginning(Collection<TopicPartition> partitions): 将消费者的偏移量设置为指定分区的最早可用偏移量,重新从分区起始位置开始消费消息。
seekToEnd(Collection<TopicPartition> partitions): 将消费者的偏移量设置为指定分区的最新可用偏移量,继续消费分区中尚未消费的消息。
seekByTimestamp(Map<TopicPartition, Long> timestampsToSearch): 根据时间戳搜索偏移量,并将消费者的偏移量设置为找到的偏移量。
assignment(): 获取当前分配给消费者的所有分区。
pause(Collection<TopicPartition> partitions): 暂停指定分区的消息消费,消费者将不再继续接收这些分区的消息。
resume(Collection<TopicPartition> partitions): 恢复被暂停的指定分区的消息消费,使消费者可以继续接收这些分区的消息。
close(): 关闭消费者,释放相关资源。

到了这里,关于java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka系列之:Kafka生产者和消费者

    batch.size:只有数据积累到batch.size之后,sender才会发送数据,默认16K。 linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。 0:生产者发送过来的数据,不需要等数据罗盘应答。 1:生产者发送过来的

    2023年04月09日
    浏览(45)
  • kafka生产者和消费者配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月12日
    浏览(48)
  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(53)
  • Kafka生产者与消费者api示例

      一个正常的生产逻辑需要具备以下几个步骤 配置生产者参数及创建相应的生产者实例 构建待发送的消息 发送消息 关闭生产者实例 采用默认分区方式将消息散列的发送到各个分区当中    对于properties配置的第二种写法,相对来说不会出错,简单举例:   1.kafka的生产者可

    2024年02月07日
    浏览(77)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(44)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(87)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(48)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(58)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(49)
  • 探究:kafka生产者/消费者与多线程安全

    目录 1. 多线程安全 1.1. 生产者是多线程安全的么? 1.1. 消费者是多线程安全的么? 2. 消费者规避多线程安全方案 2.1. 每个线程维护一个kafkaConsumer 2.2. [单/多]kafkaConsumer实例 + 多worker线程 2.3.方案优缺点对比         Kafka生产者是 线程安全 的,可以在多个线程中共享一个

    2023年04月26日
    浏览(92)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包