(Kafka源码三)Kafka的缓存机制

这篇具有很好参考价值的文章主要介绍了(Kafka源码三)Kafka的缓存机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在Kafka的架构中,若没有缓存机制,在RecordAccumulator中对于不再使用的批次对象,需要进行回收,释放这些对象所占用的内存,为了降低GC的压力,Kafka作者设计了一个缓存池的机制,从而实现了实现消息批次的内存复用。本文主要讲解缓冲池的设计。

首先来看下kafka生产者到RecordAccumulator这边的架构流程

kafka 生产者缓存,kafka,缓存,分布式,linq,后端

append()

生产者发送消息的send方法会依次调用以下方法,首先通过场景驱动的方式来看看生产者的源码是如何通过append()方法将消息添加到batches的双端队列中,首先模拟三个线程(线程一,线程二,线程三)往同一个分区写入数据,执行append操作,假设每次都是线程一先获取到锁,通过synchronized分段加锁,三个线程都能成功的将消息插入batches中,并且成功的将所申请的内存空间释放掉。可以看出append方法为了保证线程的安全性以及并发能力,append方法不是在方法上直接加锁,而是在内部采用分段加锁的方式,进一步提高了并发能力,执行流程图如下

kafka 生产者缓存,kafka,缓存,分布式,linq,后端

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        return doSend(interceptedRecord, callback);
    }
 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
 /*
    把消息放入accumulator(默认大小32M),然后accumulator把消息封装
    成为一个一个的消息批次的发送。
   */
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
//append()方法不是直接锁住整个方法,而是在内部采用分段加锁的方式,进一步提高了并发能力
 public  RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        try {
            /*
             根据分区获取对应的队列(每个分区对应一个双端队列),若队列已经存在,则使用已存在的队列,
             若不存在,则创建新的对垒
             */
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
          //先到达的线程锁住队列
            synchronized (dq) {
        		//若连接已经关闭,则抛出异常
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                
                // 尝试往队列里面的批次里添加数据,第一次尝试添加失败,因为还没有给消息批次对象分配内存
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                //若添加结果不为null,说明添加成功,返回添加结果
                if (appendResult != null)
                    return appendResult;
            }//释放锁

             //在消息批次的大小和消息大小之间取一个较大值作为要申请内存的大小
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
             //根据所需内存的大小分配内存块
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
					
            synchronized (dq) {
             //若连接已经关闭,则抛出异常
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                 //再一次尝试将消息写入消息批次中(还是失败,虽然分配了内存,但还没有创建消息批次对象)
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                //不为null,说明添加成功
                if (appendResult != null) {
                    //释放内存,将消息批次所占用的内存空间返回内存池
                    free.deallocate(buffer);
                    //返回添加结果
                    return appendResult;
                }   
                 //根据内存块的大小创建消息批次对象
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
						//尝试往消息批次对象写数据,这时就可以写入成功了
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
                 //将消息批次对象添加到双端队列的尾部
                dq.addLast(batch);
                incomplete.add(batch);
                //返回添加结果
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }//释放锁
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

BufferPool

首先来看看缓冲池的数据结构

public final class BufferPool {
    //缓冲池内存的总大小(默认大小为32M)
    private final long totalMemory;
    //单个消息批次(ByteBuffer)的大小,默认16K
    private final int poolableSize;
    // 由于在多线环境下,多个线程可能并发分配和回收消息批次,为了保证线程安全,使用控制并发。
    private final ReentrantLock lock;
    //双端队列实现的free缓冲池
    private final Deque<ByteBuffer> free;
    //该队列用于存放那些申请不到足够内存而被阻塞的线程所对应的条件变量
    private final Deque<Condition> waiters;
     //可用内存(可用内存大小 = totalMemory-free缓冲池的大小),初始值等于总内存大小
    private long availableMemory;
    	...
 public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
    ...
    // 总内存
    this.totalMemory = memory;
    // 可用内存的大小默认等于总内存大小
    this.availableMemory = memory;
  }
}

