用netty轻松实现一个高效稳定的TCP服务器

这篇具有很好参考价值的文章主要介绍了用netty轻松实现一个高效稳定的TCP服务器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

          随着物联网的发展,很多项目都开始涉及到了tcp连接这块,在这里我们轻松用netty去实现,站在巨人的肩膀上。

关于netty包引用:

 <!-- TCP SERVER -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
            <scope>compile</scope>
        </dependency>

实现TCP服务器代码

依赖netty只需几行代码tcp服务:


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteOrder;

public class TcpServer {

    private Logger log = LoggerFactory.getLogger(getClass());
    //自定义tcp服务端口号
    private int port=9000;

    static TcpServer tcpServer;
    //单例设计模式
    private TcpServer(){

    }

    public static TcpServer getInstance(){
        if(tcpServer==null){
            tcpServer=new TcpServer();
        }
        return tcpServer;
    };


public void run() throws InterruptedException {


    // 创建主线程组(接受连接)
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    // 创建工作线程组(处理连接)
    EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 指定工作线程数量为20

    // 创建ServerBootstrap实例,用于配置服务器
    ServerBootstrap bootstrap = new ServerBootstrap();
    // 配置主、工作线程组
    bootstrap.group(bossGroup, workerGroup);
    // 指定使用NIO进行网络传输
    bootstrap.channel(NioServerSocketChannel.class);
    // 设置子Channel的Socket选项,允许地址重用
    bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);

    // 配置子Channel的处理器,这里使用ChannelInitializer
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            // 添加自定义的解码器,这里是处理协议
            ch.pipeline().addLast(new YKCDecoderV1());
            // 添加自定义的服务器处理器
            ch.pipeline().addLast(new TCPServerHandler());
        }
    });

    // 绑定端口并添加监听器,处理绑定操作的结果
    bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
        // 在绑定成功后输出日志信息
        log.info("bind success in port: " + port);
    });

    // 输出服务器启动成功信息
    System.out.println("server started!");
}



}

 业务处理代码(参考)

以下是处理报文业务类可参考,注意代码未优化:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 正则解析版
 */
public class YKCDecoderV1 extends ByteToMessageDecoder {

    final static String reg = "^68.{14,332}";//单指令解析 根据业务协议报文定最短和最长
    final static Pattern pattern1 = Pattern.compile(reg);

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf bufferIn, List<Object> list) throws Exception {

        // 获取可读字节数
        int leng = bufferIn.readableBytes();

        // 如果可读字节数小于8,输出错误信息并跳过这部分数据
        if (leng < 8) {
            System.out.println("err! cmd len < 8 .");
            String s = ByteBufUtil.hexDump(bufferIn);
            System.out.println(s);
            bufferIn.skipBytes(leng);
            return;
        } else {
            
            String s = ByteBufUtil.hexDump(bufferIn);
            Matcher matcher1 = pattern1.matcher(s);
            if (matcher1.find()) {

                String cmd = matcher1.group();
                //单指令
                System.out.println("sign cmd: " + cmd);
                String lenStr = cmd.substring(2, 4);
                int len = (Integer.parseInt(lenStr, 16) + 4) * 2;

                int cmdLen = cmd.length();
                if (cmdLen == len) {
                    JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(cmd);
                    list.add(jfyChargeProtocol);
                    bufferIn.skipBytes(leng);

                } else if (cmdLen > len) {

                    multiHand(cmd, list);
                    bufferIn.skipBytes(leng);

                }


            } else {
                logErr(channelHandlerContext, s);
                System.out.println("err! cmd format invalid: " + s);
                bufferIn.skipBytes(leng);

            }

        }


    }

    private void multiHand(String cmd, List<Object> list) {

        if (cmd.length() < 8) {
            return;
        }
        String lenStr = cmd.substring(2, 4);
        int len = (Integer.parseInt(lenStr, 16) + 4) * 2;

        if (len > cmd.length()) {
            return;
        }

        String newCmd = cmd.substring(0, len);

        if (newCmd.length() == len) {
            System.out.println("multi cmd-> " + newCmd);

            JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(newCmd);
            list.add(jfyChargeProtocol);
        }

        if (cmd.length() > len) {
            System.out.println("multi xxx-> " + cmd);
            String two = cmd.substring(len);
            if(two.startsWith("68")){
                multiHand(two, list);
            }
        }


    }

    private int checkSignCmd(String cmd) {
        int cmd_len = getCmdLen(cmd);
        return cmd.length() - cmd_len;
    }

    private int getCmdLen(String cmd) {
        String leng = cmd.substring(28, 30) + cmd.substring(26, 28);
        int dec_num = Integer.parseInt(leng, 16);
        return (dec_num * 2) + 34;
    }

    private void logErr(ChannelHandlerContext ctx, String msg) {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIP = insocket.getAddress().getHostAddress();
        System.out.println(clientIP + " :: " + msg);
    }
