KafkaProducer Sender 线程详解(含详细的执行流程图)

这篇具有很好参考价值的文章主要介绍了KafkaProducer Sender 线程详解(含详细的执行流程图)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

for (ProducerBatch batch : batches) {

List inflightBatchList = inFlightBatches.get(batch.topicPartition);

if (inflightBatchList == null) {

inflightBatchList = new ArrayList<>();

inFlightBatches.put(batch.topicPartition, inflightBatchList);

}

inflightBatchList.add(batch);

}

}

Step5:将抽取的 ProducerBatch 加入到 inFlightBatches 数据结构,该属性的声明如下:Map<TopicPartition, List< ProducerBatch >> inFlightBatches,即按照 topic-分区 为键,存放已抽取的 ProducerBatch,这个属性的含义就是存储待发送的消息批次。可以根据该数据结构得知在消息发送时以分区为维度反馈 Sender 线程的“积压情况”,max.in.flight.requests.per.connection 就是来控制积压的最大数量,如果积压达到这个数值,针对该队列的消息发送会限流。

Sender#sendProducerData

accumulator.resetNextBatchExpiryTime();

List expiredInflightBatches = getExpiredInflightBatches(now);

List expiredBatches = this.accumulator.expiredBatches(now);

expiredBatches.addAll(expiredInflightBatches);

Step6:从 inflightBatches 与 batches 中查找已过期的消息批次(ProducerBatch),判断是否过期的标准是系统当前时间与 ProducerBatch 创建时间之差是否超过120s,过期时间可以通过参数 delivery.timeout.ms 设置。

Sender#sendProducerData

if (!expiredBatches.isEmpty())

log.trace(“Expired {} batches in accumulator”, expiredBatches.size());

for (ProducerBatch expiredBatch : expiredBatches) {

String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition

  • “:” + (now - expiredBatch.createdMs) + " ms has passed since batch creation";

failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);

if (transactionManager != null && expiredBatch.inRetry()) {

// This ensures that no new batches are drained until the current in flight batches are fully resolved.

transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);

}

}

Step7:处理已超时的消息批次,通知该批消息发送失败,即通过设置 KafkaProducer#send 方法返回的凭证中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之调用其 get 方法不会阻塞。

Sender#sendProducerData

sensors.updateProduceRequestMetrics(batches);

Step8:收集统计指标,本文不打算详细分析,但后续会专门对 Kafka 的 Metrics 设计进行一个深入的探讨与学习。

Sender#sendProducerData

long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);

pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);

pollTimeout = Math.max(pollTimeout, 0);

if (!result.readyNodes.isEmpty()) {

log.trace(“Nodes with data ready to send: {}”, result.readyNodes);

pollTimeout = 0;

}

Step9:设置下一次的发送延时,待补充详细分析。

Sender#sendProducerData

sendProduceRequests(batches, now);

private void sendProduceRequests(Map<Integer, List> collated, long now) {

for (Map.Entry<Integer, List> entry : collated.entrySet())

sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());

}

Step10:该步骤按照 brokerId 分别构建发送请求,即每一个 broker 会将多个 ProducerBatch 一起封装成一个请求进行发送,同一时间,每一个 与 broker 连接只会只能发送一个请求,注意,这里只是构建请求,并最终会通过 NetworkClient#send 方法,将该批数据设置到 NetworkClient 的待发送数据中,此时并没有触发真正的网络调用。

sendProducerData 方法就介绍到这里了,既然这里还没有进行真正的网络请求,那在什么时候触发呢?

我们继续回到 runOnce 方法。

1.2.1.2 NetworkClient 的 poll 方法

public List poll(long timeout, long now) {

ensureActive();

if (!abortedSends.isEmpty()) {

// If there are aborted sends because of unsupported version exceptions or disconnects,

// handle them immediately without waiting for Selector#poll.

List responses = new ArrayList<>();

handleAbortedSends(responses);

completeResponses(responses);

return responses;

}

long metadataTimeout = metadataUpdater.maybeUpdate(now); // @1

try {

this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // @2

} catch (IOException e) {

log.error(“Unexpected error during I/O”, e);

}

// process completed actions

long updatedNow = this.time.milliseconds();

List responses = new ArrayList<>(); // @3

handleCompletedSends(responses, updatedNow);

handleCompletedReceives(responses, updatedNow);

handleDisconnections(responses, updatedNow);

handleConnections();

handleInitiateApiVersionRequests(updatedNow);

handleTimedOutRequests(responses, updatedNow);

completeResponses(responses); // @4

return responses;

}

