netty与websockt实现聊天

这篇具有很好参考价值的文章主要介绍了netty与websockt实现聊天。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

配置websockt:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * websocket配置
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "ws")
public class WsConfig {
    /**
     * websockt服务端口,不可和web服务同一个端口
     */
    private Integer port=8779;
    /**
     * 心跳超时时间-单位-秒
     */
    private Integer heartTimeout = 60;
    /**
     * 默认匹配的路径
     */
    private String url="/";
}
返回实体载体:


import com.alibaba.fastjson2.JSON;
import lombok.Data;

import java.io.Serializable;

@Data
public class MsgBody implements Serializable {
    public enum MsgType{
        /**
         * 普通文字消息
         */
        text,
        /**
         * 图片消息
         */
        img,
        /**
         * 文件
         */
        file,
    }
    public enum Type{
        /**
         * 自己
         */
        self,
        /**
         * 别人
         */
        other,
    }
    private Type type;
    private MsgType msgType;
    /**
     * 消息主体
     */
    private String msgContent;
    public String toJson(){
        return JSON.toJSONString(this);
    }
}


import com.alibaba.fastjson2.JSON;

import java.io.Serializable;
import java.util.LinkedHashMap;

/**
 * websocket返回载体
 */
public class WsBean extends LinkedHashMap<String,Object> implements Serializable {
    /**
     * 指定调用前端的回调函数
     */
    public enum CallbackEm{
        /**
         * 通知回调函数
         */
        notice,
        /**
         * 收到消息的回调
         */
        receive_msg,
    }

    public WsBean(CallbackEm callbackEm,Object data) {
        super();
        this.put("code",callbackEm.name());
        this.put("data",data);
    }
    public static WsBean get(CallbackEm callbackEm){
        return new WsBean(callbackEm,"");
    }
    public static WsBean get(CallbackEm callbackEm,Object data){
        return new WsBean(callbackEm,data);
    }
    public WsBean setData(Object data){
        this.put("data",data);
        return this;
    }
    public WsBean set(String key,Object value){
        this.put(key,value);
        return this;
    }
    public String toJson(){
        return JSON.toJSONString(this);
    }

}
netty通道:


import cn.hutool.core.map.BiMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 这个类用来处理用户和连接的关联关系
 */
@Slf4j
@Component
public class NioWebSocketChannelPool {
    /**
     * 用户保持连接
     */
    private final DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    /**
     * 保持连接用户对应的长连接id,此为双向绑定
     */
    private final BiMap<String, ChannelId> bindUserMap = new BiMap<>(new ConcurrentHashMap<>());
    /**
     * 新增一个客户端通道
     */
    public void addChannel(Channel channel){
        channels.add(channel);
    }
    /**
     * 移除一个客户端通道
     * @param channel
     */
    public void removeChannel(Channel channel){
        String mapKey = bindUserMap.getKey(channel.id());
        if (mapKey != null){
            bindUserMap.remove(mapKey);
        }
        channels.remove(channel);
    }
    /**
     * 绑定用户
     */
    public void bindUser(String userId,Channel channel){
        bindUserMap.put(userId,channel.id());
    }
    /**
     * 向用户推送消息
     */
    public void sendToUser(String userId,WsBean data){
        ChannelId channelId = bindUserMap.get(userId);
        if (channelId != null){
            channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));
        }
    }
    public void sendToUser(String userId,MsgBody data){
        ChannelId channelId = bindUserMap.get(userId);
        if (channelId != null){
            channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));
        }
    }
    public BiMap<String,ChannelId> getBindUserMap(){
        return bindUserMap;
    }
    /**
     * 群发推送消息
     */
    public void writeAndFlush(WsBean data){
        Set<String> onlineIds = getBindUserMap().keySet();
        onlineIds.forEach(userId->{
            ChannelId channelId = bindUserMap.get(userId);
            if (channelId != null){
                channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));
            }
        });
    }


}
netty处理类:

