kafka 3.5 kafka服务端接收生产者发送的数据源码

这篇具有很好参考价值的文章主要介绍了kafka 3.5 kafka服务端接收生产者发送的数据源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、服务端接收生产者数据的方法

kafka服务端接收生产者数据的API在KafkaApis.scala类中,handleProduceRequest方法

override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
	//省略代码
	request.header.apiKey match {
          //生产者生产消息推送到服务端,这个接口进行处理
        case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
        //省略代码
    }    	
        
 }    
 /**
   * Handle a produce request
   */
  def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  	 //遍历请求中的topic,内部嵌套遍历此topic的分区,组装authorizedRequestInfo数据,当入参传入后面的appendRecords
	 val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
	  produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
      val topicPartition = new TopicPartition(topic.name, partition.index)
      // This caller assumes the type is MemoryRecords and that is true on current serialization
      // We cast the type to avoid causing big change to code base.
      // https://issues.apache.org/jira/browse/KAFKA-10698
      val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
     //省略代码
      ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
      authorizedRequestInfo += (topicPartition -> memoryRecords)
      //省略代码
    })
  	 //省略代码
	 // call the replica manager to append messages to the replicas
      //把消息发送到副本中
      replicaManager.appendRecords(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        origin = AppendOrigin.CLIENT,
        entriesPerPartition = authorizedRequestInfo,
        requestLocal = requestLocal,
        responseCallback = sendResponseCallback,
        recordConversionStatsCallback = processingStatsCallback)

      // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
      // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
      produceRequest.clearPartitionRecords()
     //省略代码
  
}

通过调用replicaManager.appendRecords把数据存入副本中(这里的副本指的是Topic分区Leader副本)

二、遍历需要保存数据的topic分区,分别执行保存数据操作到topic分区Leader

def appendRecords(timeout: Long,
                    requiredAcks: Short,
                    internalTopicsAllowed: Boolean,
                    origin: AppendOrigin,
                    entriesPerPartition: Map[TopicPartition, MemoryRecords],
                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                    delayedProduceLock: Option[Lock] = None,
                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
                    requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
    if (isValidRequiredAcks(requiredAcks)) {
      val sTime = time.milliseconds
      //追加到本地日志
      val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        origin, entriesPerPartition, requiredAcks, requestLocal)
      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
	  //后面逻辑都是处理响应结果localProduceResults
      //将每个分区的结果转换为ProducePartitionStatus,并存储在produceStatus中。
      val produceStatus = localProduceResults.map { case (topicPartition, result) =>
        topicPartition -> ProducePartitionStatus(
          result.info.lastOffset + 1, // required offset
          new PartitionResponse(
            result.error,
            result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
            result.info.logAppendTime,
            result.info.logStartOffset,
            result.info.recordErrors,
            result.info.errorMessage
          )
        ) // response status
      }
      //将一些操作添加到actionQueue中,这些操作会根据result.info.leaderHwChange的值执行不同的操作。
      actionQueue.add {
        () =>
          localProduceResults.foreach {
            case (topicPartition, result) =>
              val requestKey = TopicPartitionOperationKey(topicPartition)
              result.info.leaderHwChange match {
                case LeaderHwChange.INCREASED =>
                  // some delayed operations may be unblocked after HW changed
                  delayedProducePurgatory.checkAndComplete(requestKey)
                  delayedFetchPurgatory.checkAndComplete(requestKey)
                  delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
                case LeaderHwChange.SAME =>
                  // probably unblock some follower fetch requests since log end offset has been updated
                  delayedFetchPurgatory.checkAndComplete(requestKey)
                case LeaderHwChange.NONE =>
                  // nothing
              }
          }
      }
      //调用recordConversionStatsCallback方法,将每个分区的记录转换统计信息传递给回调函数。
      recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })

      //通过 delayedProduceRequestRequired 方法判断是否需要等待其它副本完成写入,如果 acks = -1,则需要等待。
      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
        //根据条件判断是否需要创建延迟的produce操作。如果需要,创建一个DelayedProduce对象,并将它添加到delayedProducePurgatory中。
        // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        //创建(主题、分区)对的列表,以用作此延迟生成操作的键
        val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

        // 再一次尝试完成该延时请求
        //  如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

      } else {
        //如果不需要延迟操作,直接将produce的结果返回给回调函数。
        // we can respond immediately
        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
        responseCallback(produceResponseStatus)
      }
    } else {
      //每个分区创建一个错误的PartitionResponse对象,并将结果返回给回调函数。
      val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
        topicPartition -> new PartitionResponse(
          Errors.INVALID_REQUIRED_ACKS,
          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset.map[Long](_.messageOffset).orElse(-1L),
          RecordBatch.NO_TIMESTAMP,
          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
        )
      }
      responseCallback(responseStatus)
    }
  }

