kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码

这篇具有很好参考价值的文章主要介绍了kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、服务端接收消费者拉取数据的方法

kafka服务端接收生产者数据的API在KafkaApis.scala类中,handleFetchRequest方法

override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
	//省略代码
	request.header.apiKey match {
          //消费者拉取消息请求,这个接口进行处理
        case ApiKeys.FETCH => handleFetchRequest(request)
        //省略代码
    }    	
        
 }  
def handleFetchRequest(request: RequestChannel.Request): Unit = {
    //从请求中获取请求的API版本(versionId)和客户端ID(clientId)。
    val versionId = request.header.apiVersion
    val clientId = request.header.clientId
    //从请求中获取Fetch请求的数据
    val fetchRequest = request.body[FetchRequest]
    //根据请求的版本号,决定是否获取主题名称的映射关系(topicNames)。如果版本号大于等于13,则使用metadataCache.topicIdsToNames()获取主题名称映射关系,否则使用空的映射关系。
    val topicNames =
      if (fetchRequest.version() >= 13)
        metadataCache.topicIdsToNames()
      else
        Collections.emptyMap[Uuid, String]()
    //根据主题名称映射关系,获取Fetch请求的数据(fetchData)和需要忽略的主题(forgottenTopics)。
    val fetchData = fetchRequest.fetchData(topicNames)
    val forgottenTopics = fetchRequest.forgottenTopics(topicNames)
    //创建一个Fetch上下文(fetchContext),用于管理Fetch请求的处理过程。该上下文包含了Fetch请求的版本号、元数据、是否来自Follower副本、Fetch数据、需要忽略的主题和主题名称映射关系。
    val fetchContext = fetchManager.newContext(
      fetchRequest.version,
      fetchRequest.metadata,
      fetchRequest.isFromFollower,
      fetchData,
      forgottenTopics,
      topicNames)
    //初始化两个可变数组erroneous和interesting,用于存储处理过程中的错误和请求需要哪些topic的数据。
    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]()
    val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]()
  //Fetch请求来自Follower副本
    if (fetchRequest.isFromFollower) {
      //则需要验证权限。如果权限验证通过
      // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
      if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
        //遍历每个分区的数据,根据不同情况将数据添加到erroneous或interesting中
        fetchContext.foreachPartition { (topicIdPartition, data) =>
          if (topicIdPartition.topic == null)
            erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
          else if (!metadataCache.contains(topicIdPartition.topicPartition))
            erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
          else
            interesting += topicIdPartition -> data
        }
      } else {
        //如果权限验证失败,则将所有分区的数据添加到erroneous中。
        fetchContext.foreachPartition { (topicIdPartition, _) =>
          erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
        }
      }
    } else {
      //如果Fetch请求来自普通的Kafka消费者
      // Regular Kafka consumers need READ permission on each partition they are fetching.
      val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]
      fetchContext.foreachPartition { (topicIdPartition, partitionData) =>
        if (topicIdPartition.topic == null)
          erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
        else
          partitionDatas += topicIdPartition -> partitionData
      }
      //需要验证对每个分区的读取权限,根据权限验证结果,将数据添加到erroneous或interesting中。
      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)
      partitionDatas.foreach { case (topicIdPartition, data) =>
        if (!authorizedTopics.contains(topicIdPartition.topic))
          erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
        else if (!metadataCache.contains(topicIdPartition.topicPartition))
          erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
        else
          interesting += topicIdPartition -> data
      }
    }
  //省略代码
   //如果需要的topic没有校验通过或者不存在,则直接调用processResponseCallback处理响应
    if (interesting.isEmpty) {
      processResponseCallback(Seq.empty)
    } else {
      // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
      // no bytes were recorded in the recent quota window
      // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
      //如果是Follower提取数据的请求,则maxQuotaWindowBytes设置为int类型的最大,否则从记录中得到此client以前获取数据大小,
      // 再和请求中、配置文件中的fetchMaxBytes比较得到下面fetchMaxBytes和fetchMinBytes两个值
      val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
        Int.MaxValue
      else
        quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
      //根据请求的类型和配额限制,获取Fetch请求的最大字节数(fetchMaxBytes)和最小字节数(fetchMinBytes)
      val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
      val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)

      val clientMetadata: Optional[ClientMetadata] = if (versionId >= 11) {
        // Fetch API version 11 added preferred replica logic
        //提取 API 版本 11以上 添加了首选副本逻辑
        Optional.of(new DefaultClientMetadata(
          fetchRequest.rackId,
          clientId,
          request.context.clientAddress,
          request.context.principal,
          request.context.listenerName.value))
      } else {
        Optional.empty()
      }
      //创建一个FetchParams对象,包含了请求的各种参数
      val params = new FetchParams(
        versionId,
        fetchRequest.replicaId,
        fetchRequest.replicaEpoch,
        fetchRequest.maxWait,
        fetchMinBytes,
        fetchMaxBytes,
        FetchIsolation.of(fetchRequest),
        clientMetadata
      )

      // call the replica manager to fetch messages from the local replica
      //replicaManager.fetchMessages方法,从本地副本获取消息,并提供回调函数processResponseCallback处理响应
      replicaManager.fetchMessages(
        params = params,
        fetchInfos = interesting,
        quota = replicationQuota(fetchRequest),
        responseCallback = processResponseCallback,
      )
    }
}    

replicaManager.fetchMessages 最后通过这个方法获得日志

