librdkafka的rdk:broker-1线程cpu百分百问题分析

这篇具有很好参考价值的文章主要介绍了librdkafka的rdk:broker-1线程cpu百分百问题分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题调用栈:

(gdb) bt
#0  0x000000000068307c in rd_kafka_q_pop_serve (rkq=0x1ff31a0, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#1  0x0000000000683130 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#2  0x000000000066abcf in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x1ff28e0, timeout_ms=<optimized out>) at rdkafka_broker.c:2510
#3  0x000000000066ac84 in rd_kafka_broker_serve (rkb=rkb@entry=0x1ff28e0, abs_timeout=abs_timeout@entry=71168163411018) at rdkafka_broker.c:2532
#4  0x000000000066b157 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x1ff28e0, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2617
#5  0x000000000066c796 in rd_kafka_broker_thread_main (arg=arg@entry=0x1ff28e0) at rdkafka_broker.c:3571
#6  0x00000000006b8d87 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:583
#7  0x00007f80a07e1eb5 in start_thread () from /lib64/libpthread.so.0
#8  0x00007f80a05098fd in clone () from /lib64/libc.so.6

相关代码(rd_kafka_q_pop):

rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms,
                               int32_t version) {
	return rd_kafka_q_pop_serve(rkq, timeout_ms, version,
                                    RD_KAFKA_Q_CB_RETURN,
                                    NULL, NULL);
}

通过 gdb 观察到 timeout_ms 值为 1,也就是 1 毫秒,这是导致 cpu 百分百的原因:

(gdb) f 1
#1  0x0000000000683130 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
399     in rdkafka_queue.c
(gdb) info args
rkq = <optimized out>
timeout_ms = <optimized out>
version = 0
(gdb) info reg
rax            0x0      0
rbx            0x0      0
rcx            0x64ab3c08       1688943624
rdx            0x7f7f537b4240   140184838160960
rsi            0x1ff31a0        33501600
rdi            0x1ff31c8        33501640
rbp            0x1ff28e0        0x1ff28e0
rsp            0x7f7f537b4288   0x7f7f537b4288
r8             0x7      7
r9             0x4748105a       1195905114
r10            0x7b     123
r11            0x1da063a5132567 8339124156310887
r12            0x7f7f537b42f0   140184838161136
r13            0x40ba2119744a   71168163411018
r14            0x0      0
r15            0x4      4
rip            0x683130 0x683130 <rd_kafka_q_serve>
eflags         0x246    [ PF ZF IF ]
cs             0x33     51
ss             0x2b     43
ds             0x0      0
es             0x0      0
fs             0x0      0
gs             0x0      0
(gdb) p *(rd_kafka_q_t*)0x1ff31c8
$1 = {rkq_lock = {__data = {__lock = 0, __count = 4289904, __owner = 2144952, __nusers = 0, __kind = 2144952, __spins = 0, __elision = 0, __list = {__prev = 0x20bab8, 
        __next = 0x1ff31a0}}, 
    __size = "\000\000\000\000puA\000\270\272 \000\000\000\000\000\270\272 \000\000\000\000\000\270\272 \000\000\000\000\000\240\061\377\001\000\000\000", 
    __align = 18424997382979584}, rkq_cond = {__data = {__lock = 0, __futex = 0, __total_seq = 0, __wakeup_seq = 0, __woken_seq = 33501696, __mutex = 0x0, __nwaiters = 0, 
      __broadcast_seq = 0}, __size = '\000' <repeats 25 times>, "\062\377\001", '\000' <repeats 19 times>, __align = 0}, rkq_fwdq = 0x300000001, rkq_q = {
    tqh_first = 0x1ff1930, tqh_last = 0x0}, rkq_qlen = 0, rkq_qsize = 0, rkq_refcnt = 9087760, rkq_flags = 0, rkq_rk = 0x0, rkq_qio = 0x261, rkq_serve = 0x23, 
  rkq_opaque = 0x0, rkq_name = 0x1 <Address 0x1 out of bounds>}
(gdb) p *(int*)0x1ff31a0
$2 = 1

继续跟踪,问题发生在函数 cnd_timedwait_abs:

(gdb) b cnd_timedwait_abs
Breakpoint 1 at 0x6b92c0: file tinycthread_extra.c, line 95.
(gdb) c
Continuing.

Breakpoint 1, cnd_timedwait_abs (cnd=cnd@entry=0x1ff31c8, mtx=mtx@entry=0x1ff31a0, tspec=tspec@entry=0x7f7f537b4240) at tinycthread_extra.c:95
95      tinycthread_extra.c: No such file or directory.
(gdb) p *tspec
$3 = {tv_sec = 1688943624, tv_nsec = 1000000000}

函数 cnd_timedwait_abs 源码:

int cnd_timedwait_abs (cnd_t *cnd, mtx_t *mtx, const struct timespec *tspec) {
        if (tspec->tv_sec == RD_POLL_INFINITE)
                return cnd_wait(cnd, mtx);
        else if (tspec->tv_sec == RD_POLL_NOWAIT)
                return thrd_timedout;

        return cnd_timedwait(cnd, mtx, tspec); // 走这里来了
}

底层调用的是 Posix 的 pthread_cond_timedwait 函数:

#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
      pthread_mutex_t *restrict mutex,
      const struct timespec *restrict abstime);

函数 pthread_cond_timedwait 的参数 abstime 是个绝对时间,不是相对时间。初始化如下:

rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
                                     int32_t version,
                                     rd_kafka_q_cb_type_t cb_type,
                                     rd_kafka_q_serve_cb_t *callback,
                                     void *opaque) {
    struct timespec timeout_tspec;

    rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
    
    if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, &timeout_tspec) == thrd_timedout) {
        mtx_unlock(&rkq->rkq_lock);
    }
}

static RD_INLINE void rd_timeout_init_timespec (struct timespec *tspec, int timeout_ms) {
    if (timeout_ms == RD_POLL_INFINITE ||
        timeout_ms == RD_POLL_NOWAIT) {
        tspec->tv_sec = timeout_ms;
        tspec->tv_nsec = 0;
    } else {
        timespec_get(tspec, TIME_UTC); // 这里
        tspec->tv_sec  += timeout_ms / 1000;
        tspec->tv_nsec += (timeout_ms % 1000) * 1000000;
        if (tspec->tv_nsec > 1000000000) {
            tspec->tv_nsec -= 1000000000;
            tspec->tv_sec++;
        }
    }
}

函数 timespec_get:

/* If TIME_UTC is missing, provide it and provide a wrapper for
   timespec_get. */
#ifndef TIME_UTC
#define TIME_UTC 1
#define _TTHREAD_EMULATE_TIMESPEC_GET_

int _tthread_timespec_get(struct timespec *ts, int base);
#define timespec_get _tthread_timespec_get
#endif

#if defined(_TTHREAD_EMULATE_TIMESPEC_GET_)
int _tthread_timespec_get(struct timespec *ts, int base)
{
#if defined(_TTHREAD_WIN32_)
  struct _timeb tb;
#elif !defined(CLOCK_REALTIME)
  struct timeval tv;
#endif

  if (base != TIME_UTC) // 约束为 UTC,即世界统一时间
  {
    return 0;
  }

#if defined(_TTHREAD_WIN32_)
  _ftime_s(&tb);
  ts->tv_sec = (time_t)tb.time;
  ts->tv_nsec = 1000000L * (long)tb.millitm;
#elif defined(CLOCK_REALTIME)
  base = (clock_gettime(CLOCK_REALTIME, ts) == 0) ? base : 0;
#else
  gettimeofday(&tv, NULL);
  ts->tv_sec = (time_t)tv.tv_sec;
  ts->tv_nsec = 1000L * (long)tv.tv_usec;
#endif

  return base;
}
#endif /* _TTHREAD_EMULATE_TIMESPEC_GET_ */

回过头看函数 cnd_timedwait_abs 的参数 tspec 的值:

(gdb) p *tspec
$3 = {tv_sec = 1688943624, tv_nsec = 1000000000}

将 tv_sec 转为可读的值:

时间戳(秒)	1688943624
ISO 8601	2023-07-09T23:00:24.000Z
日期时间(UTC)	2023-07-09 23:00:24
日期时间(本地)	2023-07-10 07:00:24

而本地的实际时间为:

# date +'%Y-%m-%d %H:%M:%S'
2023-07-13 16:15:27

# date +'%s'
1689236277

# expr 1689236277 - 1688943624
292653
# expr 292653 / 3600
81

很明显 tspec 不对,。

准备进一步分析时,遇到 gdb 的 bug 了:

(gdb) b gettimeofday
Breakpoint 1 at 0x7fddd8ba9650 (2 locations)
(gdb) b clock_gettime
Breakpoint 2 at gnu-indirect-function resolver at 0x7fddd8c08800 (3 locations)
(gdb) c
Continuing.
../../gdb/elfread.c:1052: internal-error: elf_gnu_ifunc_resolver_return_stop: Assertion `b->loc->next == NULL' failed.
A problem internal to GDB has been detected,
further debugging may prove unreliable.
Quit this debugging session? (y or n)

进一步验证是发生在 clock_gettime,单独断点 gettimeofday 没有问题,而且不会进入 gettimeofday。

$ man clock_gettime

clock_getres(), clock_gettime(), clock_settime():
      _POSIX_C_SOURCE >= 199309L
      
#  ifdef __USE_POSIX199309
/* Identifier for system-wide realtime clock.  */
#   define CLOCK_REALTIME

找了个正常的,查看:

(gdb) p timeout_tspec
$2 = {tv_sec = 1689237800, tv_nsec = 687268217}

# expr 1689237800 - 1688943624
294176
# expr 294176 / 3600
81

暂时怀疑 clock_gettime 调用出问题了,实现在最新的代码中没有变化:https://github.com/confluentinc/librdkafka/blob/master/src/tinycthread.c。文章来源地址https://www.toymoban.com/news/detail-562385.html

到了这里,关于librdkafka的rdk:broker-1线程cpu百分百问题分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka【问题 03】Connection to node -1 (/IP:9092) could not be established. Broker may not be available.

    此问题仅出现在云服务器上,非云服务器未出现过一下报错: 非云服务器: 云服务器: 云服务器有两个IP,监听IP为云服务器IP,而advertised监听IP为云服务器的外网IP。

    2024年02月05日
    浏览(43)
  • kafka入门,Kafka Broker工作流程、Broker重要参数(十一)

    在zookeeper的服务端存储的Kafka相关信息 1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器 2)/kafka/brokers/topics/first/partitions/0/state 记录谁是leader,有哪些服务器可用 3)/kafka/controller 辅助选举leader 1)broker启动后在zk中注册 2)controller谁先注册,谁说了算 3)由选举出来的Controller监听bro

    2024年02月11日
    浏览(42)
  • Kafka 实战 - Kafka Broker工作流程

    Apache Kafka Broker 在 Kafka 集群中扮演着核心角色,负责接收、存储、复制及分发消息。以下是 Kafka Broker 的工作流程概览: 1. 启动与初始化 加载配置 :Kafka Broker 从 server.properties 文件加载配置参数,包括 Broker ID、监听地址、日志目录、ZooKeeper 连接信息等。 注册到 ZooKeeper :

    2024年04月15日
    浏览(38)
  • 四、Kafka Broker

    4.1.1 Zookeeper 存储的 Kafka 信息 4.1.2 Kafka Broker 总体工作流程 自己的理解:其实就是将kafka的分区,负载到集群中的各个节点上。 1、服役新节点 2、退役旧节点 1、副本的作用 2、Leader的选举流程 选举规则:在isr中存活为前提,按照AR中排在前面的优先。例如ar[1,0,2], isr [1,0,

    2024年02月11日
    浏览(29)
  • Kafka - Broker 详解

    目录 零、前置 一、Kafka Broker 工作流程 1.Zookeeper 存储的 Kafka 信息 2.Kafka Broker 总体工作流程 模拟 Kafka 上下线,Zookeeper 中数据变化 3.Broker 重要参数 二、生产经验 节点服役和退役 1.服役新节点 新节点准备 执行负载均衡操作 生成负载均衡的计划 执行副本存储计划 验证副本存

    2024年01月24日
    浏览(32)
  • 「Kafka」Broker篇

    主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。 Zookeeper 存储的 Kafka 信息 启动 Zookeeper 客户端: 通过 ls 命令可以查看 kafka 相关信息: Kafka Broker 总体工作流程 模拟 Kafka 上下线,Zookeeper 中数据变化: 查看 /kafka/brokers/ids 路径上的节

    2024年01月18日
    浏览(32)
  • 深入Kafka broker

    颗粒度, PRODUCE和FETCH中支持topic,partion等层级的颗粒度; 测试友好, 基于session_id和epoch确定一条拉取链路的fetch session; 全量增量结合, FetchRequest中的全量拉取和增量拉取; 基本结构: header+body。 常见header: api_key, api_version, corelation_id, client_id。与网络协议类似, Kafka本身的协议也是分

    2024年01月22日
    浏览(70)
  • 【kafka】——Broker

    1 /kafka/brokers/dis 存储broker的id,记录有哪些服务器 2 /kafka/brokers/topics 存储topic 相关信息 3 /kafka/consumers Kafka 0.9 版本之前 用于保存offset信息 Kafka 0.9 版本之后offser存储在Kafka主题中 4 /kafka/controller 辅助选举Leader 1 Broker 启动后在Zookeeper中注册 2 每个节点中的Contoller 抢先在Zookeepe

    2024年02月09日
    浏览(32)
  • Kafka及Kafka消费者的消费问题及线程问题

    Topic:是 Kafka 消息发布和订阅的基本单元,同时也是消息的容器。Topic 中的消息被分割成多个分区进行存储和处理。 Partition:是 Topic 分区,将 Topic 细分成多个分区,每个分区可以独立地存储在不同的 Broker 中,从而增加了消息的并发性、可扩展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    浏览(40)
  • Kafka-Broker工作流程

     kafka集群在启动时,会将每个broker节点注册到zookeeper中,每个broker节点都有一个controller,哪个controller先在zookeeper中注册,哪个controller就负责监听brokers节点变化,当有分区的leader挂掉时,controller会监听到节点变化,然后去zookeeper中获取isr,选举新的leader,选举的规则是:在

    2024年02月14日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包