上面写入本地日志的方法是appendToLocalLog方法

/**
   * Append the messages to the local replica logs
   */
  private def appendToLocalLog(internalTopicsAllowed: Boolean,
                               origin: AppendOrigin,
                               entriesPerPartition: Map[TopicPartition, MemoryRecords],
                               requiredAcks: Short,
                               requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = {
    val traceEnabled = isTraceEnabled
    def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
      val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
      brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
      brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
      error(s"Error processing append operation on partition $topicPartition", t)

      logStartOffset
    }
    //首先,它检查是否启用了跟踪(trace)日志,并根据需要记录跟踪信息。
    if (traceEnabled)
      trace(s"Append [$entriesPerPartition] to local log")
    //遍历请求中需要把数据写入的topic集合
    entriesPerPartition.map { case (topicPartition, records) =>
      brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
      brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
      //如果topicPartition是内部主题且不允许追加记录到内部主题,则返回一个包含错误信息的LogAppendResult。
      if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
        (topicPartition, LogAppendResult(
          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
          Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
      } else {
        try {
          //尝试将records追加到相应的分区中。
          val partition = getPartitionOrException(topicPartition)
          val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal)
          val numAppendedMessages = info.numMessages
          //如果追加成功,更新成功追加的字节数和消息数的统计信息。
          brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
          brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
          brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
          brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
      		//省略代码
        } catch {
			//省略代码
   
        }
      }
    }
  }

appendToLocalLog方法中比遍历Topic分区集合,针对Topic分区得到分区对象,再执行保存数据到Topic分区Leader

  val partition = getPartitionOrException(topicPartition)
  val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal)

三、在数据写入分区Leader之前,先获一些锁

1、首先获得leaderIsrUpdateLock的读锁,得到Leader日志读取权限,做一些验证

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
                            requestLocal: RequestLocal): LogAppendInfo = {
    //函数首先获取leaderIsrUpdateLock的读锁,以确保对Leader和ISR(In-Sync Replica)的更新操作是同步的。
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
      //然后检查当前是否有Leader日志,
      leaderLogIfLocal match {
        //如果存在Leader日志,
        case Some(leaderLog) =>
          //则获取最小ISR(MinInSyncReplicas)的配置和ISR的大小。
          val minIsr = leaderLog.config.minInSyncReplicas
          val inSyncSize = partitionState.isr.size

          // Avoid writing to leader if there are not enough insync replicas to make it safe,如果没有足够的不同步副本来使其安全,请避免写入领导者
          //如果ISR的大小小于最小ISR要求,并且requiredAcks的值为-1(表示不需要确认),则抛出NotEnoughReplicasException异常。
          if (inSyncSize < minIsr && requiredAcks == -1) {
            throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
              s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
          }
          //调用Leader日志的appendAsLeader方法将记录作为Leader追加到日志中,并传递相关参数。
          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
            interBrokerProtocolVersion, requestLocal)

          // we may need to increment high watermark since ISR could be down to 1,
          // 我们可能需要增加高水位线,因为 ISR 可能降至 1
          (info, maybeIncrementLeaderHW(leaderLog))
        //如果没有,则抛出NotLeaderOrFollowerException异常。
        case None =>
          throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
            .format(topicPartition, localBrokerId))
      }
    }
    //返回追加记录的信息,并根据是否增加了Leader高水位线,将LeaderHwChange.INCREASED或LeaderHwChange.SAME复制给返回信息的副本。
    info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
  }
 def appendAsLeader(records: MemoryRecords,
                     leaderEpoch: Int,
                     origin: AppendOrigin = AppendOrigin.CLIENT,
                     interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
                     requestLocal: RequestLocal = RequestLocal.NoCaching): LogAppendInfo = {
    val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), ignoreRecordSize = false)
  }

2、在把日志写入分区Leader之前先获得同步锁,再执行写入操作

