精密数据工匠:探索 Netty ChannelHandler 的奥秘

这篇具有很好参考价值的文章主要介绍了精密数据工匠:探索 Netty ChannelHandler 的奥秘。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

通过上篇文章(Netty入门 — Channel,把握 Netty 通信的命门),我们知道 Channel 是传输数据的通道,但是有了数据,也有数据通道,没有数据加工也是没有意义的,所以今天学习 Netty 的第四个组件:ChannelHandler ,它是 Netty 的数据加工厂。

精密数据工匠:探索 Netty ChannelHandler 的奥秘,# 死磕 Netty,死磕 Netty,死磕 Java,ChannelHandler

ChannelHandler

在上篇文章(Netty入门 — Channel,把握 Netty 通信的命门)中,大明哥提到:EventLoop 接收到 Channel 的 I/O 事件后会将该事件转交给 Handler 来处理,这个 Handler 就是 ChannelHandler。ChannelHandler 它是对 Netty 输入输出数据进行加工处理的载体,它包含了我们应用程序业务处理的逻辑

在整个 Netty 生产线上,它就是那个埋头苦干,一心干活的工人。

Channel 的状态处理

在介绍 Channel 的时候(Netty入门 — Channel,把握 Netty 通信的命门)我们知道,Channel 是有状态的,而且 Channel 也提供了判断 Channel 当前状态的 API,如下:

  • isOpen():检查 Channel 是否为 open 状态。
  • isRegistered():检查 Channel 是否为 registered 状态。
  • isActive():检查 Channel 是否为 active 状态。

上面三个 API 对应了 Channel 四个状态:

状态 描述
ChannelUnregistered Channel 已经被创建,但还未注册到 EventLoop。此时 isOpen() 返回 true,但 isRegistered() 返回 false。
ChannelRegistered Channel 已经被注册到 EventLoop。此时 isRegistered() 返回 true,但 isActive() 返回 false。
ChannelActive Channel 已经处理活动状态并可以接收与发送数据。此时 isActive() 返回 true。
ChannelInactive Channel 没有连接到远程节点

状态变更如下:

精密数据工匠:探索 Netty ChannelHandler 的奥秘,# 死磕 Netty,死磕 Netty,死磕 Java,ChannelHandler

当 Channel 的状态发生改变时,会生成相对应的事件,这些事件会被转发给 ChannelHandler,而 ChannelHandler 中会有相对应的方法来对其进行响应。在 ChannelHandler 中定义一些与这生命周期相关的 API,如 channelRegistered()channelUnregistered()channelActive()channelInactive()等等,后面大明哥会详细介绍这些 API。

ChannelHandler 的家族

ChannelHandler 的大家族如下图:

精密数据工匠:探索 Netty ChannelHandler 的奥秘,# 死磕 Netty,死磕 Netty,死磕 Java,ChannelHandler

ChannelHandler 是整个家族中的顶层接口,它有两大子接口,ChannelInBoundHandler 和 ChannelOutBoundHandler,其中 ChannelInBoundHandler 用于处理入站数据(读请求),ChannelOutBoundHandler 用于处理出站数据(写请求),他们两个接口都有一个相对应的默认实现,即 XxxxAdapter。

ChannelHandler

ChannelHandler 作为顶层接口,它并不具备太多功能,它仅仅只提供了三个 API:

API 描述
handlerAdded() 当ChannelHandler 添加到 ChannelPipeline 中时被调用
handlerRemoved() 当 ChannelHandler 被从 ChannelPipeline 移除时调用
exceptionCaught() 当 ChannelHandler 在处理过程中出现异常时调用

从 ChannelHandler 提供的 API 中我们可以看出,它并不直接参与 Channel 的数据加工过程,而是用来响应 ChannelPipeline 链和异常处 理的,对于 Channel 的数据加工则由它的子接口处理:

  • ChannelInboundHandler:拦截&处理各种入站的 I/O 事件
  • ChannelOutboundHandler:拦截&处理各种出站的 I/O 事件

这里解释下出站和入站的概念:

  • 接收对方传输的数据并处理,即为入站
  • 向对方写数据时,即为出站
  • 如:客户端 —> 服务端:服务端读取客户端消息为入站,服务端处理消息完后给客户端返回消息为出站

