kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

这篇具有很好参考价值的文章主要介绍了kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kakfa集群有主题,每一个主题下又有很多分区,为了保证防止丢失数据,在分区下分Leader副本和Follower副本,而kafka的某个分区的Leader和Follower数据如何同步呢?下面就是讲解的这个

首先要知道,Follower的数据是通过Fetch线程异步从Leader拉取的数据,不懂的可以看一下Kafka——副本(Replica)机制

一、Broker接收处理分区的Leader和Follower的API

kafkaApis.scala

     //处理Leader和follower,ISR的请求
     case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
 def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {
    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
    //从请求头中获取关联ID
    //从请求体中获取LeaderAndIsrRequest对象。
    val correlationId = request.header.correlationId
    val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
    //对请求进行集群操作的授权验证。
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    //检查broker的代数是否过时。
    if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {
    //省略代码
    } else {
      //调用replicaManager.becomeLeaderOrFollower方法处理请求,获取响应
      val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
        RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
      requestHelper.sendResponseExemptThrottle(request, response)
    }
  }

经过一些检验后,调用becomeLeaderOrFollower获得响应结果,

二、针对分区副本的Leader和Follower的处理逻辑

def becomeLeaderOrFollower(correlationId: Int,
                             leaderAndIsrRequest: LeaderAndIsrRequest,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
    //首先记录了方法开始的时间戳
    val startMs = time.milliseconds()
    //副本状态改变锁
    replicaStateChangeLock synchronized {
      //从leaderAndIsrRequest中获取一些请求信息,包括controller的ID、分区状态等
      val controllerId = leaderAndIsrRequest.controllerId
      val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
   

      val response = {
        //处理过程中,会检查请求的controller epoch是否过时
        if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
       		//省略
        } else {
          val responseMap = new mutable.HashMap[TopicPartition, Errors]
          controllerEpoch = leaderAndIsrRequest.controllerEpoch

          val partitions = new mutable.HashSet[Partition]()
          //要成功Leader分区的集合
          val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
          //要成为Follower分区的集合
          val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
          val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()

          //遍历requestPartitionStates,其中包含了来自控制器(controller)的分区状态请求
          requestPartitionStates.foreach { partitionState =>
            val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
            //对于每个分区状态请求,首先检查分区是否存在,如果不存在,则创建一个新的分区。
            val partitionOpt = getPartition(topicPartition) match {
              case HostedPartition.Offline =>
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                  "partition is in an offline log directory")
                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
                None

              case HostedPartition.Online(partition) =>
                Some(partition)

              case HostedPartition.None =>
                val partition = Partition(topicPartition, time, this)
                allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
                Some(partition)
            }

            //接下来,检查分区的主题ID和Leader的epoch(版本号)等信息。
            partitionOpt.foreach { partition =>
              val currentLeaderEpoch = partition.getLeaderEpoch
              val requestLeaderEpoch = partitionState.leaderEpoch
              val requestTopicId = topicIdFromRequest(topicPartition.topic)
              val logTopicId = partition.topicId

              if (!hasConsistentTopicId(requestTopicId, logTopicId)) {
                //如果主题ID不一致,则记录错误并将其添加到响应映射(responseMap)中。
                stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" +
                  s" match the topic ID for partition $topicPartition received: " +
                  s"${requestTopicId.get}.")
                responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
              } 
              //如果Leader的epoch大于当前的epoch,则记录控制器(controller)的epoch,并将分区添加到要成为Leader或Follower的集合中。 
              else if (requestLeaderEpoch > currentLeaderEpoch) {              
                //分区副本确定是当前broker的,则添加到partitionsToBeLeader或者partitionsToBeFollower
                //如果分区副本是leader并且broker是当前broker,则加入partitionsToBeLeader
                //其他的放入到partitionsToBeFollower
                //这样保证后续操作partitionsToBeLeader或者partitionsToBeFollower只操作当前broker的
                if (partitionState.replicas.contains(localBrokerId)) {
                  partitions += partition
                  if (partitionState.leader == localBrokerId) {
                    partitionsToBeLeader.put(partition, partitionState)
                  } else {
                    partitionsToBeFollower.put(partition, partitionState)
                  }
                } 
                //省略代码.....
          }
          //创建高水位线检查点
          val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
          //如果partitionsToBeLeader非空,则调用makeLeaders方法将指定的分区设置为Leader,并返回这些分区的集合,否则返回空集合
          val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
          //这个是处理Leader的逻辑
            makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
              highWatermarkCheckpoints, topicIdFromRequest)
          else
            Set.empty[Partition]
          //如果partitionsToBeFollower非空,则调用makeFollowers方法将指定的分区副本设置为Follower,并返回这些分区的集合,否则返回空集合。
          val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
          //这个是处理Follower的逻辑
            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
              highWatermarkCheckpoints, topicIdFromRequest)
          else
            Set.empty[Partition]
          //根据partitionsBecomeFollower集合获取Follower分区的主题集合,并更新相关指标。
          val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
          updateLeaderAndFollowerMetrics(followerTopicSet)
          //如果topicIdUpdateFollowerPartitions非空,则调用updateTopicIdForFollowers方法更新Follower分区的主题ID。
          if (topicIdUpdateFollowerPartitions.nonEmpty)
            updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)    
          //启动高水位检查点线程。
          startHighWatermarkCheckPointThread()
          //根据参数初始化日志目录获取器
          maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)
          //关闭空闲的副本获取器线程
          //todo FetcherThreads
          replicaFetcherManager.shutdownIdleFetcherThreads()
          replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
		//省略代码....		
        }
      }
      //省略代码....		
    }
  }