private def append(records: MemoryRecords,
                     origin: AppendOrigin,
                     interBrokerProtocolVersion: MetadataVersion,
                     validateAndAssignOffsets: Boolean,
                     leaderEpoch: Int,
                     requestLocal: Option[RequestLocal],
                     ignoreRecordSize: Boolean): LogAppendInfo = {
    //调用maybeFlushMetadataFile()确保在写入任何日志数据之前,分区元数据文件被写入日志目录。这样可以确保在发生故障时,可以使用正确的主题ID恢复任何日志数据。
    maybeFlushMetadataFile()
  //会返回一个appendInfo对象,其中包含有关记录的分析和验证结果。
    val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
    //根据appendInfo.shallowCount的值判断是否有有效的消息。如果shallowCount为0,则直接返回appendInfo。
    if (appendInfo.shallowCount == 0) appendInfo
    else {
      //对记录进行修剪以去除无效的字节或部分消息。
      var validRecords = trimInvalidBytes(records, appendInfo)
      //获取锁并在同步块中进行操作:
      lock synchronized {
        maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
          //省略代码
          //可能会滚动日志,如果当前段已满。
          // maybe roll the log if this segment is full
          val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
          //创建一个LogOffsetMetadata对象来存储日志的偏移量信息。
          val logOffsetMetadata = new LogOffsetMetadata(
            appendInfo.firstOrLastOffsetOfFirstBatch,
            segment.baseOffset,
            segment.size)
		//省略代码
          maybeDuplicate match {
            case Some(duplicate) =>
              appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset)))
              appendInfo.setLastOffset(duplicate.lastOffset)
              appendInfo.setLogAppendTime(duplicate.timestamp)
              appendInfo.setLogStartOffset(logStartOffset)
            case None =>
              // Before appending update the first offset metadata to include segment information
              //如果没有重复的消息,则将记录追加到本地日志中,并更新高水位标记。
              appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata =>
                new LogOffsetMetadata(offsetMetadata.messageOffset, segment.baseOffset, segment.size)
              })
              //把数据追加到数据文件、索引文件、时间索引文件的方法
              localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
              //修改高水位线最后一个日志的偏移量
              updateHighWatermarkWithLogEndOffset()
              //更新生产者状态
              updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
              //省略代码
                //根据配置文件中刷新间隔判断,是否把所有在文件管道中的数据刷新进磁盘文件
              if (localLog.unflushedMessages >= config.flushInterval) flush(false)
          }
          appendInfo
        }
      }
    }
  }

val segment = maybeRoll(validRecords.sizeInBytes, appendInfo) 获取需要写入到哪个segment
再调用localLog.append执行操作文章来源地址https://www.toymoban.com/news/detail-702742.html

 private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
    //给数据文件增加数据,并且根据条件判断是否给索引文件和时间索引文件增加数据
    segments.activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
      shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)
    //更新日志的结束偏移量,并更新恢复点。
    updateLogEndOffset(lastOffset + 1)
  }

四、把数据写入到数据文件后,再追加索引文件和索引时间文件

 def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
    if (records.sizeInBytes > 0) {
      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      //获取当前日志的物理位置
      val physicalPosition = log.sizeInBytes()
      //如果物理位置为0,则将rollingBasedTimestamp设置为最大时间戳。
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)

      ensureOffsetInRange(largestOffset)

      // append the messages
      //追加消息到日志中,并返回追加的字节数,即追加到数据文件中
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
      // Update the in memory max timestamp and corresponding offset.
      //更新内存中的最大时间戳和对应的偏移量
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestamp, shallowOffsetOfMaxTimestamp)
      }
      // append an entry to the index (if needed)
      //如果距离上一次索引条目的字节数超过了indexIntervalBytes,索引文件中追加一个条目,也可能给时间索引文件增加一个条目
      if (bytesSinceLastIndexEntry > indexIntervalBytes) {
        offsetIndex.append(largestOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
        bytesSinceLastIndexEntry = 0
      }
      //更新bytesSinceLastIndexEntry的值。
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }
/**
     * Append a set of records to the file. This method is not thread-safe and must be
     * protected with a lock.
     * 将一组记录追加到文件中。此方法不是线程安全的,必须使用锁进行保护。
     *
     * @param records The records to append
     * @return the number of bytes written to the underlying file
     */
    public int append(MemoryRecords records) throws IOException {
        //代码检查要追加的记录的大小是否超过了当前文件位置之后的剩余空间大小,如果超过了,则抛出一个IllegalArgumentException异常。
        if (records.sizeInBytes() > Integer.MAX_VALUE - size.get())
            throw new IllegalArgumentException("Append of size " + records.sizeInBytes() +
                    " bytes is too large for segment with current file position at " + size.get());
        //records.writeFullyTo(channel)方法将记录完全写入到指定的channel中,并返回实际写入的字节数。
        int written = records.writeFullyTo(channel);
        //ize.getAndAdd(written)方法将已写入的字节数添加到size变量中,并返回实际写入的字节数。
        size.getAndAdd(written);
        return written;
    }