import cn.hutool.core.util.StrUtil;
import com.xx.framework.config.WsConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * 处理端
 */
@Slf4j
@ChannelHandler.Sharable
@Component
public class NioWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    @Resource
    private WsConfig wsConfig;
    @Resource
    private NioWebSocketChannelPool webSocketChannelPool;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端连接:{}",ctx.channel().id());
        webSocketChannelPool.addChannel(ctx.channel());
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端端口连接:{}",ctx.channel().id());
        webSocketChannelPool.removeChannel(ctx.channel());
        super.channelInactive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().flush();
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端请求数据类型:{}",msg.getClass());
        if (msg instanceof FullHttpRequest){
            fullHttpRequestHandler(ctx,(FullHttpRequest)msg);
        }
        super.channelRead(ctx, msg);
    }

    /**
     * 处理连接请求,客户端websockt发送握手包时会执行第一次请求
     * @param ctx
     * @param request
     */
    private void fullHttpRequestHandler(ChannelHandlerContext ctx, FullHttpRequest request) {
        String uri = request.getUri();
        Map<String,String> params = getParams(uri);
        log.info("客户端请求参数:{}",params);
        /**
         * 判断请求路径是否跟配置中的一致
         */
        if (wsConfig.getUrl().equals(getBasePath(uri))){
            /**
             * 因为有可能携带了参数,导致客户端一致无法返回握手包,因此在校验通过后,重置请求路径
             */
            request.setUri(wsConfig.getUrl());
        }else{
            ctx.close();
        }
        String userId = params.get("user_id");
        if (StrUtil.isBlank(userId)){
            log.info("用户ID为空,无法登录");
            return;
        }
        webSocketChannelPool.bindUser(userId,ctx.channel());
    }

    /**
     * 获取URI中参数以外部分路径
     * @param uri
     * @return
     */
    private String getBasePath(String uri) {
        if (uri == null || uri.isEmpty()){
            return null;
        }
        int idx = uri.indexOf("?");
        if (idx  == -1){
            return uri;
        }
        return uri.substring(0,idx);
    }

    /**
     * 请路径参数转换成Map对象,如果路径参数出现重复参数名,将以最后的参数值为准
     * @param uri
     * @return
     */
    private Map<String, String> getParams(String uri) {
        Map<String,String> params = new HashMap<>(10);
        int idx = uri.indexOf("?");
        if (idx != -1){
            String[] paramsArr = uri.substring(idx+1).split("&");
            for (String param:paramsArr){
                idx = param.indexOf("=");
                params.put(param.substring(0,idx),param.substring(idx+1));
            }
        }
        return params;
    }

    /**
     * 客户端发送断开请求处理
     */
    private void closeWebSocketFrameHandler(ChannelHandlerContext ctx, CloseWebSocketFrame frame){
                ctx.close();
    }

    /**
     * 创建连接之后,客户端发送的消息都会在这里处理
     */
    private void textWebSocketFrameHandler(ChannelHandlerContext ctx, TextWebSocketFrame frame){
        //客户端发送过来的内容不进行业务处理,原样返回,一般不做处理
//        log.info("收到客户端信息-channelId: {}, 消息内容: {}", ctx.channel().id(), frame.text());
    }

    /**
     * 处理客户端心跳包
     */
    private void  pingWebSocketFrameHandler(ChannelHandlerContext ctx, PingWebSocketFrame frame){
        ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {
        //根据请求数量类型进厂分发处理
        if (webSocketFrame instanceof PingWebSocketFrame){
            PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) webSocketFrame;
            pingWebSocketFrameHandler(channelHandlerContext,pingWebSocketFrame);
        }else if (webSocketFrame instanceof TextWebSocketFrame){
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;
            textWebSocketFrameHandler(channelHandlerContext,textWebSocketFrame);
        }else if (webSocketFrame instanceof CloseWebSocketFrame){
            CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) webSocketFrame;
            closeWebSocketFrameHandler(channelHandlerContext,closeWebSocketFrame);
        }
    }
}
netty服务类:
package com.xx.framework.ws;