ChannelInboundHandler

ChannelInboundHandler 用来处理和拦截各种入站的 I/O 事件,它可以处理的事件如下:

响应方法 触发事件
channelRegistered Channel 被注册到EventLoop 时
channelUnregistered Channel 从 EventLoop 中取消时
channelActive Channel 处理活跃状态,可以读写时
channelInactive Channel 不再是活动状态且不再连接它的远程节点时
channelReadComplete Channel 从上一个读操作完成时
channelRead Channel 读取数据时
channelWritabilityChanged Channel 的可写状态发生改变时
userEventTriggered ChannelInboundHandler.fireUserEventTriggered()方法被调用时

一般情况我们不直接使用 ChannelInboundHandler,而是使用它的实现类 ChannelInboundHandlerAdapter。我们写业务处理时直接继承 ChannelInboundHandlerAdapter ,然后重写你感兴趣的 I/O 事件响应方法即可,比如你想处理 Channel 读数据,那重写 channelRead() 即可,代码如下:

public class ChannelHandlerTest_1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // do something
    }
}

但是在使用 ChannelInboundHandlerAdapter 的时候,需要注意的是我们需要显示地释放与池化 ByteBuf 实例相关的内存,Netty 为此专门提供了一个方法 ReferenceCountUtil.release(),即我们需要在 ChannelInboundHandler 的链的末尾需要使用该方法来释放内存,如下:

public class ByteBufReleaseHandler extends ChannelInboundHandlerAdapter{
  @Override
  public void channelRead(ChannelHandlerContext ctx,Object msg){
    //释放msg
    ReferenceCountUtil.release(msg);
  }
}

但是有些小伙伴有时候会忘记这点,会带来不必要的麻烦,那有没有更好的方法呢?Netty 提供了一个类来帮助我们简化这个过程: SimpleChannelInboundHandler,对于我们业务处理的类,采用继承 SimpleChannelInboundHandler 而不是 ChannelInboundHandlerAdapter 就可以解决了。

public class ChannelHandlerTest_1 extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // do something
    }
}

使用 SimpleChannelInboundHandler 我们就不需要显示释放资源了,是不是非常人性化。

ChannelOutboundHandler

ChannelOutboundHandler 是拦截和处理出站的各种 I/O 事件的。处理的事件如下:

响应方法 触发事件
bind 请求将 Channel 绑定到本地地址时
connect 请求将 Channel 连接到远程节点时
disconnect 请求将 Channel 从远程节点断开时
close 请求关闭 Channel 时
dderegister 请求将 Channel 从它的 EventLoop 注销时
read 请求从 Channel 中读取数据时
flush 请求通过 Channel 将入队数据刷入远程节点时
write 请求通过 Channel 将数据写入远程节点时

与 ChannelInboundHandler 一样,我们也不直接使用 ChannelOutboundHandler 接口,而是使用它的默认实现类 ChannelOutboundHandlerAdapter,重写我们想处理的 I/O 事件的响应方法就可以了。

ChannelHandler 的示例

ChannelHandler 生命周期

大明哥通过一个示例来告诉你 ChannelHandler 在整个执行过程中的生命周期是怎么样的,响应方法调用的顺序是如何的。我们只有了解了整个生命周期的运行,才能在合适的生命周期响应方法里面扩展我们自己的业务功能。

  • 服务端代码
public class ChannelHandlerTest_1_server extends ChannelInboundHandlerAdapter {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelHandlerTest());
                    }
                })
                .bind(8081);
    }

    private static class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println(LocalTime.now().toString() + "--handlerAdded:handler 被添加到 ChannelPipeline");
            super.handlerAdded(ctx);
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println(LocalTime.now().toString() + "--channelRegistered:Channel 注册到 EventLoop");
            super.channelRegistered(ctx);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(LocalTime.now().toString() + "--channelActive:Channel 准备就绪");
            super.channelActive(ctx);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(LocalTime.now().toString() + "--channelRead:Channel 中有可读数据");

            super.channelRead(ctx, msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println(LocalTime.now().toString() + "--channelReadComplete:Channel 读取数据完成");
            super.channelReadComplete(ctx);
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(LocalTime.now().toString() + "--channelInactive:Channel 被关闭,不在活跃");
            super.channelInactive(ctx);
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println(LocalTime.now().toString() + "--channelUnregistered:Channe 从 EventLoop 中被取消");
            super.channelUnregistered(ctx);
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println(LocalTime.now().toString() + "--handlerRemoved:handler 从 ChannelPipeline 中移除");
            super.handlerRemoved(ctx);
        }
    }
}

