分布式消息队列Kafka(四)- 消费者

这篇具有很好参考价值的文章主要介绍了分布式消息队列Kafka(四)- 消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.Kafka消费方式

分布式消息队列Kafka(四)- 消费者

2.Kafka消费者工作流程

(1)总体工作流程

分布式消息队列Kafka(四)- 消费者

(2)消费者组工作流程

分布式消息队列Kafka(四)- 消费者

3.消费者API

(1)单个消费者消费

分布式消息队列Kafka(四)- 消费者

实现代码

package com.zrclass.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
	public static void main(String[] args) {
		// 1.创建消费者的配置对象
		Properties properties = new Properties();
		// 2.给消费者配置对象添加参数
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
		"hadoop102:9092");
		// 配置序列化 必须
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
		StringDeserializer.class.getName());
		
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
		StringDeserializer.class.getName());
		// 配置消费者组(组名任意起名) 必须
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
		// 创建消费者对象
		KafkaConsumer<String, String> kafkaConsumer = new 
		KafkaConsumer<String, String>(properties);
		// 注册要消费的主题(可以消费多个主题)
		ArrayList<String> topics = new ArrayList<>();
		topics.add("first");
		kafkaConsumer.subscribe(topics);
		// 拉取数据打印
		while (true) {
			// 设置 1s 中消费一批数据
			ConsumerRecords<String, String> consumerRecords = 
			kafkaConsumer.poll(Duration.ofSeconds(1));
			// 打印消费到的数据
			for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
				System.out.println(consumerRecord);
			}
		}
	} 
 }
(2)单个消费者指定分区消费

分布式消息队列Kafka(四)- 消费者

代码实现:

 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
 // 消费某个主题的某个分区数据
 ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
 topicPartitions.add(new TopicPartition("first", 0));
 kafkaConsumer.assign(topicPartitions);
(3)消费者组消费

分布式消息队列Kafka(四)- 消费者

复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个消费者消费

4.kafka的分区分配及再平衡策略

(1)kafka的分区分配策略(默认策略是Range + CooperativeSticky)
  • 一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个 partition的数据。
  • Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略。

分布式消息队列Kafka(四)- 消费者

1)每个consumer都发送JoinGroup请求

2)选出一个consumer作为leader

3)coordinator把要消费的topic情况发送给leader消费者

4)leader消费者会负责制定消费方案

5)leader消费者把消费方案发给coordinator

6)Coordinator把leader消费者的消费方案下发给各个consumer

7)每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的过长(max.poll.interval.ms5分钟),也会触发再 平衡

分布式消息队列Kafka(四)- 消费者

(2)range分区策略及再平衡

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IxCP3AOK-1682405725356)(分布式消息队列Kafka.assets/image-20230425112937754.png)]

分区策略测试:

1)修改主题 first 为 7 个分区。

2)复制 CustomConsumer 类,创建由三个消费者 CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。

3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducer {
 public static void main(String[] args) throws InterruptedException {
	Properties properties = new Properties();
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
	for (int i = 0; i < 7; i++) {
		kafkaProducer.send(new ProducerRecord<>("first", i, "test", "java"));
	}
	kafkaProducer.close();
 } 
}

4)观看 3 个消费者分别消费哪些分区的数据

CustomConsumer消费0,1,2号分区数据;

CustomConsumer1消费3,4分区数据;

CustomConsumer2消费5,6分区数据;

分区分配再平衡策略:

1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 3、4 号分区数据。

2 号消费者:消费到 5、6 号分区数据。

0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、1、2、3 号分区数据。

2 号消费者:消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

(3)RoundRobin 分区分配再平衡

轮询的方式

(4)Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

5.kafka offset偏移量

(1)kafaka内置主题__consumer_offsets

分布式消息队列Kafka(四)- 消费者

kafka内置主题__consumer_offsets里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

(2)消费内置主题__consumer_offsets

0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,

默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

2)采用命令行方式,创建一个新的 topic。

[zrclass@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server  hadoop102:9092 --create --topic first--partitions 2 --replication-factor 2 

3)启动生产者往 first生产数据。

[zrclass@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic  first --bootstrap-server hadoop102:9092 

4)启动消费者消费 first数据。

[zrclass@hadoop104 kafka]$ bin/kafka-console-consumer.sh -- bootstrap-server hadoop102:9092 --topic first --group test 

注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。

5)查看消费者消费主题__consumer_offsets。