本文并不会详细深入探讨其网络实现部分,Kafka 的 网络通讯后续我会专门详细的介绍,在这里先点出其关键点。

代码@1:尝试更新云数据。

代码@2:触发真正的网络通讯,该方法中会通过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker。

代码@3:然后会消息发送,消息接收、断开连接、API版本,超时等结果进行收集。

代码@4:并依次对结果进行唤醒,此时会将响应结果设置到 KafkaProducer#send 方法返回的凭证中,从而唤醒发送客户端,完成一次完整的消息发送流程。

Sender 发送线程的流程就介绍到这里了,接下来首先给出一张流程图,然后对上述流程中一些关键的方法再补充深入探讨一下。

1.2.2 run 方法流程图

KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows

根据上面的源码分析得出上述流程图,图中对重点步骤也详细标注了其关键点。下面我们对上述流程图中 Sender 线程依赖的相关类的核心方法进行解读,以便加深 Sender 线程的理解。

由于在讲解 Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,故接下来重点对上述涉及到RecordAccumulator 的方法进行一个详细剖析,加强对 Sender 流程的理解。

2、RecordAccumulator 核心方法详解


2.1 RecordAccumulator 的 ready 方法详解

该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。

RecordAccumulator#ready

public ReadyCheckResult ready(Cluster cluster, long nowMs) {

Set readyNodes = new HashSet<>();

long nextReadyCheckDelayMs = Long.MAX_VALUE;

Set unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;

for (Map.Entry<TopicPartition, Deque> entry : this.batches.entrySet()) { // @1

TopicPartition part = entry.getKey();

Deque deque = entry.getValue();

Node leader = cluster.leaderFor(part); // @2

synchronized (deque) {

if (leader == null && !deque.isEmpty()) { // @3

// This is a partition for which leader is not known, but messages are available to send.

// Note that entries are currently not removed from batches when deque is empty.

unknownLeaderTopics.add(part.topic());

} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) { // @4

ProducerBatch batch = deque.peekFirst();

if (batch != null) {

long waitedTimeMs = batch.waitedTimeMs(nowMs);

boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;

long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;

boolean full = deque.size() > 1 || batch.isFull();

boolean expired = waitedTimeMs >= timeToWaitMs;

boolean sendable = full || expired || exhausted || closed || flushInProgress();

if (sendable && !backingOff) { // @5

readyNodes.add(leader);

} else {

long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);

// Note that this results in a conservative estimate since an un-sendable partition may have

// a leader that will later be found to have sendable data. However, this is good enough

// since we’ll just wake up and then sleep again for the remaining time.

nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);

}

}

}

}

}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);

}

代码@1:对生产者缓存区 ConcurrentHashMap<TopicPartition, Deque< ProducerBatch>> batches 遍历,从中挑选已准备好的消息批次。

代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,如果不存在,当将该 topic 添加到 unknownLeaderTopics (代码@3),稍后会发送元数据更新请求去 broker 端查找分区的路由信息。

代码@4:如果不在 readyNodes 中就需要判断是否满足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。

代码@5:这里就是判断是否准备好的条件,先一个一个来解读局部变量的含义。

  • long waitedTimeMs

该 ProducerBatch 已等待的时长,等于当前时间戳 与 ProducerBatch 的 lastAttemptMs 之差,在 ProducerBatch 创建时或需要重试时会将当前的时间赋值给lastAttemptMs。

  • retryBackoffMs

当发生异常时发起重试之前的等待时间,默认为 100ms,可通过属性 retry.backoff.ms 配置。

  • batch.attempts()

该批次当前已重试的次数。

  • backingOff

后台发送是否关闭,即如果需要重试并且等待时间小于 retryBackoffMs ,则 backingOff = true,也意味着该批次未准备好。

  • timeToWaitMs