服务端重写整个生命周期中的各个方法。

  • 客户端
public class ChannelHandlerTest_1_client extends ChannelInboundHandlerAdapter {
    public static void main(String[] args) throws Exception {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect("127.0.0.1",8081)
                .sync()
                .channel();

        for (int i = 1 ; i <= 2 ; i++) {
            channel.writeAndFlush("hello,i am ChannelHander client -" + i);
            TimeUnit.SECONDS.sleep(2);
        }

        channel.close();
    }
}

客户端连接服务端后,每个 2 秒向服务端发送一次消息,共发两次。发送完消息后,再过 2 秒关闭连接。

  • 运行结果
11:45:09.456--handlerAdded:handler 被添加到 ChannelPipeline
11:45:09.457--channelRegistered:Channel 注册到 EventLoop
11:45:09.457--channelActive:Channel 准备就绪

11:45:09.474--channelRead:Channel 中有可读数据
11:45:09.475--channelReadComplete:Channel 读取数据完成

11:45:11.397--channelRead:Channel 中有可读数据
11:45:11.397--channelReadComplete:Channel 读取数据完成

11:45:13.405--channelReadComplete:Channel 读取数据完成
11:45:13.407--channelInactive:Channel 被关闭,不在活跃
11:45:13.407--channelUnregistered:Channe 从 EventLoop 中被取消
11:45:13.407--handlerRemoved:handler 从 ChannelPipeline 中移除

结果分析如下:

  • 服务端检测到客户端发起连接后,会将要处理的 Handler 添加到 ChannelPipeline 中,然后将 Channel 注册到 EventLoop,注册完成后,Channel 准备就绪处于活跃状态,可以接收消息了
  • 客户端向服务端发送消息,服务端读取消息
  • 当服务端检测到客户端已关闭连接后,该 Channel 就被关闭了,不再活跃,然后将该 Channel 从 EventLoop 取消,并将 Handler 从 ChannelPipeline 中移除。

在整个生命周期中,响应方法执行顺序如下:

  1. 建立连接handlerAdded() -> channelRegistered() -> channelActive ()
  2. 数据请求channelRead() -> channelReadComplete()
  3. 关闭连接channelReadComplete() -> channelInactive() -> channelUnregistered() -> handlerRemoved()

这里有一点需要注意,为什么关闭连接会响应 channelReadComplete() 呢?这里埋个点,后续大明哥在来说明,有兴趣的小伙伴可以先研究研究。

精密数据工匠:探索 Netty ChannelHandler 的奥秘,# 死磕 Netty,死磕 Netty,死磕 Java,ChannelHandler

这里大明哥对 ChannelHandler 生命周期的方法做一个总结:

  1. handlerAdded():ChannelHandler 被加入到 Pipeline 时触发。当服务端检测到新链接后,会将 ChannelHandler 构建成一个双向链表(下篇文章介绍),该方法被触发表示在当前 Channel 中已经添加了一个 ChannelHandler 业务处理链了》。
  2. channelRegistered():当 Channel 注册到 EventLoop 中时被触发。该方法被触发了,表明当前 Channel 已经绑定到了某一个 EventLoop 中了。
  3. channelActive():Channel 连接就绪时触发。该方法被触发,说明当前 Channel 已经处于活跃状态了,可以进行数据读写了。
  4. channelRead():当 Channel 有数据可读时触发。客户端向服务端发送数据,都会触发该方法,该方法被调用说明有数据可读。而且我们自定义业务 handler 时都是重写该方法。
  5. channelReadComplete():当 Channel 数据读完时触发。服务端每次读完数据后都会触发该方法,表明数据已读取完毕。
  6. channelInactive():当 Channel 断开连接时触发。该方法被触发,说明 Channel 已经不再是活跃状态了,连接已经关闭了。
  7. channelUnregistered():当 Channel 取消注册时触发:连接关闭后,我们就要取消该 Channel 与 EventLoop 的绑定关系了。
  8. handlerRemoved():当 ChannelHandler 被从 ChannelPipeline 中移除时触发。将与该 Channel 绑定的 ChannelPipeline 中的 ChannelHandler 业务处理链全部移除。