public class JFYChargeProtocol {


    private int length;
    private byte[] raw;
    private String rawStr;

    public JFYChargeProtocol(int length,byte[] raw){
        this.length=length;
        this.raw=raw;
    }
    public JFYChargeProtocol(String raw){
        this.rawStr=raw;
    }

   

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getRaw() {
        return raw;
    }

    public void setRaw(byte[] raw) {
        this.raw = raw;
    }

    public String getRawStr() {
        return rawStr;
    }

    public void setRawStr(String rawStr) {
        this.rawStr = rawStr;
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(TCPServerHandler.class);

    static Map<String,ChannelHandlerContext> inList=new ConcurrentHashMap<String,ChannelHandlerContext>();


    /**
     * 新连接
     * @param ctx
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        String channelName=getChannelName(ctx);
        inList.put(channelName,ctx);
        logger.info("dev new conn > " +channelName);
    }

    private String getChannelName(ChannelHandlerContext ctx) {
        return "ykc".concat(ctx.channel().remoteAddress().toString());

    }

    /**
     * 连接下线
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String channelName=getChannelName(ctx);
        logger.info("dev close conn > " + channelName);
        inList.remove(channelName);
        ctx.fireChannelInactive();
    }




    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        JFYChargeProtocol in = (JFYChargeProtocol) msg;

        String readMsg= in.getRawStr();
        logger.info("read dev <= " + readMsg);
        String channelName=getChannelName(ctx);
        readMsg=channelName+"$$"+readMsg;
        PackageHandlerImpl.getInstance().doHandle(readMsg);
        //ctx.writeAndFlush(in);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         cause.printStackTrace();
         ctx.close();
    }

    /**
     * 回复信息给设备
     * @param hex
     */
    public static boolean RepDev(String hex){

        String[] kv= hex.split("\\$\\$");
        if(kv.length==2){
            String key=kv[0];
            ChannelHandlerContext context=inList.get(key);
            if(context!=null){
                byte[] bytes= ByteUtil.hexString2Bytes(kv[1]);
                ByteBuf byteBuf= Unpooled.copiedBuffer(bytes);
                context.writeAndFlush(byteBuf);
                return true;
            }else{
                logger.error("dev offline="+key);
            }

        }else{
            logger.error("cmd format err");

        }
      return false;
    }

}
import java.util.ArrayList;
import java.util.List;
public  class PackageHandlerImpl implements PackageHandler {

    public static List<PackageHandler> packageHandlers= new ArrayList<PackageHandler>();
    static PackageHandlerImpl packageHandler;
    protected PackageHandlerImpl(){
        super();
        System.out.println("init PackageHandlerImpl");
    }

    public static PackageHandlerImpl getInstance(){
        if(packageHandler==null){
            packageHandler=new PackageHandlerImpl();
        }
        return packageHandler;
    }

    @Override
    public void doHandle(String hex) {
        for(PackageHandler f : packageHandlers){
            f.doHandle(hex);
        }

    }
    public PackageHandlerImpl addHandle(PackageHandler f){
        packageHandlers.add(f);
        return this;
    }


}
/**
 * 包处理
 */
public interface PackageHandler {
     void doHandle(String hex) ;

}
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

@Service
public class TranServiceImpl {

    private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);

    /**
     * 接受服务器 返回数据
     */
    private final String out_name="ykc_out";


