概述
Socket难的地方是服务端的编写,首先要合理地管理客户端连接,能让客户端持续不断地连接进来。其次每个连接的读写不能互相干扰,不能因为一个连接在传输数据,别的连接就得挂着。搞定了这两点,基本上就解决了Socket编程80%的问题。
以下是根据个人经验,写了几个示例,希望对看官有所帮助。
开发环境搭建,请点这里!!
纯Socket的实现
java的Socket有两个版本,一个服务端的(ServerSocket),一个客户端的(Socket )。服务端版本可以启用监听,接受客户端连接;客户端版本只能发起连接,概念上应该不难理解。
下边是官方服务端及客户端,分别启动后,客户端里打上几个字,两边在控制台里都会产生相应的输出。为了让程序代码能看得明白一点,我加了一些输出代码,有兴趣的可以运行起来看一下。
注:为节省篇幅,本文从头至尾只写了一个客户端代码。所以后边示例代码只例出服务端的。
初始代码
以下代码可能是大多数学习Socket编程的入门代码,也就是创建一个服务端和客户端,然后启动服务端,启动客户端,如果两端不报错,就意味着这个代码运行成功了。由于入门代码只演示了如何监听,建立连接以及简单的读写,所以这种代码在实现生产中没有一点用处。
有点编程基础的人,我想下边的代码还是比较容易看懂的,如果看不懂,就调试模式运行起,单步走一走,就知道每行代码是怎么触发的了。
服务端
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class AppServer {
public static void main(String[] args) throws IOException {
System.out.println("服务启动");
try (ServerSocket serverSocket = new ServerSocket(8888);
Socket clientSocket = serverSocket.accept();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
System.out.println("客户端连接");
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println("消息:" + inputLine);
out.println(inputLine);
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + 8888 + " or listening for a connection");
System.out.println(e.getMessage());
}
System.out.println("服务退出");
}
}
客户端
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class AppClient {
public static void main(String[] args) throws UnknownHostException, IOException {
String hostName = "localhost";
int portNumber = 8888;
try (Socket echoSocket = new Socket(hostName, portNumber);
PrintWriter out = new PrintWriter(echoSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(echoSocket.getInputStream()));
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
System.out.println("已连接,请输入内容");
String userInput;
while (!(userInput = stdIn.readLine()).equals("exit")) {
out.println(userInput);
System.out.println("回显: " + in.readLine());
}
} catch (UnknownHostException e) {
System.err.println("Don't know about host " + hostName);
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't get I/O for the connection to " + hostName);
System.exit(1);
}
System.out.println("客户端退出");
}
}
遗憾的是,这个服务端是一次性的,随着客户端退出,服务器端就挂了。其原因是只调用accept()了一次,为了解决这个问题,我们要让服务端不停地accept()。可以在accept()之上,套一层无限循环,这样就可以不停地接受客户端了。
第一次改进
这里改进其实是为了更深入理解服务端是如何接受客户端连接的。服务端起动监听后,还要显式地调用accept()方法,表明这个服务开始接受客户端的连接。不过,每次accept()只能接收一个客户端socket,而且在没有新的客户端连接之前,这个方法就会一直挂起在那里。为了能够不停地接受客户端,那么就要尽快地将连接过来的socket给接收下来,然后进入下一个等待。
因为调用会挂起(专业术语叫阻塞),所以不用考虑递归调用,使用一个无限循环就可以了。
注:
一个完整的应用程序,应该考虑程序的初始化,关闭,异常等逻辑。这里仅演示方便,并未考虑这些细节。所以,尽量避免将这样的代码直接应用到你的生产项目中去。
服务端改进1
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class AppServer {
public static void main(String[] args) throws IOException {
System.out.println("服务启动");
try (ServerSocket serverSocket = new ServerSocket(8888);
) {
//增加一个无限循环
while (true) {
Socket clientSocket = serverSocket.accept();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
System.out.println("客户端连接");
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println("消息:" + inputLine);
out.println(inputLine);
}
in.close();
out.close();
clientSocket.close();
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + 8888 + " or listening for a connection");
System.out.println(e.getMessage());
}
System.out.println("服务退出");
}
}
但这个服务端还是有点问题,每次只能连接一个客户端,只有等到前一个客户端退出了,后边的才能正常连接。为了能够迸发连接,就会用到另一个技术——线程。我们可以将读写部分放到一个单独线程中,这样主线程就可以立即接受下一个连接了。
二次改进
在读取客户端连接的时候也是阻塞的,因而前边的代码就会出现一个问题。在你没放弃读取当前连接的数据时,其它客户端依然是连不进来的。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class AppServer {
public static void main(String[] args) throws IOException {
System.out.println("服务启动");
try (ServerSocket serverSocket = new ServerSocket(8888);) {
// 增加一个无限循环
while (true) {
Socket clientSocket = serverSocket.accept();
Thread clientThread = new Thread(new Runnable() {
@Override
public void run() {
PrintWriter out;
try {
out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
System.out.println("客户端连接");
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println("消息:" + inputLine);
out.println(inputLine);
}
in.close();
out.close();
clientSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
clientThread.start();
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + 8888 + " or listening for a connection");
System.out.println(e.getMessage());
}
System.out.println("服务退出");
}
}
不过,据说不停地new Thread()比较奢侈,具体原因跟操作系统线程原理有关,具体这里不展开讲,总之线程多了即降低性能,又浪费内存。这时考虑使用线程池的方式来实现,线程池可以用JDK的现成类库。
三次改进
这次改进的重点是性能上的优化。因为接受客户端,以及读取客户端数据都是阻塞的,为了使每个客户端能并行执行,不可避免地要用到线程。但线程的使用要有个度,适当地使用线程能提升性能,将单线程的阻塞,变成并行的阻塞,确实是提升了CPU的复用率。但这里有个风险,就是线程数量超过某个阀值的时候,反而会降低性能。具体原理,网上有很多大神已经讲解得非常细致了,这里不赘述。有个原则就是在不阻塞的情况下,尽量少地去使用线程。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;
public class AppServer {
public static void main(String[] args) throws IOException {
System.out.println("Service starting...");
try (ServerSocket serverSocket = new ServerSocket(8888);) {
// 创建线程池,数量跟CPU的线程数相当
var pool = Executors.newFixedThreadPool(8);
// 增加一个无限循环
while (true) {
Socket clientSocket = serverSocket.accept();
pool.submit(new Runnable() {
@Override
public void run() {
PrintWriter out;
try {
out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
System.out.println("Client connected...");
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println("Message:" + inputLine);
out.println(inputLine);
}
in.close();
out.close();
clientSocket.close();
System.out.println("Client disconnected...");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + 8888 + " or listening for a connection");
System.out.println(e.getMessage());
}
System.out.println("服务退出");
}
}
至此服务端已经改进不少,但有新的问题出现,如果长连接的客户端足够多,那么线程必然又要增加。我们的目标是提升服务器的性能,而不是无谓的增加线程数量。可以考虑使用非阻塞的方式来接收客户端的数据。
再次改进
代码改进如下:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
public class AppServer {
public static void main(String[] args) throws IOException {
System.out.println("Service starting...");
try (ServerSocket serverSocket = new ServerSocket(8888);) {
// 创建线程池
var pool = Executors.newFixedThreadPool(8);
List<Socket> clients = Collections.synchronizedList(new ArrayList<Socket>(0));
pool.submit(new Runnable() {
@Override
public void run() {
while (true) {
var iterator = clients.iterator();
while (iterator.hasNext()) {
var socket = iterator.next();
if (socket.isClosed()) {
System.out.println("Client disconnected...");
iterator.remove();
continue;
}
try {
//如果此处有更为复杂的逻辑要处理,则可以创建一个Runnalbe对象,塞进线程池中
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
if (socket.getInputStream().available() > 0) {
System.out.println("Message received...");
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
String inputLine = in.readLine();
System.out.println("Message:" + inputLine);
out.println(inputLine);
if (inputLine.equals("exit"))
socket.close();
} else {
socket.getOutputStream().write(0);
}
} catch (IOException e) {
e.printStackTrace();
iterator.remove();
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
// 增加一个无限循环
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("Client connected...");
clients.add(clientSocket);
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + 8888 + " or listening for a connection");
System.out.println(e.getMessage());
}
System.out.println("服务退出");
}
}
至此,纯Socket的写法已经有相当强的处理能力了。当然,这还是示例,并不能解决实际的业务,不过,方向是对的,实际项目中往这个思路上靠就是了。
非阻塞NIO的实现
NIO的实现大同小异,只是Accecpt()换成了select(),至于读消息是不是也可以使用select()就不得而知了,笔者才疏学浅没找到相关的API。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
public class AppServer {
public static void main(String[] args) throws IOException {
System.out.println("Service starting...");
try {
// 创建线程池
var pool = Executors.newFixedThreadPool(8);
List<Socket> clients = Collections.synchronizedList(new ArrayList<Socket>(0));
pool.submit(new Runnable() {
@Override
public void run() {
while (true) {
var iterator = clients.iterator();
while (iterator.hasNext()) {
var socket = iterator.next();
if (socket.isClosed()) {
System.out.println("Client disconnected...");
iterator.remove();
continue;
}
try {
// 如果此处有更为复杂的逻辑要处理,则可以创建一个Runnalbe对象,塞进线程池中
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
if (socket.getInputStream().available() > 0) {
System.out.println("Message received...");
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
String inputLine = in.readLine();
System.out.println("Message:" + inputLine);
out.println(inputLine);
if (inputLine.equals("exit"))
socket.close();
} else {
socket.getOutputStream().write(0);
}
} catch (IOException e) {
e.printStackTrace();
iterator.remove();
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
// Selector for incoming time requests
Selector acceptSelector = SelectorProvider.provider().openSelector();
// Create a new server socket and set to non blocking mode
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// Bind the server socket to the local host and port
InetSocketAddress isa = new InetSocketAddress(8888);
ssc.socket().bind(isa);
// Register accepts on the server socket with the selector. This
// step tells the selector that the socket wants to be put on the
// ready list when accept operations occur, so allowing multiplexed
// non-blocking I/O to take place.
SelectionKey acceptKey = ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);
int keysAdded = 0;
// Here's where everything happens. The select method will
// return when any operations registered above have occurred, the
// thread has been interrupted, etc.
while ((keysAdded = acceptSelector.select()) > 0) {
// Someone is ready for I/O, get the ready keys
Set<SelectionKey> readyKeys = acceptSelector.selectedKeys();
Iterator<SelectionKey> i = readyKeys.iterator();
// Walk through the ready keys collection and process date requests.
while (i.hasNext()) {
SelectionKey sk = (SelectionKey) i.next();
i.remove();
// The key indexes into the selector so you
// can retrieve the socket that's ready for I/O
ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();
// Accept the date request and send back the date string
Socket s = nextReady.accept().socket();
System.out.println("Client connected...");
clients.add(s);
// Write the current time to the socket
}
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + 8888 + " or listening for a connection");
System.out.println(e.getMessage());
}
System.out.println("Service exited");
}
}
Netty实现
使用Netty后,代码结构清爽了不少。不用自己写线程,数据读写方面只要会用ByteBuf就可以了。文章来源:https://www.toymoban.com/news/detail-464105.html
import java.io.IOException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class AppServer extends ChannelInboundHandlerAdapter {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client disconnected...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = ((ByteBuf) msg).copy();
ctx.write(msg); // (1)
ctx.flush(); // (2)
String str = "";
while (in.isReadable()) { // (1)
str += (char) in.readByte();
}
System.out.print("message:"+str);
}
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("Service starting...");
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
System.out.println("Client connected...");
ch.pipeline().addLast(new AppServer());
}
}).option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(8888).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
System.out.println("Service exited");
}
}
Vert.x 实现
Vert.x是建立在Netty之上的,但对程序员更加友好。相同的功能,明显代码少了不少。文章来源地址https://www.toymoban.com/news/detail-464105.html
import java.io.IOException;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServer;
public class AppServer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("Service starting...");
Vertx vertx = Vertx.vertx();
NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
System.out.println("Client connected!");
socket.handler(buffer -> {
System.out.println("Message received!");
var c = buffer.getString(0, buffer.length());
socket.write(c);
System.out.print("Message:"+c);
});
});
server.listen(8888, res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
System.out.println("Service exited");
}
}
到了这里,关于Java Socket几个简单的入门示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!