send 线程发送消息需要的等待时间,如果 backingOff 为 true,表示该批次是在重试,并且等待时间小于系统设置的需要等待时间,这种情况下 timeToWaitMs = retryBackoffMs 。否则需要等待的时间为 lingerMs。

  • boolean full

该批次是否已满,如果两个条件中的任意一个满足即为 true。

  • Deque< ProducerBatch> 该队列的个数大于1,表示肯定有一个 ProducerBatch 已写满。

  • ProducerBatch 已写满。

  • boolean expired

是否过期,等于已经等待的时间是否大于需要等待的时间,如果把发送看成定时发送的话,expired 为 true 表示定时器已到达触发点,即需要执行。

  • boolean exhausted

当前生产者缓存已不够,创建新的 ProducerBatch 时阻塞在申请缓存空间的线程大于0,此时应立即将缓存区中的消息立即发送到服务器。

  • boolean sendable

是否可发送。其满足下面的任意一个条件即可:

  • 该批次已写满。(full = true)。

  • 已等待系统规定的时长。(expired = true)

  • 发送者内部缓存区已耗尽并且有新的线程需要申请(exhausted = true)。

  • 该发送者的 close 方法被调用(close = true)。

  • 该发送者的 flush 方法被调用。

2.2 RecordAccumulator 的 drain方法详解

RecordAccumulator#drain

public Map<Integer, List> drain(Cluster cluster, Set nodes, int maxSize, long now) { // @1

if (nodes.isEmpty())

return Collections.emptyMap();

Map<Integer, List> batches = new HashMap<>();

for (Node node : nodes) {

List ready = drainBatchesForOneNode(cluster, node, maxSize, now); // @2

batches.put(node.id(), ready);

}

return batches;

}

代码@1:我们首先来介绍该方法的参数:

  • Cluster cluster

集群信息。

  • Set< Node> nodes

已准备好的节点集合。

  • int maxSize

一次请求最大的字节数。

  • long now

当前时间。

代码@2:遍历所有节点,调用 drainBatchesForOneNode 方法抽取数据,组装成 Map<Integer /** brokerId */, List< ProducerBatch>> batches。

接下来重点来看一下 drainBatchesForOneNode。

RecordAccumulator#drainBatchesForOneNode

private List drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {

int size = 0;

List parts = cluster.partitionsForNode(node.id()); // @1

List ready = new ArrayList<>();

int start = drainIndex = drainIndex % parts.size(); // @2

do { // @3

PartitionInfo part = parts.get(drainIndex);

TopicPartition tp = new TopicPartition(part.topic(), part.partition());

this.drainIndex = (this.drainIndex + 1) % parts.size();

if (isMuted(tp, now))

continue;

Deque deque = getDeque(tp); // @4

if (deque == null)

continue;

synchronized (deque) {

// invariant: !isMuted(tp,now) && deque != null

ProducerBatch first = deque.peekFirst(); // @5

if (first == null)

continue;

// first != null

boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // @6

// Only drain the batch if it is not during backoff period.

if (backoff)

continue;

if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // @7

break;

} else {

if (shouldStopDrainBatchesForPartition(first, tp))

break;

// 这里省略与事务消息相关的代码,后续会重点学习。

batch.close(); // @8

size += batch.records().sizeInBytes();

ready.add(batch);

batch.drained(now);

}

}

} while (start != drainIndex);

return ready;

}

代码@1:根据 brokerId 获取该 broker 上的所有主分区。

代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。

  • start 当前开始遍历的分区序号。

  • drainIndex 上次抽取的队列索引后,这里主要是为了每个队列都是从零号分区开始抽取。

代码@3:循环从缓存区抽取对应分区中累积的数据。

代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。

代码@5:从双端队列的头部获取一个元素。(消息追加时是追加到队列尾部)。

代码@6:如果当前批次是重试,并且还未到阻塞时间,则跳过该分区。

代码@7:如果当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。

代码@8:将当前批次加入到已准备集合中,并关闭该批次,即不在允许向该批次中追加消息。