/**
   * Fetch messages from a replica, and wait until enough data can be fetched and return;
   * the callback function will be triggered either when timeout or required fetch info is satisfied.
   * Consumers may fetch from any replica, but followers can only fetch from the leader.
   * 从副本中获取消息,并等待可以获取足够的数据并返回;
   * 当满足超时或所需的获取信息时,将触发回调函数。
   * 消费者可以从任何副本中获取,但追随者只能从领导者那里获取。
   */
  def fetchMessages(
    params: FetchParams,
    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
    quota: ReplicaQuota,
    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
  ): Unit = {
    // check if this fetch request can be satisfied right away
    //调用readFromLocalLog函数从本地日志中读取消息,并将结果保存在logReadResults中。
    val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false)
    var bytesReadable: Long = 0
    var errorReadingData = false
    var hasDivergingEpoch = false
    var hasPreferredReadReplica = false
    val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
    //根据读取结果更新一些变量,如bytesReadable(可读取的字节数)、errorReadingData(是否读取数据时发生错误)、hasDivergingEpoch(是否存在不同的epoch)和hasPreferredReadReplica(是否存在首选读取副本)。
    logReadResults.foreach { case (topicIdPartition, logReadResult) =>
      brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
      brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
      if (logReadResult.error != Errors.NONE)
        errorReadingData = true
      if (logReadResult.divergingEpoch.nonEmpty)
        hasDivergingEpoch = true
      if (logReadResult.preferredReadReplica.nonEmpty)
        hasPreferredReadReplica = true
      bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
      logReadResultMap.put(topicIdPartition, logReadResult)
    }

    // respond immediately if 1) fetch request does not want to wait  不需要等待
    //                        2) fetch request does not require any data 不需要任何数据
    //                        3) has enough data to respond 有足够的数据
    //                        4) some error happens while reading data 读取数据时发生错误
    //                        5) we found a diverging epoch 存在不同的epoch
    //                        6) has a preferred read replica  存在首选读取副本
    if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
      hasDivergingEpoch || hasPreferredReadReplica) {
      val fetchPartitionData = logReadResults.map { case (tp, result) =>
        val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)
        tp -> result.toFetchPartitionData(isReassignmentFetch)
      }
      responseCallback(fetchPartitionData)
    } else {
      //将构建一个延迟处理的DelayedFetch对象,并将其放入延迟处理队列(delayedFetchPurgatory)中,以便在满足特定条件时完成请求。
      // construct the fetch results from the read results
      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]
      fetchInfos.foreach { case (topicIdPartition, partitionData) =>
        logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
          val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
          fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
        })
      }
      val delayedFetch = new DelayedFetch(
        params = params,
        fetchPartitionStatus = fetchPartitionStatus,
        replicaManager = this,
        quota = quota,
        responseCallback = responseCallback
      )

      // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }

      // try to complete the request immediately, otherwise put it into the purgatory;
      // this is because while the delayed fetch operation is being created, new requests
      // may arrive and hence make this operation completable.
      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
    }
  }

通过readFromLocalLog查询数据日志