[zrclass@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm
atter" --from-beginning

[offset,first,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)

[offset,first,0]::OffsetAndMetadata(offset=8, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
(3)kafka默认自动提交偏移量

分布式消息队列Kafka(四)- 消费者

 // 配置消费者组
 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 // 是否自动提交 offset
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 // 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
 //3. 创建 kafka 消费者
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 //4. 设置消费主题 形参是列表
 consumer.subscribe(Arrays.asList("first"));
(4)手动提交offset

分布式消息队列Kafka(四)- 消费者

1)同步提交offset

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例。

package com.zrclass.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerByHandSync {
	public static void main(String[] args) {
		// 1. 创建 kafka 消费者配置类
		Properties properties = new Properties();
		// 2. 添加配置参数
		// 添加连接
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
		// 配置序列化 必须
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
		// 配置消费者组
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
		// 是否自动提交 offset
		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		//3. 创建 kafka 消费者
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
		//4. 设置消费主题 形参是列表
		consumer.subscribe(Arrays.asList("first"));
		//5. 消费数据
		while (true){
		// 读取消息
		ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
		// 输出消息
		for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
			System.out.println(consumerRecord.value());
		}
		// 同步提交 offset
		consumer.commitSync();
	  }
	}
 }

2)异步提交offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

// 异步提交 offset
 consumer.commitAsync();
(5)指定offset消费

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。

2)latest(默认值):自动将偏移量重置为最新偏移量。

3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

分布式消息队列Kafka(四)- 消费者

(6)指定消费时间重新消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

package com.zrclass.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerForTime {
	public static void main(String[] args) {
		// 0 配置信息
		Properties properties = new Properties();
		// 连接
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
		// key value 反序列化
		
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
		// 1 创建一个消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		// 2 订阅一个主题
		ArrayList<String> topics = new ArrayList<>();
		topics.add("first");
		kafkaConsumer.subscribe(topics);
		Set<TopicPartition> assignment = new HashSet<>();
		while (assignment.size() == 0) {
		kafkaConsumer.poll(Duration.ofSeconds(1));
		// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
		assignment = kafkaConsumer.assignment();
		}
		HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
		// 封装集合存储,每个分区对应一天前的数据
		for (TopicPartition topicPartition : assignment) {
			timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
		}
		// 获取从 1 天前开始消费的每个分区的 offset
		Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
		// 遍历每个分区,对每个分区设置消费时间。
		for (TopicPartition topicPartition : assignment) {
			OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
			// 根据时间指定开始消费的位置
			if (offsetAndTimestamp != null){
			kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
			}
		}
		// 3 消费该主题数据
		while (true) {
			ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
			for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
			System.out.println(consumerRecord);
			}
		}
	} 
 }
(7)重复消费和漏消费

重复消费:已经消费了数据,但是 offset 没提交。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

分布式消息队列Kafka(四)- 消费者

怎么能做到既不漏消费也不重复消费呢?使用消费者事务。

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如 MySQL)。

分布式消息队列Kafka(四)- 消费者

(8)消费者调优(提高吞吐量)

数据积压(消费者如何提高吞吐量)

分布式消息队列Kafka(四)- 消费者文章来源地址https://www.toymoban.com/news/detail-425613.html

到了这里,关于分布式消息队列Kafka(四)- 消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(43)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(37)
  • 【分布式技术】消息队列Kafka

    目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群  步骤一:在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二:传给其他节点 步骤三:启动3个节点 kafka管理

    2024年01月23日
    浏览(51)
  • kafka 分布式的情况下,如何保证消息的顺序消费?

    目录 一、什么是分布式 二、kafka介绍 三、消息的顺序消费 四、如何保证消息的顺序消费   分布式是指将计算任务分散到多个计算节点上进行并行处理的一种计算模型。在分布式系统中,多台计算机通过网络互联,共同协作完成任务。每个计算节点都可以独立运行,并且可以

    2024年02月10日
    浏览(53)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(40)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(51)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)
  • 分布式应用之zookeeper集群+消息队列Kafka

           ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。为分布式框架提供协调服务的

    2024年02月06日
    浏览(60)
  • zookeeper+kafka分布式消息队列集群的部署

    目录 一、zookeeper 1.Zookeeper 定义 2.Zookeeper 工作机制 3.Zookeeper 特点 4.Zookeeper 数据结构 5.Zookeeper 应用场景 (1)统一命名服务 (2)统一配置管理 (3)统一集群管理 (4)服务器动态上下线 6.Zookeeper 选举机制 (1)第一次启动选举机制 (2)非第一次启动选举机制 7.部署zookeepe

    2024年02月14日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包