1.首先kafka每创建一个消费者就是一个消费者组,必须指定groupip
2.两个消费者组之间不相互影响,消费同一个主题的同一个分区,两个消费者组不相互影响,各自记录自己的offset
3.在开发中如果没有指定每个消费者去消费特定的分区,那么kafka默认是按照roundRobin轮询的方式分配消费者消费分区的,如果指定了消费者消费特定的分区,那么,就会按照指定的分区消费,那么具体如何指定特定分区消费呢?看代码
// 0 配置
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first",0));
kafkaConsumer.assign(topicPartitions);
// 3 消费数据
while (true){
//Duration.ofSeconds(1) 就是等待一秒钟的意思,如果等待1秒后仍然没有消息,则返回空
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
在topicPartitions 中加入多个对象,每个对象指定该消费者消费的主题和对应的分区,再通过assign方法加入消费者配置中
那自定义消费哪个分区有什么用呢?
那作用大了,假如我们未来有一个消费者组,只有一台的能力比较强,处理快,那么就可以指定它来消费kafka多的那个分区,而且这样我们可以自由的指定每个消费者消费哪个分区,有更强的拓展性
消费者可能出现重复消费和漏消费的情况,如何解决?
这个根据实际情况来定,这个问题主要出现在kafka的某个消费者节点宕机后,可能就会出现这样的问题,那么如何完全解决呢?就是使用事务消费?但是如果项目中可以接受部分消息丢失,就没必要使用了,因为会造成挺大的性能损耗的,上代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 禁用自动提交
props.put("isolation.level", "read_committed"); // 设置隔离级别为读已提交
props.put("transactional.id", "my-transactional-id"); // 设置事务ID
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 设置拉取的超时时间
seataTransactionManager.begin(); // 开启 Seata 分布式事务
try {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
// 在这里处理消息的逻辑
// ...
}
seataTransactionManager.commit(); // 提交事务
consumer.commitSync(); // 手动提交偏移量
} catch (Exception e) {
seataTransactionManager.rollback(); // 回滚事务
consumer.seekToBeginning(records.partitions()); // 将消费者偏移量重置到消息的起始位置,以重新消费
throw new RuntimeException("Failed to consume messages", e);
}
}
} finally {
consumer.close();
}
那么在生产中消费着具体如何编写呢?
在生产环境中使用 Kafka 的 poll 模式来消费数据,可以按照以下步骤进行配置和实现:
1. 添加 Kafka 依赖:在项目的 `pom.xml` 文件中添加 Kafka 相关的依赖,例如:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. 配置 Kafka 连接信息:在 `application.properties` 或 `application.yml` 中配置 Kafka 的连接信息,包括 bootstrap servers(Kafka 服务器地址)、group id(消费者组ID)等配置项。
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
3. 创建 Kafka 消费者配置类:创建一个 Kafka 消费者配置类,用于配置 KafkaConsumer 的属性。可以根据实际需求进行自定义配置。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 其他可选配置项
return props;
}
}
4. 创建消息处理器:创建一个消息处理器,用于处理从 Kafka 主题中消费的消息。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;
@Component
public class MyMessageHandler {
public void handleMessage(ConsumerRecord<String, String> record) {
// 处理接收到的消息
String key = record.key();
String value = record.value();
// ... 进行业务处理
}
}
5. 创建 Kafka 消费者:创建一个 Kafka 消费者,并使用 KafkaConsumerConfig 中定义的配置。
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
@Component
public class MyKafkaConsumer {
private final KafkaConsumer<String, String> kafkaConsumer;
private final MyMessageHandler messageHandler;
@Autowired
public MyKafkaConsumer(Map<String, Object> consumerConfigs, MyMessageHandler messageHandler) {
this.kafkaConsumer = new KafkaConsumer<>(consumerConfigs);
this.messageHandler = messageHandler;
}
@PostConstruct
public void startConsuming() {
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> messageHandler.handleMessage(record));
}
}).start();
}
// 可以根据需要添加关闭消费者的方法
}
在上述示例中,使用 `KafkaConsumerConfig` 类定义
了 KafkaConsumer 的配置,并通过构造函数注入到 `MyKafkaConsumer` 类中。在 `startConsuming()` 方法中,创建了一个新线程来进行消费,不断地使用 `poll()` 方法轮询获取消息,并通过 `MyMessageHandler` 处理消息。
请注意,在实际生产环境中,你可能还需要添加更多的配置和处理,例如异常处理、消费者的关闭和资源释放、消息提交偏移量的控制等。以上示例仅提供了一个基本的框架,你可以根据实际需求进行扩展和调整。文章来源:https://www.toymoban.com/news/detail-498576.html
好,消费者就介绍到这里,后边我们介绍在生产中如何选择硬件以及kafka每个组件具体的优化方案,以及如何配置文章来源地址https://www.toymoban.com/news/detail-498576.html
到了这里,关于kafka消费者详解,根据实际生产解决问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!