面试官问我:线程锁导致的kafka客户端超时,如何解决?

这篇具有很好参考价值的文章主要介绍了面试官问我:线程锁导致的kafka客户端超时,如何解决?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文分享自华为云社区《线程锁导致的kafka客户端超时问题》,作者: 张俭 。

问题背景

有一个环境的kafka client发送数据有部分超时,拓扑图也非常简单

面试官问我:线程锁导致的kafka客户端超时,如何解决?,技术交流,kafka,分布式,线程锁,JVM,kafka client

定位历程

我们先对客户端的环境及JVM情况进行了排查,从JVM所在的虚拟机到kafka server的网络正常,垃圾回收(GC)时间也在预期范围内,没有出现异常。

紧接着,我们把目光转向了kafka 服务器,进行了一些基础的检查,同时也查看了kafka处理请求的超时日志,其中我们关心的metadata和produce请求都没有超时。

问题就此陷入了僵局,虽然也搜到了一些kafka server会对连上来的client反解导致超时的问题( KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer by dpoldrugo · Pull Request #10059 · apache/kafka · GitHub),但通过一些简单的分析,我们确定这并非是问题所在。

同时,我们在环境上也发现一些异常情况,当时觉得不是核心问题/解释不通,没有深入去看

  • 问题JVM线程数较高,已经超过10000,这个线程数量虽然确实较高,但并不会对1个4U的容器产生什么实质性的影响。
  • 负责指标上报的线程CPU较高,大约占用了1/4 ~ 1/2 的CPU核,这个对于4U的容器来看问题也不大

当排查陷入僵局,我们开始考虑其他可能的调查手段。我们尝试抓包来找线索,这里的抓包是SASL鉴权+SSL加密的,非常难读,只能靠长度和响应时间勉强来推断报文的内容。

在这个过程中,我们发现了一个非常重要的线索,客户端竟然发起了超时断链,并且超时的那条消息,实际服务端是有响应回复的。

随后我们将kafka client的trace级别日志打开,这里不禁感叹kafka client日志打的相对较少,发现的确有log.debug(“Disconnecting from node {} due to request timeout.”, nodeId);的日志打印。

与网络相关的流程:

try {

// 这里发出了请求

client.send(request, time.milliseconds());

while (client.active()) {

List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());

for (ClientResponse response : responses) {

if (response.requestHeader().correlationId() == request.correlationId()) {

if (response.wasDisconnected()) {

throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");

}

if (response.versionMismatch() != null) {

throw response.versionMismatch();

}

return response;

}

}

}

throw new IOException("Client was shutdown before response was read");

} catch (DisconnectException e) {

if (client.active())

throw e;

else

throw new IOException("Client was shutdown before response was read");

}

这个poll方法,不是简单的poll方法,而在poll方法中会进行超时判断,查看poll方法中调用的handleTimedOutRequests方法

@Override

public List<ClientResponse> poll(long timeout, long now) {

ensureActive();

if (!abortedSends.isEmpty()) {

// If there are aborted sends because of unsupported version exceptions or disconnects,

// handle them immediately without waiting for Selector#poll.

List<ClientResponse> responses = new ArrayList<>();

handleAbortedSends(responses);

completeResponses(responses);

return responses;

}

long metadataTimeout = metadataUpdater.maybeUpdate(now);

try {

this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

} catch (IOException e) {

log.error("Unexpected error during I/O", e);

}

// process completed actions

long updatedNow = this.time.milliseconds();

List<ClientResponse> responses = new ArrayList<>();

handleCompletedSends(responses, updatedNow);

handleCompletedReceives(responses, updatedNow);

handleDisconnections(responses, updatedNow);

handleConnections();

handleInitiateApiVersionRequests(updatedNow);

// 关键的超时判断

handleTimedOutRequests(responses, updatedNow);

completeResponses(responses);

return responses;

}

由此我们推断,问题可能在于客户端hang住了一段时间,从而导致超时断链。我们通过工具Arthas深入跟踪了Kafka的相关代码,甚至发现一些简单的操作(如A.field)也需要数秒的时间。这进一步确认了我们的猜想:问题可能出在JVM。JVM可能在某个时刻出现问题,导致系统hang住,但这并非由GC引起。

面试官问我:线程锁导致的kafka客户端超时,如何解决?,技术交流,kafka,分布式,线程锁,JVM,kafka client

为了解决这个问题,我们又检查了监控线程CPU较高的问题。我们发现线程的执行热点是从"sun.management.ThreadImpl"中的"getThreadInfo"方法。

"metrics-1@746" prio=5 tid=0xf nid=NA runnable

