【Netty】Netty中的超时处理与心跳机制(十九)

这篇具有很好参考价值的文章主要介绍了【Netty】Netty中的超时处理与心跳机制(十九)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)
  • 自定义解码器、编码器、编解码器(十五)
  • Future 源码分析(十六)
  • Promise 源码分析(十七)
  • 一行简单的writeAndFlush都做了哪些事(十八)

一、超时监测

Netty 的超时类型 IdleState 主要分为以下3类:

  • ALL_IDLE : 一段时间内没有数据接收或者发送。
  • READER_IDLE : 一段时间内没有数据接收。
  • WRITER_IDLE : 一段时间内没有数据发送。

针对上面的 3 类超时异常,Netty 提供了 3 类ChannelHandler来进行监测。

  • IdleStateHandler : 当 Channel 一段时间未执行读取、写入或者两者都未执行时,触发 -IdleStateEvent 事件。
  • ReadTimeoutHandler :在一定时间内未读取任何数据时,引发 ReadTimeoutEvent 事件。
  • WriteTimeoutHandler :当写操作在一定时间内无法完成时,引发 WriteTimeoutEvent 事件。

二、IdleStateHandler类

IdleStateHandler 包括了读\写超时状态处理,观察以下 IdleStateHandler 类的构造函数源码。

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this.writeListener = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();
            IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true;
        }
    };
    this.firstReaderIdleEvent = true;
    this.firstWriterIdleEvent = true;
    this.firstAllIdleEvent = true;
    ObjectUtil.checkNotNull(unit, "unit");
    this.observeOutput = observeOutput;
    if (readerIdleTime <= 0L) {
        this.readerIdleTimeNanos = 0L;
    } else {
        this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (writerIdleTime <= 0L) {
        this.writerIdleTimeNanos = 0L;
    } else {
        this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (allIdleTime <= 0L) {
        this.allIdleTimeNanos = 0L;
    } else {
        this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
    }

}

在上述源码中,构造函数可以接收以下参数:

  • readerIdleTimeSecond:指定读超时时间,指定 0 表明为禁用。

  • writerIdleTimeSecond:指定写超时时间,指定 0 表明为禁用。

  • allIdleTimeSecond:在指定读写超时时间,指定 0 表明为禁用。

IdleStateHandler 使用示例:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("idleStateHandler",new IdleStateHandler(60,30,0));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

public class MyHandler extends ChannelDuplexHandler {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent e = (IdleStateEvent) evt;
            if(e.state() == IdleState.READER_IDLE){
                ctx.close();
            }else if(e.state() == IdleState.WRITER_IDLE){
                ctx.writeAndFlush(new PingMessage());
            }
        }
    }
}

在上述示例中,IdleStateHandler 设置了读超时时间为 60 秒,写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。

  • 如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。
  • 如果 60 秒内没有入站流量(读超时)时,连接关闭。

三、ReadTimeoutHandler类

ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下:

public class ReadTimeoutHandler extends IdleStateHandler {
    private boolean closed;

    public ReadTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public ReadTimeoutHandler(long timeout, TimeUnit unit) {
        super(timeout, 0L, 0L, unit);//禁用了写超时、读写超时
    }

    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;//只处理读超时

        this.readTimedOut(ctx);
    }

    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常
            ctx.close();
            this.closed = true;
        }

    }
}

从上述源码可以看出,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时、读写超时,而且在处理超时时,只会针对 READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。
ReadTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(30));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof ReadTimeoutException){
            //...
        }else {
            super.exceptionCaught(ctx,cause);
        }
    }
}

在上述示例中,ReadTimeoutHandler 设置了读超时时间是 30 秒。

四、WriteTimeoutHandler类

WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下:

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
    private static final long MIN_TIMEOUT_NANOS;
    private final long timeoutNanos;
    private WriteTimeoutHandler.WriteTimeoutTask lastTask;
    private boolean closed;

    public WriteTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public WriteTimeoutHandler(long timeout, TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");
        if (timeout <= 0L) {
            this.timeoutNanos = 0L;
        } else {
            this.timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
        }

    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.timeoutNanos > 0L) {
            promise = promise.unvoid();
            this.scheduleTimeout(ctx, promise);
        }

        ctx.write(msg, promise);
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        WriteTimeoutHandler.WriteTimeoutTask task = this.lastTask;

        WriteTimeoutHandler.WriteTimeoutTask prev;
        for(this.lastTask = null; task != null; task = prev) {
            task.scheduledFuture.cancel(false);
            prev = task.prev;
            task.prev = null;
            task.next = null;
        }

    }

    private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {
        WriteTimeoutHandler.WriteTimeoutTask task = new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);
        task.scheduledFuture = ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);
        if (!task.scheduledFuture.isDone()) {
            this.addWriteTimeoutTask(task);
            promise.addListener(task);
        }

    }

    private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (this.lastTask != null) {
            this.lastTask.next = task;
            task.prev = this.lastTask;
        }

        this.lastTask = task;
    }

    private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (task == this.lastTask) {
            assert task.next == null;

            this.lastTask = this.lastTask.prev;
            if (this.lastTask != null) {
                this.lastTask.next = null;
            }
        } else {
            if (task.prev == null && task.next == null) {
                return;
            }

            if (task.prev == null) {
                task.next.prev = null;
            } else {
                task.prev.next = task.next;
                task.next.prev = task.prev;
            }
        }

        task.prev = null;
        task.next = null;
    }

    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            this.closed = true;
        }

    }

  //...
}

从上述源码可以看出,WriteTimeoutHandler 在处理超时时,引发了 WriteTimeoutException 异常。
WriteTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("writeTimeoutHandler",new WriteTimeoutHandler(30));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof WriteTimeoutException ){
            //...
        }else {
            super.exceptionCaught(ctx,cause);
        }
    }
}

在上述示例中,WriteTimeoutHandler 设置了写超时时间是 30 秒。

五、实现心跳机制

针对超时的解决方案——心跳机制。
在程序开发中,心跳机制是非常常见的。其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段小的通信。

5.1. 定义心跳处理器

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
	
	// (1)心跳内容
	private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
			.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
					CharsetUtil.UTF_8));  

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
			throws Exception {

		// (2)判断超时类型
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			String type = "";
			if (event.state() == IdleState.READER_IDLE) {
				type = "read idle";
			} else if (event.state() == IdleState.WRITER_IDLE) {
				type = "write idle";
			} else if (event.state() == IdleState.ALL_IDLE) {
				type = "all idle";
			}

			// (3)发送心跳
			ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
					ChannelFutureListener.CLOSE_ON_FAILURE);
 
			System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}
}

对上述代码说明:

  • 定义了心跳时,要发送的内容。

  • 判断是不是 IdleStateEvent 事件,是则处理。

  • 将心跳内容发送给客户端。

5.2. 定义 ChannelInitializer

HeartbeatHandlerInitializer用于封装各类ChannelHandler,代码如下:

public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {

	private static final int READ_IDEL_TIME_OUT = 4; // 读超时
	private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
	private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
				WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // (1)
		pipeline.addLast(new HeartbeatServerHandler()); // (2)
	}
}

对上述代码说明如下:

  • 添加了一个IdleStateHandler到 ChannelPipeline,并分别设置了读、写超时的时间。为了方便演示,将超时时间设置的比较短。
  • 添加了HeartbeatServerHandler,用来处理超时时,发送心跳。

5.3. 编写服务器

服务器代码比较简单,启动后侦听 8083 端口。

public final class HeartbeatServer {

    static final int PORT = 8083;

