随着物联网的发展,很多项目都开始涉及到了tcp连接这块,在这里我们轻松用netty去实现,站在巨人的肩膀上。
关于netty包引用:
<!-- TCP SERVER -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
<scope>compile</scope>
</dependency>
实现TCP服务器代码
依赖netty只需几行代码tcp服务:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteOrder;
public class TcpServer {
private Logger log = LoggerFactory.getLogger(getClass());
//自定义tcp服务端口号
private int port=9000;
static TcpServer tcpServer;
//单例设计模式
private TcpServer(){
}
public static TcpServer getInstance(){
if(tcpServer==null){
tcpServer=new TcpServer();
}
return tcpServer;
};
public void run() throws InterruptedException {
// 创建主线程组(接受连接)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建工作线程组(处理连接)
EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 指定工作线程数量为20
// 创建ServerBootstrap实例,用于配置服务器
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置主、工作线程组
bootstrap.group(bossGroup, workerGroup);
// 指定使用NIO进行网络传输
bootstrap.channel(NioServerSocketChannel.class);
// 设置子Channel的Socket选项,允许地址重用
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
// 配置子Channel的处理器,这里使用ChannelInitializer
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 添加自定义的解码器,这里是处理协议
ch.pipeline().addLast(new YKCDecoderV1());
// 添加自定义的服务器处理器
ch.pipeline().addLast(new TCPServerHandler());
}
});
// 绑定端口并添加监听器,处理绑定操作的结果
bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
// 在绑定成功后输出日志信息
log.info("bind success in port: " + port);
});
// 输出服务器启动成功信息
System.out.println("server started!");
}
}
业务处理代码(参考)
以下是处理报文业务类可参考,注意代码未优化:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 正则解析版
*/
public class YKCDecoderV1 extends ByteToMessageDecoder {
final static String reg = "^68.{14,332}";//单指令解析 根据业务协议报文定最短和最长
final static Pattern pattern1 = Pattern.compile(reg);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf bufferIn, List<Object> list) throws Exception {
// 获取可读字节数
int leng = bufferIn.readableBytes();
// 如果可读字节数小于8,输出错误信息并跳过这部分数据
if (leng < 8) {
System.out.println("err! cmd len < 8 .");
String s = ByteBufUtil.hexDump(bufferIn);
System.out.println(s);
bufferIn.skipBytes(leng);
return;
} else {
String s = ByteBufUtil.hexDump(bufferIn);
Matcher matcher1 = pattern1.matcher(s);
if (matcher1.find()) {
String cmd = matcher1.group();
//单指令
System.out.println("sign cmd: " + cmd);
String lenStr = cmd.substring(2, 4);
int len = (Integer.parseInt(lenStr, 16) + 4) * 2;
int cmdLen = cmd.length();
if (cmdLen == len) {
JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(cmd);
list.add(jfyChargeProtocol);
bufferIn.skipBytes(leng);
} else if (cmdLen > len) {
multiHand(cmd, list);
bufferIn.skipBytes(leng);
}
} else {
logErr(channelHandlerContext, s);
System.out.println("err! cmd format invalid: " + s);
bufferIn.skipBytes(leng);
}
}
}
private void multiHand(String cmd, List<Object> list) {
if (cmd.length() < 8) {
return;
}
String lenStr = cmd.substring(2, 4);
int len = (Integer.parseInt(lenStr, 16) + 4) * 2;
if (len > cmd.length()) {
return;
}
String newCmd = cmd.substring(0, len);
if (newCmd.length() == len) {
System.out.println("multi cmd-> " + newCmd);
JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(newCmd);
list.add(jfyChargeProtocol);
}
if (cmd.length() > len) {
System.out.println("multi xxx-> " + cmd);
String two = cmd.substring(len);
if(two.startsWith("68")){
multiHand(two, list);
}
}
}
private int checkSignCmd(String cmd) {
int cmd_len = getCmdLen(cmd);
return cmd.length() - cmd_len;
}
private int getCmdLen(String cmd) {
String leng = cmd.substring(28, 30) + cmd.substring(26, 28);
int dec_num = Integer.parseInt(leng, 16);
return (dec_num * 2) + 34;
}
private void logErr(ChannelHandlerContext ctx, String msg) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
System.out.println(clientIP + " :: " + msg);
}
public class JFYChargeProtocol {
private int length;
private byte[] raw;
private String rawStr;
public JFYChargeProtocol(int length,byte[] raw){
this.length=length;
this.raw=raw;
}
public JFYChargeProtocol(String raw){
this.rawStr=raw;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getRaw() {
return raw;
}
public void setRaw(byte[] raw) {
this.raw = raw;
}
public String getRawStr() {
return rawStr;
}
public void setRawStr(String rawStr) {
this.rawStr = rawStr;
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(TCPServerHandler.class);
static Map<String,ChannelHandlerContext> inList=new ConcurrentHashMap<String,ChannelHandlerContext>();
/**
* 新连接
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
String channelName=getChannelName(ctx);
inList.put(channelName,ctx);
logger.info("dev new conn > " +channelName);
}
private String getChannelName(ChannelHandlerContext ctx) {
return "ykc".concat(ctx.channel().remoteAddress().toString());
}
/**
* 连接下线
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String channelName=getChannelName(ctx);
logger.info("dev close conn > " + channelName);
inList.remove(channelName);
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
JFYChargeProtocol in = (JFYChargeProtocol) msg;
String readMsg= in.getRawStr();
logger.info("read dev <= " + readMsg);
String channelName=getChannelName(ctx);
readMsg=channelName+"$$"+readMsg;
PackageHandlerImpl.getInstance().doHandle(readMsg);
//ctx.writeAndFlush(in);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
/**
* 回复信息给设备
* @param hex
*/
public static boolean RepDev(String hex){
String[] kv= hex.split("\\$\\$");
if(kv.length==2){
String key=kv[0];
ChannelHandlerContext context=inList.get(key);
if(context!=null){
byte[] bytes= ByteUtil.hexString2Bytes(kv[1]);
ByteBuf byteBuf= Unpooled.copiedBuffer(bytes);
context.writeAndFlush(byteBuf);
return true;
}else{
logger.error("dev offline="+key);
}
}else{
logger.error("cmd format err");
}
return false;
}
}
import java.util.ArrayList;
import java.util.List;
public class PackageHandlerImpl implements PackageHandler {
public static List<PackageHandler> packageHandlers= new ArrayList<PackageHandler>();
static PackageHandlerImpl packageHandler;
protected PackageHandlerImpl(){
super();
System.out.println("init PackageHandlerImpl");
}
public static PackageHandlerImpl getInstance(){
if(packageHandler==null){
packageHandler=new PackageHandlerImpl();
}
return packageHandler;
}
@Override
public void doHandle(String hex) {
for(PackageHandler f : packageHandlers){
f.doHandle(hex);
}
}
public PackageHandlerImpl addHandle(PackageHandler f){
packageHandlers.add(f);
return this;
}
}
/**
* 包处理
*/
public interface PackageHandler {
void doHandle(String hex) ;
}
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Destination;
@Service
public class TranServiceImpl {
private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);
/**
* 接受服务器 返回数据
*/
private final String out_name="ykc_out";
/**
* 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
*/
@Autowired
private JmsMessagingTemplate jmsTemplate;
/**
* 发送消息 采用系统配置类型
*
* @param queueName 是发送到的队列名称
* @param message 是发送到的队列
*/
public void sendMessage(String queueName, final String message) {
jmsTemplate.convertAndSend(queueName, message);
}
/**
* 发送消息 采用指定队列类型
*
* @param queueName 是发送到的队列
* @param message 是发送到的队列
*/
public void sendMessageByQueue(String queueName, final String message) {
Destination destination = new ActiveMQQueue(queueName);
jmsTemplate.convertAndSend(destination, message);
}
@JmsListener(destination = out_name)
public void receiveQueue(String text) {
System.out.println("to dev => "+text);
if(!TCPServerHandler.RepDev(text)){
logger.error("write mq fail ==> "+text);
}
}
}
运行
当前是集成到 springboot2框架在这里即可运行,或实现实现 org.springframework.boot.ApplicationRunner
或 org.springframework.boot.CommandLineRunner
的接口,即启动后执行的任务,不用框架的在main方法也可以直接运行。
/**
* tcp服务在框架启动后 跟着启动即可
*/
@SpringBootApplication
public class DevServiceApplication {
private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);
public static void main(String[] args) {
TcpServer tcpServer = TcpServer.getInstance();
try {
tcpServer.run();
PackageHandlerImpl packageHandler = PackageHandlerImpl.getInstance();
packageHandler.addHandle(new PackageHandlerByMQ());
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("TCP服务错误", e);
throw new RuntimeException();
}
}
总结
看看服务器上的tcp服务运行情况
运行天数:
流量状态图:
站在netty巨人的肩膀上,这个tcp服务实现方式简单,运行更是稳定。服务器运行时就部署了一直到今天共运行1235天了,900多个设备同步在线,配了2g的jvm运行内存,cpu占用5.6(top截图等了很久才出来5.6是个峰值,平时不到1)确保某个市的充电桩设备。中间由于客户的充电桩设备协议问题更新过几次,刚时开始是使用netty官网的解码LengthFieldBasedFrameDecoder做处理,可以说非常高效,但随后发现有几个产商的设备报文头部有特殊字符,而且刚好和协议头有些重叠,再考虑到示来的产商协议的不确定性,为了兼容这些产家不得以并以正则的方法去处理。
扩展部分
Netty 官方提供的编解码器
-
字符串编解码器:
-
StringEncoder
:将字符串编码为字节。 -
StringDecoder
:将字节解码为字符串。
-
-
字节流编解码器:
-
ByteArrayEncoder
:将字节数组编码为字节。 -
ByteArrayDecoder
:将字节解码为字节数组。
-
-
对象序列化编解码器:
-
ObjectEncoder
:将对象序列化为字节。 -
ObjectDecoder
:将字节反序列化为对象。
-
-
长度字段编解码器:
-
LengthFieldPrepender
:在消息头部添加表示消息长度的字段。 -
LengthFieldBasedFrameDecoder
:根据长度字段解码消息,用于处理拆包和粘包问题。
-
-
行分隔符编解码器:
-
LineBasedFrameDecoder
:按行切分消息,通常用于处理文本协议。
-
-
DelimiterBasedFrameDecoder:
-
DelimiterBasedFrameDecoder
:按照指定的分隔符切分消息,用于处理自定义分隔符的协议。
-
-
Protobuf 编解码器:
-
ProtobufEncoder
:将 Protobuf 对象编码为字节。 -
ProtobufDecoder
:将字节解码为 Protobuf 对象。
-
-
HTTP 编解码器:文章来源:https://www.toymoban.com/news/detail-835821.html
-
HttpRequestEncoder
:将 HTTP 请求编码为字节。 -
HttpResponseDecoder
:将字节解码为 HTTP 响应。 -
HttpRequestDecoder
:将字节解码为 HTTP 请求。 -
HttpResponseEncoder
:将 HTTP 响应编码为字节。
-
-
WebSocket 编解码器:文章来源地址https://www.toymoban.com/news/detail-835821.html
-
WebSocketServerProtocolHandler
:处理 WebSocket 握手以及帧的编解码。
-
到了这里,关于用netty轻松实现一个高效稳定的TCP服务器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!