import com.xx.framework.config.WsConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 服务端
 */
@Slf4j
@Component
public class NioWebSocketServer implements InitializingBean , DisposableBean, Ordered {
    @Resource
    private WsConfig wsConfig;
    @Resource
    private NioWebSocketHandler nioWebSocketHandler;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;
    private ChannelFuture channelFuture;

    @Override
    public void destroy() throws Exception {
        log.info("shutting down netty server....");
        if (bossGroup != null){
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null){
            workGroup.shutdownGracefully().sync();
        }
        if (channelFuture != null){
            channelFuture.channel().closeFuture().syncUninterruptibly();
        }
        log.info("netty server shutdown");
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        try{
            bossGroup = new NioEventLoopGroup();
            workGroup = new NioEventLoopGroup();

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.option(ChannelOption.SO_BACKLOG,1024)
            .group(bossGroup,workGroup)
            .channel(NioServerSocketChannel.class)
            .localAddress(wsConfig.getPort())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(new HttpServerCodec())
                            .addLast(new ChunkedWriteHandler())
                            .addLast(new HttpObjectAggregator(8192))
                            .addLast(nioWebSocketHandler)
                            .addLast(new WebSocketServerProtocolHandler(wsConfig.getUrl(),null,
                                    true));
                }
            });
            channelFuture = serverBootstrap.bind().sync();
        }finally {
            if (channelFuture != null && channelFuture.isSuccess()){
                log.info("netty server startup on port:{} (websockt)with context path '{}'",
                        wsConfig.getPort(),"/");
            }else{
                log.info("netty server startup failed");
                if (bossGroup != null){
                    bossGroup.shutdownGracefully().sync();
                }
                if (workGroup != null){
                    workGroup.shutdownGracefully().sync();
                }
            }
        }

    }

    @Override
    public int getOrder() {
        return 0;
    }
}
以上代码serviceImpl应用:
/**
     * 新增聊天记录
     *
     * @param chatRecord 聊天记录
     * @return 结果
     */
    @Override
    public int insertChatRecord(ChatRecord chatRecord) {
        MsgBody msgBody = new MsgBody();
        if (StrUtil.isNotBlank(chatRecord.getMsgType())){
            if (chatRecord.getMsgType().equals(MsgTypeEnum.TEXT.getMsgType())){
                msgBody.setMsgType(MsgBody.MsgType.text);
            }else if (chatRecord.getMsgType().equals(MsgTypeEnum.IMG.getMsgType())){
                msgBody.setMsgType(MsgBody.MsgType.img);
            }
        }
        msgBody.setMsgContent(chatRecord.getMsgContent());
        chatRecord.setMsgContent(JSON.toJSONString(msgBody));
        chatRecord.setId(IdUtils.getLongId());
        chatRecord.setCreateTime(DateUtils.getNowDate());
        int insert = chatRecordMapper.insertChatRecord(chatRecord);
        if(insert > 0){
            msgBody.setType(MsgBody.Type.self);
            //通知浏览器用户            webSocketChannelPool.sendToUser(chatRecord.getSendUserId().toString(),msgBody);
            msgBody.setType(MsgBody.Type.other);
            webSocketChannelPool.sendToUser(chatRecord.getReceiveUserId().toString(),msgBody);
        }
        return insert;
    }

文章来源地址https://www.toymoban.com/news/detail-678318.html