关于消息发送就介绍到这里,NetworkClient 的 poll 方法内部会调用 Selector 执行就绪事件的选择,并将抽取的消息通过网络发送到 Broker 服务器,关于网络后面的具体实现,将在后续文章中单独介绍。

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows
KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows
KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows
KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows
KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows
KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows

言尽于此,完结

无论是一个初级的 coder,高级的程序员,还是顶级的系统架构师,应该都有深刻的领会到设计模式的重要性。

  • 第一,设计模式能让专业人之间交流方便,如下:

程序员A:这里我用了XXX设计模式

程序员B:那我大致了解你程序的设计思路了

  • 第二,易维护

项目经理:今天客户有这样一个需求…

程序员:明白了,这里我使用了XXX设计模式,所以改起来很快

  • 第三,设计模式是编程经验的总结

程序员A:B,你怎么想到要这样去构建你的代码

程序员B:在我学习了XXX设计模式之后,好像自然而然就感觉这样写能避免一些问题

  • 第四,学习设计模式并不是必须的

程序员A:B,你这段代码使用的是XXX设计模式对吗?

程序员B:不好意思,我没有学习过设计模式,但是我的经验告诉我是这样写的

KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows

从设计思想解读开源框架,一步一步到Spring、Spring5、SpringMVC、MyBatis等源码解读,我都已收集整理全套,篇幅有限,这块只是详细的解说了23种设计模式,整理的文件如下图一览无余!

KafkaProducer Sender 线程详解(含详细的执行流程图),2024年程序员学习,流程图,windows

搜集费时费力,能看到此处的都是真爱!

一个人可以走的很快,但一群人才能走的更远。如果你从事以下工作或对以下感兴趣,欢迎戳这里加入程序员的圈子,让我们一起学习成长!

AI人工智能、Android移动开发、AIGC大模型、C C#、Go语言、Java、Linux运维、云计算、MySQL、PMP、网络安全、Python爬虫、UE5、UI设计、Unity3D、Web前端开发、产品经理、车载开发、大数据、鸿蒙、计算机网络、嵌入式物联网、软件测试、数据结构与算法、音视频开发、Flutter、IOS开发、PHP开发、.NET、安卓逆向、云计算文章来源地址https://www.toymoban.com/news/detail-848617.html

程序员A:B,你怎么想到要这样去构建你的代码

程序员B:在我学习了XXX设计模式之后,好像自然而然就感觉这样写能避免一些问题

  • 第四,学习设计模式并不是必须的

程序员A:B,你这段代码使用的是XXX设计模式对吗?

程序员B:不好意思,我没有学习过设计模式,但是我的经验告诉我是这样写的

[外链图片转存中…(img-3oSccB44-1712172979982)]

从设计思想解读开源框架,一步一步到Spring、Spring5、SpringMVC、MyBatis等源码解读,我都已收集整理全套,篇幅有限,这块只是详细的解说了23种设计模式,整理的文件如下图一览无余!

[外链图片转存中…(img-MrpnBr3m-1712172979982)]

搜集费时费力,能看到此处的都是真爱!

一个人可以走的很快,但一群人才能走的更远。如果你从事以下工作或对以下感兴趣,欢迎戳这里加入程序员的圈子,让我们一起学习成长!

AI人工智能、Android移动开发、AIGC大模型、C C#、Go语言、Java、Linux运维、云计算、MySQL、PMP、网络安全、Python爬虫、UE5、UI设计、Unity3D、Web前端开发、产品经理、车载开发、大数据、鸿蒙、计算机网络、嵌入式物联网、软件测试、数据结构与算法、音视频开发、Flutter、IOS开发、PHP开发、.NET、安卓逆向、云计算