    public static void main(String[] args) throws Exception {

        // 配置服务器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new HeartbeatHandlerInitializer());

            // 启动
            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

5.4. 测试

首先启动 HeartbeatServer,客户端用操作系统自带的 Telnet 程序即可:

telnet 127.0.0.1 8083

可以看到客户端与服务器的交互效果如下图。
【Netty】Netty中的超时处理与心跳机制(十九)

结语

文章如果对你有帮助,看完记得点赞、关注、收藏。文章来源地址https://www.toymoban.com/news/detail-464639.html

到了这里,关于【Netty】Netty中的超时处理与心跳机制(十九)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RPC教程 4.超时处理机制

    对比原教程,这里使用context来处理子协程的泄露问题。 超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加

    2024年01月24日
    浏览(42)
  • websocket超时重连、心跳检测

    在单个TCP连接上进行全双工通信的协议,可以实现服务端和客户端双向推送信息的协议。我们在使用webscoket通信时必须要注意的问题超时重连和心跳检测。 超时重连:当出现错误时客户端尝试重新连接websocket。 心跳检测:客户端长时间没接收到服务端消息,就向服务端发送

    2024年02月11日
    浏览(43)
  • Java IO流(五)Netty实战[TCP|Http|心跳检测|Websocket]

    Server端  Client 运行结果 Server Client 运行结果  注意: 需要调整readerIdleTime|writerIdleTime|allIdleTime参数才会显示对应超时信息 服务端 客户端(浏览器) 效果图

    2024年02月11日
    浏览(35)
  • 《Netty》从零开始学netty源码(五十九)之ServerBootstrapAcceptor

    前面初始化channel的过程中向pipeline中添加了一个channelHandler,即 ServerBootstrapAcceptor ,它的作用主要是将worker组的channel进行注册,它的数据结构如下: 它的属性主要是通过 ServerBootstrap 启动类设置的,它的方法主要是 channelRead() 方法,其过程如下: 在第五十八篇中,当EventLo

    2024年02月05日
    浏览(64)
  • 《Netty》从零开始学netty源码(四十九)之PoolArena

    Netty中分配内存是委托给PoolArena来管理的,它主要有两个实现类: 默认情况下使用的DirectArena,它的数据结构如下: 从属性中我们看到PoolArena主要分配三种类型的内存,小于32KB的分配small类型的PoolSubpage,存储在smallSubpagePools,32KB~4MB的分配normal类型的PoolChunk,根据其利用率的

    2024年02月02日
    浏览(35)
  • 大量删除hdfs历史文件导致全部DataNode心跳汇报超时为死亡状态问题解决

    背景: 由于测试环境的磁盘满了,导致多个NodeManager出现不健康状态,查看了下,基本都是data空间满导致,不是删除日志文件等就能很快解决的,只能删除一些历史没有用的数据。于是从大文件列表中,找出2018年的spark作业的历史中间文件并彻底删除(跳过回收站) 问题产生过

    2024年02月14日
    浏览(44)
  • 【计算机网络】 心跳机制

    应用场景 在长连接下,有可能很长一段时间没有数据往来。理论上说,这个连接是一直保持连接的,但是实际情况中,如果中间节点出现什么故障是难以知道的。更要命的是,有的节点(防火墙)会自动把一定时间之内没有数据交互 的连接给断掉。在这个时候,就需要我们

    2024年02月08日
    浏览(33)
  • WebSocket心跳机制

    WebSocket是HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术,属于应用层协议。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。 1、创建webSocket 2、websocket事件 事件 事件处理程序 描述 open Socket.onopen 连接建立时触发

    2024年02月15日
    浏览(58)
  • websocket 心跳机制

    WebSocket 是一种在客户端和服务器之间创建持久连接的技术。为了保持连接的稳定性,就需要通过发送心跳消息来维持 WebSocket 连接。 1、创建一个webscoket基本的使用 2、在客户端连接到 WebSocket 服务器之后,通过 setInterval 方法定时发送心跳消息 这边的代码会每隔5秒向服务器发

    2024年02月11日
    浏览(40)
  • WebSocket心跳机制(笔记大全)

    一、WebSocket心跳机制前端 前端实现WebSocket心跳机制的方式主要有两种: 使用setInterval定时发送心跳包。 在前端监听到WebSocket的onclose()事件时,重新创建WebSocket连接。 第一种方式会对服务器造成很大的压力,因为即使WebSocket连接正常,也要定时发送心跳包,从而消耗服务器资

    2024年02月15日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包