五、写入操作执行后,执行刷盘和重置恢复点

 if (localLog.unflushedMessages >= config.flushInterval) flush(false)
 private def flush(offset: Long, includingOffset: Boolean): Unit = {
    //flushOffset 此位点之前的数据刷进磁盘中
    val flushOffset = if (includingOffset) offset + 1  else offset
    //offset设置为新的恢复位点,
    val newRecoveryPoint = offset
    val includingOffsetStr =  if (includingOffset) "inclusive" else "exclusive"
    maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset " +
      s"($includingOffsetStr) and recovery point $newRecoveryPoint") {
      //如果flushOffset大于本地文件的恢复位点
      if (flushOffset > localLog.recoveryPoint) {
        debug(s"Flushing log up to offset $offset ($includingOffsetStr)" +
          s"with recovery point $newRecoveryPoint, last flushed: $lastFlushTime,  current time: ${time.milliseconds()}," +
          s"unflushed: ${localLog.unflushedMessages}")
        //则把本地文件flushOffset位点之前的刷新进磁盘
        localLog.flush(flushOffset)
        //增加锁,同步新的恢复位点
        lock synchronized {
          localLog.markFlushed(newRecoveryPoint)
        }
      }
    }
  }

到了这里,关于kafka 3.5 kafka服务端接收生产者发送的数据源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka入门(五):kafka生产者发送消息

    构建消息,即创建 ProduceRecord 对象。 (1) kafka发送消息,最常见的构造方法是: topic 表示主题, value 表示值。 (2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。

    2024年01月17日
    浏览(41)
  • kafka生产者异步发送、同步发送、回调异步发送,是什么情况?

    Kafka是一种分布式流处理平台 ,它是一种高吞吐量、可扩展、可持久化的消息队列系统,用于处理和存储实时流式数据。 Kafka基于发布-订阅模式,采用了分布式、多副本、分区的架构。它允许生产者将数据以消息的形式发送到Kafka集群的一个或多个主题(topic)中,而消费者可以

    2024年02月15日
    浏览(36)
  • kafka入门,生产者异步发送、回调函数,同步发送(四)

    引入依赖 回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 只需在异步发送的基础上,再调用一下 get(

    2024年02月11日
    浏览(44)
  • 多图详解 kafka 生产者消息发送过程

    生产者客户端代码 KafkaProducer 通过解析 producer.propeties 文件里面的属性来构造自己。例如 :分区器、Key 和 Value 序列化器、拦截器、 RecordAccumulator消息累加器 、 元信息更新器 、启动发送请求的后台线程 生产者元信息更新器 我们之前有讲过. 客户端都会保存集群的元信息,例如

    2023年04月09日
    浏览(37)
  • kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

    Kakfa集群有主题,每一个主题下又有很多分区,为了保证防止丢失数据,在分区下分Leader副本和Follower副本,而kafka的某个分区的Leader和Follower数据如何同步呢?下面就是讲解的这个 首先要知道,Follower的数据是通过Fetch线程异步从Leader拉取的数据,不懂的可以看一下Kafka——副

    2024年02月09日
    浏览(33)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(40)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(50)
  • Kafka 入门到起飞系列 - 生产者发送消息流程解析

    生产者通过 producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 生产者通过 send() 方法发送消息,send()方法会经过如下几步 1. 首先将消息交给 拦截器(Interceptor) 处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的

    2024年02月16日
    浏览(46)
  • Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验

    1.1 生产者消息发送流程 1.1.1 发送原理 在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。 batch.size:只有数据积累到bat

    2024年02月09日
    浏览(43)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包