Apache Kafka - 重识消费者

这篇具有很好参考价值的文章主要介绍了Apache Kafka - 重识消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


Apache Kafka - 重识消费者

概述

Kafka是一个分布式的消息队列系统,它的出现解决了传统消息队列系统的吞吐量瓶颈问题。

Kafka的高吞吐量、低延迟和可扩展性使得它成为了很多公司的首选消息队列系统。

在Kafka中,消息被分成了不同的主题(Topic),每个主题又被分成了不同的分区(Partition)。

生产者(Producer)将消息发送到指定的主题中,而消费者(Consumer)则从指定的主题中读取消息。

接下来我们将介绍Kafka消费者相关的知识。

Kafka消费者的工作原理

Kafka消费者从指定的主题中读取消息,消费者组(Consumer Group)则是一组消费者的集合,它们共同消费一个或多个主题。在一个消费者组中,每个消费者都会独立地读取主题中的消息。当一个主题有多个分区时,每个消费者会读取其中的一个或多个分区。消费者组中的消费者可以动态地加入或退出,这样就可以实现消费者的动态扩展。

Kafka消费者通过轮询(Polling)方式从Kafka Broker中读取消息。当一个消费者从Broker中读取到一条消息后,它会将该消息的偏移量(Offset)保存在Zookeeper或Kafka内部主题中。消费者组中的消费者会协调并平衡分区的分配,保证每个消费者读取的分区数量尽可能均衡。

Kafka消费者的配置

  1. bootstrap.servers

    该参数用于指定Kafka集群中的broker地址,多个地址以逗号分隔。消费者会从这些broker中获取到集群的元数据信息,以便进行后续的操作。

  2. group.id

    该参数用于指定消费者所属的消费组,同一消费组内的消费者共同消费一个主题的消息。如果不指定该参数,则会自动生成一个随机的group.id。

  3. enable.auto.commit

    该参数用于指定是否启用自动提交offset。如果设置为true,则消费者会在消费消息后自动提交offset;如果设置为false,则需要手动提交offset。

  4. auto.commit.interval.ms

    该参数用于指定自动提交offset的时间间隔,单位为毫秒。只有当enable.auto.commit设置为true时,该参数才会生效。

  5. session.timeout.ms

    该参数用于指定消费者与broker之间的会话超时时间,单位为毫秒。如果消费者在该时间内没有发送心跳包,则会被认为已经失效,broker会将其从消费组中移除。

  6. max.poll.records

    该参数用于指定每次拉取消息的最大条数。如果一次拉取的消息数量超过了该参数指定的值,则消费者需要等待下一次拉取消息。

  7. auto.offset.reset

    该参数用于指定当消费者第一次加入消费组或者offset失效时,从哪个位置开始消费。可选值为latest和earliest,分别表示从最新的消息和最早的消息开始消费。

  8. max.poll.interval.ms

    该参数用于指定两次poll操作之间的最大时间间隔,单位为毫秒。如果消费者在该时间内没有进行poll操作,则被认为已经失效,broker会将其从消费组中移除。

  9. fetch.min.bytes

    该参数用于指定每次拉取消息的最小字节数。如果一次拉取的消息数量不足该参数指定的字节数,则消费者需要等待下一次拉取消息。

  10. fetch.max.wait.ms

    该参数用于指定拉取消息的最大等待时间,单位为毫秒。如果在该时间内没有获取到足够的消息,则返回已经获取到的消息。


Kafka消费者的实现

Kafka消费者的实现可以使用Kafka提供的高级API或者低级API。高级API封装了低级API,提供了更加简洁、易用的接口。下面分别介绍一下这两种API的使用方法。

高级API

使用高级API可以更加方便地实现Kafka消费者。下面是一个使用高级API实现Kafka消费者的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

在上面的代码中,我们首先创建了一个Properties对象,用于存储Kafka消费者的配置信息。然后创建了一个KafkaConsumer对象,并指定了要消费的主题。最后使用poll方法从Broker中读取消息,并对每条消息进行处理。

低级API

使用低级API可以更加灵活地实现Kafka消费者。下面是一个使用低级API实现Kafka消费者的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "fal	VCC se");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        consumer.commitSync(Collections.singletonMap(record.topicPartition(), new OffsetAndMetadata(record.offset() + 1)));
    }
}

在上面的代码中,我们首先创建了一个Properties对象,用于存储Kafka消费者的配置信息。然后创建了一个KafkaConsumer对象,并指定了要消费的主题。最后使用poll方法从Broker中读取消息,并对每条消息进行处理。在处理完每条消息后,我们使用commitSync方法手动提交偏移量。


导图

Apache Kafka - 重识消费者

总结

Kafka消费者是Kafka消息队列系统中的重要组成部分,它能够从指定的主题中读取消息,并进行相应的处理。在使用Kafka消费者时,需要注意消费者组ID、自动提交偏移量、偏移量重置策略以及消息处理方式等配置信息。

Apache Kafka - 重识消费者文章来源地址https://www.toymoban.com/news/detail-459883.html

到了这里,关于Apache Kafka - 重识消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(42)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(37)
  • kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

    默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. 二、定义两个消费者,给其配置上述PartitionAssignor. 在kafka创建只有一个分区的topic : study2023 创建一个生产者往study2023这个 topic发送消息: 分别

    2024年02月10日
    浏览(37)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(80)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题数据案例__订阅主题)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题中数据,所下图所示: 注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。 1.2、案例代码 代码 1.3、测试 在 Kafka 集群控制台,创建firstTopic主题 在 IDEA中

    2024年02月09日
    浏览(39)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(45)
  • kafka在创建KafkaConsumer消费者时,发生Exception in thread “main“ org.apache.kafka.common.KafkaException: Faile

    原因:可能是序列化和反序列化没正确使用。将以下代码修改正确再次运行。 将以上代码的 StringDeserializer 反序列化,确认无误!!!

    2024年02月13日
    浏览(53)
  • 多个消费者订阅一个Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    记录 :466 场景 :一个KafkaProducer在一个Topic发布消息,多个消费者KafkaConsumer订阅Kafka的Topic。每个KafkaConsumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zha

    2024年02月16日
    浏览(44)
  • 多个消费者订阅一个Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    记录 :465 场景 :一个Producer在一个Topic发布消息,多个消费者Consumer订阅Kafka的Topic。每个Consumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包