一、服务端接收消费者拉取数据的方法
kafka服务端接收生产者数据的API在KafkaApis.scala类中,handleFetchRequest方法
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
//省略代码
request.header.apiKey match {
//消费者拉取消息请求,这个接口进行处理
case ApiKeys.FETCH => handleFetchRequest(request)
//省略代码
}
}
def handleFetchRequest(request: RequestChannel.Request): Unit = {
//从请求中获取请求的API版本(versionId)和客户端ID(clientId)。
val versionId = request.header.apiVersion
val clientId = request.header.clientId
//从请求中获取Fetch请求的数据
val fetchRequest = request.body[FetchRequest]
//根据请求的版本号,决定是否获取主题名称的映射关系(topicNames)。如果版本号大于等于13,则使用metadataCache.topicIdsToNames()获取主题名称映射关系,否则使用空的映射关系。
val topicNames =
if (fetchRequest.version() >= 13)
metadataCache.topicIdsToNames()
else
Collections.emptyMap[Uuid, String]()
//根据主题名称映射关系,获取Fetch请求的数据(fetchData)和需要忽略的主题(forgottenTopics)。
val fetchData = fetchRequest.fetchData(topicNames)
val forgottenTopics = fetchRequest.forgottenTopics(topicNames)
//创建一个Fetch上下文(fetchContext),用于管理Fetch请求的处理过程。该上下文包含了Fetch请求的版本号、元数据、是否来自Follower副本、Fetch数据、需要忽略的主题和主题名称映射关系。
val fetchContext = fetchManager.newContext(
fetchRequest.version,
fetchRequest.metadata,
fetchRequest.isFromFollower,
fetchData,
forgottenTopics,
topicNames)
//初始化两个可变数组erroneous和interesting,用于存储处理过程中的错误和请求需要哪些topic的数据。
val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]()
val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]()
//Fetch请求来自Follower副本
if (fetchRequest.isFromFollower) {
//则需要验证权限。如果权限验证通过
// The follower must have ClusterAction on ClusterResource in order to fetch partition data.
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
//遍历每个分区的数据,根据不同情况将数据添加到erroneous或interesting中
fetchContext.foreachPartition { (topicIdPartition, data) =>
if (topicIdPartition.topic == null)
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += topicIdPartition -> data
}
} else {
//如果权限验证失败,则将所有分区的数据添加到erroneous中。
fetchContext.foreachPartition { (topicIdPartition, _) =>
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
}
}
} else {
//如果Fetch请求来自普通的Kafka消费者
// Regular Kafka consumers need READ permission on each partition they are fetching.
val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]
fetchContext.foreachPartition { (topicIdPartition, partitionData) =>
if (topicIdPartition.topic == null)
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
else
partitionDatas += topicIdPartition -> partitionData
}
//需要验证对每个分区的读取权限,根据权限验证结果,将数据添加到erroneous或interesting中。
val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)
partitionDatas.foreach { case (topicIdPartition, data) =>
if (!authorizedTopics.contains(topicIdPartition.topic))
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += topicIdPartition -> data
}
}
//省略代码
//如果需要的topic没有校验通过或者不存在,则直接调用processResponseCallback处理响应
if (interesting.isEmpty) {
processResponseCallback(Seq.empty)
} else {
// for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
// no bytes were recorded in the recent quota window
// trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
//如果是Follower提取数据的请求,则maxQuotaWindowBytes设置为int类型的最大,否则从记录中得到此client以前获取数据大小,
// 再和请求中、配置文件中的fetchMaxBytes比较得到下面fetchMaxBytes和fetchMinBytes两个值
val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
Int.MaxValue
else
quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
//根据请求的类型和配额限制,获取Fetch请求的最大字节数(fetchMaxBytes)和最小字节数(fetchMinBytes)
val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
val clientMetadata: Optional[ClientMetadata] = if (versionId >= 11) {
// Fetch API version 11 added preferred replica logic
//提取 API 版本 11以上 添加了首选副本逻辑
Optional.of(new DefaultClientMetadata(
fetchRequest.rackId,
clientId,
request.context.clientAddress,
request.context.principal,
request.context.listenerName.value))
} else {
Optional.empty()
}
//创建一个FetchParams对象,包含了请求的各种参数
val params = new FetchParams(
versionId,
fetchRequest.replicaId,
fetchRequest.replicaEpoch,
fetchRequest.maxWait,
fetchMinBytes,
fetchMaxBytes,
FetchIsolation.of(fetchRequest),
clientMetadata
)
// call the replica manager to fetch messages from the local replica
//replicaManager.fetchMessages方法,从本地副本获取消息,并提供回调函数processResponseCallback处理响应
replicaManager.fetchMessages(
params = params,
fetchInfos = interesting,
quota = replicationQuota(fetchRequest),
responseCallback = processResponseCallback,
)
}
}
replicaManager.fetchMessages
最后通过这个方法获得日志
/**
* Fetch messages from a replica, and wait until enough data can be fetched and return;
* the callback function will be triggered either when timeout or required fetch info is satisfied.
* Consumers may fetch from any replica, but followers can only fetch from the leader.
* 从副本中获取消息,并等待可以获取足够的数据并返回;
* 当满足超时或所需的获取信息时,将触发回调函数。
* 消费者可以从任何副本中获取,但追随者只能从领导者那里获取。
*/
def fetchMessages(
params: FetchParams,
fetchInfos: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {
// check if this fetch request can be satisfied right away
//调用readFromLocalLog函数从本地日志中读取消息,并将结果保存在logReadResults中。
val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false)
var bytesReadable: Long = 0
var errorReadingData = false
var hasDivergingEpoch = false
var hasPreferredReadReplica = false
val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
//根据读取结果更新一些变量,如bytesReadable(可读取的字节数)、errorReadingData(是否读取数据时发生错误)、hasDivergingEpoch(是否存在不同的epoch)和hasPreferredReadReplica(是否存在首选读取副本)。
logReadResults.foreach { case (topicIdPartition, logReadResult) =>
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
if (logReadResult.error != Errors.NONE)
errorReadingData = true
if (logReadResult.divergingEpoch.nonEmpty)
hasDivergingEpoch = true
if (logReadResult.preferredReadReplica.nonEmpty)
hasPreferredReadReplica = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
logReadResultMap.put(topicIdPartition, logReadResult)
}
// respond immediately if 1) fetch request does not want to wait 不需要等待
// 2) fetch request does not require any data 不需要任何数据
// 3) has enough data to respond 有足够的数据
// 4) some error happens while reading data 读取数据时发生错误
// 5) we found a diverging epoch 存在不同的epoch
// 6) has a preferred read replica 存在首选读取副本
if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
hasDivergingEpoch || hasPreferredReadReplica) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)
tp -> result.toFetchPartitionData(isReassignmentFetch)
}
responseCallback(fetchPartitionData)
} else {
//将构建一个延迟处理的DelayedFetch对象,并将其放入延迟处理队列(delayedFetchPurgatory)中,以便在满足特定条件时完成请求。
// construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]
fetchInfos.foreach { case (topicIdPartition, partitionData) =>
logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
})
}
val delayedFetch = new DelayedFetch(
params = params,
fetchPartitionStatus = fetchPartitionStatus,
replicaManager = this,
quota = quota,
responseCallback = responseCallback
)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
// try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed fetch operation is being created, new requests
// may arrive and hence make this operation completable.
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}
通过readFromLocalLog查询数据日志
二、遍历请求中需要拉取数据的主题分区集合,分别执行查询数据操作,
/**
* Read from multiple topic partitions at the given offset up to maxSize bytes
* 以给定的偏移量从多个主题分区读取最大最大大小字节
*/
def readFromLocalLog(
params: FetchParams,
readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota,
readFromPurgatory: Boolean
): Seq[(TopicIdPartition, LogReadResult)] = {
val traceEnabled = isTraceEnabled
def read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
//从fetchInfo中获取一些数据,包括fetchOffset(拉取偏移量)、maxBytes(拉取的最大字节数)和logStartOffset(日志起始偏移量)。
val offset = fetchInfo.fetchOffset
val partitionFetchSize = fetchInfo.maxBytes
val followerLogStartOffset = fetchInfo.logStartOffset
//计算调整后的最大字节数adjustedMaxBytes,取fetchInfo.maxBytes和limitBytes的较小值。
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
try {
if (traceEnabled)
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
//获取指定分区的Partition对象
val partition = getPartitionOrException(tp.topicPartition)
//获取当前时间戳fetchTimeMs
val fetchTimeMs = time.milliseconds
//检查拉取请求或会话中的主题ID是否与日志中的主题ID一致,如果不一致则抛出InconsistentTopicIdException异常。
val topicId = if (tp.topicId == Uuid.ZERO_UUID) None else Some(tp.topicId)
if (!hasConsistentTopicId(topicId, partition.topicId))
throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.")
// If we are the leader, determine the preferred read-replica
//根据一些条件选择合适的副本(replica)进行后续的数据抓取(fetch)。
val preferredReadReplica = params.clientMetadata.asScala.flatMap(
metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
if (preferredReadReplica.isDefined) {
//如果不存在,则跳过读取操作,直接构建一个LogReadResult对象,表示从非Leader副本获取数据的结果。
replicaSelectorOpt.foreach { selector =>
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
s"${preferredReadReplica.get} for ${params.clientMetadata}")
}
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = -1L,
lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
} else {
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
//尝试进行读取操作。根据读取结果构建一个LogReadResult对象,表示从分区获取数据的结果。
val readInfo: LogReadInfo = partition.fetchRecords(
fetchParams = params,
fetchPartitionData = fetchInfo,
fetchTimeMs = fetchTimeMs,
maxBytes = adjustedMaxBytes,
minOneMessage = minOneMessage,
updateFetchState = !readFromPurgatory
)
val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {
// If the partition is being throttled, simply return an empty set.
new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else {
readInfo.fetchedData
}
//返回构建的LogReadResult对象
LogReadResult(info = fetchDataInfo,
divergingEpoch = readInfo.divergingEpoch.asScala,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
lastStableOffset = Some(readInfo.lastStableOffset),
preferredReadReplica = preferredReadReplica,
exception = None
)
}
} catch {
//省略代码
}
}
var limitBytes = params.maxBytes
val result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]
var minOneMessage = !params.hardMaxBytesLimit
readPartitionInfo.foreach { case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
//记录批量的大小(以字节为单位)。
val recordBatchSize = readResult.info.records.sizeInBytes
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
//如果 recordBatchSize 大于 0,则将 minOneMessage 设置为 false,表示从非空分区读取了消息,不再忽略请求和分区级别的大小限制。
if (recordBatchSize > 0)
minOneMessage = false
limitBytes = math.max(0, limitBytes - recordBatchSize)
//将 (tp -> readResult) 添加到 result 中
result += (tp -> readResult)
}
result
}
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
遍历主题分区分别执行read内部函数执行查询操作
方法内部通过partition.fetchRecords
查询数据
1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离)
在上面readFromLocalLog
方法中,read
内部方法
val preferredReadReplica = params.clientMetadata.asScala.flatMap(
metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
def findPreferredReadReplica(partition: Partition,
clientMetadata: ClientMetadata,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int] = {
//partition.leaderIdIfLocal返回一个Option[Int]类型的值,表示分区的领导者副本的ID。
// 如果本地是领导者副本,则返回该副本的ID,否则返回None。
partition.leaderIdIfLocal.flatMap { leaderReplicaId =>
// Don't look up preferred for follower fetches via normal replication
//如果存在领导者副本ID(leaderReplicaId),则执行flatMap中的代码块;否则直接返回None。
if (FetchRequest.isValidBrokerId(replicaId))
None
else {
replicaSelectorOpt.flatMap { replicaSelector =>
//通过metadataCache.getPartitionReplicaEndpoints方法获取分区副本的端点信息
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
new ListenerName(clientMetadata.listenerName))
//创建一个可变的mutable.Set[ReplicaView]类型的集合replicaInfoSet,用于存储符合条件的副本信息。
val replicaInfoSet = mutable.Set[ReplicaView]()
//遍历分区的远程副本集合(partition.remoteReplicas),对每个副本进行以下操作:
//获取副本的状态快照(replica.stateSnapshot)。
//如果副本的brokerId存在于ISR中,并且副本的日志范围包含了指定的fetchOffset,则将副本信息添加到replicaInfoSet中。
partition.remoteReplicas.foreach { replica =>
val replicaState = replica.stateSnapshot
if (partition.inSyncReplicaIds.contains(replica.brokerId) &&
replicaState.logEndOffset >= fetchOffset &&
replicaState.logStartOffset <= fetchOffset) {
replicaInfoSet.add(new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replicaState.logEndOffset,
currentTimeMs - replicaState.lastCaughtUpTimeMs
))
}
}
//创建一个DefaultReplicaView对象,表示领导者副本的信息,并将其添加到replicaInfoSet中。
val leaderReplica = new DefaultReplicaView(
replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),
partition.localLogOrException.logEndOffset,
0L
)
replicaInfoSet.add(leaderReplica)
//创建一个DefaultPartitionView对象,表示分区的信息,其中包含了副本信息集合和领导者副本信息。
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
//调用replicaSelector.select方法,根据特定的策略选择合适的副本。然后通过collect方法将选择的副本转换为副本的ID集合。
replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {
// Even though the replica selector can return the leader, we don't want to send it out with the
// FetchResponse, so we exclude it here
//从副本的ID集合中排除领导者副本,并返回剩余副本的ID集合。
case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
}
}
}
}
}
其中 replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect
选合适副本默认首先Leader副本,但是2.4版本后支持主题分区非Leader副本中读取数据,即Follower副本读取数据
在代码上:
- 通过
case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
判断设置,
在配置上:
- 在
broker
端,需要配置参数replica.selector.class
,其默认配置为LeaderSelector
,意思是:消费者从首领副本获取消息,改为RackAwareReplicaSelector
,即消费者按照指定的rack id
上的副本进行消费。还需要配置broker.rack
参数,用来指定broker
在哪个机房。 - 在
consumer
端,需要配置参数client.rack
,且这个参数和broker
端的哪个broker.rack
匹配上,就会从哪个broker
上去获取消息数据。
读写分离在2.4之前为什么之前不支持,后面支持了呢?
之前不支持的原因:其实对于kakfa而言,主题分区的水平扩展
完全可以解决消息的处理量,增加broker也可以降低系统负载,所以没有必要费力不讨好增加一个读写分离。
现在支持的原因:有一种场景不是很适合,跨机房或者说跨数据中心的场景,当其中一个数据中心需要向另一个数据中心同步数据的时候,如果只能从首领副本进行数据读取的话,需要跨机房来完成,而这些流量带宽又比较昂贵,而利用本地跟随者副本进行消息读取就成了比较明智的选择。
所以kafka推出这一个功能,目的并不是降低broker的系统负载,分摊消息处理量,而是为了节约流量资源。
三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费者客户端拉取数据请求
关于Follower发请求可以看一下kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码
def fetchRecords(
fetchParams: FetchParams,
fetchPartitionData: FetchRequest.PartitionData,
fetchTimeMs: Long,
maxBytes: Int,
minOneMessage: Boolean,
updateFetchState: Boolean
): LogReadInfo = {
def readFromLocalLog(log: UnifiedLog): LogReadInfo = {
readRecords(
log,
fetchPartitionData.lastFetchedEpoch,
fetchPartitionData.fetchOffset,
fetchPartitionData.currentLeaderEpoch,
maxBytes,
fetchParams.isolation,
minOneMessage
)
}
//判断获取数据的请求是否来自Follower
if (fetchParams.isFromFollower) {
// Check that the request is from a valid replica before doing the read
val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
val localLog = localLogWithEpochOrThrow(
fetchPartitionData.currentLeaderEpoch,
fetchParams.fetchOnlyLeader
)
val replica = followerReplicaOrThrow(
fetchParams.replicaId,
fetchPartitionData
)
val logReadInfo = readFromLocalLog(localLog)
(replica, logReadInfo)
}
if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
updateFollowerFetchState(
replica,
followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
followerStartOffset = fetchPartitionData.logStartOffset,
followerFetchTimeMs = fetchTimeMs,
leaderEndOffset = logReadInfo.logEndOffset,
fetchParams.replicaEpoch
)
}
logReadInfo
} else {
//来自消费者客户端请求
inReadLock(`leaderIsrUpdateLock`) {
val localLog = localLogWithEpochOrThrow(
fetchPartitionData.currentLeaderEpoch,
fetchParams.fetchOnlyLeader
)
readFromLocalLog(localLog)
}
}
}
1、拉取数据之前首先要得到leaderIsrUpdateLock的读锁
上面的方法逻辑中
//Follower的请求
val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock)
//来自消费者客户端请求
inReadLock(`leaderIsrUpdateLock`)
2、readFromLocalLog读取本地日志数据
def readFromLocalLog(log: UnifiedLog): LogReadInfo = {
readRecords(
log,
fetchPartitionData.lastFetchedEpoch,
fetchPartitionData.fetchOffset,
fetchPartitionData.currentLeaderEpoch,
maxBytes,
fetchParams.isolation,
minOneMessage
)
}
四、读取日志数据就是读取的segment文件
1、获取当前本地日志的基础数据(高水位线,偏移量等),
private def readRecords(
localLog: UnifiedLog,
lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
minOneMessage: Boolean
): LogReadInfo = {
//localLog的高水位标记(initialHighWatermark)、、。
val initialHighWatermark = localLog.highWatermark
//日志起始偏移(initialLogStartOffset)
val initialLogStartOffset = localLog.logStartOffset
//日志结束偏移(initialLogEndOffset)
val initialLogEndOffset = localLog.logEndOffset
//和最后一个稳定偏移(initialLastStableOffset)
val initialLastStableOffset = localLog.lastStableOffset
//省略代码
//代码调用localLog的read方法,读取指定偏移量处的数据
val fetchedData = localLog.read(
fetchOffset,
maxBytes,
fetchIsolation,
minOneMessage
)
//返回一个包含读取数据的LogReadInfo对象。
new LogReadInfo(
fetchedData,
Optional.empty(),
initialHighWatermark,
initialLogStartOffset,
initialLogEndOffset,
initialLastStableOffset
)
}
def read(startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean): FetchDataInfo = {
checkLogStartOffset(startOffset)
val maxOffsetMetadata = isolation match {
case FetchIsolation.LOG_END => localLog.logEndOffsetMetadata
case FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadata
case FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata
}
localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED)
}
2、遍历segment,直到从segment读取到数据
/*
*
* @param startOffset 起始偏移量(startOffset)
* @param maxLength 最大长度(maxLength)
* @param minOneMessage 是否至少读取一个消息(minOneMessage)
* @param maxOffsetMetadata 最大偏移元数据(maxOffsetMetadata)
* @param includeAbortedTxns 是否包含已中止的事务(includeAbortedTxns)
* @throws
* @return 返回一个FetchDataInfo对象
*/
def read(startOffset: Long,
maxLength: Int,
minOneMessage: Boolean,
maxOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = {
maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
s"total length ${segments.sizeInBytes} bytes")
//获取下一个偏移元数据(endOffsetMetadata)和对应的偏移量(endOffset)
val endOffsetMetadata = nextOffsetMetadata
val endOffset = endOffsetMetadata.messageOffset
//获得segment的集合,比如会获得某个位点后所有的segment的列表,有序
var segmentOpt = segments.floorSegment(startOffset)
// return error on attempt to read beyond the log end offset
//如果起始偏移量大于结束偏移量或者找不到日志段,则抛出OffsetOutOfRangeException异常。
if (startOffset > endOffset || segmentOpt.isEmpty)
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments upto $endOffset.")
//如果起始偏移量等于最大偏移量元数据的偏移量,函数返回一个空的FetchDataInfo对象
if (startOffset == maxOffsetMetadata.messageOffset)
emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
else if (startOffset > maxOffsetMetadata.messageOffset)
//如果起始偏移量大于最大偏移量元数据的偏移量,函数返回一个空的FetchDataInfo对象,并将起始偏移量转换为偏移元数据
emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
else {
//函数在小于目标偏移量的基本偏移量的日志段上进行读取
var fetchDataInfo: FetchDataInfo = null
//首先fetchDataInfo不为null,和大于start位点的segment要存在
while (fetchDataInfo == null && segmentOpt.isDefined) {
val segment = segmentOpt.get
val baseOffset = segment.baseOffset
val maxPosition =
// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
//如果它在此段上,请使用最大偏移位置;否则,段大小是限制。
if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
else segment.size
fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
if (fetchDataInfo != null) {
//则根据条件判断,如果includeAbortedTxns为真,则调用addAbortedTransactions方法添加中断的事务到fetchDataInfo中。
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
}
//如果fetchDataInfo为null,则将segmentOpt设置为segments中大于baseOffset的下一个段。
else segmentOpt = segments.higherSegment(baseOffset)
}
//成功读取到消息,函数返回FetchDataInfo对象
if (fetchDataInfo != null) fetchDataInfo
else {
//如果已经超过了最后一个日志段的末尾且没有读取到任何数据,则返回一个空的FetchDataInfo对象,其中包含下一个偏移元数据和空的内存记录(MemoryRecords.EMPTY)
new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
}
}
}
首先获得segment列表var segmentOpt = segments.floorSegment(startOffset)
,
通过 fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
从segment获取数据
五、创建带有读取指定文件位置通道的文件记录对象FileRecords
这一章节比较简单
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
//则表示起始位置已经超出了日志的末尾,则返回 null
if (startOffsetAndSize == null)
return null
//起始偏移量、基准偏移量和起始位置创建一个LogOffsetMetadata对象
val startPosition = startOffsetAndSize.position
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
//根据给定的maxOffset计算要读取的消息集的长度,将其限制为maxPosition和起始位置之间的较小值,并将结果赋给fetchSize变量。
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
//创建一个FetchDataInfo对象,其中包含偏移量元数据、从起始位置开始的指定大小的日志切片(log slice)以及其他相关信息
//其中log.slice(startPosition, fetchSize)是日志数据
new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
adjustedMaxSize < startOffsetAndSize.size, Optional.empty())
}
log.slice
获取文件数据,会生成一个FileRecords
文件数据
public FileRecords slice(int position, int size) throws IOException {
int availableBytes = availableBytes(position, size);
int startPosition = this.start + position;
return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);
}
public class FileRecords extends AbstractRecords implements Closeable {
private final Iterable<FileLogInputStream.FileChannelRecordBatch> batches;
//构造方法
FileRecords(File file,
FileChannel channel,
int start,
int end,
boolean isSlice) throws IOException {
this.file = file;
this.channel = channel;
this.start = start;
this.end = end;
this.isSlice = isSlice;
this.size = new AtomicInteger();
//表示这只是一个切片视图,不需要检查文件大小,直接将size设置为end - start。
if (isSlice) {
// don't check the file size if this is just a slice view
size.set(end - start);
} else {
//如果isSlice为false,表示这不是一个切片,需要检查文件的大小。如果文件大小超过了Integer.MAX_VALUE,将抛出KafkaException异常。
if (channel.size() > Integer.MAX_VALUE)
throw new KafkaException("The size of segment " + file + " (" + channel.size() +
") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE);
//否则,将文件大小和end之间的较小值设置为limit,并将size设置为limit - start。然后,将文件通道的位置设置为limit,即文件末尾的位置。
int limit = Math.min((int) channel.size(), end);
size.set(limit - start);
channel.position(limit);
}
//这个后面可以忽略,主要关注入参中的channel,相当于内核态的读取管道
batches = batchesFrom(start);
}
//这个是零拷贝的关键方法,后面在发送响应时,会调用这个方法,把读取的管道直接转到发送的管道上,即零拷贝,不经过用户态
@Override
public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
long count = Math.min(length, oldSize - offset);
return destChannel.transferFrom(channel, position, count);
}
}
看到这里就可以了,后面直接看如何处理响应
六、发送响应报文的处理
1、kafkaApis中的handleFetchRequest方法有回调函数,处理要发送到客户端的数据
def handleFetchRequest(request: RequestChannel.Request): Unit = {
//用于处理Fetch响应数据。
def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
//立即发送响应。
requestChannel.sendResponse(request, createResponse(maxThrottleTimeMs), Some(updateConversionStats))
}
//replicaManager.fetchMessages方法,从本地副本获取消息,并提供回调函数processResponseCallback处理响应
replicaManager.fetchMessages(
params = params,
fetchInfos = interesting,
quota = replicationQuota(fetchRequest),
responseCallback = processResponseCallback,
)
}
2、在执行发送之前,要组装要发送的send
def sendResponse(
request: RequestChannel.Request,
response: AbstractResponse,
onComplete: Option[Send => Unit]
): Unit = {
updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala)
sendResponse(new RequestChannel.SendResponse(
request,
request.buildResponseSend(response),
request.responseNode(response),
onComplete
))
}
其中request.buildResponseSend(response)
是生成send的方法
def buildResponseSend(abstractResponse: AbstractResponse): Send = {
envelope match {
//isForwarded
case Some(request) =>
//忽略代码
case None =>
//直接调用context.buildResponseSend(abstractResponse)来构建响应发送对象。
context.buildResponseSend(abstractResponse)
}
}
/**
* Build a {@link Send} for direct transmission of the provided response
* over the network.
* 构建一个 {@link Send},用于通过网络直接传输提供的响应。
*/
public Send buildResponseSend(AbstractResponse body) {
return body.toSend(header.toResponseHeader(), apiVersion());
}
public abstract class AbstractResponse implements AbstractRequestResponse {
public static final int DEFAULT_THROTTLE_TIME = 0;
private final ApiKeys apiKey;
protected AbstractResponse(ApiKeys apiKey) {
this.apiKey = apiKey;
}
//data()调用的是FetchResponse中的data()方法,返回的是FetchResponseData
public final Send toSend(ResponseHeader header, short version) {
return SendBuilder.buildResponseSend(header, data(), version);
}
}
public static Send buildResponseSend(
ResponseHeader header,
Message apiResponse,
short apiVersion
) {
return buildSend(
header.data(),
header.headerVersion(),
apiResponse,
apiVersion
);
}
private static Send buildSend(
Message header,
short headerVersion,
Message apiMessage,
short apiVersion
) {
// 创建一个 ObjectSerializationCache 对象用于缓存序列化的对象
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
//创建一个 MessageSizeAccumulator 对象用于计算消息的大小
MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
// 计算 header 和 apiMessage 的大小并写入到 messageSize 对象中
header.addSize(messageSize, serializationCache, headerVersion);
apiMessage.addSize(messageSize, serializationCache, apiVersion);
// 创建一个 SendBuilder 对象,并指定大小为 messageSize.sizeExcludingZeroCopy() + 4
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
//通过调用builder的writeInt方法将messageSize的总大小写入到builder中
builder.writeInt(messageSize.totalSize());
//分别调用header和apiMessage的write方法,将它们的内容写入到builder中。
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
// 构建并返回一个 Send 对象
return builder.build();
}
buildSend
方法,入参中apiMessage
类型虽然是Message
,但是实际上传的是Message
的实现类FetchResponseData
,可以看一下上面AbstractResponse
中data()
方法,实际调用的是FetchResponse
的data()
方法。
(1)自动生成FetchResponse类重写的Write方法
在buildSend
中,有apiMessage.write()
,所以实际上调用的是FetchResponseData
的write
方法,但是FetchResponseData
类生成的,所以在项目中是找不到的,但是找到了生成方法,在MessageDataGenerator
类中generateClassWriter
重写了write
方法,方法内部通过generateVariableLengthWriter
生成针对类型是Records
执行writeRecords
方法,代码如下:
if (type.isString()) {
buffer.printf("_writable.writeByteArray(_stringBytes);%n");
} else if (type.isBytes()) {
if (zeroCopy) {
buffer.printf("_writable.writeByteBuffer(%s);%n", name);
} else {
buffer.printf("_writable.writeByteArray(%s);%n", name);
}
} else if (type.isRecords()) {
buffer.printf("_writable.writeRecords(%s);%n", name);
}
所以后续方法内部执行的是SendBuilder
中的writeRecords
/**
*编写记录集。基础记录数据将保留在 {@link build()} 的结果中。请参阅 {@link BaseRecordstoSend()}。
* @param records the records to write 记录要写入的记录
*
*/
@Override
public void writeRecords(BaseRecords records) {
if (records instanceof MemoryRecords) {
flushPendingBuffer();
addBuffer(((MemoryRecords) records).buffer());
} else if (records instanceof UnalignedMemoryRecords) {
flushPendingBuffer();
addBuffer(((UnalignedMemoryRecords) records).buffer());
} else {
//records instanceof FileRecords 才执行下面的
flushPendingSend();
//调用的是AbstractRecords中的toSend,如果send调用writeTo则是调用的FileRecords中的writeTo
addSend(records.toSend());
}
}
(2) 生成的send 的writeTo方法实现
records.toSend()
执行的实际上是FileRecords
的父类AbstractRecords
的toSend
方法
返回的是DefaultRecordsSend
,但是把FileRecords
赋值给了RecordsSend
的records
字段,
所以后续有send
执行writeTo
的方法,实际执行的是RecordsSend
的writeTo
方法
public abstract class AbstractRecords implements Records {
@Override
public DefaultRecordsSend<Records> toSend() {
return new DefaultRecordsSend<>(this);
}
}
会把自身AbstractRecords
当参数传入DefaultRecordsSend
的构造函数
public class DefaultRecordsSend<T extends TransferableRecords> extends RecordsSend<T> {
public DefaultRecordsSend(T records) {
this(records, records.sizeInBytes());
}
public DefaultRecordsSend(T records, int maxBytesToWrite) {
super(records, maxBytesToWrite);
}
@Override
protected long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException {
//records()获取的是FileRecords,所以writeTo调用的是FileRecords的
return records().writeTo(channel, previouslyWritten, remaining);
}
}
又把AbstractRecords
赋值给RecordsSend
的records
public abstract class RecordsSend<T extends BaseRecords> implements Send {
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private final T records;
private final int maxBytesToWrite;
private int remaining;
private boolean pending = false;
protected RecordsSend(T records, int maxBytesToWrite) {
this.records = records;
this.maxBytesToWrite = maxBytesToWrite;
this.remaining = maxBytesToWrite;
}
@Override
public boolean completed() {
return remaining <= 0 && !pending;
}
@Override
public final long writeTo(TransferableChannel channel) throws IOException {
long written = 0;
if (remaining > 0) {
//会执行子类的DefaultRecordsSend的writeTo方法,之后调用record的writeTo方法
written = writeTo(channel, size() - remaining, remaining);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
}
pending = channel.hasPendingWrites();
if (remaining <= 0 && pending)
channel.write(EMPTY_BYTE_BUFFER);
return written;
}
@Override
public long size() {
return maxBytesToWrite;
}
protected T records() {
return records;
}
/**
* 将最多“剩余”字节的记录写入“通道”。允许实现是有状态的。来自调用方的约定是,第一次调用将“beforeWrite”等于0,“剩余”等于我们要写入“通道”的最大字节数。
* “先前写入”和“剩余”将根据每次后续调用进行适当调整。有关预期用法的示例,请参阅 {@link writeTo}。
*/
protected abstract long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException;
}
最后执行的是FileRecord
的writeTo
方法
(3)最后执行的是FileChannel的transferTo
@Override
public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
long count = Math.min(length, oldSize - offset);
return destChannel.transferFrom(channel, position, count);
}
最后执行的是FileChannel
的transferTo
方法
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
3、异步发送send
(1)把response加入到队列中
//,用于发送响应给客户端。
private[network] def sendResponse(response: RequestChannel.Response): Unit = {
response match {
//我们每个请求只应发送以下一项
case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse =>
//代码更新相应请求的responseCompleteTimeNanos和apiLocalCompleteTimeNanos属性,以记录请求的完成时间。
val request = response.request
val timeNanos = time.nanoseconds()
request.responseCompleteTimeNanos = timeNanos
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = timeNanos
//对于StartThrottlingResponse和EndThrottlingResponse类型的响应,代码不执行任何操作
case _: StartThrottlingResponse | _: EndThrottlingResponse => ()
}
//代码根据响应的处理器找到对应的处理器,并将响应加入处理器的响应队列中。
val processor = processors.get(response.processor)
if (processor != null) {
processor.enqueueResponse(response)
}
}
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
responseQueue.put(response)
wakeup()
}
(2)Processor线程处理响应数据
private[kafka] class Processor(
val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext,
connectionQueueSize: Int,
isPrivilegedListener: Boolean,
apiVersionManager: ApiVersionManager,
threadName: String
) extends Runnable with Logging {
//后台线程把数据发送到客户端
override def run(): Unit = {
try {
while (shouldRun.get()) {
try {
// setup any new connections that have been queued up
//用于设置新连接。
configureNewConnections()
// register any new responses for writing
//将新的响应注册用于写操作。
processNewResponses()
//方法进行轮询操作
poll()
//处理已完成的接收操作。
processCompletedReceives()
//处理已完成的发送操作。
processCompletedSends()
//处理已断开的连接。
processDisconnected()
//关闭多余的连接。
closeExcessConnections()
} catch {
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
}
}
}
Processor
继承了Runnable
类,实现了run
方法文章来源:https://www.toymoban.com/news/detail-693171.html
(3) 在执行发送时,会把要FileChannel的Channel通过FileChannel的transferTo转移到要发送的Channel,代表零拷贝的完成
private def poll(): Unit = {
val pollTimeout = if (newConnections.isEmpty) 300 else 0
try selector.poll(pollTimeout)
catch {
case e @ (_: IllegalStateException | _: IOException) =>
// The exception is not re-thrown and any completed sends/receives/connections/disconnections
// from this poll will be processed.
error(s"Processor $id poll failed", e)
}
}
@Override
public void poll(long timeout) throws IOException {
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are excluded from the ready count for the next select
//清除所有选定的键,以便将它们从下一次选择的就绪计数中排除
readyKeys.clear();
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
try {
attemptWrite(key, channel, nowNanos);
} catch (Exception e) {
sendFailed = true;
throw e;
}
}
private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
if (channel.hasSend()
&& channel.ready()
&& key.isWritable()
&& !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
write(channel);
}
}
void write(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
//channel.write 里会执行send.writeTo(transportLayer)
long bytesSent = channel.write();
}
public long write() throws IOException {
if (send == null)
return 0;
midWrite = true;
return send.writeTo(transportLayer);
}
这里的send.writeTo
执行的是上面的RecordsSend
的writeTo
方法
这样通过这种方式FileChannel
实现了零拷贝
文章来源地址https://www.toymoban.com/news/detail-693171.html
到了这里,关于kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!