nginx配置
在nginx.conf文件中,events,http同级添加配置
stream {
upstream tcp {
server 127.0.0.1:8888 weight=1;
server 127.0.0.1:8889 weight=1;
}
server {
listen 8880;
proxy_pass tcp;
proxy_protocol on; #仅此一句重点,用以判断获取客户端真实ip
}
}
启动nginx服务
netty代码
package com.alexyang.nettyandthread.netty2;
/**
* @author yqc
* @version 1.0
* @description netty+nginx
* @date 2023-06-30 9:22
*/
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class ServerNetty implements ApplicationRunner {
final static Logger log = LogManager.getLogger(ServerNetty.class);
private int port = 8888;
private String ip = "127.0.0.1";
public void start() throws InterruptedException {
NioEventLoopGroup boss = null;
NioEventLoopGroup worker = null;
try {
ServerBootstrap b = new ServerBootstrap();
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
b.group(boss, worker);
b.channel(NioServerSocketChannel.class);
b.localAddress(port);
b.option(ChannelOption.SO_KEEPALIVE, true);//是否开启TCP心跳机制
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast("decoder",new DecodeProxy()); // 增加这个自定义的解码器
channel.pipeline().addLast(new ServerHandler());
}
});
log.info("启动 netty 服务端");
ChannelFuture future = b.bind(ip, port).sync();
log.info("服务器启动成功,监听端口{}", future.channel().localAddress());
future.channel().closeFuture().sync();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//关闭线程组,释放资源
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("执行.............");
start();
}
}
package com.alexyang.nettyandthread.netty2;
/**
* @author yqc
* @version 1.0
* @description nginx代理netty tcp服务端负载均衡,nginx stream要打开 proxy_protocol on; 配置
* @date 2023-06-30 10:09
*/
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.nio.charset.Charset;
import java.util.List;
/**
* @Description nginx代理netty tcp服务端负载均衡,nginx stream要打开 proxy_protocol on; 配置
*/
public class DecodeProxy extends ByteToMessageDecoder {
/**
* 保存客户端IP
*/
public static AttributeKey<String> key = AttributeKey.valueOf("IP");
/**
* decode() 会根据接收的数据,被调用多次,直到确定没有新的元素添加到list,
* 或者是 ByteBuf 没有更多的可读字节为止。
* 如果 list 不为空,就会将 list 的内容传递给下一个 handler
*
* @param ctx 上下文对象
* @param byteBuf 入站后的 ByteBuf
* @param out 将解码后的数据传递给下一个 handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
/*消息打印--------------------------*/
byte[] bytes = printSz(byteBuf);
String message = new String(bytes, Charset.forName("UTF-8"));
//logger.info("从客户端收到的字符串:" + message);
/*消息打印--------------------------*/
if (bytes.length > 0) {
//判断是否有代理
if (message.indexOf("PROXY") != -1) {
//PROXY MSG: PROXY TCP4 192.168.12.52 192.168.12.52 1096 5680\r\n
System.out.println("PROXY MSG: " + message.substring(0, message.length() - 2));
if (message.indexOf("\n") != -1) {
String[] str = message.split("\n")[0].split(" ");
System.out.println("Real Client IP: " + str[2]);
Attribute<String> channelAttr = ctx.channel().attr(key);
//基于channel的属性
if (null == channelAttr.get()) {
channelAttr.set(str[2]);
}
}
//清空数据,重要不能省略
byteBuf.clear();
}
if (byteBuf.readableBytes() > 0) {
//logger.info("out add!!!");
out.add(byteBuf.readBytes(byteBuf.readableBytes()));
}
}
}
/**
* 打印byte数组
*
* @param newBuf
*/
public byte[] printSz(ByteBuf newBuf) {
ByteBuf copy = newBuf.copy();
byte[] bytes = new byte[copy.readableBytes()];
copy.readBytes(bytes);
//logger.info("字节数组打印:" + Arrays.toString(bytes));
return bytes;
}
}
package com.alexyang.nettyandthread.netty2;
/**
* @description ServerHandler
* @author yqc
* @date 2023-06-30 9:23
* @version 1.0
*/
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.Attribute;
import java.nio.charset.Charset;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("收到链接:" + ctx.channel().remoteAddress());
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Attribute<String> channelAttr = ctx.channel().attr(DecodeProxy.key);
//基于channel的属性
if(null != channelAttr.get()){
System.out.println("IP地址--------------- :" + channelAttr.get());
}
ByteBuf in = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress().toString() + "内容:" + in.toString(Charset.forName("UTF-8")));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
启动2个服务netty服务设置nginx中8888,8889端口。
使用tcp工具连接并发送数据测试文章来源:https://www.toymoban.com/news/detail-736409.html
参考博客
参考链接1
参考链接2文章来源地址https://www.toymoban.com/news/detail-736409.html
到了这里,关于Nginx+netty实现tcp负载均衡,获取客户端真实ip的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!