Kafka源码简要分析

这篇具有很好参考价值的文章主要介绍了Kafka源码简要分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、生产者的初始化流程

二、生产者到缓冲队列的流程

三、Sender拉取数据到Kafka流程

四、消费者初始化

五、主题订阅原理

六、消费者抓取数据原理

七、消费者组初始化

八、消费者组消费流程

九、提交offset原理


一、生产者的初始化流程

  1. 首先获取事务id和客户端id(用到事物必须要事物id不然报错,每个生产者都需要唯一标识客户端id)
  2. 监控kafka相关情况的JmxReporter配置
  3. 然后获取分区器,如果用户有自定义的就读取配置的,如果没有配置就用默认分区器
  4. 然后key和value进行序列化
  5. 然后就读取自定义拦截器,可以定义多个拦截器,组成拦截器链
  6. 然后初始化控制单条日志的大小,默认是1m;缓冲区大小,默认32m;
  7. 创建内存池,缓存队列,初始化批次大小默认16k,压缩相关处理,默认是none,重试间隔时间默认100ms
  8. 连接kafka集群,获取元数据,才能知道要发送到哪个分区
  9. 创建sender线程,会有个创建sender的方法,sender线程负责拉取缓冲队列消息到Kafka,在方法里面会定义缓存请求的个数默认5个,然后请求超时的时间,然后创建一个网络请求客户端对象,会传入刚刚的参数还有客户端id,重试时间,发送缓冲区的大小128和接受缓冲区的大小32,还有acks等配置。sender继承了Runnbale接口,然后会new个sender线程出来用上面这些参数,然后返回。
  10. sender放到后台,启动sender线程

二、生产者到缓冲队列的流程

  1. 在执行到拦截器的时候就要调用一个onSend方法,如果有多个拦截器,每个拦截器都会走一次这个方法,这个方法就是拦截器对数据加工的
  2. 然后获取元数据,要根据主题的分区放到对应的缓存队列
  3. 序列化相关操作key和value的序列化和压缩
  4. 分区操作,如果指定了分区,直接分配到指定分区;没有指定就会根据分区器进行分配,没有指定key就会粘性分区处理(如果批次大小和活着时间到了不然就一直是那个,满足才能创建新队列用),如果指定key就根据key到hashcode进分区数取模,
  5. 保证(序列化和压缩后)数据大小能够传输,他去读取配置的消息最大值和缓冲区大小,如果有超过的抛异常
  6. 向缓存队列里面追加数据,获取或者创建一个队列按照分区,然后尝试添加数据(一般不成功,因为还没申请内存),然后根据16k和现在压缩后的总大小取最大值,申请内存就申请这个大小,内存池分配内存,然后sender线程拿走就了会释放内存。
  7. 如果批次大小满了或者有了新的批次需要创建,就唤醒sender线程把缓冲队列的数据拉取过去。

三、Sender拉取数据到Kafka流程

  1. 事务相关操作
  2. 获取元数据信息,为了知道发到哪个分区
  3. 判断32m缓存是否准备好,先获取队列的信息,先判断内存队列有没有数据
  4. 判断leader是不是空如果没有目标那还是会抛出异常,如果批次大小或时间满足一个条件,就会发送。
  5. 把所有请求按照节点为单位来发送请求,这样一台机器只需要建立一次连接
  6. 封装了个request然后通过网络客户端把数据发送过去
  7. 然后服务端还是通过网络客户端获取结果

四、消费者初始化

  1. 消费者组平衡
  2. 获取消费者组id和客户端id
  3. 设置请求服务端等待时间,默认30秒;重试时间,默认100毫秒
  4. 拦截器链相关处理
  5. key和value的反序列化
  6. 判断offset从什么位置开始消费
  7. 获取消费者元数据(重试时间、是否允许访问系统主题默认false,是否允许自动创建topic主题默认true)
  8. 连接Kafka集群
  9. 创建网络客户端对象(连接重试时间默认50ms,最大重试时间1s,发送缓冲区128kb和接受缓冲区64kb大小)
  10. 指定消费者分区分配策略
  11. 创建coordinator对象
  12. 设置自动提交offset时间,默认5s,配置抓取数据的参数(最少抓取多少最大一次抓取多少等)

五、主题订阅原理

  1. 传入要订阅的主题,如果为null直接抛出异常
  2. 注册负载均衡监听器,如果消费者组中有节点挂了,要通知其他消费者
  3. 按照主题自动订阅进行分配

六、消费者抓取数据原理

  1. 他首先先初始化消费者组和队列
  2. 然后回调消息会到缓冲队列,然后去队列抓取数据,最多一次500条
  3. 然后抓取后拦截器开始处理数据

七、消费者组初始化

  1. 先判断coordinator不为null那就说明为消费者组
  2. 如果没有指定分区分配策略会抛出异常
  3. 判断coordinator是否准备好,他会循环创建查找coordinator的请求并发送,并获取服务器返回到结果

