【项目实战】Java 开发 Kafka 消费者

这篇具有很好参考价值的文章主要介绍了【项目实战】Java 开发 Kafka 消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

✊✊ 感觉对你有帮助的朋友,可以给博主一个三连,非常感谢 🙏🙏🙏

【项目实战】Java 开发 Kafka 消费者,Kafka专栏,java,kafka,linq,原力计划

1、什么是 Kafka 消费者

【项目实战】Java 开发 Kafka 消费者,Kafka专栏,java,kafka,linq,原力计划

🔔 Kafka 消费者是使用 Apache Kafka 消息队列系统的应用程序,它们用于从 Kafka 主题(topics)中读取消息。消费者订阅一个或多个主题,并从这些主题中拉取消息以进行处理。消费者可以以不同的方式配置,例如,可以指定消息的偏移量(offset)以从特定位置开始消费消息,还可以指定消费者组(consumer group)以实现消息的分组消费。消费者是 Kafka 中的重要组成部分,用于实现可靠的消息传递和数据处理。

2、 Java 如何使用 Kafka 消费者

首先引入依赖

<dependency>
 	<groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

以下代码创建一个简单的 Kafka 消费者:

package com.pany.camp.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 *
 * @description: Kafka 消费者
 * @copyright: @Copyright (c) 2022
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0
 * @createTime: 2023-06-26 18:04
 */
public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d\n",
                        record.key(), record.value(), record.partition(), record.offset());
            });
        }
    }
}

“test-topic” 替换为你要消费的 Kafka 主题的名称,将 “localhost:9092” 替换为 Kafka 服务器的地址,将 “test-group” 替换为你的消费者组的唯一标识符。

3、SpringBoot 如何使用 Kafka 消费者/ 消费者组

消费者是通过 @KafkaListener 监听消息获取的,案例如下:

package com.pany.camp.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 *
 * @description:  kafka 消费者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-07-14 8:31
 */
@Component
public class kafkaConsumerListenerExample {

    @KafkaListener(topics = "your_topic_name", groupId = "your_consumer_group_id")
    public void consume(ConsumerRecord<?, ?> record) {
        Optional<?> value = Optional.ofNullable(record.value());
        // 进行消息处理逻辑
        System.out.println("print message: " + value);
    }
}

4、@KafkaListener 参数说明

@KafkaListener 用于监听 Kafka 主题中的消息,并在消息到达时执行相应的逻辑。

KafkaListener 注解有以下参数:

  • 🐔 id :监听器的唯一标识符。如果未指定,默认为方法的名称。
  • 🐔 topics :要监听的 Kafka 主题列表。可以监听多个主题,以逗号分隔。
  • 🐔 topicPattern :要监听的 Kafka 主题的正则表达式模式。与 topics 参数互斥,只能选择其中一个。
  • 🐔 containerFactory :用于创建 Kafka 容器的工厂类。如果未指定,默认使用默认工厂类。
  • 🐔 concurrency :并发处理消息的线程数。默认为 1。
  • 🐔 groupId :Kafka 消费者组的标识符。如果未指定,默认为应用程序的名称。
  • 🐔 clientIdPrefix :Kafka 消费者的客户端 ID 的前缀。如果未指定,默认为空。
  • 🐔 autoStartup :是否在应用程序启动时自动启动监听器。默认为 true。
  • 🐔 properties :Kafka 消费者的其他属性配置。
    这些参数可以根据具体需求进行配置,以便创建适合的 Kafka 消息监听器。

5、你必须知道的 @KafkaListener 实现原理

KafkaListener 的底层实现原理涉及到 Spring Kafka 的注解处理器和 Kafka 消费者的集成。
注解的处理过程:

1. 注解处理器:

  • Spring Kafka 框架中的注解处理器负责解析和处理 KafkaListener 注解。
  • 在应用程序启动时,注解处理器会扫描所有的 Bean,查找带有 KafkaListener 注解的方法,并为每个方法创建一个 KafkaMessageListenerContainer 对象。
  • 注解处理器会解析注解上的参数,例如指定的 Kafka 主题、消费者组 ID、反序列化器等。

2. 创建 Kafka 消费者:

  • KafkaMessageListenerContainer 对象负责创建和管理底层的 Kafka 消费者。
  • 根据注解上的参数,KafkaMessageListenerContainer 会创建一个或多个 Kafka 消费者,并配置消费者的相关属性。
  • 消费者的创建和配置是通过 Apache Kafka 客户端库实现的,Spring Kafka 提供了对客户端库的封装和集成。

3. 消费 Kafka 消息:

  • KafkaMessageListenerContainer 对象会启动 Kafka 消费者,开始轮询 Kafka 服务器以获取新的消息。
  • 当有消息到达时,消费者会将消息传递给带有 KafkaListener 注解的方法进行处理。
  • 方法可以根据业务逻辑进行相应的处理,例如解析消息内容、进行数据处理、调用其他服务等。
  • 处理完成后,可以选择确认消息的消费,或者抛出异常以触发重试机制。

4. 并发处理消息:

  • KafkaMessageListenerContainer 支持并发处理消息,可以通过配置 concurrency 参数来指定处理消息的线程数。
  • 如果设置了并发处理,KafkaMessageListenerContainer 会为每个线程创建一个独立的 Kafka 消费者,并通过分区分配策略将消息分配给不同的线程进行处理。
  • 这样可以提高消息处理的吞吐量。

写在最后

Kafka消费者的优点是具有高吞吐量和低延迟的特性,能够处理大规模的消息流。它支持水平扩展,可以通过增加消费者实例来提高处理能力。此外,Kafka消费者还具有消息的持久化和可靠性保证,能够处理高并发的消息消费。

然而,Kafka消费者也存在一些缺点。首先,Kafka消费者的配置和管理相对复杂,需要关注许多参数和细节。其次,Kafka消费者在处理消息时需要自行管理偏移量,确保消息的有序性和正确性,这对于开发人员来说可能需要额外的工作。此外,Kafka消费者对于实时性要求较高的场景可能不太适用,因为消息的传递可能存在一定的延迟。

总的来说,Kafka消费者适用于需要处理大规模消息流的场景,对于数据的可靠性和持久化有较高要求,但在配置和管理上需要额外的注意和工作。

💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃
【项目实战】Java 开发 Kafka 消费者,Kafka专栏,java,kafka,linq,原力计划文章来源地址https://www.toymoban.com/news/detail-589180.html

到了这里,关于【项目实战】Java 开发 Kafka 消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

    #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生产者 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.ap

    2024年04月09日
    浏览(49)
  • 大数据开发之Kafka(broker、消费者、eagle监控、kraft模式)

    4.1.1 Zookeeper存储的Kafka的信息 1、查看zookeeper中的kafka节点所存储的信息 启动Zookeeper客户端 通过ls命令列出kafka节点内容 2、zookeeper中存储的kafka信息 在zookeeper的服务端存储的Kafka相关信息: 1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器 2)/kafka/brokers/topics/first/partitions/0/state {“l

    2024年01月21日
    浏览(54)
  • Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

    Kafka消费者提交Offset的策略有 自动提交Offset: 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了 手动提交Offset 消费者在消费消息时/后,再提交o

    2024年02月08日
    浏览(49)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(47)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(49)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(47)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(44)
  • Kafka-消费者组消费流程

    消费者向kafka集群发送消费请求,消费者客户端默认每次从kafka集群拉取50M数据,放到缓冲队列中,消费者从缓冲队列中每次拉取500条数据进行消费。   

    2024年02月12日
    浏览(46)
  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包