Kafka原理之消费者

这篇具有很好参考价值的文章主要介绍了Kafka原理之消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、消费模式

1、pull(拉)模式(kafka采用这种方式)

consumer采用从broker中主动拉取数据。
存在问题:如果kafka中没有数据,消费者可能会陷入循环中,一直返回空数据

2、push(推)模式

由broker决定消息发送频率,很难适应所有消费者的消费速率。

二、总体工作流程

kafka pull 拉取数据,Kafka,kafka,java,分布式

案例一:单独消费者,并订阅主题

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
		kafkaConsumer.subscribe(topicList);
		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record.key() + "---------" + record.value());
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

控制台输出
image.png

案例二:单独消费者,订阅主题+分区

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题+分区
		List<TopicPartition> topicPartitionList = new ArrayList<>();
		topicPartitionList.add(new TopicPartition("first", 0));
		kafkaConsumer.assign(topicPartitionList);
		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record.key() + "---------" + record.value());
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

只消费了发往分区0的数据
image.pngimage.png

案例三:消费者组

启动多个消费案例一的消费者,会自动指定消费的分区(partition)
启动3个消费者,一个消费者消费一个分区

image.png

三、消费者组

由多个consumer组成(条件:groupid相同),是逻辑上的一个订阅者。

  • 每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
  • 消费者组之间互不影响

1、初始化流程

coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择=groupid的hashCode值%50(__consumer_offsets的分区数量)
例如:groupid的hashCode=1,1%50=1,那么__consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大,消费者组下所有的消费者提交offset的时候,就往这个分区去提交offset

  • 1.组内每个消费者向选中的coordinator节点发送joinGroup请求
  • 2.coordinator节点选择一个consumer作为leader
  • 3.coordinator节点把要消费的topic情况,发送给消费者leader
  • 4.消费者leader负责制定消费方案
  • 5.把消费方案发送给coordinator节点
  • 6.coordinator节点把消费方案发送给各consumer
  • 7.每个消费者都会和coordinator节点保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理的时间过长(max.poll.interval.ms=5分钟),也会被移除,并触发再平衡

kafka pull 拉取数据,Kafka,kafka,java,分布式

2、分区分配以及再平衡

到底由哪个消费者来消费哪个partition的数据

  • 分配策略:Range、RoundRobin、Sticky、CooperativeStick
  • 配置参数:partition.assignment.strategy(默认:Range+CooperativeStick)
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//设置分区分配策略,多个策略使用逗号拼接
		properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
        //再平衡的时候,会触发ConsumerRebalanceListener
		kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {
			// 重新分配完分区之前调用
			@Override
			public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
				System.out.println("==============回收的分区=============");
				for (TopicPartition partition : partitions) {
					System.out.println("partition = " + partition);
				}
			}

			// 重新分配完分区后调用
			@Override
			public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
				System.out.println("==============重新得到的分区==========");
				for (TopicPartition partition : partitions) {
					System.out.println("partition = " + partition);
				}
			}
		});
		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record);
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

range
  • 分配策略:对同一个topic里面的分区序号排序,对消费者按字母排序,通过partition数量/consumer数量(如果除不尽,那么前面几个消费者将会多消费1个分区)

这个只是针对一个topic而言,C0消费者多消费一个分区影响不是很大,但是如果这个消费者组消费多个topic,容易产生数据倾斜

  • 再平衡机制:某一个消费者挂掉后,45秒内产生的数据,将会由某一个消费者代为消费;45秒后产生的数据,会重新分配
RoundRobin
  • 分配策略:对集群中所有的Topic而言,把所有的partition和所有的consumer都列出来,然后按照hashCode进行排序,最后通过轮询算法来分配partition给各个消费者
  • 再平衡机制:轮询分配(不是按数据,是按分区)
Sticky
  • 分配策略:分配带粘性,执行一次新的分配时,考虑原有的分配
  • 再平衡机制:打散,尽量均匀分配(不是按数据,是按分区)

四、offset

1、默认维护位置

主题:__consumer_offset
key:group.id + topic + 分区号
value:当前offset的值

每隔一段时间,kafka内部会对这个topic进行压缩(compact),也就是每一个group.id + topic + 分区号保留最新数据

2、自动提交offset

是否开启自动提交:enable.auto.commit默认true
自动提交时间间隔:auto.commit.interval.ms默认5s

基于时间的提交,难以把握

3、手动提交offset

类别:同步提交(commitSync)、异步提交(commitAsync)
相同点:提交一批数据的最高偏移量
不同点:同步阻塞当前现场,失败会自动重试;异步没有重试机制,可以提交失败。

4.指定offset消费

