接上文
二、注册OP_WRITE写数据
服务端代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
* 服务端接收到数据后,缓存,注册OP_WRITE事件,收到状态转发数据
*
*/
class NIOSelectorOpWriteServer1 {
Selector selector;
public static void main(String[] args) throws IOException {
NIOSelectorOpWriteServer1 server = new NIOSelectorOpWriteServer1();
server.start(); // 开启监听和事件处理
}
public void start() {
initServer();
// selector非阻塞轮询有哪些感兴趣的事件到了
doService();
}
private void doService() {
if (selector == null) {
System.out.println("server init failed, without doing read/write");
return;
}
try {
while (true) {
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys(); // 感兴趣且准备好的事件
Iterator<SelectionKey> iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 删除当前元素,防止重复处理
// 下面根据事件进行分别处理
if (key.isAcceptable()) {
// 客户端连接事件
acceptHandler(key);
} else if (key.isReadable()) {
// 读取客户端数据
readHandler(key);
} else if (key.isWritable()) {
// 为了避免重复写,需要先去除OP_WRITE注册状态
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
writeHandler(key);
}
}
}
}
} catch (IOException exception) {
exception.printStackTrace();
}
}
private void initServer() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9090));
// 此时在selector上注册感兴趣的事件
// 这里先注册OP_ACCEPT: 客户端连接事件
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server init success");
} catch (IOException exception) {
exception.printStackTrace();
System.out.println("server init failied");
}
}
public void acceptHandler(SelectionKey key) {
ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
try {
SocketChannel client = server.accept();
client.configureBlocking(false); // 设置client非阻塞
System.out.println("server receive a client :" + client);
// 注册OP_READ事件,用于从客户端读取数据
// 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
client.register(key.selector(), SelectionKey.OP_READ, buffer);
} catch (IOException exception) {
exception.printStackTrace();
}
}
public void readHandler(SelectionKey key) {
System.out.println("read handler");
SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
buffer.clear(); // 使用前clear
// 防止数据分包,需要while循环读取
try {
while (true) {
int readLen = client.read(buffer);
if (readLen > 0) {
// 读取到数据了
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println("server read data from " + client + ", data is :" + new String(data));
// 给其他客户端注册OP_WRITE;
registerWrite(client, data);
} else if (readLen == 0) {
// 没读到数据
System.out.println(client + " : no data");
break;
} else if (readLen < 0) {
// client 关闭连接
// 当客户端主动连接断开时,为了让服务器知道断开了连接,会产生OP_READ事件。所以需要判断读取长度,当读到-1时,关闭channel。
System.out.println(client + " close");
client.close();
break;
}
}
} catch (IOException exception) {
exception.printStackTrace();
// client 关闭连接
System.out.println(client + " disconnect");
// todo:disconnect 导致一直有read事件,怎么办?
try {
client.close();
} catch (IOException ex) {
System.out.println("close ex");
}
}
}
public void writeHandler(SelectionKey key) {
System.out.println("write handler");
SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer,此时处于读模式
try {
while (buffer.hasRemaining()) {
client.write(buffer);
}
} catch (IOException exception) {
System.out.println("write error");
exception.printStackTrace();
}
}
private void registerWrite(SocketChannel myself, byte[] data) throws IOException {
Set<SelectionKey> keys = selector.keys();
// read/write 对应同一个key,同一个client不会发送两遍
for (SelectionKey key : keys) {
SelectableChannel channel = key.channel();
if (channel instanceof SocketChannel && channel != myself) {
key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
ByteBuffer attachment = (ByteBuffer) key.attachment();
attachment.clear();
attachment.put(data);
attachment.flip();
}
}
}
}
这里有几个注意项:
1.在注册OP_WRITE时,需要给所有其他客户端注册;
2.注册OP_WRITE时:是使用key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);避免对原来的OP_READ事件进行覆盖;在OP_WRITE事件来的时候,要把先把OP_WRITE事件去掉,key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); 防止重复写事件状态发生;
3.注册OP_WRITE时,要写的数据,直接给到了原来channel对应的attachment里了;在OP_WRITE事件来的时候,可以直接用;
到此,我们一定有个疑问:既然服务端关不关注OP_WRITE事件,都可以给客户端发送数据,意义何在?
那我们就要看下OP_WRITE事件的具备条件:send-queue是否有空间
而服务端要写数据要关注:服务端数据是否准备好了 + send-queue是否有空间
服务端一般都是在自己数据准备好了后,再注册对客户端的OP_WRITE事件。
但是上面的代码中,在给客户端写数据时,是一直写,直到数据写完,但是 send-queue空间有限,当 send-queue写满后,写操作就会阻塞,导致单线程下业务阻塞。。。
此时,OP_WRITE的优势就体现出来了
我们可以对OP_WRITE的使用方式稍微调整,就可以解决上面的问题:文章来源:https://www.toymoban.com/news/detail-857524.html
当我们收到OP_WRITE事件,开始给客户端写数据后,当我们发现该客户端对应的send-queue写满,SocketChannel.write(buffer)会返回已经写出去的字节数,此时为0;我们根据此标志知道,此时send-queue满,不能再写了,此时我们记录下没有写的数据,再次给该客户端注册一个OP_WRITE事件,结束本次写过程;让业务线程继续处理其他事件,等到该客户端对应的send-queue有空闲的时候,会再次收到收到OP_WRITE事件,我们就可以继续写数据了;这样就是解决了写数据满导致业务阻塞的问题了。文章来源地址https://www.toymoban.com/news/detail-857524.html
到了这里,关于Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(二)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!