二、遍历请求中需要拉取数据的主题分区集合,分别执行查询数据操作,

 /**
   * Read from multiple topic partitions at the given offset up to maxSize bytes
   * 以给定的偏移量从多个主题分区读取最大最大大小字节
   */
  def readFromLocalLog(
    params: FetchParams,
    readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
    quota: ReplicaQuota,
    readFromPurgatory: Boolean
  ): Seq[(TopicIdPartition, LogReadResult)] = {
    val traceEnabled = isTraceEnabled

    def read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
      //从fetchInfo中获取一些数据,包括fetchOffset(拉取偏移量)、maxBytes(拉取的最大字节数)和logStartOffset(日志起始偏移量)。
      val offset = fetchInfo.fetchOffset
      val partitionFetchSize = fetchInfo.maxBytes
      val followerLogStartOffset = fetchInfo.logStartOffset
      //计算调整后的最大字节数adjustedMaxBytes,取fetchInfo.maxBytes和limitBytes的较小值。
      val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
      try {
        if (traceEnabled)
          trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
            s"remaining response limit $limitBytes" +
            (if (minOneMessage) s", ignoring response/partition size limits" else ""))
        //获取指定分区的Partition对象
        val partition = getPartitionOrException(tp.topicPartition)
        //获取当前时间戳fetchTimeMs
        val fetchTimeMs = time.milliseconds

        //检查拉取请求或会话中的主题ID是否与日志中的主题ID一致,如果不一致则抛出InconsistentTopicIdException异常。
        val topicId = if (tp.topicId == Uuid.ZERO_UUID) None else Some(tp.topicId)
        if (!hasConsistentTopicId(topicId, partition.topicId))
          throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.")
        // If we are the leader, determine the preferred read-replica
        //根据一些条件选择合适的副本(replica)进行后续的数据抓取(fetch)。
        val preferredReadReplica = params.clientMetadata.asScala.flatMap(
          metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))

        if (preferredReadReplica.isDefined) {
          //如果不存在,则跳过读取操作,直接构建一个LogReadResult对象,表示从非Leader副本获取数据的结果。
          replicaSelectorOpt.foreach { selector =>
            debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
              s"${preferredReadReplica.get} for ${params.clientMetadata}")
          }
          // If a preferred read-replica is set, skip the read
          val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
          LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
            divergingEpoch = None,
            highWatermark = offsetSnapshot.highWatermark.messageOffset,
            leaderLogStartOffset = offsetSnapshot.logStartOffset,
            leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
            followerLogStartOffset = followerLogStartOffset,
            fetchTimeMs = -1L,
            lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
            preferredReadReplica = preferredReadReplica,
            exception = None)
        } else {
          // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
          //尝试进行读取操作。根据读取结果构建一个LogReadResult对象,表示从分区获取数据的结果。
          val readInfo: LogReadInfo = partition.fetchRecords(
            fetchParams = params,
            fetchPartitionData = fetchInfo,
            fetchTimeMs = fetchTimeMs,
            maxBytes = adjustedMaxBytes,
            minOneMessage = minOneMessage,
            updateFetchState = !readFromPurgatory
          )

          val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {
            // If the partition is being throttled, simply return an empty set.
            new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
          } else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
            // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
            // progress in such cases and don't need to report a `RecordTooLargeException`
            new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
          } else {
            readInfo.fetchedData
          }
          //返回构建的LogReadResult对象
          LogReadResult(info = fetchDataInfo,
            divergingEpoch = readInfo.divergingEpoch.asScala,
            highWatermark = readInfo.highWatermark,
            leaderLogStartOffset = readInfo.logStartOffset,
            leaderLogEndOffset = readInfo.logEndOffset,
            followerLogStartOffset = followerLogStartOffset,
            fetchTimeMs = fetchTimeMs,
            lastStableOffset = Some(readInfo.lastStableOffset),
            preferredReadReplica = preferredReadReplica,
            exception = None
          )
        }
      } catch {
      //省略代码
      }
    }

    var limitBytes = params.maxBytes
    val result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]
    var minOneMessage = !params.hardMaxBytesLimit
    readPartitionInfo.foreach { case (tp, fetchInfo) =>
      val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
      //记录批量的大小(以字节为单位)。
      val recordBatchSize = readResult.info.records.sizeInBytes
      // Once we read from a non-empty partition, we stop ignoring request and partition level size limits
      //如果 recordBatchSize 大于 0,则将 minOneMessage 设置为 false,表示从非空分区读取了消息,不再忽略请求和分区级别的大小限制。
      if (recordBatchSize > 0)
        minOneMessage = false
      limitBytes = math.max(0, limitBytes - recordBatchSize)
      //将 (tp -> readResult) 添加到 result 中
      result += (tp -> readResult)
    }
    result
  }

val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)遍历主题分区分别执行read内部函数执行查询操作
方法内部通过partition.fetchRecords查询数据

1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离)

在上面readFromLocalLog方法中,read内部方法

val preferredReadReplica = params.clientMetadata.asScala.flatMap(
          metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
def findPreferredReadReplica(partition: Partition,
                               clientMetadata: ClientMetadata,
                               replicaId: Int,
                               fetchOffset: Long,
                               currentTimeMs: Long): Option[Int] = {
    //partition.leaderIdIfLocal返回一个Option[Int]类型的值,表示分区的领导者副本的ID。
    // 如果本地是领导者副本,则返回该副本的ID,否则返回None。
    partition.leaderIdIfLocal.flatMap { leaderReplicaId =>
      // Don't look up preferred for follower fetches via normal replication
      //如果存在领导者副本ID(leaderReplicaId),则执行flatMap中的代码块;否则直接返回None。
      if (FetchRequest.isValidBrokerId(replicaId))
        None
      else {
        replicaSelectorOpt.flatMap { replicaSelector =>
          //通过metadataCache.getPartitionReplicaEndpoints方法获取分区副本的端点信息
          val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
            new ListenerName(clientMetadata.listenerName))
          //创建一个可变的mutable.Set[ReplicaView]类型的集合replicaInfoSet,用于存储符合条件的副本信息。
          val replicaInfoSet = mutable.Set[ReplicaView]()
          //遍历分区的远程副本集合(partition.remoteReplicas),对每个副本进行以下操作:
          //获取副本的状态快照(replica.stateSnapshot)。
          //如果副本的brokerId存在于ISR中,并且副本的日志范围包含了指定的fetchOffset,则将副本信息添加到replicaInfoSet中。
          partition.remoteReplicas.foreach { replica =>
            val replicaState = replica.stateSnapshot
            if (partition.inSyncReplicaIds.contains(replica.brokerId) &&
                replicaState.logEndOffset >= fetchOffset &&
                replicaState.logStartOffset <= fetchOffset) {

              replicaInfoSet.add(new DefaultReplicaView(
                replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
                replicaState.logEndOffset,
                currentTimeMs - replicaState.lastCaughtUpTimeMs
              ))
            }
          }
          //创建一个DefaultReplicaView对象,表示领导者副本的信息,并将其添加到replicaInfoSet中。
          val leaderReplica = new DefaultReplicaView(
            replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),
            partition.localLogOrException.logEndOffset,
            0L
          )
          replicaInfoSet.add(leaderReplica)
          //创建一个DefaultPartitionView对象,表示分区的信息,其中包含了副本信息集合和领导者副本信息。
          val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
          //调用replicaSelector.select方法,根据特定的策略选择合适的副本。然后通过collect方法将选择的副本转换为副本的ID集合。
          replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {
            // Even though the replica selector can return the leader, we don't want to send it out with the
            // FetchResponse, so we exclude it here
            //从副本的ID集合中排除领导者副本,并返回剩余副本的ID集合。
            case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
          }
        }
      }
    }
  }

