kafka 3.5 主题分区的高水位线HW,低水位线LW,logStartOffset,LogEndOffset什么情况下会更新源码

这篇具有很好参考价值的文章主要介绍了kafka 3.5 主题分区的高水位线HW,低水位线LW,logStartOffset,LogEndOffset什么情况下会更新源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

下面的例子只是各拿一个做举例,不是全部场景,不要以为logStartOffset,LogEndOffset,HW,LW只有三个场景可以修改

前言

这里需要针对logStartOffsetLogEndOffset做特殊说明,要不会让大家脑袋混乱,并且前言后的章节讲的都是主题分区级别

1、logStartOffset

(1)主题分区级别

  1. 对于每个分区中每一个副本(包括Leader和Follower)都有一个独立的值,Kafka服务器记录并管理
  2. 如果是单个segment,那logStartOffset是segment的第一个offset位点,如果是多个segment,则是最旧的segment的第一个offset位点,
  3. 会受到生产者的写入、管理员的管理等因素的影响(删除oldSegment等)

(2)消费分组级别

  1. 针对每个消费者组内的每个分区而言的,对于每个消费者组内的每个分区都有一个独立的值,表示消费者在加入消费组之前已经消费的最大偏移量。
  2. 客户端记录并管理的,它表示该消费者在消费时的起始位置
  3. 会根据消费者消费的情况而不断变化

2、LogEndOffset

(1)主题分区级别

  1. 对于每个分区每一个副本(包括Leader和Follower)都有一个独立的值,Kafka服务器记录并管理
  2. 最新的segemnt的最新的offset位点+1
  3. 会受到生产者的写入、管理员的管理等因素的影响

(2)消费分组级别

  1. 针对每个消费者组内的每个分区而言的,对于每个消费者组内的每个分区都有一个独立的值,表示消费者在加入消费组之前已经消费过的最后一条消息的下一个偏移量。
  2. 客户端记录并管理的,它表示该消费者在消费时的已消费消息的位置
  3. 会根据消费者消费的情况而不断变化

3、HighWatermark(HW)

在Leader副本中的ISR集合中,最小的主题分区级别的LogEndOffset中为HW

4、LowWatermark(LW)

这个数据是虚线的值,不是实际存储的值,可以参考第五章节证明环节,
但是LW和所有副本(AR)中最小的主题分区级别的logStartOffset是一致的

一、定时任务

1、在处理创建分区请求时,会启动定时任务,主要用于把高水位线HW定时写入到文件中

def becomeLeaderOrFollower(correlationId: Int,
                             leaderAndIsrRequest: LeaderAndIsrRequest,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
          //todo 启动高水位线定时任务,目的是把每一个分区的高水位线的数据写入到高水位标记文件中
          startHighWatermarkCheckPointThread()
}          
def startHighWatermarkCheckPointThread(): Unit = {
    if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) {
      //() => checkpointHighWatermarks() 是一个runner
      scheduler.schedule("highwatermark-checkpoint", () => checkpointHighWatermarks(), 0L, config.replicaHighWatermarkCheckpointIntervalMs)
    }
  }

定时任务checkpointHighWatermarks

def checkpointHighWatermarks(): Unit = {
    //该函数接受两个参数:logDirToCheckpoints和log。函数的作用是将log的高水位标记(highWatermark)存储到logDirToCheckpoints中。
    def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
              log: UnifiedLog): Unit = {
      val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
        new mutable.AnyRefMap[TopicPartition, Long]())
      checkpoints.put(log.topicPartition, log.highWatermark)
    }
    //它是一个mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]]类型的可变映射。初始化大小为allPartitions.size。
    val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]](
      allPartitions.size)
   // 使用onlinePartitionsIterator迭代器遍历每个分区,并对每个分区的日志调用putHw函数来更新logDirToHws。
    onlinePartitionsIterator.foreach { partition =>
      partition.log.foreach(putHw(logDirToHws, _))
      partition.futureLog.foreach(putHw(logDirToHws, _))
    }
    //使用logDirToHws中的每个(logDir, hws)对调用highWatermarkCheckpoints.get(logDir),并尝试将hws写入高水位标记文件
    for ((logDir, hws) <- logDirToHws) {
      try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws))
      catch {
        //如果写入过程中发生KafkaStorageException异常,则打印错误日志
        case e: KafkaStorageException =>
          error(s"Error while writing to highwatermark file in directory $logDir", e)
      }
    }
  }

二、副本Follower在向Leader副本Fetch数据

至于为什么是fetchRecords方法,你可以看一下kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码

