【ES实战】ES创建Transports客户端时间过长分析

这篇具有很好参考价值的文章主要介绍了【ES实战】ES创建Transports客户端时间过长分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ES创建Transports客户端时间过长分析

2023年10月19日


在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。

问题描述

  1. 使用ES Transport 客户端创建与集群的链接。
  2. 连接地址里面有不存在的IP
  3. 在增加ES节点时,采用逐个增加的方式

整个建立链接的过程会非常耗时。

问题重现

采用jar依赖如下

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>5.6.16</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>x-pack-transport</artifactId>
            <version>5.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>sniffer</artifactId>
            <version>5.4.2</version>
        </dependency>

创建连接代码如下

        final Settings settings = Settings.builder()
                .put("cluster.name", "common-es")
                .put("client.transport.sniff", true).build();
        final TransportClient transportClient = new PreBuiltXPackTransportClient(settings);
        long t1 = System.currentTimeMillis();
        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.37"), 9800));
        logger.info("第1个错误节点耗时:" + (System.currentTimeMillis() - t1) / 1000);
        long t2 = System.currentTimeMillis();
        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.38"), 9800));
        logger.info("第2个错误节点耗时:" + (System.currentTimeMillis() - t2) / 1000);
        long t3 = System.currentTimeMillis();
        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.39"), 9800));
        logger.info("第3个错误节点耗时:" + (System.currentTimeMillis() - t3) / 1000);

输出结果

[2023-10-19 15:31:31,398] [main] [INFO ] xxx.Client - 第1个错误节点耗时:21
[2023-10-19 15:32:13,414] [main] [INFO ] xxx.Client - 第2个错误节点耗时:42
[2023-10-19 15:32:55,436] [main] [INFO ] xxx.Client - 第3个错误节点耗时:42

问题分析

是否可以配置链接超时时间

通过new PreBuiltXPackTransportClient()方法创建客户端,跟踪源码发现其会在TransportClient.buildTemplate进行建立网络模块服务,在继续debug,会发现会在TcpTransport中方法buildDefaultConnectionProfile构建链接的配置文件。发现其TCP_CONNECT_TIMEOUT默认的配置是30s,起对应的配置参数是transport.tcp.connect_timeout

    static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
        int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
        int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
        int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
        int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
        int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
        ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
        // 链接的超时时间
        builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
        builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));
        builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
        builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
        // if we are not master eligible we don't need a dedicated channel to publish the state
        builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
        // if we are not a data-node we don't need any dedicated channels for recovery
        builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
        builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
        return builder.build();
    }
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
    timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);

节点建立连接超时逻辑

TcpTransport.openConnection(DiscoveryNode node, ConnectionProfile connectionProfile)方法建立通信管道时,在通信之前重组连接的默认配置和自定义配置。在Netty4Transport.connectToChannels()方法内具体生效,future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));

增加节点的方式

TransportClient类提供了数组方式增加节点和单个节点增加的方式,

    public TransportClient addTransportAddress(TransportAddress transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }
    
    public TransportClient addTransportAddresses(TransportAddress... transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }

不过根据代码,发现其都是调用的TransportClientNodesService类的addTransportAddresses(TransportAddress... transportAddresses)方法

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
        // 竞争对象锁mutex
        synchronized (mutex) {
            if (closed) {
                throw new IllegalStateException("transport client is closed, can't add an address");
            }
            List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);
            for (TransportAddress transportAddress : transportAddresses) {
                boolean found = false;
                for (DiscoveryNode otherNode : listedNodes) {
                    // 方式连接地址值重复,会自动过滤
                    if (otherNode.getAddress().equals(transportAddress)) {
                        found = true;
                        logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
                        break;
                    }
                }
                if (!found) {
                    filtered.add(transportAddress);
                }
            }
            if (filtered.isEmpty()) {
                return this;
            }
            List<DiscoveryNode> builder = new ArrayList<>(listedNodes);
            for (TransportAddress transportAddress : filtered) {
                DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),
                        transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);
                logger.debug("adding address [{}]", node);
                builder.add(node);
            }
            // listNodes里面存放的是配置的连接节点列表
            listedNodes = Collections.unmodifiableList(builder);
            // 调用不同的节点采集-里面也对mutex锁进行竞争
            nodesSampler.sample();
        }
        return this;
    }