其中 replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect选合适副本默认首先Leader副本,但是2.4版本后支持主题分区非Leader副本中读取数据,即Follower副本读取数据

在代码上:

  • 通过case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id 判断设置,

在配置上:

  • broker端,需要配置参数 replica.selector.class,其默认配置为LeaderSelector,意思是:消费者从首领副本获取消息,改为RackAwareReplicaSelector,即消费者按照指定的rack id上的副本进行消费。还需要配置broker.rack参数,用来指定broker在哪个机房。
  • consumer端,需要配置参数client.rack,且这个参数和broker端的哪个broker.rack匹配上,就会从哪个broker上去获取消息数据。

读写分离在2.4之前为什么之前不支持,后面支持了呢?

之前不支持的原因:其实对于kakfa而言,主题分区的水平扩展完全可以解决消息的处理量,增加broker也可以降低系统负载,所以没有必要费力不讨好增加一个读写分离。
现在支持的原因:有一种场景不是很适合,跨机房或者说跨数据中心的场景,当其中一个数据中心需要向另一个数据中心同步数据的时候,如果只能从首领副本进行数据读取的话,需要跨机房来完成,而这些流量带宽又比较昂贵,而利用本地跟随者副本进行消息读取就成了比较明智的选择。
所以kafka推出这一个功能,目的并不是降低broker的系统负载,分摊消息处理量,而是为了节约流量资源

三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费者客户端拉取数据请求

关于Follower发请求可以看一下kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

def fetchRecords(
    fetchParams: FetchParams,
    fetchPartitionData: FetchRequest.PartitionData,
    fetchTimeMs: Long,
    maxBytes: Int,
    minOneMessage: Boolean,
    updateFetchState: Boolean
  ): LogReadInfo = {
    def readFromLocalLog(log: UnifiedLog): LogReadInfo = {
      readRecords(
        log,
        fetchPartitionData.lastFetchedEpoch,
        fetchPartitionData.fetchOffset,
        fetchPartitionData.currentLeaderEpoch,
        maxBytes,
        fetchParams.isolation,
        minOneMessage
      )
    }
    //判断获取数据的请求是否来自Follower
    if (fetchParams.isFromFollower) {
      // Check that the request is from a valid replica before doing the read
      val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
        val localLog = localLogWithEpochOrThrow(
          fetchPartitionData.currentLeaderEpoch,
          fetchParams.fetchOnlyLeader
        )
        val replica = followerReplicaOrThrow(
          fetchParams.replicaId,
          fetchPartitionData
        )
        val logReadInfo = readFromLocalLog(localLog)
        (replica, logReadInfo)
      }

      if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
        updateFollowerFetchState(
          replica,
          followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
          followerStartOffset = fetchPartitionData.logStartOffset,
          followerFetchTimeMs = fetchTimeMs,
          leaderEndOffset = logReadInfo.logEndOffset,
          fetchParams.replicaEpoch
        )
      }

      logReadInfo
    } else {
      //来自消费者客户端请求
      inReadLock(`leaderIsrUpdateLock`) {
        val localLog = localLogWithEpochOrThrow(
          fetchPartitionData.currentLeaderEpoch,
          fetchParams.fetchOnlyLeader
        )
        readFromLocalLog(localLog)
      }
    }
  }

1、拉取数据之前首先要得到leaderIsrUpdateLock的读锁

上面的方法逻辑中

//Follower的请求
 val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) 
//来自消费者客户端请求
 inReadLock(`leaderIsrUpdateLock`) 

2、readFromLocalLog读取本地日志数据

 def readFromLocalLog(log: UnifiedLog): LogReadInfo = {
      readRecords(
        log,
        fetchPartitionData.lastFetchedEpoch,
        fetchPartitionData.fetchOffset,
        fetchPartitionData.currentLeaderEpoch,
        maxBytes,
        fetchParams.isolation,
        minOneMessage
      )
    }

四、读取日志数据就是读取的segment文件

1、获取当前本地日志的基础数据(高水位线,偏移量等),

private def readRecords(
    localLog: UnifiedLog,
    lastFetchedEpoch: Optional[Integer],
    fetchOffset: Long,
    currentLeaderEpoch: Optional[Integer],
    maxBytes: Int,
    fetchIsolation: FetchIsolation,
    minOneMessage: Boolean
  ): LogReadInfo = {
    //localLog的高水位标记(initialHighWatermark)、、。
    val initialHighWatermark = localLog.highWatermark
    //日志起始偏移(initialLogStartOffset)
    val initialLogStartOffset = localLog.logStartOffset
    //日志结束偏移(initialLogEndOffset)
    val initialLogEndOffset = localLog.logEndOffset
    //和最后一个稳定偏移(initialLastStableOffset)
    val initialLastStableOffset = localLog.lastStableOffset

   //省略代码
    //代码调用localLog的read方法,读取指定偏移量处的数据
    val fetchedData = localLog.read(
      fetchOffset,
      maxBytes,
      fetchIsolation,
      minOneMessage
    )
    //返回一个包含读取数据的LogReadInfo对象。
    new LogReadInfo(
      fetchedData,
      Optional.empty(),
      initialHighWatermark,
      initialLogStartOffset,
      initialLogEndOffset,
      initialLastStableOffset
    )
  }
 def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    checkLogStartOffset(startOffset)
    val maxOffsetMetadata = isolation match {
      case FetchIsolation.LOG_END => localLog.logEndOffsetMetadata
      case FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadata
      case FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata
    }
    localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED)
  }