到了这里,关于KafkaProducer Sender 线程详解(含详细的执行流程图)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 力扣---LeetCode160. 相交链表(代码详解+流程图)

    “风格相同的人总会相遇 千万个人中万幸得以相逢.” 本章的内容是力扣每日随机一题的部分方法的解析 给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点,返回 null 。 160. 相交链表 link 根据题目这是两个问题

    2024年02月02日
    浏览(46)
  • 详解《基于 javascript 的流程图编辑框架LogicFlow》

    1、LogicFlow 是什么 LogicFlow 是一款流程图编辑框架,提供了一系列流程图交互、编辑所必需的功能和灵活的节点自定义、插件等拓展机制。LogicFlow 支持前端研发自定义开发各种逻辑编排场景,如流程图、ER 图、BPMN 流程等。在工作审批配置、机器人逻辑编排、无代码平台流程配

    2024年02月05日
    浏览(52)
  • OD(8)之Mermaid流程图(flowcharts)使用详解

    OD(8)之Mermaid流程图(flowcharts)使用详解 Author: Once Day Date: 2024年2月20日 漫漫长路才刚刚开始… 全系列文章可参考专栏: Linux实践记录_Once_day的博客-CSDN博客 参考文章: 关于 Mermaid | Mermaid 中文网 (nodejs.cn) Mermaid | Diagramming and charting tool ‍‌⁡⁤‍‍⁢‌‬⁡⁤⁢‍‌⁣⁢⁢⁤⁣‌‌

    2024年02月22日
    浏览(51)
  • python实现+leetcode题+合并两个有序列表超详细流程图分析以及代码思路

    给你两个按非递减顺序排列的整数列表nums1和nums2,另有两个整数m和n,分别表示nums1和nums2中的元素数目。请你合并nums2到nums1中,使合并后的数组同样按非递减顺序排列。 注意 :最终,合并后数组不应由函数返回,而是存储在数组nums1中。为了应对这种情况,nums1的初始长度为

    2023年04月09日
    浏览(50)
  • 生成一篇博客,详细讲解springboot的单点登录功能,有流程图,有源码demo

    SpringBoot是目前非常流行的一个Java开发框架,它以简洁的配置和快速的开发效率著称。在实际应用中,单点登录是一个非常重要的功能,它可以让用户在多个应用系统中使用同一个账号登录,提高用户体验和安全性。本文将详细讲解如何在SpringBoot中实现单点登录功能,并提供

    2024年02月08日
    浏览(45)
  • 需求管理全过程流程图及各阶段核心关注点详解

    分析报告指出,多达76%的项目失败是因为差劲的需求管理,这个是项目失败的最主要原因,比落后的技术、进度失控或者混乱的变更管理还要关键。 很多项目往往在开始的时候已经决定了失败,谜底就在谜面上,开始就注定的失败,你后面多努力都很难,注定背锅! 很多P

    2024年02月15日
    浏览(39)
  • 线程池的执行流程

    如果所示,就是线程池的执行过程,可以分为三个主要步骤: 1.提交任务后会首先进行当前工作线程数与核心线程数的比较,如果当前工作线程数小于核心线程数,则直接调用 addWorker() 方法创建一个核心线程去执行任务; 2.如果工作线程数大于核心线程数,即线程池核心线程

    2023年04月09日
    浏览(39)
  • G6框架Dagre流程图第三个自左向右的 Dagre 上对齐改造,对齐结点和边添加样式,并添加修改节点和展示结点详细信息交互

    ​​ 标题修改具体项 设置结点的样式 设置边的样式 添加修改结点名称功能 添加展示结点详细信息功能 参考链接 基本图形:https://g6.antv.vision/zh/examples/net/dagreFlow#lrDagreUL 展示结点详细信息功能:https://g6.antv.vision/zh/examples/tool/tooltip#tooltipClick 修改结点名称功能:https://g6.ant

    2024年02月10日
    浏览(59)
  • springMVC执行流程详解

    springMVC执行流程 主要由model层,view层和controller层组成。 1.1,jsp模型 主要是结构简单,开发这个小型项目的效率高,主要是由这个jsp和javaBean组成。但是jsp同时负责了controller层和view层,因此所有的代码都写在这个jsp里面,导致这个代码的重用性很低,维护不方便,因此这种

    2024年02月16日
    浏览(40)
  • MyBatis 执行流程详解及示例代码

    MyBatis 是一个优秀的持久层框架,它简化了数据库操作的开发过程。本文将详细介绍 MyBatis 的执行流程,并提供相应的示例代码,帮助读者更好地理解和应用 MyBatis。 在 MyBatis 中,首先需要创建一个 SqlSessionFactoryBuilder 对象,并加载 MyBatis 的配置文件。这个对象用于构建 SqlS

    2024年02月11日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包