BufferPool一开始的内存结构图如下

kafka 生产者缓存,kafka,缓存,分布式,linq,后端

allocate()

allocate()申请内存的流程图
kafka 生产者缓存,kafka,缓存,分布式,linq,后端

再来看看内存分配allocate()的源码


 public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        //若申请的内存的大小超过了内存池的总大小(默认32M),则抛出异常
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");
        //加锁
        this.lock.lock();
        try {
  
            //若申请的内存大小等于干好等于一个消息批次(默认大小16k)并且free缓冲池不
            //为空,则直接返回free队列头部的ByteBuffer
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();
					//free缓冲池的总大小
            int freeListSize = this.free.size() * this.poolableSize;
            	//若总内存(free缓冲池大小+可用内存大小)大小 >= 要申请的内存大小
            if (this.availableMemory + freeListSize >= size) {
              	//若可用内存不够但是free缓冲池不为空,则依次将free首部的ByteBuffer加入到可用内存中
                freeUp(size);
                //对可用内存做内存扣减
                this.availableMemory -= size;
                //释放锁
                lock.unlock();
                //根据申请的内存大小分配内存空间
                return ByteBuffer.allocate(size);
            } else {
                //总内存不够 < 申请的内存大小
                //accumulated用于记录已经分配了多少内存空间
                int accumulated = 0;
                ByteBuffer buffer = null;
               //创建lock的条件变量
                Condition moreMemory = this.lock.newCondition();
                //当前线程最多可以等待多少时间
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                //将当前线程的条件变量加入waitter等待队列中,当前线程阻塞,等待其他线程释放内存
                this.waiters.addLast(moreMemory);
							//根据申请内存的大小,内存池一点一点去分配,直到满足所申请的内存大小
                while (accumulated < size) {
                //记录当前系统时间
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
									//当等待的时间到了或者被其他线程唤醒了,当前线程就恢复继续运行
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        this.waiters.remove(moreMemory);
                        throw e;
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        this.waitTime.record(timeNs, time.milliseconds());
                    }
							
                    if (waitingTimeElapsed) {
                    	//从等待队列中移除当前线程的条件变量
                        this.waiters.remove(moreMemory);
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }
								//计算还剩余多少可以等待的时间
                    remainingTimeToBlockNs -= timeNs; 
								//若申请大小刚好等于一个消息批次的大小并且free缓冲池不为空
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        //从free队列首部中获取ByteBuffer
                        buffer = this.free.pollFirst();
                        //更新内存累加值
                        accumulated = size;
                    } else {
											//size-accumlated表示还需要多少内存空间,从free缓冲池中划分还需要的内存空间
                        freeUp(size - accumulated);
                        // 两者取较小值,表示可以分配的内存
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        //从可用内存中扣减所需内存
                        this.availableMemory -= got;
                        //计算已经分配了多少内存
                        accumulated += got;
                    }
                }
                //从等待队列中的首部移除阻塞线程的condition
                Condition removed = this.waiters.removeFirst();
                if (removed != moreMemory)
                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");
							//如果可用内存大于0或者free缓冲池不为空,唤醒等待队列中的线程
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    //等待队列不为空
                    if (!this.waiters.isEmpty())
                    //唤醒首部等待的线程
                        this.waiters.peekFirst().signal();
                }
                //释放锁
                lock.unlock();
                //若buffer为空,就分配内存
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                //返回已经分配内存的buffer
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
            //释放锁
                lock.unlock();
        }
    }

deallocate()

释放内存的方法分为两种情况,若释放内存大小等于消息批次的大小,则直接将内存添加到free缓冲池队列的尾部,以达到内存复用的效果,若不等于一个消息批次的大小,则将内存的数值添加到可用内存的数值大小中,然后这块内存就等待JVM的GC回收,不论哪种情况,若等待队列不为空,都会唤醒等待队列waiters首部中被阻塞的线程,让该线程继续从内存池中分配内存。