NodeSampler.sample()

		public void sample() {
            synchronized (mutex) {
                if (closed) {
                    return;
                }
                doSample();
            }
        }

NodesSampler有两个具体的继承实现类

  • SniffNodesSampler:开启嗅探属性的客户端
  • SimpleNodeSampler:简单客户端

这边对SniffNodesSamplersample()方法进行分析。

        @Override
        protected void doSample() {
            Set<DiscoveryNode> nodesToPing = new HashSet<>();
            // 最新要进行连接的一组节点列表
            for (DiscoveryNode node : listedNodes) {
                nodesToPing.add(node);
            }
            // nodes代表已经连接上的节点列表
            for (DiscoveryNode node : nodes) {
                nodesToPing.add(node);
            }
            // 并发控制辅助类
            final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
            final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
            try {
                for (final DiscoveryNode nodeToPing : nodesToPing) {
                    // 采用线程池的方式去连接节点
                    threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
                        Transport.Connection connectionToClose = null;

                        void onDone() {
                            try {
                                IOUtils.closeWhileHandlingException(connectionToClose);
                            } finally {
                                latch.countDown();
                            }
                        }

                        @Override
                        public void onFailure(Exception e) {
                            onDone();
                            ...
                            ...
                        }

                        @Override
                        protected void doRun() throws Exception {
                            Transport.Connection pingConnection = null;
                            if (nodes.contains(nodeToPing)) {
                                try {
                                    pingConnection = transportService.getConnection(nodeToPing);
                                } catch (NodeNotConnectedException e) {
                                    // will use a temp connection
                                }
                            }
                            if (pingConnection == null) {
                                logger.trace("connecting to cluster node [{}]", nodeToPing);
                                // 尝试去连接节点,超时会抛出异常
                                connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
                                pingConnection = connectionToClose;
                            }
                            // 若有一个节点连接成功会进行集群状态查询,返回值里面包含了全部可用节点
                            transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
                                Requests.clusterStateRequest().clear().nodes(true).local(true),
                                TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
                                    .withTimeout(pingTimeout).build(),
                                new TransportResponseHandler<ClusterStateResponse>() {

                                    @Override
                                    public ClusterStateResponse newInstance() {
                                        return new ClusterStateResponse();
                                    }

                                    @Override
                                    public String executor() {
                                        return ThreadPool.Names.SAME;
                                    }

                                    @Override
                                    public void handleResponse(ClusterStateResponse response) {
                                        clusterStateResponses.put(nodeToPing, response);
                                        onDone();
                                    }

                                    @Override
                                    public void handleException(TransportException e) {
                                        logger.info(
                                            (Supplier<?>) () -> new ParameterizedMessage(
                                                "failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
                                        try {
                                            hostFailureListener.onNodeDisconnected(nodeToPing, e);
                                        } finally {
                                            onDone();
                                        }
                                    }
                                });
                        }
                    });
                }
                latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            HashSet<DiscoveryNode> newNodes = new HashSet<>();
            HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
            for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
                if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                    logger.warn("node {} not part of the cluster {}, ignoring...",
                            entry.getValue().getState().nodes().getLocalNode(), clusterName);
                    newFilteredNodes.add(entry.getKey());
                    continue;
                }
                for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
                    newNodes.add(cursor.value);
                }
            }
            // 验证新节点是否可连接
            nodes = validateNewNodes(newNodes);
            filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
        }

通过代码发现,其实用了线程池并发连接节点,但是也使用了CountDownLatch,这就导致了,如果有一个节点超时,那整个批次都需要等待这么长的时间。典型的长尾效应

为啥超时间会出现翻倍

建立TransportClientNodesService服务时,构造函数中增加了对NodeSampler的调度。

    TransportClientNodesService(Settings settings, TransportService transportService,
                                       ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
       ...
       ...
       ...
        this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
    }

ScheduledNodeSampler

当调度触发之后,也会去执行nodesSampler.sample();,也就对mutex锁有了竞争,当调用增加连接方法之后,就会有两次调用 nodesSampler.sample();也就会将超时时间翻倍。

class ScheduledNodeSampler implements Runnable {
    @Override
    public void run() {
        try {
            nodesSampler.sample();
            if (!closed) {
                nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);
            }
        } catch (Exception e) {
            logger.warn("failed to sample", e);
        }
    }
}