def fetchRecords(
    fetchParams: FetchParams,
    fetchPartitionData: FetchRequest.PartitionData,
    fetchTimeMs: Long,
    maxBytes: Int,
    minOneMessage: Boolean,
    updateFetchState: Boolean
  ): LogReadInfo = {
    //省略代码
    //判断获取数据的请求是否来自Follower
    if (fetchParams.isFromFollower) {
      // Check that the request is from a valid replica before doing the read
      val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
        //省略代码
        val logReadInfo = readFromLocalLog(localLog)
        (replica, logReadInfo)
      }
      //todo Follower副本在fetch数据后,修改一些信息
      if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
        //如果 fetch 来自 broker 的副本同步,那么就更新相关的 log end offset
        updateFollowerFetchState(
          replica,
          followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
          followerStartOffset = fetchPartitionData.logStartOffset,
          followerFetchTimeMs = fetchTimeMs,
          leaderEndOffset = logReadInfo.logEndOffset,
          fetchParams.replicaEpoch
        )
      }

      logReadInfo
    } 
  //省略代码  
  }
/**
   * Update the follower's state in the leader based on the last fetch request. See
   * [[Replica.updateFetchState()]] for details.
   *
   * This method is visible for performance testing (see `UpdateFollowerFetchStateBenchmark`)
   */
  def updateFollowerFetchState(
    replica: Replica,
    followerFetchOffsetMetadata: LogOffsetMetadata,
    followerStartOffset: Long,
    followerFetchTimeMs: Long,
    leaderEndOffset: Long,
    brokerEpoch: Long
  ): Unit = {
    // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
    //通过判断是否存在延迟的DeleteRecordsRequest来确定是否需要计算低水位(lowWatermarkIfLeader)。如果没有延迟的DeleteRecordsRequest,则将oldLeaderLW设为-1。
    val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
    //获取副本的先前的跟随者日志结束偏移量
    val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
    //调用replica.updateFetchState方法来更新副本的抓取状态,包括跟随者的抓取偏移量元数据、起始偏移量、抓取时间、领导者的结束偏移量和代理节点的时期。
    replica.updateFetchState(
      followerFetchOffsetMetadata,
      followerStartOffset,
      followerFetchTimeMs,
      leaderEndOffset,
      brokerEpoch
    )
    //再次判断是否存在延迟的DeleteRecordsRequest,如果没有则将newLeaderLW设为-1。
    val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
    // check if the LW of the partition has incremented
    // since the replica's logStartOffset may have incremented
    //检查分区的低水位是否增加,即新的低水位(newLeaderLW)是否大于旧的低水位(oldLeaderLW)。
    val leaderLWIncremented = newLeaderLW > oldLeaderLW

    // Check if this in-sync replica needs to be added to the ISR.
    //调用maybeExpandIsr方法来检查是否需要将该同步副本添加到ISR(In-Sync Replicas)中。
    maybeExpandIsr(replica)

    // check if the HW of the partition can now be incremented
    // since the replica may already be in the ISR and its LEO has just incremented
    //检查分区的高水位是否可以增加。如果副本的日志结束偏移量(replica.stateSnapshot.logEndOffset)发生变化,
    val leaderHWIncremented = if (prevFollowerEndOffset != replica.stateSnapshot.logEndOffset) {
      // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
      // leaderIsrUpdateLock to prevent adding new hw to invalid log.
      //尝试增加高水位(maybeIncrementLeaderHW方法),并在leaderIsrUpdateLock锁的保护下执行该操作。
      inReadLock(leaderIsrUpdateLock) {
        leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
      }
    } else {
      false
    }

    // some delayed operations may be unblocked after HW or LW changed
    //如果低水位或高水位发生变化,则尝试完成延迟请求(tryCompleteDelayedRequests方法)。
    if (leaderLWIncremented || leaderHWIncremented)
      tryCompleteDelayedRequests()

    debug(s"Recorded replica ${replica.brokerId} log end offset (LEO) position " +
      s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
  }

1、尝试升高Leader副本的HW

 /**
   * 检查并可能增加分区的高水位线;
   * 1. Partition ISR changed 1.分区 ISR 已更改
   * 2. Any replica's LEO changed 2。任何副本的 LEO 已更改
   *
   * HW由同步或被视为已捕获的所有副本中的最小日志结束偏移量确定。
   * 这样,如果一个副本被视为已捕获,但其对数结束偏移小于HW,我们将等待此副本赶上HW,然后再推进HW。
   * 这有助于 ISR 仅包含领导者副本且从属者试图赶上的情况。
   * 如果我们在前进HW时不等待跟随者,则跟随者的对数结束偏移量可能会一直落后于HW(由领导者的对数结束偏移量决定),因此永远不会添加到 ISR 中。
   * 随着 AlterPartition 的添加,我们还在推进硬件时将新添加的副本视为 ISR 的一部分。
   * 控制器尚未将这些副本提交到 ISR,因此我们可以恢复到之前提交的 ISR。
   * 但是,向 ISR 添加其他副本会使其更具限制性,因此更安全。我们将此集合称为“最大”ISR。
   */
  private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {
    //首先获得Leader分区的LogEndOffset
    val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
    //创建一个新的Hw。不一定用上,
    var newHighWatermark = leaderLogEndOffset
    //首先,代码通过迭代remoteReplicasMap中的每个副本(replica)来确定新的高水位线。对于每个副本,它检查副本的状态快照(replica.stateSnapshot)的日志结束偏移
    remoteReplicasMap.values.foreach { replica =>
      // Note here we are using the "maximal", see explanation above
      val replicaState = replica.stateSnapshot
      //如果副本的日志结束偏移小于新的高水位线,并且副本已经追赶上了领导者日志,或者副本的brokerId包含在最大ISR(in-sync replicas)列表中,则将该日志结束偏移赋值给新的高水位线。
      if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
        (replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs)
          || partitionState.maximalIsr.contains(replica.brokerId))) {
        //则将该副本日志结束偏移赋值给新的高水位线。相当于HW设置为ISR中endOffset最小的那一个
        newHighWatermark = replicaState.logEndOffsetMetadata
      }
    }
    leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
      case Some(oldHighWatermark) =>
        //尝试更新领导者日志的高水位线。如果成功更新了旧的高水位线,则会输出一条调试信息,并返回true
        debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
        true
      //省略代码
    }
  }

