4.netty源码分析

这篇具有很好参考价值的文章主要介绍了4.netty源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.pipeline调用handler的源码

//pipeline得到双向链表的头,next到尾部,

2.心跳源码 主要分析IdleStateHandler3个定时任务内部类
//考虑了网络传输慢导致出站慢的情况
//超时重新发送,然后关闭

ReadTimeoutHandler(继承IdleStateHandler 直接关闭连接)和WriteTimeoutHandler(继承ChannelOutboundHandlerAdapter 使用定时任务来完成)
//NioEventLoop进行事件循环

 @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio; //默认 50
                if (ioRatio == 100) { //如果被占用,就处理事件
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks(); //执行任务
                    }
                } else {  //如果没有被占用
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();  //选择一个key
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);   //ioRatio默认是50 ,计算出来是1.所以定时任务执行了 ioTime没有响应的时间(妙啊,如果执行1s或者是其他时间那可能还是检测不到心跳)
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

//IdleStateHandler

3.eventLoop执行定时任务的源代码(传入并执行队列)

1.select 默认阻塞1秒

4.任务加入异步线程池(处理耗时任务时)(因为执行任务读写 和执行读写后执行Loop任务是同一个线程)
就不会阻塞netty的IO

1.handler加入线程池(自己创建group线程池)
2.context加入线程池

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package source.echo2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.concurrent.Callable;