优化方案

  1. Settings增加超时transport的tcp超时配置。

     final Settings settings = Settings.builder()
                    .put("cluster.name", "common-es")
                    .put("transport.tcp.connect_timeout", "5s")
                    .put("client.transport.sniff", true).build();
    

    注意此配置的参数名不同版本之间存在差异。

  2. 使用数组方式增加连接节点,减少反复调用TransportClientNodesService addTransportAddresses次数,就是在减少分批次的产生阻塞耗时文章来源地址https://www.toymoban.com/news/detail-722310.html

到了这里,关于【ES实战】ES创建Transports客户端时间过长分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ES客户端RestHighLevelClient的使用

    默认情况下,ElasticSearch使用两个端口来监听外部TCP流量。 9200端口:用于所有通过HTTP协议进行的API调用。包括搜索、聚合、监控、以及其他任何使用HTTP协议的请求。所有的客户端库都会使用该端口与ElasticSearch进行交互。 9300端口:是一个自定义的二进制协议,用于集群中各

    2024年02月03日
    浏览(62)
  • ES查询客户端初始化(RestHighLevelClient)

    另外,如果说把es当成一个数据库使用,可以看下开源项目easy-es,操作更方便。文档地址:快速开始 | Easy-Es

    2024年02月11日
    浏览(52)
  • es最大相似度检索(原生与java客户端)

    原生rest: 对“不好”进行分词, \\\"operator\\\": \\\"and\\\" 意思是同时满足。 结果: java RestHighLevelClient

    2024年02月11日
    浏览(49)
  • 袁庭新ES系列15节|Elasticsearch客户端基础操作

    上一章节我们介绍了搭建Elasticsearch集群相关的知识。那么又该如何来操作Elasticsearch集群呢?在ES官网中提供了各种语言的客户端,我们在项目开发过程中有多种Elasticsearch版本和连接客户端可以选择,那么他们有什么区别?这一章节袁老师带领大家来学习Elasticsearch客户端相关

    2024年04月25日
    浏览(51)
  • python用websockets创建服务端websocket创建客户端

    服务端 客户端

    2024年02月22日
    浏览(55)
  • Linux时间校准(ntpdate及NTP客户端代码校准示例)

    机器每次机启后时间就会出现异常,因为机器无法访问外网,只能访问局域网的ntp服务,所以需要保证局域网内部有ntp服务,如何安装ntp服务,参考Ubuntu20.04 Ntp服务安装及验证。 网络时间协议Network Time Protocol(NTP) 是一种确保时钟保持准确的方法。如果可以访问互联网,只需安

    2024年02月04日
    浏览(38)
  • kafka之java客户端实战

            Kafka提供了两套客户端API, HighLevel API和LowLevel API 。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,

    2024年01月17日
    浏览(61)
  • 【openfeign】OpenFeign的扩展、日志、超时时间、拦截器、客户端组件、压缩

    有时候我们遇到Bug,比如接口调用失败、参数没收到等问题,或者想看看调用性能,就需要配置Feign的日志了,以此让Feign把请求信息输出来。 定义一个配置类,指定日志级别: 通过源码可以看到日志等级有4种,分别是: NONE:不记录任何日志(默认值),性能最佳,适用于

    2024年02月12日
    浏览(50)
  • Netty理论与实践(二) 创建http客户端 服务端

    1. 使用echo服务器模拟http 通过上一篇文章中的echo服务器程序来模拟一次HTTP请求。 接收消息的代码如下: 我们通过postman直接访问echo服务器: 请求成功,echo服务器接收到了本次HTTP请求,控制台打印内容如下: 上面的原理很容易理解,postman通过tcp建立与服务器localhost:8001的连

    2024年02月16日
    浏览(35)
  • TCP四次握手为什么客户端等待的时间是2MSL

    MSL是Maximum Segment Lifetime英文的缩写,中文可以译为“报文最大生存时间”,他是任何报文在网络上存在的最长时间,超过这个时间报文将被丢弃。 第三次握手服务端发送FIN报文段,最长情况下经过MSL的时间可以到达客户端,客户端在收到来自服务端的FIN报文段之后发送ACK报文

    2024年02月15日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包