leaderLog.maybeIncrementHighWatermark这个是一个方法,有返回值

 /**
   *当且仅当高水位线大于旧值时,才将其更新为新值。更新为大于日志结束偏移量的值是错误的。
   *此方法旨在由领导者在更新追随者提取偏移量后更新高水位线。
   * @return the old high watermark, if updated by the new value
   */
  def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
   //省略代码
    lock.synchronized {
      val oldHighWatermark = fetchHighWatermarkMetadata
	 //省略代码
      if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
        (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
        updateHighWatermarkMetadata(newHighWatermark)
        Some(oldHighWatermark)
      } else {
        None
      }
    }
  }

updateHighWatermarkMetadata修改HW元数据的方法

//更新HW高水位线的方法
  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    if (newHighWatermark.messageOffset < 0)
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized {
      if (newHighWatermark.messageOffset < highWatermarkMetadata.messageOffset) {
        warn(s"Non-monotonic update of high watermark from $highWatermarkMetadata to $newHighWatermark")
      }
      //先更新HW缓存,再更新数据
      highWatermarkMetadata = newHighWatermark
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      logOffsetsListener.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      maybeIncrementFirstUnstableOffset()
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

三、生产者把数据推送到服务端

至于生产者推送消息到服务端,可以参考kafka 3.5 kafka服务端接收生产者发送的数据源码

1、logEndOffset升高

private def append(records: MemoryRecords,
                     origin: AppendOrigin,
                     interBrokerProtocolVersion: MetadataVersion,
                     validateAndAssignOffsets: Boolean,
                     leaderEpoch: Int,
                     requestLocal: Option[RequestLocal],
                     ignoreRecordSize: Boolean): LogAppendInfo = {
             
              //把数据追加到数据文件、索引文件、时间索引文件的方法
              //这里会修改LogEndOffset,保证这个点是最新数据的位点+1
              localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
              //修改高水位线最后一个日志的偏移量
              updateHighWatermarkWithLogEndOffset()

  }
 private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
    //给数据文件增加数据,并且根据条件判断是否给索引文件和时间索引文件增加数据
    segments.activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
      shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)
    //更新日志的结束偏移量,并更新恢复点。
    updateLogEndOffset(lastOffset + 1)
  }

其中updateLogEndOffset会修改logEndOffset

/**
   * The offset metadata of the next message that will be appended to the log
   */
  private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
  /**
   * The offset of the next message that will be appended to the log
   */
  private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
  /**
   * Update end offset of the log, and update the recoveryPoint.
   *更新日志的结束偏移量,并更新恢复点。
   * @param endOffset the new end offset of the log
   */
  private[log] def updateLogEndOffset(endOffset: Long): Unit = {
    nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
    //恢复点设置为上次的endOffset
    if (recoveryPoint > endOffset) {
      updateRecoveryPoint(endOffset)
    }
  }