2、遍历segment,直到从segment读取到数据

/*
   *
   * @param startOffset   起始偏移量(startOffset)
   * @param maxLength  最大长度(maxLength)
   * @param minOneMessage  是否至少读取一个消息(minOneMessage)
   * @param maxOffsetMetadata  最大偏移元数据(maxOffsetMetadata)
   * @param includeAbortedTxns   是否包含已中止的事务(includeAbortedTxns)
   * @throws
   * @return  返回一个FetchDataInfo对象
   */
  def read(startOffset: Long,
           maxLength: Int,
           minOneMessage: Boolean,
           maxOffsetMetadata: LogOffsetMetadata,
           includeAbortedTxns: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
        s"total length ${segments.sizeInBytes} bytes")
      //获取下一个偏移元数据(endOffsetMetadata)和对应的偏移量(endOffset)
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      //获得segment的集合,比如会获得某个位点后所有的segment的列表,有序
      var segmentOpt = segments.floorSegment(startOffset)

      // return error on attempt to read beyond the log end offset
      //如果起始偏移量大于结束偏移量或者找不到日志段,则抛出OffsetOutOfRangeException异常。
      if (startOffset > endOffset || segmentOpt.isEmpty)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments upto $endOffset.")
      //如果起始偏移量等于最大偏移量元数据的偏移量,函数返回一个空的FetchDataInfo对象
      if (startOffset == maxOffsetMetadata.messageOffset)
        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      else if (startOffset > maxOffsetMetadata.messageOffset)
      //如果起始偏移量大于最大偏移量元数据的偏移量,函数返回一个空的FetchDataInfo对象,并将起始偏移量转换为偏移元数据
        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
      else {
        //函数在小于目标偏移量的基本偏移量的日志段上进行读取
        var fetchDataInfo: FetchDataInfo = null
        //首先fetchDataInfo不为null,和大于start位点的segment要存在
        while (fetchDataInfo == null && segmentOpt.isDefined) {
          val segment = segmentOpt.get
          val baseOffset = segment.baseOffset
          val maxPosition =
          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
          //如果它在此段上,请使用最大偏移位置;否则,段大小是限制。
            if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
            else segment.size

          fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
          if (fetchDataInfo != null) {
            //则根据条件判断,如果includeAbortedTxns为真,则调用addAbortedTransactions方法添加中断的事务到fetchDataInfo中。
            if (includeAbortedTxns)
              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
          }
          //如果fetchDataInfo为null,则将segmentOpt设置为segments中大于baseOffset的下一个段。
          else segmentOpt = segments.higherSegment(baseOffset)
        }
        //成功读取到消息,函数返回FetchDataInfo对象
        if (fetchDataInfo != null) fetchDataInfo
        else {
          //如果已经超过了最后一个日志段的末尾且没有读取到任何数据,则返回一个空的FetchDataInfo对象,其中包含下一个偏移元数据和空的内存记录(MemoryRecords.EMPTY)
          new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
        }
      }
    }
  }

首先获得segment列表var segmentOpt = segments.floorSegment(startOffset)
通过 fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) 从segment获取数据

五、创建带有读取指定文件位置通道的文件记录对象FileRecords

这一章节比较简单

  def read(startOffset: Long,
           maxSize: Int,
           maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
    if (maxSize < 0)
      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")

    val startOffsetAndSize = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null
    //则表示起始位置已经超出了日志的末尾,则返回 null
    if (startOffsetAndSize == null)
      return null
    //起始偏移量、基准偏移量和起始位置创建一个LogOffsetMetadata对象
    val startPosition = startOffsetAndSize.position
    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

    val adjustedMaxSize =
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    // return a log segment but with zero size in the case below
    if (adjustedMaxSize == 0)
      return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    //根据给定的maxOffset计算要读取的消息集的长度,将其限制为maxPosition和起始位置之间的较小值,并将结果赋给fetchSize变量。
    val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
    //创建一个FetchDataInfo对象,其中包含偏移量元数据、从起始位置开始的指定大小的日志切片(log slice)以及其他相关信息
  //其中log.slice(startPosition, fetchSize)是日志数据
    new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
      adjustedMaxSize < startOffsetAndSize.size, Optional.empty())
  }

log.slice 获取文件数据,会生成一个FileRecords文件数据

 public FileRecords slice(int position, int size) throws IOException {
        int availableBytes = availableBytes(position, size);
        int startPosition = this.start + position;
        return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);
    }
