rabbitmq之java.net.SocketException: Connection reset与MissedHeartbeatException分析

这篇具有很好参考价值的文章主要介绍了rabbitmq之java.net.SocketException: Connection reset与MissedHeartbeatException分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前言

在android前端中接入了rabbitmq消息队列来处理业务,在手机网络环境错综复杂,网络信号不稳定,可能导致mq的频繁断开与连接,在日志中,发现有很多这样的日志,java.net.SocketException: Connection reset,接下来通过源码调试来分析下该错误可能产生的原因。MissedHeartbeatException则是在客户端在多次未收到服务端的消息后,认为服务端已经断开,则抛出该异常。

二、分析

java.net.SocketException: Connection reset在网络搜了一圈,基本上说的是客户端连接着mq,但是服务端已经断开与客户端的连接,此时客户端还在执行接收数据操作,就会发生该错误。

三、MQ的心跳机制

MQ在创建连接的时候则会进行初始化开启心跳服务initializeHeartbeatSender();

  private void initializeHeartbeatSender() {
        this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);
    }

在rabbitmq中,客户端会间隔1/2的心跳周期来定时发送心跳

    /**
     * Sets the heartbeat in seconds.
     */
    public void setHeartbeat(int heartbeatSeconds) {
        synchronized(this.monitor) {
            if(this.shutdown) {
                return;
            }

            // cancel any existing heartbeat task
            if(this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }

            if (heartbeatSeconds > 0) {
                // wake every heartbeatSeconds / 2 to avoid the worst case
                // where the last activity comes just after the last heartbeat
                long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
                ScheduledExecutorService executor = createExecutorIfNecessary();
                Runnable task = new HeartbeatRunnable(interval);
                this.future = executor.scheduleAtFixedRate(
                    task, interval, interval, TimeUnit.NANOSECONDS);
            }
        }
    }

发送心跳,此时如果发生IO异常,这边没处理

    private final class HeartbeatRunnable implements Runnable {

        private final long heartbeatNanos;

        private HeartbeatRunnable(long heartbeatNanos) {
            this.heartbeatNanos = heartbeatNanos;
        }

        @Override
        public void run() {
            try {
                LogUtils.log("心跳定时器发送");
                long now = System.nanoTime();

                if (now > (lastActivityTime + this.heartbeatNanos)) {
                    frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                    frameHandler.flush();
                }
            } catch (IOException e) {
                // ignore
            }
        }
    }

结合官方文档和客户端源代码,心跳默认超时时间是60秒,并且每隔30秒进行一次心跳检查,如果超过两次心跳检查都没有确定节点检查,则会关闭连接

3.1测试

测试用例中, 将心跳周期设置为30秒

public static void main(String[] args) {
        String queueName="123456";
        ExecutorService executor= Executors.newFixedThreadPool(10); 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.0.11.211");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setVirtualHost("/");
        factory.setPassword("admin");
        factory.setConnectionTimeout(5000);       
        factory.setAutomaticRecoveryEnabled(false); 
        factory.setTopologyRecoveryEnabled(false);   
        factory.setRequestedHeartbeat(30);
        executor.submit(() -> {
            try {
                Connection connection = factory.newConnection();
                LogUtils.log("连接创建成功");
                connection.addShutdownListener(cause -> {
                    LogUtils.log("断开连接:"+cause.getMessage()+" msg=>:"+cause.getCause());
                });
                Channel channel = connection.createChannel();
                LogUtils.log("创建通道成功:" + channel.getChannelNumber());
                channel.basicQos(30);
                channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                        try {
                            String message = new String(body, "UTF-8");
                            LogUtils.log("消息:"+message);
                            channel.basicReject(envelope.getDeliveryTag(), false);
                        } catch (Exception e) {
                            LogUtils.log("消费者异常,e:" + e.getMessage()+" consumerTag:"+consumerTag);
                        }
                    }
                });
                channel.addShutdownListener(cause -> {
                    LogUtils.log("消费者断开连接:" + cause.getMessage() + " msg=>:" + cause.getCause().toString());
                });
            } catch (Exception e) {
                LogUtils.log("发生异常:"+e);
                e.printStackTrace();
            }
        });
    }

然后将心跳的发送业务关闭

 private final class HeartbeatRunnable implements Runnable {

        private final long heartbeatNanos;

        private HeartbeatRunnable(long heartbeatNanos) {
            this.heartbeatNanos = heartbeatNanos;
        }

        @Override
        public void run() {
            try {
                LogUtils.log("心跳定时器发送");
                long now = System.nanoTime();

                if (now > (lastActivityTime + this.heartbeatNanos)) {
                  //  frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                  //   frameHandler.flush();
                }
            } catch (IOException e) {
                // ignore
            }
        }
    }

运行后,如下:

2023-09-25 09:35:02.976=>连接创建成功
2023-09-25 09:35:02.987=>创建通道成功:1
2023-09-25 09:35:17.948=>心跳定时器发送
2023-09-25 09:35:32.948=>心跳定时器发送
2023-09-25 09:35:47.949=>心跳定时器发送
2023-09-25 09:36:02.949=>心跳定时器发送
2023-09-25 09:36:17.948=>心跳定时器发送
2023-09-25 09:36:32.948=>心跳定时器发送
2023-09-25 09:36:32.960=>消费者断开连接:connection error msg=>:java.net.SocketException: Connection reset
2023-09-25 09:36:32.962=>断开连接:connection error msg=>:java.net.SocketException: Connection reset

结果分析,服务端在3个心跳周期未检测到客户端的心跳后,则会默认客户端已经断线,则将其断开。

四、MissedHeartbeatException分析

在客户端连接MQ成功后,则开始数据服务的读取this._frameHandler.initialize(this);

    private void startIoLoops() {
        if (executorService == null) {
            Thread nioThread = Environment.newThread(
                threadFactory,
                new NioLoop(socketChannelFrameHandlerFactory.nioParams, this),
                "rabbitmq-nio"
            );
            nioThread.start();
        } else {
            this.executorService.submit(new NioLoop(socketChannelFrameHandlerFactory.nioParams, this));
        }
    }

读取线程业务方法,如果frame不为空,则丢失心跳这边重置为0次,反之则开始计数丢失次数

private void readFrame(Frame frame) throws IOException {
        LogUtils.log("开始读取数据");
        if (frame != null) {
            _missedHeartbeats = 0;
            if (frame.getType() == AMQP.FRAME_HEARTBEAT) {
                LogUtils.log("读取数据:心跳"); 
            } else {
                if (frame.getChannel() == 0) { // the special channel
                    _channel0.handleFrame(frame);
                } else {
                    if (isOpen()) { 
                        ChannelManager cm = _channelManager;
                        if (cm != null) {
                            ChannelN channel;
                            try {
                                channel = cm.getChannel(frame.getChannel());
                            } catch(UnknownChannelException e) { 
                                LOGGER.info("Received a frame on an unknown channel, ignoring it");
                                return;
                            }
                            channel.handleFrame(frame);
                        }
                    }
                }
            }
        } else {    
        	LogUtils.log("开始读取数据frame为空"); 
            handleSocketTimeout();
        }
    }

超时机制,如果进入该业务,则_missedHeartbeats 会自动加1,如果超过一定次数,则会跑出MissedHeartbeatException

    private void handleSocketTimeout() throws SocketTimeoutException {
        if (_inConnectionNegotiation) {
            throw new SocketTimeoutException("Timeout during Connection negotiation");
        }
        if (_heartbeat == 0) { // No heart-beating
            return;
        }
        LogUtils.log("handleSocketTimeout-------_missedHeartbeats心跳:"+_missedHeartbeats);
        if (++_missedHeartbeats > (1)) {
            throw new MissedHeartbeatException("Heartbeat missing with heartbeat = " +
                                               _heartbeat + " seconds, for " + this.getHostAddress());
        }
    }

为了方便测试,将心跳设置为10s,将_missedHeartbeats 判断大于1则抛出MissedHeartbeatException异常

4.1测试
2023-09-25 10:21:16.565=>开始读取数据
2023-09-25 10:21:16.651=>开始读取数据
2023-09-25 10:21:16.658=>开始读取数据
2023-09-25 10:21:16.658=>连接创建成功
2023-09-25 10:21:16.669=>开始读取数据
2023-09-25 10:21:16.670=>创建通道成功:1
2023-09-25 10:21:16.671=>开始读取数据
2023-09-25 10:21:16.675=>开始读取数据
2023-09-25 10:21:19.177=>开始读取数据
2023-09-25 10:21:19.177=>开始读取数据frame为空
2023-09-25 10:21:19.177=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:21.659=>开始读取数据
2023-09-25 10:21:21.659=>读取数据:心跳
2023-09-25 10:21:24.160=>开始读取数据
2023-09-25 10:21:24.160=>开始读取数据frame为空
2023-09-25 10:21:24.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:26.659=>开始读取数据
2023-09-25 10:21:26.660=>读取数据:心跳
2023-09-25 10:21:29.161=>开始读取数据
2023-09-25 10:21:29.161=>开始读取数据frame为空
2023-09-25 10:21:29.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:31.661=>开始读取数据
2023-09-25 10:21:31.661=>读取数据:心跳
2023-09-25 10:21:34.161=>开始读取数据
2023-09-25 10:21:34.161=>开始读取数据frame为空
2023-09-25 10:21:34.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:36.662=>开始读取数据
2023-09-25 10:21:36.662=>开始读取数据frame为空
2023-09-25 10:21:36.662=>handleSocketTimeout-------_missedHeartbeats心跳:1
2023-09-25 10:21:36.668=>消费者断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211
2023-09-25 10:21:36.671=>断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211