new LogOffsetMetadata会创建一个新的给logEndOffsetMetadata,覆盖掉旧的,并且logEndOffset变成了新的logEndOffsetMetadata中的messageOffset

  public LogOffsetMetadata(long messageOffset,
                             long segmentBaseOffset,
                             int relativePositionInSegment) {
        this.messageOffset = messageOffset;
        this.segmentBaseOffset = segmentBaseOffset;
        this.relativePositionInSegment = relativePositionInSegment;
    }

四、segment过期执行删除最早创建的segment

1、logStartOffset升高

执行deleteSegments一般是segment过期执行删除操作,都是从时间最久的segment开始删除,所以LogStartOffset才会递增

private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        var segmentsToDelete = deletable
        if (localLog.segments.numberOfSegments == numToDelete) {
          val newSegment = roll()
          if (deletable.last.baseOffset == newSegment.baseOffset) {
            warn(s"Empty active segment at ${deletable.last.baseOffset} was deleted and recreated due to $reason")
            segmentsToDelete = deletable.dropRight(1)
          }
        }
        localLog.checkIfMemoryMappedBufferClosed()
        // remove the segments for lookups
        localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
        deleteProducerSnapshots(deletable, asyncDelete = true)
        maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, LogStartOffsetIncrementReason.SegmentDeletion)
      }
      numToDelete
    }
  }

在删除segment时,会调用maybeIncrementLogStartOffset 会尝试修改LogStartOffset

 /**
  
   *如果提供的偏移量较大,则递增日志开始偏移量。
   *如果日志开始偏移量发生更改,则此方法还会更新一些键偏移量,以便“logStartOffset <= logStableOffset <= highWatermark”。
   * 前导纪元缓存也会更新,以便该组件中引用的所有偏移都指向此日志中的有效偏移。
   * @throws OffsetOutOfRangeException if the log start offset is greater than the high watermark
   * @return true if the log start offset was updated; otherwise false
   */
  def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Boolean = {
    var updatedLogStartOffset = false
    maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
      lock synchronized {
        if (newLogStartOffset > highWatermark)
          throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
            s"since it is larger than the high watermark $highWatermark")

        localLog.checkIfMemoryMappedBufferClosed()
        if (newLogStartOffset > logStartOffset) {
         //修改LogStartOffset
          updatedLogStartOffset = true
          updateLogStartOffset(newLogStartOffset)
          _localLogStartOffset = newLogStartOffset
          info(s"Incremented log start offset to $newLogStartOffset due to $reason")
          leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
          producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
          maybeIncrementFirstUnstableOffset()
        }
      }
    }

    updatedLogStartOffset
  }
//修改LogStartOffset
  private def updateLogStartOffset(offset: Long): Unit = {
    logStartOffset = offset

    if (highWatermark < offset) {
      updateHighWatermark(offset)
    }

    if (localLog.recoveryPoint < offset) {
      localLog.updateRecoveryPoint(offset)
    }
  }

五、证明LW在代码中不实际存储

1、UnifiedLog.scala没有字段表示存储的是LW

但是比如HW,logStartOffset、logEndOffset都存在

class UnifiedLog(@volatile var logStartOffset: Long,
                 private val localLog: LocalLog,
                 brokerTopicStats: BrokerTopicStats,
                 val producerIdExpirationCheckIntervalMs: Int,
                 @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
                 val producerStateManager: ProducerStateManager,
                 @volatile private var _topicId: Option[Uuid],
                 val keepPartitionMetadataFile: Boolean,
                 val remoteStorageSystemEnable: Boolean = false,
                 remoteLogManager: Option[RemoteLogManager] = None,
                 @volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
   //logStartOffset
  @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
  //highWatermark            
  def highWatermark: Long = highWatermarkMetadata.messageOffset
  //logEndOffset
  def logEndOffset: Long =  localLog.logEndOffset

}

2、获得lowWatermark方法

基本获得lowWatermark的地方都是用lowWatermarkIfLeader获得LW,而方法内部是通过遍历所有副本中最小的logStartOffset得到的文章来源地址https://www.toymoban.com/news/detail-709432.html

