kafka消费者详解,根据实际生产解决问题

这篇具有很好参考价值的文章主要介绍了kafka消费者详解,根据实际生产解决问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.首先kafka每创建一个消费者就是一个消费者组,必须指定groupip

2.两个消费者组之间不相互影响,消费同一个主题的同一个分区,两个消费者组不相互影响,各自记录自己的offset

3.在开发中如果没有指定每个消费者去消费特定的分区,那么kafka默认是按照roundRobin轮询的方式分配消费者消费分区的,如果指定了消费者消费特定的分区,那么,就会按照指定的分区消费,那么具体如何指定特定分区消费呢?看代码

  // 0 配置
        Properties properties = new Properties();

        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 1 创建一个消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题对应的分区
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);

        // 3 消费数据
        while (true){
            //Duration.ofSeconds(1) 就是等待一秒钟的意思,如果等待1秒后仍然没有消息,则返回空
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }

在topicPartitions 中加入多个对象,每个对象指定该消费者消费的主题和对应的分区,再通过assign方法加入消费者配置中

那自定义消费哪个分区有什么用呢?

那作用大了,假如我们未来有一个消费者组,只有一台的能力比较强,处理快,那么就可以指定它来消费kafka多的那个分区,而且这样我们可以自由的指定每个消费者消费哪个分区,有更强的拓展性

消费者可能出现重复消费和漏消费的情况,如何解决?

这个根据实际情况来定,这个问题主要出现在kafka的某个消费者节点宕机后,可能就会出现这样的问题,那么如何完全解决呢?就是使用事务消费?但是如果项目中可以接受部分消息丢失,就没必要使用了,因为会造成挺大的性能损耗的,上代码

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 禁用自动提交
props.put("isolation.level", "read_committed"); // 设置隔离级别为读已提交
props.put("transactional.id", "my-transactional-id"); // 设置事务ID

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("my-topic"));

 try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000); // 设置拉取的超时时间

                seataTransactionManager.begin(); // 开启 Seata 分布式事务

                try {
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("Received message: " + record.value());
                        // 在这里处理消息的逻辑
                        // ...
                    }

                    seataTransactionManager.commit(); // 提交事务
                    consumer.commitSync(); // 手动提交偏移量
                } catch (Exception e) {
                    seataTransactionManager.rollback(); // 回滚事务
                    consumer.seekToBeginning(records.partitions()); // 将消费者偏移量重置到消息的起始位置,以重新消费
                    throw new RuntimeException("Failed to consume messages", e);
                }
            }
        } finally {
            consumer.close();
        }

那么在生产中消费着具体如何编写呢?

在生产环境中使用 Kafka 的 poll 模式来消费数据,可以按照以下步骤进行配置和实现:

1. 添加 Kafka 依赖:在项目的 `pom.xml` 文件中添加 Kafka 相关的依赖,例如:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置 Kafka 连接信息:在 `application.properties` 或 `application.yml` 中配置 Kafka 的连接信息,包括 bootstrap servers(Kafka 服务器地址)、group id(消费者组ID)等配置项。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group

3. 创建 Kafka 消费者配置类:创建一个 Kafka 消费者配置类,用于配置 KafkaConsumer 的属性。可以根据实际需求进行自定义配置。
 

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 其他可选配置项
        return props;
    }
}

4. 创建消息处理器:创建一个消息处理器,用于处理从 Kafka 主题中消费的消息。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;

@Component
public class MyMessageHandler {

    public void handleMessage(ConsumerRecord<String, String> record) {
        // 处理接收到的消息
        String key = record.key();
        String value = record.value();
        // ... 进行业务处理
    }
}

5. 创建 Kafka 消费者:创建一个 Kafka 消费者,并使用 KafkaConsumerConfig 中定义的配置。

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

@Component
public class MyKafkaConsumer {

    private final KafkaConsumer<String, String> kafkaConsumer;
    private final MyMessageHandler messageHandler;

    @Autowired
    public MyKafkaConsumer(Map<String, Object> consumerConfigs, MyMessageHandler messageHandler) {
        this.kafkaConsumer = new KafkaConsumer<>(consumerConfigs);
        this.messageHandler = messageHandler;
    }

    @PostConstruct
    public void startConsuming() {
        kafkaConsumer.subscribe(Collections.singletonList("my-topic"));

        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                records.forEach(record -> messageHandler.handleMessage(record));
            }
        }).start();
    }

    // 可以根据需要添加关闭消费者的方法
}

在上述示例中,使用 `KafkaConsumerConfig` 类定义

了 KafkaConsumer 的配置,并通过构造函数注入到 `MyKafkaConsumer` 类中。在 `startConsuming()` 方法中,创建了一个新线程来进行消费,不断地使用 `poll()` 方法轮询获取消息,并通过 `MyMessageHandler` 处理消息。

请注意,在实际生产环境中,你可能还需要添加更多的配置和处理,例如异常处理、消费者的关闭和资源释放、消息提交偏移量的控制等。以上示例仅提供了一个基本的框架,你可以根据实际需求进行扩展和调整。

好,消费者就介绍到这里,后边我们介绍在生产中如何选择硬件以及kafka每个组件具体的优化方案,以及如何配置文章来源地址https://www.toymoban.com/news/detail-498576.html

到了这里,关于kafka消费者详解,根据实际生产解决问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(53)
  • Kafka生产者与消费者api示例

      一个正常的生产逻辑需要具备以下几个步骤 配置生产者参数及创建相应的生产者实例 构建待发送的消息 发送消息 关闭生产者实例 采用默认分区方式将消息散列的发送到各个分区当中    对于properties配置的第二种写法,相对来说不会出错,简单举例:   1.kafka的生产者可

    2024年02月07日
    浏览(77)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(44)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(58)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(87)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(48)
  • Kafka - 3.x 消费者 生产经验不完全指北

    Kafka引入了消费者事务(Consumer Transactions)来确保在消息处理期间维护端到端的数据一致性。这使得消费者能够以事务的方式处理消息,包括从Kafka中读取消息、处理消息和提交消息的offset。以下是有关Kafka消费者事务的详细信息: 事务的引入 :Kafka 0.11.0版本引入了消费者事

    2024年02月06日
    浏览(36)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(49)
  • 探究:kafka生产者/消费者与多线程安全

    目录 1. 多线程安全 1.1. 生产者是多线程安全的么? 1.1. 消费者是多线程安全的么? 2. 消费者规避多线程安全方案 2.1. 每个线程维护一个kafkaConsumer 2.2. [单/多]kafkaConsumer实例 + 多worker线程 2.3.方案优缺点对比         Kafka生产者是 线程安全 的,可以在多个线程中共享一个

    2023年04月26日
    浏览(92)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包