ChannelHandler 业务开发

上面只是一个很简单的 ChannelHandler 实例,但是我们在实际项目开发中,要处理的业务复制多了,它肯定是由多个业务组合而成,那我们又该如何去做呢?

一个 ChannelHandler

伪代码如下:

public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Map<String,String> map=(Map<String,String>)msg;
        String code=map.get("code");
        if(code.equals("xxx1")){
            //do something 1
        }else if(code.equals("xxx2")){
            //do something 2
        }else{
            // do something 3
        }
    }
}

这种实现方式简单,非常容易实现,但是如果业务较多的情况下,该 Handler 的处理逻辑会非常臃肿,而且很不好维护,而且这么多 if…else ,看起来就烦,当然我们可以采用相对应的方式来干掉 if…else ,例如利用策略模式:

public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
    HashMap<String,HandlerService) handlerServiceMap = new HashMap();
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Map<String,String> map=(Map<String,String>)msg;
        String code=map.get("code");
        handlerServiceMap.get(code).doSomething(map);
    }
}

这种方式也行,但他把所有的业务都耦合在一起了,终究不是那么优雅。

多个 ChannelHandler

我们可以定义多个 ChannelHandler,根据 code 来判断,如果 code 是自己的则处理,否则流转到下一个节点:

public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Map<String,String> map=(Map<String,String>)msg;
        String code=map.get("code");
        if(code.equals("xxx")){
            //自己的业务,处理
        }else{
            //不是自己的,流转到下一个节点
            super.channelRead(ctx, msg);
        }
    }
}

这种方式将所有业务解耦了,每个 Handler 各司其职,所有业务不再是都耦合在一起了,后期维护更加轻松。这种方式和上面方式的一样,依赖 code,他们都需要服务端和客户端都维护一套 code,而这个 code 如果还不能轻易发生变更。

自定义类型

根据客户端提交的参数类型,自动流转到相对应的 ChannelHandler。

public class ChannelHandlerTest extends SimpleChannelInboundHandler<User> {
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, User user) throws Exception {
        // do something
    }
}

这种方式是最优雅的,也是我们使用最多的方式。他的优点明显,1. 业务解耦,2. 不需要维护码表。

总结

  1. ChannelHandler 是 Netty 中真正做事的组件,EventLoop 将监听到的 I/O 事件转发后,就由 ChannelHandler 来处理,它也是我们编写 Netty 代码最多的地方。
  2. ChannelHandler 作为顶层接口,它一般不会负责具体业务 I/O 事件,具体的业务 I/O 事件由它两个子接口负责:
    1. ChannelInboundHandler:负责拦截响应入站 I/O 事件
    2. ChannelOutboundHandler:负责拦截响应出站 I/O 事件
  3. 我们自定义的业务 Handler ,一般不会直接实现 ChannelInboundHandler 和 ChannelOutboundHandler,而是继承他们两个的默认实现类:ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,这样我们就可以只需要关注我们想关注的 I/O 事件即可。
  4. 在这节内容中,最重要的是理解 ChannelHandler 的生命周期方法,在文章总已多次总结了,这里不再阐述了。

虽然 ChannelHandler 是一个好的打工人,但是它没办法一个人干所有的活啊,他需要有其他的 ChannelHandler 来配合它,这个时候怎么办?大明哥下篇文章揭晓!

示例源码:http://suo.nz/1v8w02文章来源地址https://www.toymoban.com/news/detail-734724.html