他这整个消费者组初始化就是判断coordinator有没有准备好文章来源地址https://www.toymoban.com/news/detail-724658.html

八、消费者组消费流程

  1. 他会用判断coordinator是不是空,是的话就等待
  2. 他上来先去队列拉取数据,一般是拉取不到的
  3. 他先构造请求的入参(最少一次抓多少,最多抓多少,超时时间等待)然后调用send
  4. 他送后返回future,通过回调获取数据的
  5. 他会循环遍历数据获取分区,获取分区的数据,如果有数据就放到消息队列里面
  6. 然后就调用从队列拉取数据的方法拉取,然后他有大小限制最大500,他会循环一波一波拉取过去
  7. 然后放到拦截器走加工操作

九、提交offset原理

  • 同步提交:找到coordinator然后调用commitOffset进行发送,然后不停dowhile循环,调用发送提交请求,然后等待回调获取结果,一直循环到成功为止。
  • 异步提交:他还是用coordinator去提交但是他不等待结果,他new了个监听等待结果。

到了这里,关于Kafka源码简要分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息中间件之Kafka(一)

    高性能的消息中间件,在大数据的业务场景下性能比较好,kafka本身不维护消息位点,而是交由Consumer来维护,消息可以重复消费,并且内部使用了零拷贝技术,性能比较好 Broker持久化消息时采用了MMAP的技术,Consumer拉取消息时使用的sendfile技术 Kafka是最初由Linkedin公司开发,

    2024年01月20日
    浏览(52)
  • 消息中间件之Kafka(二)

    1.1 为什么要对topic下数据进行分区存储? 1.commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上, 相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据 2.提高并行度 1.2 如何在多个partition中保证顺序消费? 方案一

    2024年01月21日
    浏览(49)
  • 【Java面试丨消息中间件】Kafka

    1. 介绍 使用kafka在消息的收发过程都有可能会出现消息丢失 (1)生产者发送消息到broker丢失 (2)消息在broker中存储丢失 (3)消费者从broker接收消息丢失 2. 生产者发送消息到broker丢失 设置异步发送:同步发送会发生阻塞,一般使用异步发送方式发送消息 消息重试:由于网

    2024年02月11日
    浏览(47)
  • 消息中间件,RabbitMQ,kafka常见面试题

    RabbitMQ和Kafka都是消息队列系统,可以用于流处理。流处理是指对高速、连续、增量的数据进行实时处理。 RabbitMQ 和 Kafka 的相同点有以下几个: 都是消息队列系统,可以用于流处理、异步通信、解耦等场景 都是开源的,有活跃的社区和丰富的文档 都支持分布式部署,具有高

    2024年02月04日
    浏览(40)
  • 【消息中间件MQ系列】Spring整合kafka并设置多套kafka配置

            圣诞节的到来,程序员不会收到圣诞老人的🎁,但可以自己满足一下自己,所以,趁着有时间,就记录一下这会儿撸了些什么代码吧!!!         因为业务原因,需要在系统内新增其他的kakfa配置使用,所以今天研究的是怎么在系统内整合多套kafka配置使用。

    2024年02月01日
    浏览(96)
  • 架构师系列- 消息中间件(13)-kafka深入应用

    1)配置文件  2)启动信息 4.2.1 发送类型 KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法 详细代码参考:AsyncProducer.java 消费者使用:KafkaConsumer.java 1)同步发送 通过swagger发送,控制台可以正常打印send result swagger访问地址:http://localhost:808

    2024年04月29日
    浏览(40)
  • ActiveMQ、RabbitMQ、Kafka、RocketMQ消息中间件技术选型

    消息中间件是分布式系统中重要的组件之一,用于实现异步通信、解耦系统、提高系统可靠性和扩展性。在做消息中间件技术选型时,需要考虑多个因素,包括可靠性、性能、可扩展性、功能丰富性、社区支持和成本等。本文将五种流行的消息中间件技术:ActiveMQ、RabbitMQ、

    2024年02月11日
    浏览(50)
  • 【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka

    作者简介 前言 博主之前写过一个完整的MQ系列,包含RabbitMQ、RocketMQ、Kafka,从安装使用到底层机制、原理。专栏地址: https://blog.csdn.net/joker_zjn/category_12142400.html?spm=1001.2014.3001.5482 本文是该系列的清单综述,会拉通来聊一下三大MQ的特点和各种适合的场景。 目录 1.概述 1.1.M

    2024年02月09日
    浏览(53)
  • SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的发送方:生产者 消息的接收方:消费者 同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送 异步消息:不需要接收方回应就可以进行下一步的发送 什么是消息队列? 当此时有很多个用户同时访问服务器,需要服务器进行操作,但此

    2024年04月27日
    浏览(52)
  • 消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在构建分布式系统时,选择适合的消息中间件是至关重要的决策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是当前流行的消息中间件之一,它们各自具有独特的特点和适用场景。本文将对这四种消息中间件进行综合比较,帮助您在项目中作出明智的选择。 1. RabbitMQ 特点: 消息模

    2024年02月20日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包