因为这篇文章主要写Follower如何拉取数据,所以只需要关注上面代码中的makeFollowers就可以了

1、如果是Follower,则准备创建Fetcher线程,好异步执行向Leader拉取数据

/*
    1. 从领导者分区集中删除这些分区。
    2. 将这些 partition 标记为 follower,之后这些 partition 就不会再接收 produce 的请求了
    3. 停止对这些 partition 的副本同步,这样这些副本就不会再有(来自副本请求线程)的数据进行追加了
    4.对这些 partition 的 offset 进行 checkpoint,如果日志需要截断就进行截断操作;
    5.  清空 purgatory 中的 produce 和 fetch 请求
    6.如果代理未关闭,向这些 partition 的新 leader 启动副本同步线程
   * 执行这些步骤的顺序可确保转换中的副本在检查点偏移之前不会再接收任何消息,以便保证检查点之前的所有消息都刷新到磁盘
   *如果此函数中抛出意外错误,它将被传播到 KafkaAPIS,其中将在每个分区上设置错误消息,因为我们不知道是哪个分区导致了它。否则,返回由于此方法而成为追随者的分区集
   */
  private def makeFollowers(controllerId: Int,
                            controllerEpoch: Int,
                            partitionStates: Map[Partition, LeaderAndIsrPartitionState],
                            correlationId: Int,
                            responseMap: mutable.Map[TopicPartition, Errors],
                            highWatermarkCheckpoints: OffsetCheckpoints,
                            topicIds: String => Option[Uuid]) : Set[Partition] = {
    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
   	//省略代码。。。。
    //创建一个可变的Set[Partition]对象partitionsToMakeFollower,用于统计follower的集合
    val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
    try {
      partitionStates.forKeyValue { (partition, partitionState) =>
        //遍历partitionStates中的每个分区,根据分区的leader是否可用来改变分区的状态。
        val newLeaderBrokerId = partitionState.leader
        try {
          if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
            //如果分区的leader可用,将分区设置为follower,并将其添加到partitionsToMakeFollower中。
            // Only change partition state when the leader is available
            if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
              partitionsToMakeFollower += partition
            }
          } else {
          //省略代码。。
        } catch {
      		//省略代码。。。
        }
      }
      //删除针对partitionsToMakeFollower中 partition 的副本同步线程
      replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
      stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
        s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")
      //对于每个分区,完成延迟的抓取或生产请求。
      partitionsToMakeFollower.foreach { partition =>
        completeDelayedFetchOrProduceRequests(partition.topicPartition)
      }
      //如果正在关闭服务器,跳过添加抓取器的步骤。
      if (isShuttingDown.get()) {
      	//省略代码
      } else {
        //对于每个分区,获取分区的leader和抓取偏移量,并构建partitionsToMakeFollowerWithLeaderAndOffset映射。
        val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
          val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache.
            getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())
          val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())
          val log = partition.localLogOrException
          val fetchOffset = initialFetchOffset(log)
          partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset)
        }.toMap
        //添加抓取器以获取partitionsToMakeFollowerWithLeaderAndOffset中的分区。
        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
      }
    } catch {
     	//省略代码
    }
	 //省略代码。。。
  }