如果没有初始偏移量(消费者第一次消费)或者服务器上不存在当前偏移量(被删除),如何指定offset进行消费
auto.offset.reset=earliest(默认) | latest | none
在代码中设置方式为properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")

  • earliest:自动将偏移量重置为最早的偏移量(--from-beginning)
  • latest:自动将偏移量重置为最新的偏移量
  • none:没有偏移量,抛出异常

除了这三中,还可以自己来指定位置或者指定时间
指定位置开始消费案例:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
		kafkaConsumer.subscribe(topicList);
		Set<TopicPartition> assignment = new HashSet<>();
		while (assignment.size() == 0){
			kafkaConsumer.poll(Duration.ofSeconds(1));
			//获取到消费者分区分配信息(有了分区分配信息才能开始消费)
			assignment = kafkaConsumer.assignment();
		}
		//遍历所有分区,并指定offset从100的位置开始消费
		for (TopicPartition partition : assignment) {
			kafkaConsumer.seek(partition, 100);
		}

		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record);
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

指定时间开始消费案例:把指定的时间转为offset

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
		kafkaConsumer.subscribe(topicList);
		Set<TopicPartition> assignment = new HashSet<>();
		while (assignment.size() == 0){
			kafkaConsumer.poll(Duration.ofSeconds(1));
			//获取到消费者分区分配信息(有了分区分配信息才能开始消费)
			assignment = kafkaConsumer.assignment();
		}
		HashMap<TopicPartition, Long> timestampMap = new HashMap<>();
		for (TopicPartition partition : assignment) {
			//一天前的毫秒数
			timestampMap.put(partition, System.currentTimeMillis() - 1*24*3600*1000);
		}
		//获取毫秒数对应的offset位置
		Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampMap);
		OffsetAndTimestamp offsetAndTimestamp;
		//给每个patition设置offset位置
		for (TopicPartition partition : assignment) {
			offsetAndTimestamp = offsetAndTimestampMap.get(partition);
			kafkaConsumer.seek(partition, offsetAndTimestamp.offset());
		}

		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record);
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

五、消费者事务

使用消费者事务,进行精准一次消费,将消费过程和提交offset过程做原子操作绑定。解决重复消费和漏消费问题文章来源地址https://www.toymoban.com/news/detail-704677.html

  • 重复消费:由自动提交offset引起。
  • 漏消费:设置手动提交offset,提交offset时,数据还未落盘,消费者进程被kill,那么offset已经提交,但是数据未处理,导致这部分内存中数据丢失

六、数据挤压

  • 消费能力不足:增加分区数量,同时提高消费者数量(注意:分区数量≥消费者数量)
  • 处理不及时: 拉去数据 / 处理时间 < 生产速度 拉去数据/处理时间<生产速度 拉去数据/处理时间<生产速度,提高每批次拉去的数量。fetch.max.bytes(一次拉取得最大字节数,默认:5242880=50m)max.poll.records(一次poll数据最大条数,默认:500条)

到了这里,关于Kafka原理之消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(46)
  • Kafka消费者无法消费数据,解决

    作为一个在项目中边学边用的实习生,真的被昨天还好好的今天就不能消费数据的kafka折磨到了,下面提供一点建议,希望能对大家有所帮助。 //操作前集群都关了 1.首先去kafka-home的config目录下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092    如果有配置liste

    2024年02月17日
    浏览(52)
  • Kafka入门,漏消费和重复消费, 消费者事务,数据积压(二十四)

    重复消费:已经消费了数据,但是offset没提交。 漏消费:先提交offset后消费,有可能会造成数据得漏消费 如果向完成consumer端得进准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如MySQ

    2024年02月15日
    浏览(42)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题数据案例__订阅主题)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题中数据,所下图所示: 注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。 1.2、案例代码 代码 1.3、测试 在 Kafka 集群控制台,创建firstTopic主题 在 IDEA中

    2024年02月09日
    浏览(40)
  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(40)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(45)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(49)
  • springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据

    在springboot项目中,使用spring-kafka消费kafka数据。希望能够控制消费者(KafkaConsumer)启动或停止消费,并且在启动消费时只消费当前时刻以后生产的数据(最新生产的数据),也就是说,启动消费之前未消费的数据不再消费。 按照官方文档创建一个监听。 官方文档地址 Kafka

    2023年04月15日
    浏览(39)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • 大数据开发之Kafka(broker、消费者、eagle监控、kraft模式)

    4.1.1 Zookeeper存储的Kafka的信息 1、查看zookeeper中的kafka节点所存储的信息 启动Zookeeper客户端 通过ls命令列出kafka节点内容 2、zookeeper中存储的kafka信息 在zookeeper的服务端存储的Kafka相关信息: 1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器 2)/kafka/brokers/topics/first/partitions/0/state {“l

    2024年01月21日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包