    /**
     * 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
     */
    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    /**
     * 发送消息 采用系统配置类型
     *
     * @param queueName 是发送到的队列名称
     * @param message   是发送到的队列
     */
    public void sendMessage(String queueName, final String message) {
        jmsTemplate.convertAndSend(queueName, message);
    }

    /**
     * 发送消息 采用指定队列类型
     *
     * @param queueName 是发送到的队列
     * @param message   是发送到的队列
     */
    public void sendMessageByQueue(String queueName, final String message) {
        Destination destination = new ActiveMQQueue(queueName);
        jmsTemplate.convertAndSend(destination, message);
    }

    @JmsListener(destination = out_name)
    public void receiveQueue(String text) {
        System.out.println("to dev => "+text);

        if(!TCPServerHandler.RepDev(text)){
            logger.error("write mq fail ==> "+text);
        }
    }
}

 运行

当前是集成到 springboot2框架在这里即可运行,或实现实现 org.springframework.boot.ApplicationRunner 或 org.springframework.boot.CommandLineRunner 的接口,即启动后执行的任务,不用框架的在main方法也可以直接运行。

/**
 * tcp服务在框架启动后 跟着启动即可
 */
@SpringBootApplication
public class DevServiceApplication {
    private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);


    public static void main(String[] args) {


        TcpServer tcpServer = TcpServer.getInstance();
        try {
            tcpServer.run();
            PackageHandlerImpl packageHandler = PackageHandlerImpl.getInstance();
            packageHandler.addHandle(new PackageHandlerByMQ());

        } catch (InterruptedException e) {
            e.printStackTrace();
            logger.error("TCP服务错误", e);
            throw new RuntimeException();
        }


    }

总结

看看服务器上的tcp服务运行情况

运行天数:

netty 高性能 tcp server,服务器,java,数据库

流量状态图:

netty 高性能 tcp server,服务器,java,数据库

netty 高性能 tcp server,服务器,java,数据库

            站在netty巨人的肩膀上,这个tcp服务实现方式简单,运行更是稳定。服务器运行时就部署了一直到今天共运行1235天了,900多个设备同步在线,配了2g的jvm运行内存,cpu占用5.6(top截图等了很久才出来5.6是个峰值,平时不到1)确保某个市的充电桩设备。中间由于客户的充电桩设备协议问题更新过几次,刚时开始是使用netty官网的解码LengthFieldBasedFrameDecoder做处理,可以说非常高效,但随后发现有几个产商的设备报文头部有特殊字符,而且刚好和协议头有些重叠,再考虑到示来的产商协议的不确定性,为了兼容这些产家不得以并以正则的方法去处理。

扩展部分

Netty 官方提供的编解码器
  1. 字符串编解码器:

    • StringEncoder:将字符串编码为字节。
    • StringDecoder:将字节解码为字符串。
  2. 字节流编解码器:

    • ByteArrayEncoder:将字节数组编码为字节。
    • ByteArrayDecoder:将字节解码为字节数组。
  3. 对象序列化编解码器:

    • ObjectEncoder:将对象序列化为字节。
    • ObjectDecoder:将字节反序列化为对象。
  4. 长度字段编解码器:

    • LengthFieldPrepender:在消息头部添加表示消息长度的字段。
    • LengthFieldBasedFrameDecoder:根据长度字段解码消息,用于处理拆包和粘包问题。
  5. 行分隔符编解码器:

    • LineBasedFrameDecoder:按行切分消息,通常用于处理文本协议。
  6. DelimiterBasedFrameDecoder:

    • DelimiterBasedFrameDecoder:按照指定的分隔符切分消息,用于处理自定义分隔符的协议。
  7. Protobuf 编解码器:

    • ProtobufEncoder:将 Protobuf 对象编码为字节。
    • ProtobufDecoder:将字节解码为 Protobuf 对象。
  8. HTTP 编解码器:

    • HttpRequestEncoder:将 HTTP 请求编码为字节。
    • HttpResponseDecoder:将字节解码为 HTTP 响应。
    • HttpRequestDecoder:将字节解码为 HTTP 请求。
    • HttpResponseEncoder:将 HTTP 响应编码为字节。
  9. WebSocket 编解码器:文章来源地址https://www.toymoban.com/news/detail-835821.html

    • WebSocketServerProtocolHandler:处理 WebSocket 握手以及帧的编解码。