释放内存的流程相比申请内存要简洁些,deallocate()释放内存的源码如下

 public void deallocate(ByteBuffer buffer, int size) {
        //加锁
        lock.lock();
        try {
            //若释放的内存大小等于一个消息批次的大小
            if (size == this.poolableSize && size == buffer.capacity()) {
                //首先将内存的内容清空
                buffer.clear();
                //把内存放入free队列缓冲池中
                this.free.add(buffer);
            } else {
                //若释放的内存大小不等于消息批次的大小,就直接将该内存块的空间大小添加到可用内存
                //的数值大小,等待JVM的GC回收
                this.availableMemory += size;
            }
					//从等待队列的首部移除阻塞的线程
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                //唤醒正在等待分配内存的线程
                moreMem.signal();
        } finally {
        //释放锁
            lock.unlock();
        }
    }

对于内存的分配以及释放可以归纳为如下四种情景:
(1)申请16K的内存且free缓冲池不为空
这种情况直接从free缓冲池队列首部弹出大小为16K的ByteBuffer,用完后首先使用clear()方法清空内存里面的内存,再添加到free缓冲池队列的尾部,以达到内存复用的效果,接着唤醒等待队列waiters首部等待分配内存的线程。
kafka 生产者缓存,kafka,缓存,分布式,linq,后端

(2)申请16K的内存且free()缓冲池为空
这种情况直接从可用内存中划分大小为16K的内存空间,用完后添加到free缓冲池队列的尾部,以便下次使用,
接着唤醒等待队列waiters首部中被阻塞的线程,让该线程继续从内存池中分配内存。
kafka 生产者缓存,kafka,缓存,分布式,linq,后端

(3)申请非16K的内存且free缓冲池为空
这种情况直接从可用内存划分非16k的内存,用完后将其归还可用内存中,后续被JVM的GC回收。
kafka 生产者缓存,kafka,缓存,分布式,linq,后端

(4)申请非16K的内存且可用内存不够但free缓冲池不为空的情况
首先从free缓冲池中将ByteBuffer释放回可用内存中,直到满足申请的内存大小,再从可用内存获取申请的内存,用完后再释放回可用内存,后续被JVM的GC回收。
kafka 生产者缓存,kafka,缓存,分布式,linq,后端总结:通过以上源码分析可以知道,如果申请的内存大小刚好等于batchSize,那么会直接从free缓冲池中申请内存,当用完后直接将内存归还到free队列,达到内存复用的效果,同时也减轻了GC的压力;若申请的内存大小比batchSize还要大,此时就不会从free缓冲池申请ByteBuffer了,而是直接到可用内存中申请一块新的内存空间,当用完后需要将这块内存归还到可用内存中,等待JVM的GC回收,此时更糟糕的一种情况是可用内存比申请内存小,需要不断的将free队列首部的内存释放到可用内存中,free缓冲池中的内存越来越少,意味着可用内存需要更多JVM的GC回收。因此,需要根据业务消息的大小,适当调整batchSize的大小,避免频繁的GC回收。

CopyOnWriteMap

Kafka可以处理大量的数据并且具备高并发能力,其中一个原因就在于借助了并发容器,在RecordAccumulator的成员变量中有一个用CopyOnWriteMap实现的batches,从而实现了读写分离以及高并发读的能力。

public final class RecordAccumulator {
	    //TopicPartition(分区):Deque<RecordBatch> (队列)
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;//构造方法
 public RecordAccumulator(int batchSize,
                             long totalSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             Metrics metrics,
                             Time time) {
                             ...
                 //CopyOnWriteMap实现的batches
        this.batches = new CopyOnWriteMap<>();
    }

batches的结构图

kafka 生产者缓存,kafka,缓存,分布式,linq,后端
CopyOnWriteMap工作原理:当一个线程尝试修改 Map(比如添加、删除或更新键值对)时,它会先创建一个 Map 的副本,然后在副本上进行修改。修改完成后,会用新的副本替换原始 Map。这样,读取 Map 的线程可以继续访问原始 Map,而不会受到修改操作的影响。

CopyOnWriteMap的优缺点