上面可知,服务端会在心跳周期向客户端发送心跳,如果在客户端没收到任何消息时间段内,MissedHeartbeatException超过一定次数,则将跑出该异常,官方默认是2*4=8文章来源地址https://www.toymoban.com/news/detail-810779.html

到了这里,关于rabbitmq之java.net.SocketException: Connection reset与MissedHeartbeatException分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • javax.net.ssl.SSLException: Connection reset

    https://www.cnblogs.com/colder/p/16612582.html 解决NoHttpResponseException问题 .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) = 禁止连接重用,每次请求不再链接复用,而是创建一个新的链接 https://blog.csdn.net/yzf279533105/article/details/126963611 https://blog.csdn.net/enweitech/article/details/79261439 https://

    2024年02月04日
    浏览(29)
  • java.net.SocketException: Broken pipe原因

    服务端日志中发现 若干条异常 在抛出上述异常时可能还有 Connect reset by peer 异常, 在客户端和服务端建立socket连接后,一方突然掉线/关闭通道,一方还在持续读取或者写入数据就会触发这个异常。触发 Connect reset by peer 后继续写入/读取数据则会抛出 Broken pipe 异常 这个异常简

    2024年02月12日
    浏览(77)
  • Unable to open debugger port (127.0.0.1:13928):java.net.SocketException “

    Unable to open debugger port (127.0.0.1:13928):java.net.SocketException “Interrupted function call: accept failed” 一、问题描述 准备debug启动项目时,报标题错误。 二、原因分析 出现这个报错的原因是因为端口被占用导致的。 三、解决办法 解决方法:关闭占用端口的进程。 第一步:cmd打开命令行窗

    2024年02月03日
    浏览(37)
  • java.net.SocketException: No buffer space available (maximum connections reached?): connect

            最近,有项目反馈调用第三方接口失败,查不到数据。现场用soapUI工具试了下,点执行按钮,一闪而过,接口没有任何返回值。一开始以为是第三方服务问题,后来查看日志,发现是自己的问题,报错如下:         Caused by: java.net.SocketException: No buffer space av

    2024年02月16日
    浏览(27)
  • java.io.IOException: Connection reset by peer

    接口返回的时候报错,java.io.IOException: Connection reset by peer,具体报错信息如下: 原因: 接口返回的数据量太大报错, 解决办法: 修改nginx缓存配置信息。 nginx原配置信息: nginx修改后信息: 注意: \\\"proxy_busy_buffers_size\\\"必须等于或大于\\\"proxy_buffer_size\\\"的最大值。

    2024年02月16日
    浏览(35)
  • 有关 java.io.IOException: Connection reset by peer 解决问题方法之一

    有很多大佬已经终结出现这个错误的原因有一下几种 1. 服务器在接受处理用户请求时,自身的cpu、io、内存、线程等资源都是有最大限制的。当并发请求超过服务器的承载量时,服务器会停掉一些请求。(但是要注意如果实际的并发数量没有超过服务器的承载量,可能中了木

    2024年02月22日
    浏览(51)
  • ES启动报错:Caused by: java.net.SocketException: No buffer space available (maximum connections reached?)

    线上服务的es突然挂掉导致网站无法正常访问,看了下是es服务停止了,于是重启后出现了如下报错,对于报错信息的重点关注: Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): connect 看报错信息是连接数太多,耗尽了某种资源。 查看当前所有的连接,

    2024年02月07日
    浏览(35)
  • HTTP接口调用时报错 java.io.IOException: Connection reset by peer解决办法

    在接口调用时,遇到如下报错: Caused by: java.io.IOException: Connection reset by peer         at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:1.8.0_241]         at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[?:1.8.0_241]         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[?:1.8.0_241]  

    2024年04月26日
    浏览(49)
  • Connection reset(Connection reset具体的解决方案)

    Connection reset的解决方案有如下几个:1、出错了重试;2、客户端和服务器统一使用TCP长连接;3、客户端和服务器统一使用TCP短连接。 首先是Connection reset出错了重试:这种方案可以简单防止“Connection reset”错误,然后如果服务不是“幂等”的则不能使用该方法;比如提交订单

    2024年02月16日
    浏览(34)
  • reset Offset 与connection reset by peer

    某次生产线上,从KafkaManager监控页面,发现还剩几十万未消费数据量,过了几分钟之后,监控页面发现未消费数据量达到了几千万。 定位生产日志,发现消费端 出现日志 reset offset , 结合上下文日志,发现在切换broker leader之后,提交的偏移量在新leader上面找不到,之后根据消

    2024年02月16日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包