java.lang.Thread.State: RUNNABLE

at sun.management.ThreadImpl.getThreadInfo(Native Method)

at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:185)

at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:149)

进一步发现,在某些版本的JDK8中,读取线程信息是需要加锁的。

至此,问题的根源已经清晰明了:过高的线程数以及线程监控时JVM全局锁的存在导致了这个问题。您可以使用如下的demo来复现这个问题

import java.lang.management.ManagementFactory;

import java.lang.management.ThreadInfo;

import java.lang.management.ThreadMXBean;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

public class ThreadLockSimple {

public static void main(String[] args) {

for (int i = 0; i < 15_000; i++) {

new Thread(new Runnable() {

@Override

public void run() {

try {

Thread.sleep(200_000);

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

}).start();

}

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

executorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

System.out.println("take " + " " + System.currentTimeMillis());

}

}, 1, 1, TimeUnit.SECONDS);

ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();

ScheduledExecutorService metricsService = Executors.newSingleThreadScheduledExecutor();

metricsService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

long start = System.currentTimeMillis();

ThreadInfo[] threadInfoList = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds());

System.out.println("threads count " + threadInfoList.length + " cost :" + (System.currentTimeMillis() - start));

}

}, 1, 1, TimeUnit.SECONDS);

}

}

为了解决这个问题,我们有以下几个可能的方案:

  • 将不合理的线程数往下降,可能存在线程泄露的场景
  • 升级jdk到jdk11或者jdk17(推荐)
  • 将Thread相关的监控临时关闭

这个问题的解决方案应根据实际情况进行选择,希望对你有所帮助。

点击关注,第一时间了解华为云新鲜技术~文章来源地址https://www.toymoban.com/news/detail-771512.html

到了这里,关于面试官问我:线程锁导致的kafka客户端超时,如何解决?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka 02——三个重要的kafka客户端

    请参考下面的文章: Kafka 01——Kafka的安装及简单入门使用. AdminClient API: 允许管理和检测Topic、Broker以及其他Kafka对象。 Producer API: 发布消息到一个或多个API。 Consumer API: 订阅一个或多个Topic,并处理产生的消息。 如下: 完整的pom 关于配置,可参考官网: https://kafka.apa

    2024年02月13日
    浏览(47)
  • kafka客户端应用参数详解

    Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可: 1、消息发送者主流程  然后可以使用Kafka提供的Producer类,快速发送消息。 ​ 整体来说,构建Producer分为三个步骤: 设置Producer核心属性  :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTST

    2024年02月07日
    浏览(53)
  • kafka客户端工具(Kafka Tool)的安装

    官方下载 根据不同的系统下载对应的版本,点击下载后双击,如何一直下一步,安装 kafka环境搭建请参考:CentOS 搭建Kafka集群 (1)连接kafka (2)简单使用  

    2024年04月23日
    浏览(76)
  • kafka之java客户端实战

            Kafka提供了两套客户端API, HighLevel API和LowLevel API 。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,

    2024年01月17日
    浏览(60)
  • rocketmq客户端本地日志文件过大调整配置(导致pod缓存cache过高)

            在使用rocketmq时,发现本地项目中文件越来越大,查找发现在/home/root/logs/rocketmqlog目录下存在大量rocketmq_client.log日志文件。 开启slf4j日志模式,在项目启动项中增加-Drocketmq.client.logUseSlf4j=true 因为配置使用的是System.getProperty获取,所以只能使用系统环境配置。 调整日

    2024年02月15日
    浏览(41)
  • 自定义kafka客户端消费topic

    使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。 后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没

    2024年02月02日
    浏览(54)
  • python-kafka客户端封装

    本文对python的kafka包做简单封装,方便kafka初学者使用。包安装: kafka_helper.py kafka_test.py Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

    2024年02月09日
    浏览(40)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(61)
  • c#客户端Kafka的使用方法

    Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目之一。Kafka能够处理大规模的实时数据流,支持高可靠性、高可扩展性、低延迟和高吞吐量。它主要用于构建实时数据管道和流式处理应用程序。 Kafka的核心概念包括:Producer(生产者)

    2024年02月12日
    浏览(62)
  • 【Kafka】Kafka客户端认证失败:Cluster authorization failed.

    kafka客户端是公司内部基于spring-kafka封装的 spring-boot版本:3.x spring-kafka版本:2.1.11.RELEASE 集群认证方式:SASL_PLAINTEXT/SCRAM-SHA-512 经过多年的经验,以及实际验证,配置是没问题的,但是业务方反馈用相同的配置,还是报错! 封装的kafka客户端版本过低,高版本的配置项:secu

    2024年01月17日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包