浅析Spring-kafka源码——消费者模型的实现

这篇具有很好参考价值的文章主要介绍了浅析Spring-kafka源码——消费者模型的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot项目中的消费端实现而言,Spring-kafka没有用原生的ConsumerConnector,,而是借助原生client的拉取消息功能做了自己的消费模型的实现,提供了@KafkaListener注解这种方式实现消费。

开发中在使用Spring-kafka时,一般也都是通过使用@KafkaListener注解的方法来实现消息监听和消费。本文就是介绍基于这个注解实现的消费端模型原理。

Kafka消费模型

我们在使用@KafkaListener注解实现消费者时消费者模型是这样的:

每个@KafkaListener注解对应有一个ConcurrentMessageListenerContainer容器,容器中会创建concurrency数量的 KafkaMessageListenerContainer 容器。默认不配置concurrency参数时这俩容器都是一对一的关系。

KafkaMessageListenerContainer 容器中又包含了一个SimpleAsyncTaskExecutor线程池和KafkaMessageListenerContainer.ListenerConsumer实例任务。

每个KafkaMessageListenerContainer相当于一个消费者线程, 这个线程会不断向 server 端发起 poll 请求来获取消息。

浅析Spring-kafka源码——消费者模型的实现

Kafka消费端启动过程源码

在Spring项目启动整体流程中,Kafka消费端模型的启动初始化依赖于KafkaListenerAnnotationBeanPostProcessor 这个处理器以及Spring生命周期中的i文章来源地址https://www.toymoban.com/news/detail-494345.html

到了这里,关于浅析Spring-kafka源码——消费者模型的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(37)
  • kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码

    kafka服务端接收生产者数据的API在KafkaApis.scala类中,handleFetchRequest方法 replicaManager.fetchMessages 最后通过这个方法获得日志 通过readFromLocalLog查询数据日志 val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) 遍历主题分区分别执行read内部函数执行查询操作 方法内部通过 partition.fe

    2024年02月10日
    浏览(28)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(33)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(33)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(52)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(33)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(33)
  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(35)
  • Kafka-消费者组消费流程

    消费者向kafka集群发送消费请求,消费者客户端默认每次从kafka集群拉取50M数据,放到缓冲队列中,消费者从缓冲队列中每次拉取500条数据进行消费。   

    2024年02月12日
    浏览(37)
  • Kafka3.0.0版本——消费者(消费者组原理)

    1.1、消费者组概述 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 注意: (1)、消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 (2)、消费者组之间互不影响。所有的消费者

    2024年02月09日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包