2、遍历分区Follower副本,判断是否有向目标broker现成的Fetcher,如果是则复用,否则创建

之后执行replicaFetcherManager.addFetcherForPartitions把信息添加到指定的Fetcher线程中

 // to be defined in subclass to create a specific fetcher
  def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): T
//主要目的是将分区和偏移量添加到相应的Fetcher线程中
  def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = {
    lock synchronized {
      //首先对partitionAndOffsets进行分组,按照BrokerAndFetcherId来分组
      val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) =>
        BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
      }

      def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
                                   brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
        //创建Fetcher线程                           
        val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
        //把线程放入到fetcherThreadMap
        fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
        //线程启动
        fetcherThread.start()
        fetcherThread
      }

      for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {

        val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
        //将启动的FetcherThread线程添加到fetcherThreadMap中
        val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
          // //检查是否已经存在一个与当前broker和fetcher id相匹配的Fetcher线程。
          case Some(currentFetcherThread) if currentFetcherThread.leader.brokerEndPoint() == brokerAndFetcherId.broker =>
            // reuse the fetcher thread
            //如果存在,则重用该线程
            currentFetcherThread
          case Some(f) =>//如果之前有,fetcher线程,则先关闭在创建一个新的Fetcher线程并启动
            f.shutdown()
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
          case None =>//创建一个新的Fetcher线程,并启动
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
        }
        //将分区添加到相应的Fetcher线程中
        // failed partitions are removed when added partitions to thread
        addPartitionsToFetcherThread(fetcherThread, initialFetchOffsets)
      }
    }
  }

上面可能有疑问,为什么有重用fetcher线程?
答案:是broker并不一定会为每一个主题分区的Follower都启动一个 fetcher 线程,对于一个目的 broker,只会启动 num.replica.fetchers 个线程,具体这个 topic-partition 会分配到哪个 fetcher 线程上,是根据 topic 名和 partition id 进行计算得到,实现所示:

  // Visibility for testing
  private[server] def getFetcherId(topicPartition: TopicPartition): Int = {
    lock synchronized {
      Utils.abs(31 * topicPartition.topic.hashCode() + topicPartition.partition) % numFetchersPerBroker
    }
  }

继续往下,其中createFetcherThread的实现是下面

3、创建Fetcher线程的实现

class ReplicaFetcherManager(brokerConfig: KafkaConfig,
                            protected val replicaManager: ReplicaManager,
                            metrics: Metrics,
                            time: Time,
                            threadNamePrefix: Option[String] = None,
                            quotaManager: ReplicationQuotaManager,
                            metadataVersionSupplier: () => MetadataVersion,
                            brokerEpochSupplier: () => Long)
      extends AbstractFetcherManager[ReplicaFetcherThread](
        name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
        clientId = "Replica",
        numFetchers = brokerConfig.numReplicaFetchers) {

  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
    val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
    val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
    val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${sourceBroker.id}, " +
      s"fetcherId=$fetcherId] ")
    val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,
      s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
    val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
      replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)
   // 创建了一个ReplicaFetcherThread对象,它的构造函数接受多个参数,用于副本的获取和管理。
    new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
      quotaManager, logContext.logPrefix, metadataVersionSupplier)
  }

  def shutdown(): Unit = {
    info("shutting down")
    closeAllFetchers()
    info("shutdown completed")
  }
}

