Netty学习——源码篇9 Handler其他处理与异步处理

这篇具有很好参考价值的文章主要介绍了Netty学习——源码篇9 Handler其他处理与异步处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 ChannelHandlerContext

        每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系如下图:

Netty学习——源码篇9 Handler其他处理与异步处理,Netty,Netty源码,Netty高级,ChannelHandler,Handler,Future异步,Promise异步

2 Channel的声明周期

        Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。

Netty学习——源码篇9 Handler其他处理与异步处理,Netty,Netty源码,Netty高级,ChannelHandler,Handler,Future异步,Promise异步         一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。

Netty学习——源码篇9 Handler其他处理与异步处理,Netty,Netty源码,Netty高级,ChannelHandler,Handler,Future异步,Promise异步

3 ChannelHandler常用的API

        先看一个Netty中整个Handler体系的类关系图。

Netty学习——源码篇9 Handler其他处理与异步处理,Netty,Netty源码,Netty高级,ChannelHandler,Handler,Future异步,Promise异步        Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是ChannelHandler, ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表

Netty学习——源码篇9 Handler其他处理与异步处理,Netty,Netty源码,Netty高级,ChannelHandler,Handler,Future异步,Promise异步

        Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。 ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个ChannelHandler往下传递到下一个ChannelHandler,直到全部ChannelHandler传递完毕。也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。

4 ChannelInboundHandler

        ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。

Netty学习——源码篇9 Handler其他处理与异步处理,Netty,Netty源码,Netty高级,ChannelHandler,Handler,Future异步,Promise异步

5 异步处理Future

        java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。

        Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();


    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);


    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> sync() throws InterruptedException;

    
    Future<V> syncUninterruptibly();

    
    Future<V> await() throws InterruptedException;

 
    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

  
    boolean await(long timeoutMillis) throws InterruptedException;

   
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

   
    boolean awaitUninterruptibly(long timeoutMillis);


    V getNow();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

        ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。

public interface ChannelFuture extends Future<Void> {

    /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     */
    Channel channel();

    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();

  
    boolean isVoid();
}

6 异步执行Promise

        Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。


/**
 * Special {@link Future} which is writable.
 */
public interface Promise<V> extends Future<V> {

    
    Promise<V> setSuccess(V result);

 
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a failure. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

        ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();

    @Override
    ChannelPromise setSuccess(Void result);

    ChannelPromise setSuccess();

    boolean trySuccess();

    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;

    @Override
    ChannelPromise syncUninterruptibly();

    @Override
    ChannelPromise await() throws InterruptedException;

    @Override
    ChannelPromise awaitUninterruptibly();

    /**
     * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
     */
    ChannelPromise unvoid();
}

        DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下

    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
        }
    }
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

        从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。

        再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。文章来源地址https://www.toymoban.com/news/detail-850126.html

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

到了这里,关于Netty学习——源码篇9 Handler其他处理与异步处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • FastAPI学习-23.异常处理器 exception_handler

    通常我们可以通过 raise 抛出一个 HTTPException 异常,请求参数不合法会抛出 RequestValidationError 异常,这是最常见的2种异常。 向客户端返回 HTTP 错误响应,可以使用  raise 触发  HTTPException 。 默认情况下返回json格式 查看 HTTPException 异常相关源码 HTTPException 异常是继承的 starle

    2024年02月07日
    浏览(35)
  • 【SA8295P 源码分析】25 - QNX Ethernet MAC 驱动 之 emac_isr_thread_handler 中断处理函数源码分析

    【源码分析】 因为一些原因,本文需要移除, 对于已经购买的兄弟,不用担心,不是跑路, 我会继续持续提供技术支持, 有什么模块想学习的,或者有什么问题有疑问的, 请私聊我,我们 +VX 沟通技术问题,一起学习,一起进步 接下来,我一一私聊已经购买的兄弟添加V

    2024年02月11日
    浏览(27)
  • Handler 同步屏障&异步消息

    简单来说,同步屏障就是一套为了让特殊消息更快执行的一个机制。 这里我们假设一个场景:我们向主线程发送了一个UI绘制操作Message,而此时消息队列中的消息非常多,那么这个Message的处理可能会得到延迟,绘制不及时造成界面卡顿。同步屏障机制的作用,是让这个绘制

    2024年02月09日
    浏览(27)
  • 添加https后反向代理gateway报错io.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record

    域名及https绑定在openshift的router上,用route的Edge模式,证书终止在router(证书卸载),转发向后端请求是http的。后端接入nginx做反向代理,所有项目的流量都通过这个nginx。再向后转发是两个nginx,作为本项目的流量入口,有反代和静态文件解析功能。再向后就是服务gateway。

    2024年02月11日
    浏览(42)
  • logstash 开启ssl报错:Caused by: io.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record

    背景 :在使用证书的方式让beat和logstash通信传输数据的过程中,生成了证书,logstash的证书配置如下: 错误现象 :在启动logstsah的时候出现报错: 问题原因 :排错的过程比较辛酸,就不赘述了。这里的报错原因是证书中指定的ip在logstash发布服务的时候没有指定该ip,因此抛

    2024年02月12日
    浏览(40)
  • Vue源码学习 - 异步更新队列 和 nextTick原理

    在我们使用Vue的过程中,基本大部分的 watcher 更新都需要经过 异步更新 的处理。而 nextTick 则是异步更新的核心。 官方对其的定义: 在下次 DOM 更新循环结束之后执行延迟回调。在修改数据之后立即使用这个方法,获取更新后的 DOM。 Vue 可以做到 数据驱动视图更新 ,我们简单

    2024年02月15日
    浏览(30)
  • 构建异步高并发服务器:Netty与Spring Boot的完美结合

    「作者主页」 :雪碧有白泡泡 「个人网站」 :雪碧的个人网站 ChatGPT体验地址 在Java基础中,IO流是一个重要操作,先上八股 BIO:传统的IO,同步阻塞,一个连接一个线程。一般不怎么使用 AIO:JDK7引入的,异步非阻塞IO NIO:JDK1.4之后新的API,是多路复用,允许你一次性处理

    2024年02月03日
    浏览(38)
  • 结合源码拆解Handler机制

    作者:Pingred 当初在讲App启动流程的时候,它的整个流程涉及到的类可以汇总成下面这张图: 那时着重讲了AMS、PMS、Binder这些知识点,有一个是没有对它进行详细讲解的,那就是常见的Handler,它不仅在这个流程里作用在ApplicationThread和ActivityThread进行通信,它在整个安卓体系

    2024年02月11日
    浏览(32)
  • 云安全_什么是云,云计算的本质,没想到一个Handler还有中高级几种问法

    是将计算机终端系统进行虚拟化,以达到桌面使用的安全性和灵活性。可以通过任何设备,在任何地点,任何时间通过网络访问属于我们个人的桌面系统。 存储虚拟化 是对存储硬件资源进行抽象化表现。 网络虚拟化 网络虚拟化就是在一个物理网络上模拟出多个逻辑网络来。

    2024年04月13日
    浏览(35)
  • 从源码分析Handler面试问题

    Handler 老生常谈的问题了,非常建议看一下Handler 的源码。刚入行的时候,大佬们就说 阅读源码 是进步很快的方式。 Handler 的 重要组成部分 Message 消息 MessageQueue 消息队列 Lopper 负责处理MessageQueue中的消息 对照着上面的大的逻辑图,我们深入一下,看一下,一个消息 是如何被

    2024年02月15日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包