public class FileRecords extends AbstractRecords implements Closeable {
	private final Iterable<FileLogInputStream.FileChannelRecordBatch> batches;
	//构造方法
        FileRecords(File file,
                FileChannel channel,
                int start,
                int end,
                boolean isSlice) throws IOException {
        this.file = file;
        this.channel = channel;
        this.start = start;
        this.end = end;
        this.isSlice = isSlice;
        this.size = new AtomicInteger();
        //表示这只是一个切片视图,不需要检查文件大小,直接将size设置为end - start。
        if (isSlice) {
            // don't check the file size if this is just a slice view
            size.set(end - start);
        } else {
            //如果isSlice为false,表示这不是一个切片,需要检查文件的大小。如果文件大小超过了Integer.MAX_VALUE,将抛出KafkaException异常。
            if (channel.size() > Integer.MAX_VALUE)
                throw new KafkaException("The size of segment " + file + " (" + channel.size() +
                        ") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE);
            //否则,将文件大小和end之间的较小值设置为limit,并将size设置为limit - start。然后,将文件通道的位置设置为limit,即文件末尾的位置。
            int limit = Math.min((int) channel.size(), end);
            size.set(limit - start);
            channel.position(limit);
        }
		//这个后面可以忽略,主要关注入参中的channel,相当于内核态的读取管道
        batches = batchesFrom(start);
  }	
  //这个是零拷贝的关键方法,后面在发送响应时,会调用这个方法,把读取的管道直接转到发送的管道上,即零拷贝,不经过用户态
   @Override
    public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
        long newSize = Math.min(channel.size(), end) - start;
        int oldSize = sizeInBytes();
        if (newSize < oldSize)
            throw new KafkaException(String.format(
                    "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
                    file.getAbsolutePath(), oldSize, newSize));

        long position = start + offset;
        long count = Math.min(length, oldSize - offset);
        return destChannel.transferFrom(channel, position, count);
    }
}  

看到这里就可以了,后面直接看如何处理响应

六、发送响应报文的处理

1、kafkaApis中的handleFetchRequest方法有回调函数,处理要发送到客户端的数据

  def handleFetchRequest(request: RequestChannel.Request): Unit = {
   //用于处理Fetch响应数据。
    def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
    //立即发送响应。
        requestChannel.sendResponse(request, createResponse(maxThrottleTimeMs), Some(updateConversionStats))
    }
//replicaManager.fetchMessages方法,从本地副本获取消息,并提供回调函数processResponseCallback处理响应
 replicaManager.fetchMessages(
        params = params,
        fetchInfos = interesting,
        quota = replicationQuota(fetchRequest),
        responseCallback = processResponseCallback,
   )
}

2、在执行发送之前,要组装要发送的send

 def sendResponse(
    request: RequestChannel.Request,
    response: AbstractResponse,
    onComplete: Option[Send => Unit]
  ): Unit = {
    updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala)
    sendResponse(new RequestChannel.SendResponse(
      request,
      request.buildResponseSend(response),
      request.responseNode(response),
      onComplete
    ))
  }

其中request.buildResponseSend(response)是生成send的方法

 def buildResponseSend(abstractResponse: AbstractResponse): Send = {
      envelope match {
          //isForwarded
        case Some(request) =>
         //忽略代码
        case None =>
          //直接调用context.buildResponseSend(abstractResponse)来构建响应发送对象。
          context.buildResponseSend(abstractResponse)
      }
    }
   /**
     * Build a {@link Send} for direct transmission of the provided response
     * over the network.
     * 构建一个 {@link Send},用于通过网络直接传输提供的响应。
     */
    public Send buildResponseSend(AbstractResponse body) {
        return body.toSend(header.toResponseHeader(), apiVersion());
    }
public abstract class AbstractResponse implements AbstractRequestResponse {
    public static final int DEFAULT_THROTTLE_TIME = 0;

    private final ApiKeys apiKey;

    protected AbstractResponse(ApiKeys apiKey) {
        this.apiKey = apiKey;
    }
    //data()调用的是FetchResponse中的data()方法,返回的是FetchResponseData
    public final Send toSend(ResponseHeader header, short version) {
        return SendBuilder.buildResponseSend(header, data(), version);
    }
}

    public static Send buildResponseSend(
        ResponseHeader header,
        Message apiResponse,
        short apiVersion
    ) {
        return buildSend(
            header.data(),
            header.headerVersion(),
            apiResponse,
            apiVersion
        );
    }

    private static Send buildSend(
        Message header,
        short headerVersion,
        Message apiMessage,
        short apiVersion
    ) {
         // 创建一个 ObjectSerializationCache 对象用于缓存序列化的对象
        ObjectSerializationCache serializationCache = new ObjectSerializationCache();
        //创建一个 MessageSizeAccumulator 对象用于计算消息的大小
        MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
        // 计算 header 和 apiMessage 的大小并写入到 messageSize 对象中
        header.addSize(messageSize, serializationCache, headerVersion);
        apiMessage.addSize(messageSize, serializationCache, apiVersion);
        // 创建一个 SendBuilder 对象,并指定大小为 messageSize.sizeExcludingZeroCopy() + 4
        SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
        //通过调用builder的writeInt方法将messageSize的总大小写入到builder中
        builder.writeInt(messageSize.totalSize());
        //分别调用header和apiMessage的write方法,将它们的内容写入到builder中。
        header.write(builder, serializationCache, headerVersion);
        apiMessage.write(builder, serializationCache, apiVersion);
        // 构建并返回一个 Send 对象
        return builder.build();
    }

buildSend方法,入参中apiMessage类型虽然是Message,但是实际上传的是Message的实现类FetchResponseData,可以看一下上面AbstractResponsedata()方法,实际调用的是FetchResponsedata()方法。

(1)自动生成FetchResponse类重写的Write方法

buildSend中,有apiMessage.write(),所以实际上调用的是FetchResponseDatawrite方法,但是FetchResponseData类生成的,所以在项目中是找不到的,但是找到了生成方法,在MessageDataGenerator类中generateClassWriter重写了write方法,方法内部通过generateVariableLengthWriter生成针对类型是Records执行writeRecords方法,代码如下:

if (type.isString()) {
    buffer.printf("_writable.writeByteArray(_stringBytes);%n");
} else if (type.isBytes()) {
      if (zeroCopy) {
         buffer.printf("_writable.writeByteBuffer(%s);%n", name);
      } else {
         buffer.printf("_writable.writeByteArray(%s);%n", name);
           }
  } else if (type.isRecords()) {
     buffer.printf("_writable.writeRecords(%s);%n", name);
 }

所以后续方法内部执行的是SendBuilder中的writeRecords

 /**
     *编写记录集。基础记录数据将保留在 {@link build()} 的结果中。请参阅 {@link BaseRecordstoSend()}。
     * @param records the records to write 记录要写入的记录
     *
     */
    @Override
    public void writeRecords(BaseRecords records) {
        if (records instanceof MemoryRecords) {
            flushPendingBuffer();
            addBuffer(((MemoryRecords) records).buffer());
        } else if (records instanceof UnalignedMemoryRecords) {
            flushPendingBuffer();
            addBuffer(((UnalignedMemoryRecords) records).buffer());
        } else {
            //records instanceof FileRecords 才执行下面的
            flushPendingSend();
            //调用的是AbstractRecords中的toSend,如果send调用writeTo则是调用的FileRecords中的writeTo
            addSend(records.toSend());
        }
    }

(2) 生成的send 的writeTo方法实现

records.toSend()执行的实际上是FileRecords的父类AbstractRecordstoSend方法
返回的是DefaultRecordsSend,但是把FileRecords赋值给了RecordsSendrecords字段,
所以后续有send执行writeTo的方法,实际执行的是RecordsSendwriteTo方法

public abstract class AbstractRecords implements Records {
    @Override
    public DefaultRecordsSend<Records> toSend() {
        return new DefaultRecordsSend<>(this);
    }
}    

会把自身AbstractRecords当参数传入DefaultRecordsSend的构造函数

public class DefaultRecordsSend<T extends TransferableRecords> extends RecordsSend<T> {
    public DefaultRecordsSend(T records) {
        this(records, records.sizeInBytes());
    }

    public DefaultRecordsSend(T records, int maxBytesToWrite) {
        super(records, maxBytesToWrite);
    }

    @Override
    protected long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException {
    	//records()获取的是FileRecords,所以writeTo调用的是FileRecords的
        return records().writeTo(channel, previouslyWritten, remaining);
    }
}

又把AbstractRecords赋值给RecordsSendrecords

public abstract class RecordsSend<T extends BaseRecords> implements Send {
    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);

    private final T records;
    private final int maxBytesToWrite;
    private int remaining;
    private boolean pending = false;

    protected RecordsSend(T records, int maxBytesToWrite) {
        this.records = records;
        this.maxBytesToWrite = maxBytesToWrite;
        this.remaining = maxBytesToWrite;
    }

    @Override
    public boolean completed() {
        return remaining <= 0 && !pending;
    }

    @Override
    public final long writeTo(TransferableChannel channel) throws IOException {
        long written = 0;

        if (remaining > 0) {
        //会执行子类的DefaultRecordsSend的writeTo方法,之后调用record的writeTo方法
            written = writeTo(channel, size() - remaining, remaining);
            if (written < 0)
                throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
            remaining -= written;
        }

        pending = channel.hasPendingWrites();
        if (remaining <= 0 && pending)
            channel.write(EMPTY_BYTE_BUFFER);

        return written;
    }

    @Override
    public long size() {
        return maxBytesToWrite;
    }

    protected T records() {
        return records;
    }

    /**
    
     * 将最多“剩余”字节的记录写入“通道”。允许实现是有状态的。来自调用方的约定是,第一次调用将“beforeWrite”等于0,“剩余”等于我们要写入“通道”的最大字节数。
     * “先前写入”和“剩余”将根据每次后续调用进行适当调整。有关预期用法的示例,请参阅 {@link writeTo}。
     */
    protected abstract long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException;
}

最后执行的是FileRecordwriteTo 方法

(3)最后执行的是FileChannel的transferTo

 @Override
    public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
        long newSize = Math.min(channel.size(), end) - start;
        int oldSize = sizeInBytes();
        if (newSize < oldSize)
            throw new KafkaException(String.format(
                    "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
                    file.getAbsolutePath(), oldSize, newSize));

        long position = start + offset;
        long count = Math.min(length, oldSize - offset);
        return destChannel.transferFrom(channel, position, count);
    }

最后执行的是FileChanneltransferTo方法

 @Override
    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
        return fileChannel.transferTo(position, count, socketChannel);
    }

3、异步发送send

(1)把response加入到队列中

  //,用于发送响应给客户端。
  private[network] def sendResponse(response: RequestChannel.Response): Unit = {
    
    response match {
      //我们每个请求只应发送以下一项
      case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse =>
        //代码更新相应请求的responseCompleteTimeNanos和apiLocalCompleteTimeNanos属性,以记录请求的完成时间。
        val request = response.request
        val timeNanos = time.nanoseconds()
        request.responseCompleteTimeNanos = timeNanos
        if (request.apiLocalCompleteTimeNanos == -1L)
          request.apiLocalCompleteTimeNanos = timeNanos
      //对于StartThrottlingResponse和EndThrottlingResponse类型的响应,代码不执行任何操作
      case _: StartThrottlingResponse | _: EndThrottlingResponse => ()
    }
    //代码根据响应的处理器找到对应的处理器,并将响应加入处理器的响应队列中。
    val processor = processors.get(response.processor)
    if (processor != null) {
      processor.enqueueResponse(response)
    }
  }
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
    responseQueue.put(response)
    wakeup()
  }