到了这里,关于精密数据工匠:探索 Netty ChannelHandler 的奥秘的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 探索数据的奥秘:一份深入浅出的数据分析入门指南

    书籍推荐 入门读物 深入浅出数据分析 啤酒与尿布 数据之美 数学之美 数据分析 Scipy and Numpy Python for Data Analysis Bad Data Handbook 集体智慧编程 Machine Learning in Action 机器学习实战 Building Machine Learning Systems with Python 数据挖掘导论 Machine Learning for Hackers 专业读物 Introduction to Semi-Su

    2024年01月21日
    浏览(35)
  • 流式数据处理与高吞吐消息传递:深入探索Kafka技术的奥秘

    Kafka 是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由 LinkedIn 公司开发,使用Scala 语言编写,目前是 Apache 的开源项目。 Kafka 概念 Zookeeper 集群是一个基于主从复制的高可用集群,每个服务器承担如下三种角色中的一种 ZooKeeper中常见的角色: 领导者(Leader): 

    2024年02月09日
    浏览(38)
  • 结构化GPT用例,在CSDN私密社区中死磕@ada 探索SpringBoot

    摘要:bbs.csdn.net 有私密社区。csdn的社区里支持@ada (会员权益)评论和连续对话。本文展示了在私密社区层层结构化地死磕@ada,通过构建的方式探索技术问题。【learning by doing】。这个过程是从【learner】为中心出发的,产生了【一个又一个无法预估的思考链条】,learner在这个

    2024年02月10日
    浏览(35)
  • 探索 3D 图形处理的奥秘

    最近一年多来,在 3Dfx、Intel 们的狂轰滥炸中,在 Quake、古墓丽影们的推波助澜下,三维图形已经成为计算机迷眼中的又一个热点。3D 世界到底是怎样的神奇,我们又是怎样享受它的乐趣呢?就让我们来一探究竟吧。 为真正掌握3D图形,必须先打好一个牢固的基础。显然,最

    2024年02月03日
    浏览(31)
  • 探索未来世界,解密区块链奥秘!

    你是否曾好奇,区块链是如何影响着我们的生活与未来?想要轻松了解这个引领着技术革命的概念吗?那么这本令人着迷的新书《区块链导论》绝对值得你拥有! 内容丰富多彩,让你轻松掌握: **1章:区块链概述** 了解令人惊叹的区块链世界,从概念到价值,从发展历程到

    2024年02月11日
    浏览(34)
  • 解锁创意灵感,探索FlutterExampleApps项目的奥秘

    FlutterExampleApps项目是一个包含各种示例应用链接的仓库,旨在演示Flutter应用开发中的各种功能、特性和集成。 项目包含了以下几个部分,每个部分都涵盖了不同的内容和主题: Clones/Apps:包含各种应用的克隆或原创应用示例。 Flutter For Web / Desktop / Responsive:针对Web、桌面和

    2024年02月21日
    浏览(38)
  • 探索 PlanetIX:解读区块链游戏运营的奥秘

    作者: daniel@footprint.network 熊市之中,PlanetIX 成长为最强的 Web3 游戏,在 Polygon 网络上独占鳌头。而其开发团队深度使用了 Footprint Analtics 的零代码数据分析平台和-GameFi 的数据 API 来提升用户的游戏体验。  近日,Footprint 与 PlanetIX 的 CMO Melly 一同讨论了 GameFi 的现状和未来,以

    2024年02月09日
    浏览(41)
  • 【探索排序算法的奥秘】一文初步详解八大排序

    👻作者简介:M malloc,致力于成为嵌入式大牛的男人 👻专栏简介:本文收录于 初阶数据结构 ,本专栏主要内容讲述了初阶的数据结构,如顺序表,链表,栈,队列等等,专为小白打造的文章专栏。 👻相关专栏推荐:LeetCode刷题集,C语言每日一题。 本章我将详细的讲解关于

    2024年02月14日
    浏览(34)
  • 解密键盘输入:探索设备控制器的奥秘

    键盘是我们最常用的输入硬件设备之一。作为程序员,你知道当我们敲击键盘上的字母\\\"A\\\"时,操作系统会发生什么吗?下面我将简要介绍整个过程,以便你更容易理解为什么需要这些组件。 首先,让我们来看看CPU的硬件架构图。 CPU内部的内存接口需要通过系统总线和I/O桥接

    2024年02月09日
    浏览(25)
  • 探索操作系统:内核、启动和系统调用的奥秘

    首先,对于有科班背景的读者,可以跳过本系列文章。这些文章的主要目的是通过简单易懂的汇总,帮助非科班出身的读者理解底层知识,进一步了解为什么在面试中会涉及这些底层问题。否则,某些概念将始终无法理解。这些计算机基础文章将为你打通知识的任督二脉,祝

    2024年02月11日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包