其中new ReplicaFetcherThread返回一个创建的线程

class ReplicaFetcherThread(name: String,
                           leader: LeaderEndPoint,
                           brokerConfig: KafkaConfig,
                           failedPartitions: FailedPartitions,
                           replicaMgr: ReplicaManager,
                           quota: ReplicaQuota,
                           logPrefix: String,
                           metadataVersionSupplier: () => MetadataVersion)
  extends AbstractFetcherThread(name = name,
                                clientId = name,
                                leader = leader,
                                failedPartitions,
                                fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr),
                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
                                isInterruptible = false,
                                replicaMgr.brokerTopicStats) {

override def doWork(): Unit = {
    super.doWork()
    completeDelayedFetchRequests()
  }




}

ReplicaFetcherThread继承的AbstractFetcherThread类,AbstractFetcherThread又继承自ShutdownableThread类,其中ShutdownableThread中的run方法是线程的执行函数

abstract class AbstractFetcherThread(name: String,
                                     clientId: String,
                                     val leader: LeaderEndPoint,
                                     failedPartitions: FailedPartitions,
                                     val fetchTierStateMachine: TierStateMachine,
                                     fetchBackOffMs: Int = 0,
                                     isInterruptible: Boolean = true,
                                     val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
  extends ShutdownableThread(name, isInterruptible) with Logging {
 override def doWork(): Unit = {
    maybeTruncate()
    maybeFetch()
  }
}
public abstract class ShutdownableThread extends Thread {
	//省略代码
 public abstract void doWork();
  public void run() {
        isStarted = true;
        log.info("Starting");
        try {
            while (isRunning())
                doWork();
        } catch (FatalExitError e) {
            shutdownInitiated.countDown();
            shutdownComplete.countDown();
            log.info("Stopped");
            Exit.exit(e.statusCode());
        } catch (Throwable e) {
            if (isRunning())
                log.error("Error due to", e);
        } finally {
            shutdownComplete.countDown();
        }
        log.info("Stopped");
    }
}    

ShutdownableThread中的run函数调用子类的doWork()
而doWork中的执行顺序如下

//是否截断
 maybeTruncate()
 //抓取
 maybeFetch()
 //处理延时抓取请求
 completeDelayedFetchRequests()

其中 maybeFetch(),就是正常拼接fetch请求,并向目标broker发送请求,调用brokercase ApiKeys.FETCH => handleFetchRequest(request)文章来源地址https://www.toymoban.com/news/detail-697005.html

private def maybeFetch(): Unit = {
	//分区映射锁
    val fetchRequestOpt = inLock(partitionMapLock) {
      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = leader.buildFetch(partitionStates.partitionStateMap.asScala)

      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")

      if (fetchRequestOpt.isEmpty) {
        trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }

      fetchRequestOpt
    }

    fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
      processFetchRequest(sessionPartitions, fetchRequest)
    }
  }