/**
   * 低水位线偏移值,仅当本地副本是分区前导符时才计算 它仅由领导代理用于决定何时满足 DeleteRecordsRequest。
   * 它的值是所有活动副本的最小 logStartOffset 当领导者代理收到 FetchRequest 或 DeleteRecordsRequest 时,低水位线将增加。
   */
  def lowWatermarkIfLeader: Long = {
  	//不是Leader抛异常
    if (!isLeader)
      throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
    //当 DeleteRecordsRequest 未完成时,lowWatermarkIfLeader 可能会被多次调用,已注意避免在此代码中生成不必要的集合
    //首先获得Leader的logStartOffset
    var lowWaterMark = localLogOrException.logStartOffset
    //遍历所有的副本,如果有一个副本的logStartOffset小于当前Leader的logStartOffset,则LW则重新设置为最小的那个
    remoteReplicas.foreach { replica =>
      val logStartOffset = replica.stateSnapshot.logStartOffset
      if (metadataCache.hasAliveBroker(replica.brokerId) && logStartOffset < lowWaterMark) {
        lowWaterMark = logStartOffset
      }
    }
    //如果存在未来的日志(futureLog),则将最小水位线与未来日志的起始偏移量进行比较,取较小值作为最终的最小水位线;如果不存在未来的日志,则直接返回最小水位线。
    futureLog match {
      case Some(partitionFutureLog) =>
        Math.min(lowWaterMark, partitionFutureLog.logStartOffset)
      case None =>
        lowWaterMark
    }
  }

到了这里,关于kafka 3.5 主题分区的高水位线HW,低水位线LW,logStartOffset,LogEndOffset什么情况下会更新源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【FLink】水位线(Watermark)

    目录 1、关于时间语义 1.1事件时间 1.2处理时间​编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式水位线生成器(Punc

    2024年02月21日
    浏览(44)
  • Flink-水位线和时间语义

    在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。 在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Fli

    2024年02月04日
    浏览(48)
  • Flink-【时间语义、窗口、水位线】

    🌰:可乐 可乐的生产日期 = 事件时间(可乐产生的时间); 可乐被喝的时间 = 处理时间(可乐被处理【喝掉=处理】的时间)。 机器时间:可能不准确(例如:A可乐厂的时钟比较慢,B可乐厂的时钟比较快,但实际上B产生可乐的时间比A产生可乐的时间慢,却被先处理了)

    2024年02月01日
    浏览(51)
  • Flink-水位线的设置以及传递

    6.2.1 概述 分类 有序流 无序流 判断的时间延迟 延迟时间判定 6.2.2 水位线的设置 分析 DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本质还是个算子,传入的参数是WatermarkStrategy的生成策略 但是WatermarkStrategy是一个接口 有序流 因此调用静态方法forMonotonousT

    2023年04月15日
    浏览(34)
  • Flink之Watermark水印、水位线

    在Apache Flink中,Watermark(水印)是一种用于处理事件时间(eventtime)的时间指示器。它模拟了事件流中事件时间进展的概念。 事件时间是指事件实际发生的时间,在分布式流处理中经常用于处理无序事件流。然而,由于网络延迟、乱序事件的到达以及分布式处理的特点,事件

    2024年02月08日
    浏览(43)
  • flink水位线传播及任务事件时间

    本文来讲解一下flink的水位线传播及对其对任务事件时间的影响 首先flink是通过从源头生成水位线记录的方式来实现水位线传播的,也就是说水位线是嵌入在正常的记录流中的特殊记录,携带者水位线的时间戳,以下我们就通过图片的方式来讲解下水位线是如何传播以及更新

    2024年02月16日
    浏览(52)
  • 【入门Flink】- 09Flink水位线Watermark

    在 窗口的处理过程 中,基于数据的时间戳,自定义一个 “逻辑时钟” 。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。 用来衡量 事件时间 进展的标记,就被称作 “水位线”(Watermark) 。 具体实现上,水位线可以看作一条 特殊的数

    2024年01月17日
    浏览(44)
  • Flink详解系列之五--水位线(watermark)

    1、概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处

    2024年02月13日
    浏览(44)
  • Elasticsearch--解决磁盘使用率超过警戒水位线

    原文网址:Elasticsearch--解决磁盘使用率超过警戒水位线_IT利刃出鞘的博客-CSDN博客 本文介绍如何解决ES磁盘使用率超过警戒水位线的问题。 当客户端向 Elasticsearch 写入文档时候报错: 在 elasticsearch 的日志文件中报错如下: 出现如上问题多半是:磁盘使用量超过警戒水位线,

    2024年02月05日
    浏览(41)
  • 源码解析FlinkKafkaConsumer支持周期性水位线发送

    当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动 2.随后,PeriodicWatermarkEmitter中

    2024年02月08日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包