到了这里,关于用netty轻松实现一个高效稳定的TCP服务器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java TCP长连接详解:实现稳定、高效的网络通信

    在现代互联网应用中,稳定而高效的网络通信是至关重要的。而TCP长连接作为一种常见的通信机制,允许客户端和服务器之间保持长时间的连接,有效地传输数据。本文将详细介绍Java中TCP长连接的概念、优点、适用场景,并结合实际案例进行分析。 TCP长连接是一种建立在T

    2024年02月03日
    浏览(60)
  • 【TCP服务器的演变过程】编写第一个TCP服务器:实现一对一的连接通信

    手把手教你从0开始编写TCP服务器程序,体验 开局一块砖,大厦全靠垒 。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 函数原型: 这个函数建立一个协议族、协议类型、协议编号的socket文件描述符。如果函数调用成功,

    2024年02月03日
    浏览(54)
  • 使用Netty构建TCP和UDP服务器和客户端

    Netty是一个基于Java NIO实现的网络通信框架,提供了高性能、低延迟的网络通信能力。使用Netty构建TCP和UDP服务器和客户端非常简单,下面是一个简单的示例代码: 构建TCP服务器 构建TCP客户端 构建UDP服务器 构建UDP客户端   上述示例代码中,分别定义了一个TCP服务器、TCP客户

    2024年02月16日
    浏览(48)
  • 【网络编程】实现一个简单多线程版本TCP服务器(附源码)

    accept 函数是在服务器端用于接受客户端连接请求的函数,它在监听套接字上等待客户端的连接,并在有新的连接请求到来时创建一个新的套接字用于与该客户端通信。 下面是 accept 函数的详细介绍以及各个参数的意义: sockfd: 是服务器监听套接字的文件描述符,通常是使用

    2024年02月13日
    浏览(53)
  • 使用NETTY实现TCP的服务端

    Netty是一个异步事件驱动的网络应用框架,可快速开发可维护的高性能协议服务器和客户端。基于NIO实现的高性能网络IO框架,极大简化基于常用网络协议的编程(TCP、UDP等)。

    2024年02月16日
    浏览(37)
  • 网络通信(13)-C#TCP服务器和客户端同时在一个进程实现的实例

    有时项目需求中需要服务器和客户端同时在一个进程实现,一边需要现场接收多个客户端的数据,一边需要将数据汇总后发送给远程服务器。下面通过实例演示此项需求。 C#TCP服务器和客户端同时在一个进程实现的实例如下: 界面设计 UI文件代码

    2024年01月22日
    浏览(65)
  • netty的TCP服务端和客户端实现

    2024年02月21日
    浏览(47)
  • 轻松地搭建一个反向代理OpenAI服务器

    在 Node.js 中,您可以使用 http-proxy-middleware 这个库来轻松地搭建一个反向代理服务器。以下是一个简单的示例: 首先,确保您已经安装了 Node.js。 在项目目录中运行以下命令来初始化一个新的 Node.js 项目: 接下来,安装 http-proxy-middleware 和 express : 在项目目录中创建一个名为

    2024年02月05日
    浏览(41)
  • 用Rust设计一个并发的Web服务:常用Rust库如Tokio、Hyper等,基于TCP/IP协议栈,实现了一个简单的并发Web服务器,并结合具体的代码讲解如何编写并发Web服务器的程序

    作者:禅与计算机程序设计艺术 1994年,互联网泡沫破裂,一批优秀的程序员、工程师纷纷加入到web开发领域。而其中的Rust语言却备受瞩目,它是一种现代系统编程语言,专注于安全和并发。因此,Rust在当下成为最流行的编程语言之一,很多框架也开始使用Rust重构,这使得

    2024年02月06日
    浏览(61)
  • netty学习(3):SpringBoot整合netty实现多个客户端与服务器通信

    创建一个SpringBoot工程,然后创建三个子模块 整体工程目录:一个server服务(netty服务器),两个client服务(netty客户端) pom文件引入netty依赖,springboot依赖 NettySpringBootApplication NettyServiceHandler SocketInitializer NettyServer NettyStartListener application.yml Client1 NettyClientHandler SocketInitializ

    2024年02月11日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包