配置websockt:
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* websocket配置
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "ws")
public class WsConfig {
/**
* websockt服务端口,不可和web服务同一个端口
*/
private Integer port=8779;
/**
* 心跳超时时间-单位-秒
*/
private Integer heartTimeout = 60;
/**
* 默认匹配的路径
*/
private String url="/";
}
返回实体载体:
import com.alibaba.fastjson2.JSON;
import lombok.Data;
import java.io.Serializable;
@Data
public class MsgBody implements Serializable {
public enum MsgType{
/**
* 普通文字消息
*/
text,
/**
* 图片消息
*/
img,
/**
* 文件
*/
file,
}
public enum Type{
/**
* 自己
*/
self,
/**
* 别人
*/
other,
}
private Type type;
private MsgType msgType;
/**
* 消息主体
*/
private String msgContent;
public String toJson(){
return JSON.toJSONString(this);
}
}
import com.alibaba.fastjson2.JSON;
import java.io.Serializable;
import java.util.LinkedHashMap;
/**
* websocket返回载体
*/
public class WsBean extends LinkedHashMap<String,Object> implements Serializable {
/**
* 指定调用前端的回调函数
*/
public enum CallbackEm{
/**
* 通知回调函数
*/
notice,
/**
* 收到消息的回调
*/
receive_msg,
}
public WsBean(CallbackEm callbackEm,Object data) {
super();
this.put("code",callbackEm.name());
this.put("data",data);
}
public static WsBean get(CallbackEm callbackEm){
return new WsBean(callbackEm,"");
}
public static WsBean get(CallbackEm callbackEm,Object data){
return new WsBean(callbackEm,data);
}
public WsBean setData(Object data){
this.put("data",data);
return this;
}
public WsBean set(String key,Object value){
this.put(key,value);
return this;
}
public String toJson(){
return JSON.toJSONString(this);
}
}
netty通道:
import cn.hutool.core.map.BiMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 这个类用来处理用户和连接的关联关系
*/
@Slf4j
@Component
public class NioWebSocketChannelPool {
/**
* 用户保持连接
*/
private final DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 保持连接用户对应的长连接id,此为双向绑定
*/
private final BiMap<String, ChannelId> bindUserMap = new BiMap<>(new ConcurrentHashMap<>());
/**
* 新增一个客户端通道
*/
public void addChannel(Channel channel){
channels.add(channel);
}
/**
* 移除一个客户端通道
* @param channel
*/
public void removeChannel(Channel channel){
String mapKey = bindUserMap.getKey(channel.id());
if (mapKey != null){
bindUserMap.remove(mapKey);
}
channels.remove(channel);
}
/**
* 绑定用户
*/
public void bindUser(String userId,Channel channel){
bindUserMap.put(userId,channel.id());
}
/**
* 向用户推送消息
*/
public void sendToUser(String userId,WsBean data){
ChannelId channelId = bindUserMap.get(userId);
if (channelId != null){
channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));
}
}
public void sendToUser(String userId,MsgBody data){
ChannelId channelId = bindUserMap.get(userId);
if (channelId != null){
channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));
}
}
public BiMap<String,ChannelId> getBindUserMap(){
return bindUserMap;
}
/**
* 群发推送消息
*/
public void writeAndFlush(WsBean data){
Set<String> onlineIds = getBindUserMap().keySet();
onlineIds.forEach(userId->{
ChannelId channelId = bindUserMap.get(userId);
if (channelId != null){
channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));
}
});
}
}
netty处理类:
import cn.hutool.core.util.StrUtil;
import com.xx.framework.config.WsConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* 处理端
*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class NioWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Resource
private WsConfig wsConfig;
@Resource
private NioWebSocketChannelPool webSocketChannelPool;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接:{}",ctx.channel().id());
webSocketChannelPool.addChannel(ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端端口连接:{}",ctx.channel().id());
webSocketChannelPool.removeChannel(ctx.channel());
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.channel().flush();
super.channelReadComplete(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端请求数据类型:{}",msg.getClass());
if (msg instanceof FullHttpRequest){
fullHttpRequestHandler(ctx,(FullHttpRequest)msg);
}
super.channelRead(ctx, msg);
}
/**
* 处理连接请求,客户端websockt发送握手包时会执行第一次请求
* @param ctx
* @param request
*/
private void fullHttpRequestHandler(ChannelHandlerContext ctx, FullHttpRequest request) {
String uri = request.getUri();
Map<String,String> params = getParams(uri);
log.info("客户端请求参数:{}",params);
/**
* 判断请求路径是否跟配置中的一致
*/
if (wsConfig.getUrl().equals(getBasePath(uri))){
/**
* 因为有可能携带了参数,导致客户端一致无法返回握手包,因此在校验通过后,重置请求路径
*/
request.setUri(wsConfig.getUrl());
}else{
ctx.close();
}
String userId = params.get("user_id");
if (StrUtil.isBlank(userId)){
log.info("用户ID为空,无法登录");
return;
}
webSocketChannelPool.bindUser(userId,ctx.channel());
}
/**
* 获取URI中参数以外部分路径
* @param uri
* @return
*/
private String getBasePath(String uri) {
if (uri == null || uri.isEmpty()){
return null;
}
int idx = uri.indexOf("?");
if (idx == -1){
return uri;
}
return uri.substring(0,idx);
}
/**
* 请路径参数转换成Map对象,如果路径参数出现重复参数名,将以最后的参数值为准
* @param uri
* @return
*/
private Map<String, String> getParams(String uri) {
Map<String,String> params = new HashMap<>(10);
int idx = uri.indexOf("?");
if (idx != -1){
String[] paramsArr = uri.substring(idx+1).split("&");
for (String param:paramsArr){
idx = param.indexOf("=");
params.put(param.substring(0,idx),param.substring(idx+1));
}
}
return params;
}
/**
* 客户端发送断开请求处理
*/
private void closeWebSocketFrameHandler(ChannelHandlerContext ctx, CloseWebSocketFrame frame){
ctx.close();
}
/**
* 创建连接之后,客户端发送的消息都会在这里处理
*/
private void textWebSocketFrameHandler(ChannelHandlerContext ctx, TextWebSocketFrame frame){
//客户端发送过来的内容不进行业务处理,原样返回,一般不做处理
// log.info("收到客户端信息-channelId: {}, 消息内容: {}", ctx.channel().id(), frame.text());
}
/**
* 处理客户端心跳包
*/
private void pingWebSocketFrameHandler(ChannelHandlerContext ctx, PingWebSocketFrame frame){
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {
//根据请求数量类型进厂分发处理
if (webSocketFrame instanceof PingWebSocketFrame){
PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) webSocketFrame;
pingWebSocketFrameHandler(channelHandlerContext,pingWebSocketFrame);
}else if (webSocketFrame instanceof TextWebSocketFrame){
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;
textWebSocketFrameHandler(channelHandlerContext,textWebSocketFrame);
}else if (webSocketFrame instanceof CloseWebSocketFrame){
CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) webSocketFrame;
closeWebSocketFrameHandler(channelHandlerContext,closeWebSocketFrame);
}
}
}
netty服务类:
package com.xx.framework.ws;
import com.xx.framework.config.WsConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 服务端
*/
@Slf4j
@Component
public class NioWebSocketServer implements InitializingBean , DisposableBean, Ordered {
@Resource
private WsConfig wsConfig;
@Resource
private NioWebSocketHandler nioWebSocketHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
private ChannelFuture channelFuture;
@Override
public void destroy() throws Exception {
log.info("shutting down netty server....");
if (bossGroup != null){
bossGroup.shutdownGracefully().sync();
}
if (workGroup != null){
workGroup.shutdownGracefully().sync();
}
if (channelFuture != null){
channelFuture.channel().closeFuture().syncUninterruptibly();
}
log.info("netty server shutdown");
}
@Override
public void afterPropertiesSet() throws Exception {
try{
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.option(ChannelOption.SO_BACKLOG,1024)
.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.localAddress(wsConfig.getPort())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(8192))
.addLast(nioWebSocketHandler)
.addLast(new WebSocketServerProtocolHandler(wsConfig.getUrl(),null,
true));
}
});
channelFuture = serverBootstrap.bind().sync();
}finally {
if (channelFuture != null && channelFuture.isSuccess()){
log.info("netty server startup on port:{} (websockt)with context path '{}'",
wsConfig.getPort(),"/");
}else{
log.info("netty server startup failed");
if (bossGroup != null){
bossGroup.shutdownGracefully().sync();
}
if (workGroup != null){
workGroup.shutdownGracefully().sync();
}
}
}
}
@Override
public int getOrder() {
return 0;
}
}
以上代码serviceImpl应用:
/**
* 新增聊天记录
*
* @param chatRecord 聊天记录
* @return 结果
*/
@Override
public int insertChatRecord(ChatRecord chatRecord) {
MsgBody msgBody = new MsgBody();
if (StrUtil.isNotBlank(chatRecord.getMsgType())){
if (chatRecord.getMsgType().equals(MsgTypeEnum.TEXT.getMsgType())){
msgBody.setMsgType(MsgBody.MsgType.text);
}else if (chatRecord.getMsgType().equals(MsgTypeEnum.IMG.getMsgType())){
msgBody.setMsgType(MsgBody.MsgType.img);
}
}
msgBody.setMsgContent(chatRecord.getMsgContent());
chatRecord.setMsgContent(JSON.toJSONString(msgBody));
chatRecord.setId(IdUtils.getLongId());
chatRecord.setCreateTime(DateUtils.getNowDate());
int insert = chatRecordMapper.insertChatRecord(chatRecord);
if(insert > 0){
msgBody.setType(MsgBody.Type.self);
//通知浏览器用户 webSocketChannelPool.sendToUser(chatRecord.getSendUserId().toString(),msgBody);
msgBody.setType(MsgBody.Type.other);
webSocketChannelPool.sendToUser(chatRecord.getReceiveUserId().toString(),msgBody);
}
return insert;
}
文章来源地址https://www.toymoban.com/news/detail-678318.html
文章来源:https://www.toymoban.com/news/detail-678318.html
到了这里,关于netty与websockt实现聊天的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!