到了这里,关于kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka 3.5 主题分区的高水位线HW,低水位线LW,logStartOffset,LogEndOffset什么情况下会更新源码

    下面的例子只是各拿一个做举例,不是全部场景,不要以为logStartOffset,LogEndOffset,HW,LW只有三个场景可以修改 这里需要针对 logStartOffset 和 LogEndOffset 做特殊说明,要不会让大家脑袋混乱,并且前言后的章节讲的都是 主题分区级别 的 (1)主题分区级别 对于每个分区中每一个

    2024年02月09日
    浏览(35)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(78)
  • 05、Kafka ------ 各个功能的作用解释(主题和分区 详解,用命令行和图形界面创建主题和查看主题)

    Kafka 主题虽然也叫 topic,但它和 Pub-Sub 消息模型中 topic 主题及 AMQP 的 topic 都不同(AMQP 的 topic 只是 Exchange 的类型)。 Kafka 的主题只是盛装消息的逻辑容器(注意是逻辑容器),主题之下会分为若干个分区,分区才是盛装消息的物理容器。 ▲ 消息组织方式实际上是三级结构

    2024年02月03日
    浏览(29)
  • kafka中topic的部分分区leader为none,怎样解决?

      (以Hadoop的topic为例) 进入Zookeeper客户端查看kafka存储的信息,/kafka/brokers/topics/hadoop/partitions/1/state get /kafka/brokers/topics/hadoop/partitions/1/state 查看到 {\\\"controller_epoch\\\":33,\\\"leader\\\":-1,\\\"version\\\":1,\\\"leader_epoch\\\":25,\\\"isr\\\":[3]}  leader为-1,固分区的leader为none 修改/kafka/brokers/topics/hadoop/partitions/

    2024年02月03日
    浏览(34)
  • 【基础】Kafka -- 主题与分区

    主题管理包括创建主题、查看主题消息、修改主题以及删除主题等操作,Kafka 提供的 kafka-topics.sh 脚本来执行这些操作,脚本位于 $KAFKA_HOME/bin/ 目录下,该脚本实际上是调用了 kafka.admin.TopicCommand 类来执行主题管理的操作。 简单创建与查看 若 broker 端的配置参数 auto.create.top

    2023年04月22日
    浏览(27)
  • k8s部署zookeeper集群(3节点,1个leader,2个follower)

    环境: centos 7.9 k8s集群 在k8s上面安装zookeeper集群,我们还是按照k8s的官方文档来安装吧,这样比较好,网上有各种各样的安装方式,这里使用 https://kubernetes.io/docs/tutorials/stateful-application/zookeeper/ k8s的官方文档来安装。

    2024年02月13日
    浏览(28)
  • zookeeper4==zookeeper源码阅读,FOLLOWER收到了需要LEADER执行的命令后各节点会执行什么

    上面已经阅读并观察了节点确定自己的身份后会做些什么,大致就是比对双方信息然后完成同步。 本篇阅读, FOLLOWER收到了需要LEADER执行的命令后,怎么同步给LEADER的,并且LEADER会执行什么操作。 源码启动zkCli用于测试 将原本的代码拷贝一份用IDEA打开后,找到org.apache.zook

    2024年02月03日
    浏览(28)
  • 解密Kafka主题的分区策略:提升实时数据处理的关键

    大家好,我是哪吒。 Kafka几乎是当今时代背景下数据管道的首选,无论你是做后端开发、还是大数据开发,对它可能都不陌生。开源软件Kafka的应用越来越广泛。 面对Kafka的普及和学习热潮,哪吒想分享一下自己多年的开发经验,带领读者比较轻松地掌握Kafka的相关知识。 上

    2024年02月05日
    浏览(32)
  • [AIGC_coze] Kafka 的主题分区之间的关系

    在 Kafka 中,主题(Topics)和分区(Partitions)是两个重要的概念,它们之间存在着密切的关系。 主题是 Kafka 中用于数据发布和订阅的逻辑单元。每个主题可以包含多个分区,每个分区都是一个独立的有序数据集。生产者将数据发送到特定的主题,而消费者通过订阅主题来接收

    2024年02月19日
    浏览(25)
  • JAVA实时获取kafka各个主题下分区消息的消费情况

    通过指定 主题 和 消费者组 调用方法,实时查看主题下分区消息的消费情况(消息总数量、消费消息数量、未消费的消息数量)。

    2024年02月13日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包