一: 服务端
1: 启动类
package com.idc.config.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* @description: netty服务启动类
**/
@Slf4j
@Component
public class NettyServer {
public void start(InetSocketAddress address) {
//配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) // 绑定线程池
.channel(NioServerSocketChannel.class)
.localAddress(address)
.childHandler(new NettyServerChannelInitializer())//编码解码
.option(ChannelOption.SO_BACKLOG, 128); //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
// .childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制
// 绑定端口,开始接收进来的连接
ChannelFuture future = bootstrap.bind(address).sync();
log.info("ODF-Socket------netty服务器开始监听端口:" + address.getPort());
//关闭channel和块,直到它被关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2: 处理程序
package com.idc.config.netty;
import com.idc.config.udpsocket.UdpServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* @description: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
**/
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
channel.pipeline().addLast(new NettyServerHandler());
}
}
package com.idc.config.netty;
import com.idc.common.exception.CommonRuntimeException;
import com.idc.entity.odf.dto.LightingStatus;
import com.idc.mapper.OdfAlarmMapper;
import com.idc.mapper.OdfMapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author wcybaonier
* @description: netty服务端处理类
**/
@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Resource
private OdfMapper odfMapper;
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* @param ctx
* @DESCRIPTION: 有客户端连接服务器会触发此函数
* @return: void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
int clientPort = insocket.getPort();
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
System.out.println();
//如果map中不包含此连接,就保存连接
if (CHANNEL_MAP.containsKey(channelId)) {
log.info("ODF-Socket------客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
} else {
//保存连接
CHANNEL_MAP.put(channelId, ctx);
log.info("ODF-Socket------客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size());
}
}
/**
* @param ctx
* @DESCRIPTION: 有客户端终止连接服务器会触发此函数
* @return: void
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除
if (CHANNEL_MAP.containsKey(channelId)) {
//删除连接
CHANNEL_MAP.remove(channelId);
System.out.println();
log.info("ODF-Socket------客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size());
}
}
/**
* @param ctx
* @DESCRIPTION: 有客户端发消息会触发此函数
* @return: void
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == null){
throw new CommonRuntimeException("ODF-Socket------加载客户端报文为空,请联系厂商!");
}
log.info("ODF-Socket------加载客户端报文......【" + ctx.channel().id() + "】" + " :" + msg);
/**
* 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
* 在这里可以设置异步执行 提交任务到该channel的taskQueue 中
*/
ctx.channel().eventLoop().execute(() -> {
String msgStr = String.valueOf(msg);
// 如果不包含逗号, 那么格式不对 约定格式为 : 序列号,deviceid,shelfNo,moduleNo,termNo,state
if (!msgStr.contains(",")){
throw new CommonRuntimeException("ODF-Socket------加载客户端报文格式不正确,请联系厂商!");
}
try {
String[] split = msgStr.split(",");
if (split.length != 6){
throw new CommonRuntimeException("ODF-Socket------加载客户端报文长度不正确,请联系厂商!");
}
//开始修改 admin
LightingStatus lightingStatus = new LightingStatus();
lightingStatus.setSerialNumber(split[0]);
lightingStatus.setDeviceId(split[1]);
lightingStatus.setShelfNo(split[2]);
lightingStatus.setModuleNo(split[3]);
lightingStatus.setTermNo(split[4]);
lightingStatus.setState(split[5]);
int i = odfMapper.updateTermStatus(lightingStatus);
log.info("ODF-Socket------亮灯状态更新条数......【" + i + "】" );
} catch (Exception e) {
e.printStackTrace();
}
});
/**
* 可以设置多个异步任务
* 但是这个会在上面异步任务执行完之后才执行
*/
/*ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10*1000);
log.info(">>>>>>>>>休眠二十秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});*/
//响应客户端
log.info("ODF-Socket------服务端端返回报文......【" + ctx.channel().id() + "】" + " :" + msg);
this.channelWrite(ctx.channel().id(), msg);
}
/**
* @param msg 需要发送的消息内容
* @param channelId 连接通道唯一id
* @DESCRIPTION: 服务端给客户端发送消息
* @return: void
*/
public void channelWrite(ChannelId channelId, Object msg) throws Exception {
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
if (ctx == null) {
log.info("ODF-Socket------通道【" + channelId + "】不存在");
return;
}
if (msg == null || msg == "") {
log.info("ODF-Socket------服务端响应空的消息");
return;
}
//将客户端的信息直接返回写入ctx
ctx.write(msg);
//刷新缓存区
ctx.flush();
}
public static void main(String[] args) {
System.out.println("序列号,deviceid,shelfNo,moduleNo,termNo,state".split(",").length);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("ODF-Socket------Client: " + socketString + " READER_IDLE 读超时");
ctx.disconnect();
} else if (event.state() == IdleState.WRITER_IDLE) {
log.info("ODF-Socket------Client: " + socketString + " WRITER_IDLE 写超时");
ctx.disconnect();
} else if (event.state() == IdleState.ALL_IDLE) {
log.info("ODF-Socket------Client: " + socketString + " ALL_IDLE 总超时");
ctx.disconnect();
}
}
}
/**
* @param ctx
* @DESCRIPTION: 发生异常会触发此函数
* @return: void
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println();
ctx.close();
log.info("ODF-Socket------"+ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
//cause.printStackTrace();
}
}
3: 项目启动类
package com.idc;
import com.idc.config.udpsocket.UdpServer;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.Resource;
/**
* @author wcybaonier
*/
@MapperScan("com.idc.mapper")
@SpringBootApplication
@Slf4j
@EnableScheduling
public class IdcPduApplication implements CommandLineRunner {
@Value("${netty.host}")
private String host;
@Value("${netty.port}")
private Integer port;
@Resource
private NettyServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(IdcPduApplication.class, args);
log.info("IdcPduApplication 启动成功!");
}
/**
* netty服务启动
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
//tcp实现
InetSocketAddress address = new InetSocketAddress(host,port);
log.info("neety服务器启动地址: "+host+":"+ port);
nettyServer.start(address);
}
}
yml配置:
# 配置Netty通信IP和端口
netty:
port: 7101
host: 127.0.0.1
完成,启动项目即可自动监听对应端口
二: 客户端
1: 启动类
这里为了测试,写了Main方法,可以参考服务端,配置启动类 ,实现跟随项目启动
package com.ws.aa;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
*
* 核心文件(与服务端进行数据交互)
*
* 客户端
*
* @author 小辰哥哥
*/
public class SocketClient {
// 服务端IP
static final String HOST = System.getProperty("host", "134.95.3.134");
// 服务端开放端口
static final int PORT = Integer.parseInt(System.getProperty("port", "7101"));
// 数据包大小
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
// 日志打印
private static final Logger LOGGER = LoggerFactory.getLogger(SocketClient.class);
// 主函数启动
public static void main(String[] args) throws InterruptedException {
sendMessage("1,deviceid123,shelfNo123,moduleNo123,termNo123,state123");
}
/**
* 核心方法(处理:服务端向客户端发送的数据、客户端向服务端发送的数据)
*
* @param content
* @throws InterruptedException
* @author 小辰哥哥
*/
public static void sendMessage(String content) throws InterruptedException {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new SocketHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LOGGER.debug("####接收服务端发送过来的消息####");
LOGGER.debug("服务端发送过来的数据:" + msg);
// 主动与服务端断开连接(客户端触发)
//ctx.channel().close();
}
});
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
future.channel().writeAndFlush(content);
// 程序阻塞
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
2: 处理程序
package com.ws.aa;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.logging.SocketHandler;
/**
* Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
*
* 设置出站和入站的编码器和解码器(该方法在SocketClientConfig.java中被重写)
*
* 客户端
*
* @author wcybaonier
*/
public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
p.addLast((ChannelHandler) new SocketHandler());
}
}
package com.ws.aa;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
*
* 初始化操作、接受服务端发送过来的消息(该方法在SocketClient.java中被重写)
*
* 客户端
*
* @author wcybaonier
*/
public class SocketHandler extends ChannelInboundHandlerAdapter {
// 日志打印
private static final Logger LOGGER = LoggerFactory.getLogger(SocketHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) {
LOGGER.debug("SocketHandler Active(客户端)");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LOGGER.debug("####接收服务端发送过来的消息####");
LOGGER.debug("SocketHandler read Message:" + msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.debug("####客户端断开连接####");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3: 项目启动类
......想写就参考服务端......文章来源:https://www.toymoban.com/news/detail-606057.html
三: 测试,完成
有测试的,,,但是忘记截图了................文章来源地址https://www.toymoban.com/news/detail-606057.html
到了这里,关于SpringBoot搭建Netty+Socket+Tcp服务端和客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!