/**
 * Handler implementation for the echo server.
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    // group 就是充当业务线程池,可以将任务提交到该线程池
    // 这里我们创建了16个线程
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("EchoServer Handler 的线程是=" + Thread.currentThread().getName());

        //按照原来的方法处理耗时任务

        //解决方案2 用户程序自定义的普通任务

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    //输出线程名
                    System.out.println("EchoServerHandler execute 线程是=" + Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));

                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    //输出线程名
                    System.out.println("EchoServerHandler execute 线程2是=" + Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));

                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        //方式1
//        //将任务提交到 group线程池
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//
//                //接收客户端信息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, "UTF-8");
                //休眠10秒
                Thread.sleep(10 * 1000);
//                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
//                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
//                return null;
//
//            }
//        });
//
//        //将任务提交到 group线程池
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//
//                //接收客户端信息
//                ByteBuf buf = (ByteBuf) msg;
//                byte[] bytes = new byte[buf.readableBytes()];
//                buf.readBytes(bytes);
//                String body = new String(bytes, "UTF-8");
//                //休眠10秒
//                Thread.sleep(10 * 1000);
//                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
//                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
//                return null;
//
//            }
//        });
//
//
//        //将任务提交到 group线程池
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//
//                //接收客户端信息
//                ByteBuf buf = (ByteBuf) msg;
//                byte[] bytes = new byte[buf.readableBytes()];
//                buf.readBytes(bytes);
//                String body = new String(bytes, "UTF-8");
//                //休眠10秒
//                Thread.sleep(10 * 1000);
//                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
//                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
//                return null;
//
//            }
//        });



        //普通方式
        //接收客户端信息
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, "UTF-8");
        //休眠10秒
        Thread.sleep(10 * 1000);
        System.out.println("普通调用方式的 线程是=" + Thread.currentThread().getName());
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));

        System.out.println("go on ");

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        //cause.printStackTrace();
        ctx.close();
    }
}

5.netty源码
//面试

1.连接 ServerBootstrap空的构造器 作用为初始化类的成员
//启动bind

6.RPC(remote procedure call) call function like local machine

1.usual framework , ali Dubbo google gRPC ,Go rpcx
apache thrift ,spring cloud
pic 17.rpc procedure

  1. imitate dubbo RPC 通过jdk反射实现

// server handler规定 msg开头是规定的字符串(或者协议)
msg.toString().startsWith(“helloService#hello”)
//callable可以在服务器和客户端之间使用,wait()然后notify() (在同一机器)
//read调用只一次,然后调用call(netty的pipeline自动调用)
//server和client,只需按照需要的名字来调用

//下面是客户端传递参数给服务端,然后服务端调用被代理的实现的接口

//创建消费者,调用远程服务

public class ClientBootstrap {


    //这里定义协议头(调用的方法名)
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws  Exception{

        //创建一个消费者
        NettyClient customer = new NettyClient();

        //创建代理对象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (;;) {
            Thread.sleep(2 * 1000);
            //通过代理对象调用服务提供者的方法(服务)
            String res = service.hello("你好 dubbo~");
            System.out.println("调用的结果 res= " + res);
        }

    }
}

//创建客户端创建代理对象,和初始化客户端添加handler

public class NettyClient {

    //创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    //编写方法使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");
                    //{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码
                    if (client == null) {
                        initClient();
                    }
                    //设置要发给服务器端的信息
                    //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                    client.setPara(providerName + args[0]);
                    return executor.submit(client).get();

                });
    }

    //初始化客户端
    private static void initClient() {
        client = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//客户端handler,处理来自服务器的请求,服务器有数据通知线程,没有就等待

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数


    //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被调用  ");
        context = ctx; //因为我们在其它方法会使用到 ctx
    }

    //收到服务器的数据后,调用方法 (4)
    //
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        notify(); //服务器返回数据,唤醒等待的线程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被调用  ");
        context.writeAndFlush(para); //这里的context是client自己的,
        //进行wait
        wait(); //等待channelRead 方法获取到服务器的结果后,唤醒
        System.out.println(" call2 被调用  ");
        return  result; //服务方返回的结果

    }
    //(2)
    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

//创建服务器端,添加服务器handler

public class NettyServer {


    public static void startServer(String hostName, int port) {
        startServer0(hostName,port);
    }

    //编写一个方法,完成对NettyServer的初始化和启动

    private static void startServer0(String hostname, int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //业务处理器

                                      }
                                  }

                    );

            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服务提供方开始提供服务~~");
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
    public static void main(String[] args) {

        //代码代填..
        NettyServer.startServer("127.0.0.1", 7000);
    }
}
//服务器handler,如果接收到客户端的请求,得到字符,传入参数调用实现类方法
    
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送的消息,并调用服务
        System.out.println("msg=" + msg);
        //客户端在调用服务器的api 时,我们需要定义一个协议
        //比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"
        if(msg.toString().startsWith(ClientBootstrap.providerName)) {
            msg.toString().lastIndexOf("#")
            String substring = msg.toString().substring(msg.toString().lastIndexOf("#") + 1);
            String result = new HelloServiceImpl().hello(substring);
            ctx.writeAndFlush(result); //向客户端写回返回的结果
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

//接口(客户端需要,使用jdk代理可以代理接口,使用cglib可以直接代理类)
//这个是接口,是服务提供方和 服务消费方都需要文章来源地址https://www.toymoban.com/news/detail-628169.html

public interface HelloService {

    String hello(String mes);
}

//实现类
public class HelloServiceImpl implements HelloService {

    private static int count = 0;
    //当有消费方调用该方法时, 就返回一个结果
    @Override
    public String hello(String mes) {
        System.out.println("收到客户端消息=" + mes);
        //根据mes 返回不同的结果
        if(mes != null) {
            return "你好客户端, 我已经收到你的消息 [" + mes + "] 第" + (++count) + " 次";
        } else {
            return "你好客户端, 我已经收到你的消息 ";
        }
    }
}

到了这里,关于4.netty源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 13.Netty源码之Netty中的类与API

    Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中ServerBootstrap 是服务端启动引导类。 java //泛型 AbstractBootstrapB extends AbstractBootstrapB, C, C extends Channel ​ ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel ​

    2024年02月15日
    浏览(52)
  • 《Netty》从零开始学netty源码(五十四)之PoolThreadLocalCache

    前面讲到 PoolThreadCache ,它为线程提供内存缓存,当线程需要分配内存时可快速从其中获取,在Netty中用 PoolThreadLocalCache 来管理 PoolThreadCache ,它的数据结构如下: PoolThreadLocalCache 相当于java的 ThreadLocal ,我们知道 ThreadLocal 中维护的是 ThreadLocalMap ,使用hashcode来做下标,而N

    2024年02月03日
    浏览(32)
  • 《Netty》从零开始学netty源码(五十九)之ServerBootstrapAcceptor

    前面初始化channel的过程中向pipeline中添加了一个channelHandler,即 ServerBootstrapAcceptor ,它的作用主要是将worker组的channel进行注册,它的数据结构如下: 它的属性主要是通过 ServerBootstrap 启动类设置的,它的方法主要是 channelRead() 方法,其过程如下: 在第五十八篇中,当EventLo

    2024年02月05日
    浏览(56)
  • 《Netty》从零开始学netty源码(四十九)之PoolArena

    Netty中分配内存是委托给PoolArena来管理的,它主要有两个实现类: 默认情况下使用的DirectArena,它的数据结构如下: 从属性中我们看到PoolArena主要分配三种类型的内存,小于32KB的分配small类型的PoolSubpage,存储在smallSubpagePools,32KB~4MB的分配normal类型的PoolChunk,根据其利用率的

    2024年02月02日
    浏览(33)
  • 《Netty》从零开始学netty源码(四十六)之PooledByteBuf

    Netty中一大块内存块 PoolChunk 默认大小为4MB,为了尽可能充分利用内存会将它切成很多块 PooledByteBuf , PooledByteBuf 的类关系图如下: PooledUnsafeDirectByteBuf 与 PooledUnsafeHeapByteBuf 直接暴露对象的底层地址。 PooledByteBuf 的创建过程开销很大,高并发情况下进行网络I/O时会创建大量的

    2024年02月01日
    浏览(86)
  • netty源码阅读--服务启动

    netty是一个非常成熟的NIO框架,众多apache的顶级项目底层通信框架都是用的是netty,本系列博客主要是记录自己复习netty源码的过程,重在理解netty的关键如:如何启动,如何接受网络数据、netty的内存管理机制以及编解码器等,废话不多说,直接跟着netty源码中的MQTT的官方示例

    2023年04月22日
    浏览(34)
  • Netty源码解读

    1、定义了两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写 2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,Group中维护了多个事件循环线程NioEventLoop,每个NioEventLoop维护了一个Selector和TaskQueue 3、每个Boss NioEventLoop线程内部循环执行的

    2023年04月23日
    浏览(30)
  • 《Netty》从零开始学netty源码(四十二)之PoolChunk.runsAvailMap

    PoolChunk 中的 runsAvailMap 属性用于存放可用的run的信息, PoolChunk 中每一次分配内存都会更新 runsAvailMap 中可用的run的起始信息及末尾信息,先看下它的数据结构: 我们看下它的构造函数是如何赋值的: PoolChunk 的默认大小为4MB,对应sizeClasses表格中的31,所以array的初始长度为

    2023年04月25日
    浏览(38)
  • 《Netty》从零开始学netty源码(五十)之PoolArena的内存分配

    ​PoolArena根据请求大大小主要分配三中类型的内存,小于28KB的分配PoolSubpage,28KB~4MB的分配池化的PoolChunk,4MB以上的分配非池化的内存​。 如果请求的内存空间小于28KB则分配small类型的空间,即PoolSubpage. 如果请求的大小在28KB~4MB之间则分配normal类型的空间,即池化的PoolChunk,

    2024年02月02日
    浏览(38)
  • 15.Netty源码之EventLoop

    通过将NioServerSocketChannel绑定到了bossGroup。 将NioServerSocketChannel接收到请求创建的SocketChannel放入workerGroup。 将2个不同的SocketChannel绑定到2个不同的Group完成了主从 Reactor 模式。 根据不同策略给Channel分配的规则不同。 1.普通:递增取模 2.高级:executors总数是2的幂次方才会使用位运

    2024年02月15日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包