到了这里,关于netty与websockt实现聊天的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot+Netty+Vue+Websocket实现在线推送/聊天系统

    ok,那么今天的话也是带来这个非常常用的一个技术,那就是咱们完成nutty的一个应用,今天的话,我会介绍地很详细,这样的话,拿到这个博文的代码就基本上可以按照自己的想法去构建自己的一个在线应用了。比如聊天,在线消息推送之类的。其实一开始我原来的想法做在

    2024年02月03日
    浏览(39)
  • 【 基于Netty实现聊天室聊天业务学习】第4节.什么是BIO与NIO

    IO在读写的时候是阻塞的,无法做其他操作,并发处理能力的非常低,线程之间访问资源通信时候也是非常耗时久,依赖我们的网速,带宽。 我们看一下他的白话原理 我们来看一下这张图那么这张图的话它里面有一个server还有三个客户端那么客户端的话它可以有很多,那么我

    2024年04月26日
    浏览(49)
  • 基于Springboot+WebSocket+Netty实现在线聊天、群聊系统

    此文主要实现在好友添加、建群、聊天对话、群聊功能,使用Java作为后端语言进行支持,界面友好,开发简单。 2.1、下载安装IntelliJ IDEA(后端语言开发工具),Mysql数据库,微信Web开发者工具。 1.创建maven project 先创建一个名为SpringBootDemo的项目,选择【New Project】 然后在弹出

    2024年02月14日
    浏览(41)
  • 基于netty+springCloudGateWay+nacos 实现的聊天室(可支持集群)

    1.基于需要实现一个IM 就记录一下 。 1.将websocket 注册到 nacos 服务上 也是IM 注册到nacos 上  借助rabbitmq 实现集群以及redis 实现记录离线消息具体看代码文字懒得写(里面自己还尝试换了 zookeeper 作为注册中心 仅供学习)IM 和 网关的 目录: 整个逻辑是启动服务将websocket,网关

    2023年04月08日
    浏览(75)
  • Spring boot 项目(二十三)——用 Netty+Websocket实现聊天室

    Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程,但是你仍然可以使用底层的 API。 Netty 的内部实现是很复杂的,但是 Netty 提供了简单易用的API从网络处理代码中解耦业务逻辑。

    2023年04月15日
    浏览(56)
  • 如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?

    本文将详细介绍如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能。 WebSocket是一种基于TCP的协议,它允许客户端和服务器之间进行双向通信,而不需要像HTTP那样进行请求和响应。Netty是一个Java网络编程框架,它提供了强大的异步事件驱动网络编程能

    2024年02月16日
    浏览(42)
  • 【设计模式——学习笔记】23种设计模式——状态模式State(原理讲解+应用场景介绍+案例介绍+Java代码实现)

    请编写程序完成APP抽奖活动具体要求如下: 假如每参加一次这个活动要扣除用户50积分,中奖概率是10% 奖品数量固定,抽完就不能抽奖 活动有四个状态: 可以抽奖、不能抽奖、发放奖品和奖品领完,活动的四个状态转换关系图如下 一开始的状态为“不能抽奖”,当扣除50积分

    2024年02月12日
    浏览(45)
  • SpringBoot整合websockt实现消息对话

    WebSocket是一种在Web应用程序中实现实时双向通信的技术。Web应用程序通常是基于HTTP协议的,HTTP是一种请求/响应式的协议,客户端发起请求,服务器响应请求并发送响应,客户端收到响应后关闭连接。这意味着,如果客户端需要不断地从服务器获取更新,它必须定期发送请求

    2024年02月09日
    浏览(37)
  • Netty简易聊天室

    通过一个简易的聊天室案例,讲述Netty的基本使用。同时分享案例代码。 项目中用到了log4j2,junit5,同时分享这些基础组件的使用。 项目中用到了awt,属于古董技术,只是用来做界面。非重点不用关注。 开发工具:idea2023,jdk:1.8,Maven:3.6.3 maven依赖 日志配置 src/main/resou

    2024年02月11日
    浏览(48)
  • Java设计模式-状态模式

    在软件开发领域,设计模式是一组经过验证的、被广泛接受的解决问题的方案。其中之一是状态模式,它提供了一种优雅的方式来管理对象的不同状态。 状态模式是一种行为型设计模式,它允许对象在内部状态发生改变时改变其行为。状态模式将对象的行为封装在不同的状态

    2024年02月06日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包