netty-发起tcp长连接(包含客户端和服务端)

这篇具有很好参考价值的文章主要介绍了netty-发起tcp长连接(包含客户端和服务端)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持。

Netty是对JDK自带的NIO的API进行封装,具有高并发,高性能等优点。

项目中经常用到netty实现服务器与设备的通信,先写服务端代码:

@Slf4j
@Component
public class NettyServerBootstrap {

    private Channel serverChannel;
    private static final int DEFAULT_PORT = 60782;
    //bossGroup只是处理连接请求
    private static EventLoopGroup bossGroup = null;
    //workGroup处理非连接请求,如果牵扯到数据量处理业务非常耗时的可以再单独新建一个eventLoopGroup,并在childHandler初始化的时候添加到pipeline绑定
    private static EventLoopGroup workGroup = null;

    /**
     * 启动Netty服务
     *
     * @return 启动结果
     */
    @PostConstruct
    public boolean start() {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        //创建服务端启动对象
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            //使用链式编程来设置
            bootstrap.group(bossGroup, workGroup)//设置两个线程组
                    //使用NioSocketChannel作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    //设置线程队列得到的连接数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //设置处理器  WorkerGroup 的 EvenLoop 对应的管道设置处理器
                    .childHandler(new ChannelInitializer<Channel>() {

                        @Override
                        protected void initChannel(Channel ch){
                            log.info("--------------有客户端连接");
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            //绑定端口, 同步等待成功;
            ChannelFuture future = bootstrap.bind(DEFAULT_PORT).sync();
            log.info("netty服务启动成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
            serverChannel = future.channel();
            ThreadUtil.execute(() -> {
                //等待服务端监听端口关闭
                try {
                    future.channel().closeFuture().sync();
                    log.info("netty服务正常关闭成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
                } catch (InterruptedException | UnknownHostException e) {
                    e.printStackTrace();
                } finally {
                    shutdown();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            log.error("netty服务异常,异常原因:{}", e.getMessage());
            return false;
        }
        return true;
    }


    /**
     * 关闭当前server
     */
    public boolean close() {
        if (serverChannel != null) {
            serverChannel.close();//关闭服务
            try {
                //保险起见
                serverChannel.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            } finally {
                shutdown();
                serverChannel = null;
            }
        }
        return true;
    }

    /**
     * 优雅关闭
     */
    private void shutdown() {
        workGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

}

服务端处理类代码:

@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 处理读取到的msg
     *
     * @param ctx 上下文
     * @param msg 数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,String msg) throws Exception {
        System.out.println("服务端收到的消息--------"+msg);
        ctx.channel().writeAndFlush("ok");
    }

    /**
     * 断开连接
     *
     * @param ctx 傻瓜下文
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ChannelId channelId = ctx.channel().id();
        log.info("客户端id:{},断开连接,ip:{}", channelId, ctx.channel().remoteAddress());
        super.handlerRemoved(ctx);
    }

}

接下来模拟客户端:

@Configuration
@Component
public class TianmiaoClient {


    private static String ip;

    private static int port ;

    @Value("${tianmiao.nettyServer.ip}")
    public void setIp(String ip) {
        this.ip = ip;
    }

    @Value("${tianmiao.nettyServer.port}")
    public void setPort(int port) {
        this.port = port;
    }

    /**
     * 服务类
     */
    private static Bootstrap bootstrap=null;

    /**
     * 初始化  项目启动后自动初始化
     */
    @PostConstruct
    public void init() {

        //worker
        EventLoopGroup worker = new NioEventLoopGroup();

        bootstrap = new Bootstrap();
        //设置线程池
        bootstrap.group(worker);

        //设置socket工厂
        bootstrap.channel(NioSocketChannel.class);

        //设置管道
        bootstrap.handler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new StringEncoder());
                ch.pipeline().addLast(new TianmiaoClientHandler());
            }
        });
    }


    /**
     * 获取会话 (获取或者创建一个会话)
     */
    public Channel createChannel() {
        try {
            Channel channel = bootstrap.connect( ip, port).sync().channel();
            return channel;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

客户端处理类代码

public class TianmiaoClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println("服务端发过来的消息:"+s);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(".......................tcp断开连接.........................");
        //移除
        Channel channel = ctx.channel();
        channel.close().sync();
        super.channelInactive(ctx);
    }

}

管理客户端channel的一个工具类:

public class TianmiaoChannelManager {

    /**
     * 在线会话(存储注册成功的会话)
     */
    private static final ConcurrentHashMap<String, Channel> onlineChannels = new ConcurrentHashMap<>();


    /**
     * 加入
     *
     * @param mn
     * @param channel
     * @return
     */
    public static boolean putChannel(String mn, Channel channel) {
        if (!onlineChannels.containsKey(mn)) {
            boolean success = onlineChannels.putIfAbsent(mn, channel) == null ? true : false;
            return success;
        }
        return false;
    }

    /**
     * 移除
     *
     * @param mn
     */
    public static Channel removeChannel(String mn) {
        return onlineChannels.remove(mn);
    }

    /**
     * 获取Channel
     *
     * @param mn
     * @return
     */
    public static Channel getChannel(String mn) {
        // 获取一个可用的会话
        Channel channel = onlineChannels.get(mn);
        if (channel != null) {
            // 连接有可能是断开,加入已经断开连接了,我们需要进行尝试重连
            if (!channel.isActive()) {
                //先移除之前的连接
                removeChannel(mn);
                return null;
            }
        }
        return channel;
    }

    /**
     * 发送消息[自定义协议]
     *
     * @param <T>
     * @param mn
     * @param msg
     */
    public static <T> void sendMessage(String mn, String msg) {
        Channel channel = onlineChannels.get(mn);
        if (channel != null && channel.isActive()) {
            channel.writeAndFlush(msg);
        }
    }

    /**
     * 发送消息[自定义协议]
     *
     * @param <T>
     * @param msg
     */
    public static <T> void sendChannelMessage(Channel channel, String msg) {
        if (channel != null && channel.isActive()) {
            channel.writeAndFlush(msg);
        }
    }

    /**
     * 关闭连接
     *
     * @return
     */
    public static void closeChannel(String mn) {
        onlineChannels.get(mn).close();
    }
}

最后是客户端使用方法:文章来源地址https://www.toymoban.com/news/detail-660890.html

/**
 * 发送数据包
 * @param key
 */
public static void tianmiaoData(String key, String data) {
    Channel channel = TianmiaoChannelManager.getChannel(key);
    //将通道存入
    if(channel==null){
        TianmiaoClient client = new TianmiaoClient();
        channel = client.createChannel();
        TianmiaoChannelManager.putChannel(key, channel);
    }
    if (channel != null && channel.isActive()) {
        //发送数据
        channel.writeAndFlush(data);
        System.out.println("-------------天苗转发数据成功-------------");
    }
}

到了这里,关于netty-发起tcp长连接(包含客户端和服务端)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 32.Netty源码之服务端如何处理客户端新建连接

    Netty 服务端完全启动后,就可以对外工作了。接下来 Netty 服务端是如何处理客户端新建连接的呢? 主要分为四步: md Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件; ​ 构造 初始化Netty 客户端 NioSocketChannel; ​ 注册 Netty 客户端 NioSocketChannel 到 Worker 工作线程中; ​ 从

    2024年02月12日
    浏览(29)
  • SpringBoot中使用Netty实现TCP通讯,服务器主动向客户端发送数据

    Springboot项目的web服务后台,web服务运行在9100端口。 后台使用netty实现了TCP服务,运行在8000端口。 启动截图如下: 启动类修改: 服务器查看当前所有连接的客户端  服务器获取到所有客户单的ip地址及端口号后,即可通过其给指定客户端发送数据  

    2024年02月11日
    浏览(33)
  • tcp服务器端与多个客户端连接

    如果希望Tcp服务器端可以与多个客户端连接,可以这样写: 相关的槽函数中: 使用sender()来获取对应的QTcpSocket对象。 其实,主要就是QTcpServer进行监听: 客户端的QTcpSocket与服务器端的QTcpSocket进行通信。

    2024年04月28日
    浏览(28)
  • java socket Server TCP服务端向指定客户端发送消息;可查看、断开指定连接的客户端;以及设置客户端最大可连接数量。

    首先需要知道java里如何创建一个Socket服务器端。 提示:注意server.accept()方法调用会阻塞,只有新的客户端连接后才返回一个新的socket对象。如果一直未连接那么会一直处于阻塞状态 了解了如何创建一个socket服务器端后。那么如何实现给指定的连接客户端发送消息呢?首先我

    2024年02月11日
    浏览(46)
  • tcp连接断开分析,判断tcp断开原因是客户端还是服务端?

              当与使用TCP协议对接的硬件设备进行通信时,往往会遇到一些问题,导致一些人难以找到tcp断开的根源,因些无法判定是充电桩设备客户端还是服务器端。我曾经在十多年前对接银行接口的POS机时遇到过类似的情况,现在在对接充电桩时又遇到了相似的问题。经过

    2024年02月03日
    浏览(29)
  • TCP服务器最多支持多少客户端连接

    目录 一、理论数值 二、实际部署  参考         首先知道一个基础概念,对于一个 TCP 连接可以使用四元组(src_ip, src_port, dst_ip, dst_port)进行唯一标识。因为服务端 IP 和 Port 是固定的(如下图中的bind阶段),那么一个TCP服务器支持最多的连接数就是由客户端 IP 和 端口

    2024年01月21日
    浏览(39)
  • Golang实现之TCP长连接-------服务端和客户端

    一、数据包的数据结构 (所有字段采用大端序) 帧头 帧长度(头至尾) 帧类型 帧数据 帧尾 1字节 4字节 2字节 1024字节 1字节 byte int short string byte 0xC8 0xC9 二、Server端 实现代码 1、main.go 2、server.go 3、protocol.go 4、response.go 5、result.go 三、Client端 实现代码

    2024年02月07日
    浏览(38)
  • Python启动TCP服务并监听连接,从客户端发送消息

    下面是一个简单的例子,演示如何在Python中启动TCP服务并监听连接,以及如何从客户端发送消息: TCP服务端代码: TCP客户端代码: 这个例子中,服务端首先创建一个socket对象并绑定地址和端口,然后开始监听连接。当客户端连接到服务器时,服务端接受客户端的连接请求并

    2024年02月13日
    浏览(31)
  • TCP客户端判断与服务端断开连接的几种方法

    目前已知的方法有: 1、 epoll(能检测正常的断开连接,事件触发机制,优点是快速,但是插拔网线是检测不到的) 2、自定义心跳包方式检测 3、keeplive方式检测 4、getsockopt 1、相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,

    2023年04月14日
    浏览(46)
  • Socket实例,实现多个客户端连接同一个服务端代码&TCP网络编程 ServerSocket和Socket实现多客户端聊天

    Java socket(套接字)通常也称作\\\"套接字\\\",用于描述ip地址和端口,是一个通信链的句柄。应用程序通常通过\\\"套接字\\\"向网络发出请求或者应答网络请求。 使用socket实现多个客户端和同一客户端通讯;首先客户端连接服务端发送一条消息,服务端接收到消息后进行处理,完成后再

    2024年02月12日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包