(2)Processor线程处理响应数据

private[kafka] class Processor(
  val id: Int,
  time: Time,
  maxRequestSize: Int,
  requestChannel: RequestChannel,
  connectionQuotas: ConnectionQuotas,
  connectionsMaxIdleMs: Long,
  failedAuthenticationDelayMs: Int,
  listenerName: ListenerName,
  securityProtocol: SecurityProtocol,
  config: KafkaConfig,
  metrics: Metrics,
  credentialProvider: CredentialProvider,
  memoryPool: MemoryPool,
  logContext: LogContext,
  connectionQueueSize: Int,
  isPrivilegedListener: Boolean,
  apiVersionManager: ApiVersionManager,
  threadName: String
) extends Runnable with Logging {
//后台线程把数据发送到客户端
  override def run(): Unit = {
    try {
      while (shouldRun.get()) {
        try {
          // setup any new connections that have been queued up
          //用于设置新连接。
          configureNewConnections()
          // register any new responses for writing
          //将新的响应注册用于写操作。
          processNewResponses()
          //方法进行轮询操作
          poll()
          //处理已完成的接收操作。
          processCompletedReceives()
          //处理已完成的发送操作。
          processCompletedSends()
          //处理已断开的连接。
          processDisconnected()
          //关闭多余的连接。
          closeExcessConnections()
        } catch {
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
    }
  }
}  

Processor继承了Runnable类,实现了run方法

(3) 在执行发送时,会把要FileChannel的Channel通过FileChannel的transferTo转移到要发送的Channel,代表零拷贝的完成

 private def poll(): Unit = {
    val pollTimeout = if (newConnections.isEmpty) 300 else 0
    try selector.poll(pollTimeout)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        // The exception is not re-thrown and any completed sends/receives/connections/disconnections
        // from this poll will be processed.
        error(s"Processor $id poll failed", e)
    }
  }
@Override
    public void poll(long timeout) throws IOException {
        
     // Poll from channels where the underlying socket has more data
     pollSelectionKeys(readyKeys, false, endSelect);
    // Clear all selected keys so that they are excluded from the ready count for the next select
    //清除所有选定的键,以便将它们从下一次选择的就绪计数中排除
     readyKeys.clear();

    }
 void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                           boolean isImmediatelyConnected,
                           long currentTimeNanos) {
    try {
         attemptWrite(key, channel, nowNanos);
     } catch (Exception e) {
       sendFailed = true;
        throw e;
     }                        
}
 private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
        if (channel.hasSend()
                && channel.ready()
                && key.isWritable()
                && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
            write(channel);
        }
    }
    void write(KafkaChannel channel) throws IOException {
        String nodeId = channel.id();
        //channel.write 里会执行send.writeTo(transportLayer)
        long bytesSent = channel.write();
    }
  public long write() throws IOException {
        if (send == null)
            return 0;

        midWrite = true;
        return send.writeTo(transportLayer);
    }

这里的send.writeTo执行的是上面的RecordsSendwriteTo方法
这样通过这种方式FileChannel实现了零拷贝文章来源地址https://www.toymoban.com/news/detail-693171.html

到了这里,关于kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(47)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(44)
  • Kafka-消费者组消费流程

    消费者向kafka集群发送消费请求,消费者客户端默认每次从kafka集群拉取50M数据,放到缓冲队列中,消费者从缓冲队列中每次拉取500条数据进行消费。   

    2024年02月12日
    浏览(46)
  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(46)
  • Kafka3.0.0版本——消费者(消费者组原理)

    1.1、消费者组概述 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 注意: (1)、消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 (2)、消费者组之间互不影响。所有的消费者

    2024年02月09日
    浏览(54)
  • 【Kafka】Kafka消费者

    pull(拉)模式:consumer采用从broker中主动拉取数据。 Kafka采用这种方式。 push(推)模式:Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有的消费者的消费速率。例如推送的速度是50m/s,consumer1和consumer2旧来不及处理消息。 pull模式不足之处是,如果Kafka没有数

    2024年02月13日
    浏览(49)
  • Kafka消费者无法消费数据,解决

    作为一个在项目中边学边用的实习生,真的被昨天还好好的今天就不能消费数据的kafka折磨到了,下面提供一点建议,希望能对大家有所帮助。 //操作前集群都关了 1.首先去kafka-home的config目录下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092    如果有配置liste

    2024年02月17日
    浏览(52)
  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(48)
  • Kafka进阶篇-消费者详解&Flume消费Kafka原理

    由于挺多时候如果不太熟系kafka消费者详细的话,很容易产生问题,所有剖析一定的原理很重要。 消费方式 消费者总体工作流程 消费者组初始化流程   消费者详细消费流程   消费者重要参数  bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。 key.deserializervalu

    2024年02月15日
    浏览(49)
  • Kafka及Kafka消费者的消费问题及线程问题

    Topic:是 Kafka 消息发布和订阅的基本单元,同时也是消息的容器。Topic 中的消息被分割成多个分区进行存储和处理。 Partition:是 Topic 分区,将 Topic 细分成多个分区,每个分区可以独立地存储在不同的 Broker 中,从而增加了消息的并发性、可扩展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包