前言
要手动编码,和k8s做shell交互,我们需要弄清以下两个问题:
1、Java如何与k8s做shell交互
2、前端界面如何与Java后台交互
3、多个用户并发访问如何实现
问题1:
- k8s官方提供了各种语言的KubernetesAPI,对于Java语言来说,采用KubernetesClient即可实现执行shell命令。
- 需要pod的容器的基础镜像本身支持bash、sh等终端
问题2:
- 为了交互的实时性,我们与前端的交互采用长连接
问题3:
- 为各个用户分配独立的窗口线程
综上,我绘制了以下架构图:
一、后台代码
我们假设crm-publiccloud-5fcdb4749b-rlr8s
这个pod中,有一个容器crm
,自身的基础镜像支持bash。
1、通过Netty建立websocket-server:
import io.fabric8.kubernetes.client.KubernetesClient;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Component
public class NettyWebSocketServer {
@Autowired
private KubernetesClient k8sClient;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;
@Value("${netty.websocket.port:6698}")
private int port;
@PostConstruct
public void startNettyServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//websocket协议本身是基于http协议的,所以也要用http编解码器
pipeline.addLast(new HttpServerCodec());
//以块的方式来写的处理器
pipeline.addLast(new ChunkedWriteHandler());
//netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new WebSocketHandler(k8sClient));
}
});
serverChannel = bootstrap.bind(port).sync().channel();
log.info("Netty Ws started on port {}", port);
} catch (Exception ex) {
log.error("startNettyServer error : {}", ex.getMessage(), ex);
stop();
}
}
@PreDestroy
public void stop() {
if (serverChannel != null) {
serverChannel.close();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.info("Netty Ws stopped");
}
}
2、WebSocketHandler
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static final Map<String, ShellTerminal> SHELL_WINDOWS = new ConcurrentHashMap<>();
private KubernetesClient client;
public WebSocketHandler(KubernetesClient k8sClient) {
client = k8sClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 处理WebSocket消息
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
String inputText = msg.text();
log.info("[{}] Receive ws msg:{}, {}", id, inputText, remoteAddress);
ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
if (shellTerminal != null) {
shellTerminal.input(inputText);
} else {
log.error("[{}] there is no shell terminal:{}", id, remoteAddress);
}
}
private ExecWatch getOpsExecWatch() {
String podName = "crm-publiccloud-5fcdb4749b-rlr8s";
String namespace = "default";
String containerName = "crm";
String command = "bash";
return client.pods()
.inNamespace(namespace)
.withName(podName)
.inContainer(containerName)
.redirectingInput()
.redirectingOutput()
.redirectingError()
.withTTY()
.exec(command);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
log.info("[{}] Ws client connected {}", id, remoteAddress);
try {
ExecWatch opsExecWatch = getOpsExecWatch();
ShellTerminal shellTerminal = new ShellTerminal(id, opsExecWatch, ctx);
SHELL_WINDOWS.put(id, shellTerminal);
} catch (Exception e) {
log.error("[{}] init ops exec error:{}", e.getMessage(), e);
ctx.close();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
if (shellTerminal != null) {
shellTerminal.closeExecWatch();
SHELL_WINDOWS.remove(id);
ctx.close();
}
log.info("[{}] Ws client disconnected: {}", id, remoteAddress);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
if (shellTerminal != null) {
shellTerminal.closeExecWatch();
SHELL_WINDOWS.remove(id);
}
ctx.close();
log.error("[{}] Ws exceptionCaught {}:{}", id, remoteAddress, e.getMessage(), e);
}
}
3、ShellTerminal类
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.nio.charset.StandardCharsets;
@Data
@Slf4j
public class ShellTerminal {
private static final String EXIT_CODE = "exit";
private static final String CLOSED = "closed";
private String id;
private String remoteAddress;
private ExecWatch execWatch;
private ChannelHandlerContext ctx;
public ShellTerminal(String id, ExecWatch opsExecWatch, ChannelHandlerContext ctx) {
this.id = id;
this.execWatch = opsExecWatch;
this.ctx = ctx;
initListenThread(id, execWatch, ctx);
}
public void input(String inputText) throws IOException {
if (EXIT_CODE.equals(inputText)) {
closeExecWatch();
this.ctx.close();
return;
}
inputText += "\n";
OutputStream stdin = execWatch.getInput();
if (stdin != null) {
stdin.write(inputText.getBytes(StandardCharsets.UTF_8));
stdin.flush();
}
}
public void closeExecWatch() {
if (execWatch == null) {
log.info("[{}] execWatch is null {}", id, remoteAddress);
return;
}
try {
OutputStream stdin = execWatch.getInput();
InputStream stdout = execWatch.getOutput();
InputStream stderror = execWatch.getError();
if (stdin != null) {
stdin.close();
}
if (stdout != null) {
stdout.close();
}
if (stderror != null) {
stderror.close();
}
} catch (Exception ex) {
log.error("[{}] close execWatch error {}:{}", id, remoteAddress, ex.getMessage(), ex);
}
}
public void initListenThread(String id, ExecWatch execWatch, ChannelHandlerContext ctx) {
// 创建一个线程用于持续读取输出并打印
Thread outputThread = new ShellOutputThread(id, execWatch, ctx);
outputThread.start();
}
public static class ShellOutputThread extends Thread {
private final String id;
private final ChannelHandlerContext ctx;
private final ExecWatch execWatch;
public ShellOutputThread(String id, ExecWatch execWatch, ChannelHandlerContext ctx) {
this.id = id;
this.execWatch = execWatch;
this.ctx = ctx;
}
@Override
public void run() {
try {
InputStream output = execWatch.getOutput();
BufferedReader stdout = new BufferedReader(new InputStreamReader(output));
String line;
while ((line = stdout.readLine()) != null) {
System.out.println(line + "\n");
ctx.channel().writeAndFlush(new TextWebSocketFrame(line + "\n"));
}
} catch (Exception e) {
if ((e instanceof IOException) && CLOSED.equals(e.getMessage())) {
log.error("[" + id + "] shell output stream closed");
} else {
log.error("[" + id + "] shell output read error:{}", e.getMessage(), e);
}
}
}
}
}
二、测试
我们可以通过一些在线工具来测试:
三、前端
采用xterm.js
库来实现文章来源:https://www.toymoban.com/news/detail-596725.html
<!DOCTYPE html>
<html>
<head>
<title>交互式终端</title>
<style>
body,
html {
height: 100%;
width: 100%;
margin: 0;
padding: 0;
}
#terminal-container {
height: 100vh;
background-color: black;
}
#terminal {
width: 100%;
height: 100%;
}
</style>
</head>
<body>
<div id="terminal-container">
<div id="terminal"></div>
</div>
<script src="https://cdn.jsdelivr.net/npm/xterm@5.2.1/lib/xterm.js" referrerpolicy="no-referrer"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/xterm@5.2.1/css/xterm.min.css" />
<script>
const terminal = new Terminal({
rendererType: "canvas", //渲染类型
rows: Math.ceil((document.getElementById("terminal").clientHeight - 50) / 20), //行数
convertEol: true, //启用时,光标将设置为下一行的开头
scrollback: 10,//终端中的回滚量
disableStdin: false, //是否应禁用输入。
cursorStyle: 'underline', //光标样式
cursorBlink: true, //光标闪烁
theme: {
foreground: '#FFF', //字体
background: '#060101', //背景色
cursor: 'help',//设置光标
}
});
terminal.open(document.getElementById('terminal'));
const socket = new WebSocket("ws://127.0.0.1:6698/ws");
let currentInput = '';
let prompt = '';
socket.onopen = () => {
terminal.writeln("连接到服务器");
socket.send('');
i = 1;
wirteLine = false;
};
//返回
var i = 0;
wirteLine = false;
userAndHostName = '';
socket.onmessage = (event) => {
const message = event.data;
if (i != 0) {
prompt = `${message.trim()}`;
terminal.write(prompt);
var isEnd = userAndHostName.split(':')[0].startsWith(message.split(':')[0]);
if (wirteLine && !isEnd) {
terminal.write("\n");
}
} else {
//打印:whoami[hostname]#command,这里为了避免重复故跳过
wirteLine = true;
userAndHostName = message;
}
i++;
};
//发送
terminal.onKey((event) => {
const ev = event.domEvent
const printable = !ev.altKey && !ev.altGraphKey && !ev.ctrlKey && !ev.metaKey
if (ev.keyCode === 13) {
if (currentInput == 'clear') {
currentInput = currentInput.slice(0, -5);
for (let i = 0; i < 5; i++) {
terminal.write('\b \b');
}
terminal.clear();
return;
}
socket.send(currentInput);
currentInput = '';
wirteLine = false;
reqPrompt();
} else if (ev.keyCode === 8) {
// Handle backspace
if (terminal._core.buffer.x > 0) {
terminal.write('\b \b');
currentInput = currentInput.slice(0, -1);
}
} else if (ev.ctrlKey && ev.keyCode === 67) {
// Handle Ctrl + C
socket.send('\x03');
} else if (printable) {
currentInput += event.key;
terminal.write(event.key);
}
});
function reqPrompt() {
terminal.write("\n")
socket.send('');
i = 0;
wirteLine = false;
}
</script>
</body>
</html>
效果
文章来源地址https://www.toymoban.com/news/detail-596725.html
到了这里,关于Java手动编码实现与k8s交互式shell的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!