  • 优点:它可以在读多写少的场景下提供较好的性能。因为读操作不需要加锁,而写操作只需要替换引用,所以具有很高的并发读性能。

  • 缺点:每次修改操作都需要创建一个新的副本,这会增加内存消耗和垃圾回收的压力。

总结

本文主要讲解了线程添加消息(append()方法源码)到RecordAccumulator的源码流程,以及在添加的过程中所涉及到内存的申请与释放的源码分析,最后介绍了RecordAccumulator通过CopyOnWriteMap实现的batches,从而实现了读写分离与高并发读的能力。下一章将从源码角度详细介绍Broker是如何处理生产者发送过来的消息。文章来源地址https://www.toymoban.com/news/detail-852075.html

到了这里,关于(Kafka源码三)Kafka的缓存机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 入门到起飞 - 生产者参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR?

    上回书我们讲了,生产者发送消息流程解析传送门 那么这篇我们来看下,生产者发送消息时几个重要的参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR? bootstrap.servers : Kafka 集群地址 host1:port1,host2:port2,host3:port3 不需要写Kafka集群中全部的broker地址,但是也不要写

    2024年02月15日
    浏览(27)
  • Kafka 生产者投递内存池源码刨铣

    大体逻辑是这样的。。 初始化BufferPool的时候会指定BufferPool的大小 以及内存块(poolableSize)的大小。 在申请内存的时候如果申请的内存大小大于指定的内存块大小就会抛出异常,无法申请。 如果是等于poolableSize的话,就会判断free中是否有,有的话就直接从free中取出,没有的话

    2024年01月22日
    浏览(46)
  • Spring-Kafka生产者源码分析

    本文主要概括Spring Kafka生产者发送消息的主流程 代码准备: SpringBoot项目中maven填加以下依赖 消息发送使用 KafkaTemplate 启动类 KafkaAutoConfiguration 有两个地方需要关注 其中的 ProducerFactory 使用的是 DefaultKafkaProducerFactory 在发送消息之前,Spring Kafka会先创建 Producer ,返回的是 Clos

    2024年02月09日
    浏览(29)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(37)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(37)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(49)
  • Apache Kafka - 重识Kafka生产者

    Kafka 生产者是 Apache Kafka 中的一个重要组件,它负责将数据发送到 Kafka 集群中。在实时数据处理和流式处理应用程序中,Kafka 生产者扮演着非常重要的角色。 这里我们将介绍 Kafka 生产者的概念、工作原理以及如何使用 Kafka 生产者。 Kafka 生产者是一种用于将数据发送到 Kafk

    2024年02月05日
    浏览(31)
  • [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题

    [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题 Kafka是一个高度可扩展的分布式流平台,用于构建实时数据管道和流处理应用程序。作为一个广泛使用的消息代理系统,Kafka在数据传输方面表现出色,但是在极端情况下,它可能会出现生产者阻塞的问题。这可能会导致

    2024年02月11日
    浏览(34)
  • kafka入门(五):kafka生产者发送消息

    构建消息,即创建 ProduceRecord 对象。 (1) kafka发送消息,最常见的构造方法是: topic 表示主题, value 表示值。 (2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。

    2024年01月17日
    浏览(32)
  • 「Kafka」生产者篇

    在消息发送的过程中,涉及到了 两个线程 —— main 线程 和 Sender 线程 。 在 main 线程中创建了 一个 双端队列 RecordAccumulator 。 main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象,调用 send 函数发送消息,

    2024年01月19日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包