netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

这篇具有很好参考价值的文章主要介绍了netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

自己想法和实现,如果有说错的或者有更好的简单的实现方式可以私信交流一下(主要是实现握手时鉴权)

需求实现

  1. 握手鉴权是基于前台请求头 Sec-WebSocket-Protocol的
  2. 本身socket并没有提供自定义请求头,只能自定义 Sec-WebSocket-Protocol的自协议

问题描述

socket握手请求是基于http的,握手成功后会升级为ws

前台传输了 token作为Sec-WebSocket-Protocol的值,后台接收到后总是断开连接,后来网上看了很多博客说的都是大同小异,然后就看了他的源码一步步走的(倔脾气哈哈),终于我看到了端倪,这个问题是因为前后台的Sec-WebSocket-Protocol值不一致,所以会断开,但是我记得websocket好像是不用自己设置请求头的,但是netty我看了源码,好像没有预留设置websocket的response的响应头(这只是我的个人理解)

具体实现

yaml

netty:
  websocket:
    enable: true
    port: 8483
    websocket-path: /ws
    sub-protocols: websocket
    allow-extensions: true
    max-frame-size: 655360
    reader-idle-time: 3600
    # 基于协议头方式获取token,参数方式token过长不能传输
    token-header: Sec-WebSocket-Protocol #${token.header}
    # 开启token鉴权
    has-token: true

NettyWebSocketProperties

@ConfigurationProperties(prefix = "netty.websocket")
@Configuration
@Validated
@Data
public class NettyWebSocketProperties implements Serializable {

        private boolean enable = false;

        private int port;

        private  String websocketPath = "/ws";

        private  String subProtocols = "websocket";

        private  Boolean allowExtensions = true;

        private  Integer maxFrameSize = 65536 * 10;

        private  Integer readerIdleTime = 5;

        private  String tokenHeader = "Sec-WebSocket-Protocol";

        private  Boolean hasToken = false;


}

CustomWebSocketProtocolHandler

解释: 自定义替换WebSocketProtocolHandler,复制WebSocketProtocolHandler的内容即可,因为主要是WebSocketServerProtocolHandler自定义会用到

abstract class CustomWebSocketProtocolHandler extends MessageToMessageDecoder<WebSocketFrame> {
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
        if (frame instanceof PingWebSocketFrame) {
            frame.content().retain();
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
            return;
        }
        if (frame instanceof PongWebSocketFrame) {
            // Pong frames need to get ignored
            return;
        }

        out.add(frame.retain());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
        ctx.close();
    }
}

CustomWebSocketServerProtocolHandler

解释: 自定义WebSocketServerProtocolHandler,实现上面自定义的WebSocketProtocolHandler,具体内容和WebSocketServerProtocolHandler保持一致,只需要将handlerAdded中的类ProtocolHandler改为自己定义的即可
注意:后面监听读写的自定义业务的handler需要实现相应的方法:异常或者事件监听,因为比如异常,如果抛出异常了,是不会有控制器去管的,因为当前的业务控制器就是最后一层,因为上面已经把默认实现改成了自己的实现(其他的控制器都是基于默认handler实现的,如果改了后,去初始化自己改后的handler那便是最后一层),所以要手动去关闭

ublic class CustomWebSocketServerProtocolHandler extends CustomWebSocketProtocolHandler {

    /**
     * Events that are fired to notify about handshake status
     */
    public enum ServerHandshakeStateEvent {
        /**
         * The Handshake was completed successfully and the channel was upgraded to websockets.
         *
         * @deprecated in favor of {@link WebSocketServerProtocolHandler.HandshakeComplete} class,
         * it provides extra information about the handshake
         */
        @Deprecated
        HANDSHAKE_COMPLETE
    }

    /**
     * The Handshake was completed successfully and the channel was upgraded to websockets.
     */
    public static final class HandshakeComplete {
        private final String requestUri;
        private final HttpHeaders requestHeaders;
        private final String selectedSubprotocol;

       public HandshakeComplete(String requestUri, HttpHeaders requestHeaders, String selectedSubprotocol) {
            this.requestUri = requestUri;
            this.requestHeaders = requestHeaders;
            this.selectedSubprotocol = selectedSubprotocol;
        }

        public String requestUri() {
            return requestUri;
        }

        public HttpHeaders requestHeaders() {
            return requestHeaders;
        }

        public String selectedSubprotocol() {
            return selectedSubprotocol;
        }
    }

    private static final AttributeKey<WebSocketServerHandshaker> HANDSHAKER_ATTR_KEY =
            AttributeKey.valueOf(WebSocketServerHandshaker.class, "HANDSHAKER");

    private final String websocketPath;
    private final String subprotocols;
    private final boolean allowExtensions;
    private final int maxFramePayloadLength;
    private final boolean allowMaskMismatch;
    private final boolean checkStartsWith;

    public CustomWebSocketServerProtocolHandler(String websocketPath) {
        this(websocketPath, null, false);
    }

    public CustomWebSocketServerProtocolHandler(String websocketPath, boolean checkStartsWith) {
        this(websocketPath, null, false, 65536, false, checkStartsWith);
    }

    public CustomWebSocketServerProtocolHandler(String websocketPath, String subprotocols) {
        this(websocketPath, subprotocols, false);
    }

    public CustomWebSocketServerProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions) {
        this(websocketPath, subprotocols, allowExtensions, 65536);
    }

    public CustomWebSocketServerProtocolHandler(String websocketPath, String subprotocols,
                                          boolean allowExtensions, int maxFrameSize) {
        this(websocketPath, subprotocols, allowExtensions, maxFrameSize, false);
    }

    public CustomWebSocketServerProtocolHandler(String websocketPath, String subprotocols,
                                          boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch) {
        this(websocketPath, subprotocols, allowExtensions, maxFrameSize, allowMaskMismatch, false);
    }

    public CustomWebSocketServerProtocolHandler(String websocketPath, String subprotocols,
                                          boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch, boolean checkStartsWith) {
        this.websocketPath = websocketPath;
        this.subprotocols = subprotocols;
        this.allowExtensions = allowExtensions;
        maxFramePayloadLength = maxFrameSize;
        this.allowMaskMismatch = allowMaskMismatch;
        this.checkStartsWith = checkStartsWith;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        ChannelPipeline cp = ctx.pipeline();
        if (cp.get(CustomWebSocketServerProtocolHandler.class) == null) {
            // Add the WebSocketHandshakeHandler before this one.
            ctx.pipeline().addBefore(ctx.name(), CustomWebSocketServerProtocolHandler.class.getName(),
                    new CustomWebSocketServerProtocolHandler(websocketPath, subprotocols,
                            allowExtensions, maxFramePayloadLength, allowMaskMismatch, checkStartsWith));
        }
        if (cp.get(Utf8FrameValidator.class) == null) {
            // Add the UFT8 checking before this one.
            ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
                    new Utf8FrameValidator());
        }
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
        if (frame instanceof CloseWebSocketFrame) {
            WebSocketServerHandshaker handshaker = getHandshaker(ctx.channel());
            if (handshaker != null) {
                frame.retain();
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
            } else {
                ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            }
            return;
        }
        super.decode(ctx, frame, out);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof WebSocketHandshakeException) {
            FullHttpResponse response = new DefaultFullHttpResponse(
                    HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(cause.getMessage().getBytes()));
            ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        } else {
            ctx.fireExceptionCaught(cause);
            ctx.close();
        }
    }

    static WebSocketServerHandshaker getHandshaker(Channel channel) {
        return channel.attr(HANDSHAKER_ATTR_KEY).get();
    }

    public static void setHandshaker(Channel channel, WebSocketServerHandshaker handshaker) {
        channel.attr(HANDSHAKER_ATTR_KEY).set(handshaker);
    }

    public static ChannelHandler forbiddenHttpRequestResponder() {
        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof FullHttpRequest) {
                    ((FullHttpRequest) msg).release();
                    FullHttpResponse response =
                            new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN);
                    ctx.channel().writeAndFlush(response);
                } else {
                    ctx.fireChannelRead(msg);
                }
            }
        };
    }
}

SecurityServerHandler

用SecurityServerHandler自定义的入站控制器替换原有默认的控制器WebSocketServerProtocolHandshakeHandler
这一步最关键了,因为在这一步就要将头设置进去,前面两步只是为这一步做铺垫,因为netty包中的类不能外部引用也没有提供修改方法,所以才有了上面的自定义类,此类中需要调整握手逻辑,添加握手响应头,然后将WebSocketServerProtocolHandler改为CustomWebSocketServerProtocolHandler,其他的实现类也是一样的去改

public class SecurityServerHandler extends ChannelInboundHandlerAdapter {

    private final String websocketPath;
    private final String subprotocols;
    private final boolean allowExtensions;
    private final int maxFramePayloadSize;
    private final boolean allowMaskMismatch;
    private final boolean checkStartsWith;
	
	  /**
     * 自定义属性 token头key
     */
    private final String tokenHeader;
	/**
     * 自定义属性 token
     */
    private final boolean hasToken;


    public SecurityServerHandler(String websocketPath, String subprotocols,
                                 boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch, String tokenHeader, boolean hasToken) {
        this(websocketPath, subprotocols, allowExtensions, maxFrameSize, allowMaskMismatch, false,tokenHeader,hasToken);
    }

    SecurityServerHandler(String websocketPath, String subprotocols,
                                            boolean allowExtensions, int maxFrameSize,
                          boolean allowMaskMismatch,
                          boolean checkStartsWith,
                          String tokenHeader,
                          boolean hasToken) {
        this.websocketPath = websocketPath;
        this.subprotocols = subprotocols;
        this.allowExtensions = allowExtensions;
        maxFramePayloadSize = maxFrameSize;
        this.allowMaskMismatch = allowMaskMismatch;
        this.checkStartsWith = checkStartsWith;
        this.tokenHeader = tokenHeader;
        this.hasToken = hasToken;
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        final FullHttpRequest req = (FullHttpRequest) msg;
        if (isNotWebSocketPath(req)) {
            ctx.fireChannelRead(msg);
            return;
        }
        try {
        	// 具体的鉴权逻辑
            HttpHeaders headers = req.headers();
            String token = Objects.requireNonNull(headers.get(tokenHeader));
            if(hasToken){
                // 开启鉴权 认证
                //extracts device information headers
                LoginUser loginUser = SecurityUtils.getLoginUser(token);
                if(null == loginUser){
                    refuseChannel(ctx);
                    return;
                }
                Long userId = loginUser.getUserId();
                //check ......
                SecurityCheckComplete complete = new SecurityCheckComplete(String.valueOf(userId),tokenHeader,hasToken);
                ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).set(complete);
                ctx.fireUserEventTriggered(complete);
            }else {
                // 不开启鉴权 / 认证
                SecurityCheckComplete complete = new SecurityCheckComplete(null,tokenHeader,hasToken);
                ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).set(complete);
            }
            if (req.method() != GET) {
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
                return;
            }
            final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    getWebSocketLocation(ctx.pipeline(), req, websocketPath), subprotocols,
                    allowExtensions, maxFramePayloadSize, allowExtensions);
            final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
            	// 此处将具体的头加入http中,因为这个头会传递个netty底层设置响应头的方法中,默认实现是传的null
                HttpHeaders httpHeaders = new DefaultHttpHeaders().add(tokenHeader,token);
                // 此处便是构造握手相应头的关键步骤
                final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req,httpHeaders,ctx.channel().newPromise());
                handshakeFuture.addListener((ChannelFutureListener) future -> {
                    if (!future.isSuccess()) {
                        ctx.fireExceptionCaught(future.cause());
                    } else {
                        // Kept for compatibility
                        ctx.fireUserEventTriggered(
                                CustomWebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
                        ctx.fireUserEventTriggered(
                                new CustomWebSocketServerProtocolHandler.HandshakeComplete(
                                        req.uri(), req.headers(), handshaker.selectedSubprotocol()));
                    }
                });
                CustomWebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);
                ctx.pipeline().replace(this, "WS403Responder",
                        CustomWebSocketServerProtocolHandler.forbiddenHttpRequestResponder());
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            req.release();
        }
    }

    public static final class HandshakeComplete {
        private final String requestUri;
        private final HttpHeaders requestHeaders;
        private final String selectedSubprotocol;

        HandshakeComplete(String requestUri, HttpHeaders requestHeaders, String selectedSubprotocol) {
            this.requestUri = requestUri;
            this.requestHeaders = requestHeaders;
            this.selectedSubprotocol = selectedSubprotocol;
        }

        public String requestUri() {
            return requestUri;
        }

        public HttpHeaders requestHeaders() {
            return requestHeaders;
        }

        public String selectedSubprotocol() {
            return selectedSubprotocol;
        }
    }



    private boolean isNotWebSocketPath(FullHttpRequest req) {
        return checkStartsWith ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
    }


    private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) {
        String protocol = "ws";
        if (cp.get(SslHandler.class) != null) {
            // SSL in use so use Secure WebSockets
            protocol = "wss";
        }
        String host = req.headers().get(HttpHeaderNames.HOST);
        return protocol + "://" + host + path;
    }

    private void refuseChannel(ChannelHandlerContext ctx) {
        ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED));
        ctx.channel().close();
    }

    private static void send100Continue(ChannelHandlerContext ctx,String tokenHeader,String token) {
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        response.headers().set(tokenHeader,token);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("channel 捕获到异常了,关闭了");
        super.exceptionCaught(ctx, cause);
    }
    @Getter
    @AllArgsConstructor
    public static final class SecurityCheckComplete {

        private String userId;

        private String tokenHeader;

        private Boolean hasToken;

    }
}

initChannel方法去初始化自己的实现类

其他的类需要自己实现或者引用,其他的就是无关紧要的,不用去处理的类


 @Override
    protected void initChannel(SocketChannel ch){
        log.info("有新的连接");
        //获取工人所要做的工程(管道器==管道器对应的便是管道channel)
        ChannelPipeline pipeline = ch.pipeline();
        //为工人的工程按顺序添加工序/材料 (为管道器设置对应的handler也就是控制器)
        //1.设置心跳机制
        pipeline.addLast("idle-state",new IdleStateHandler(
                nettyWebSocketProperties.getReaderIdleTime(),
                0,
                0,
                TimeUnit.SECONDS));
        //2.出入站时的控制器,大部分用于针对心跳机制
        pipeline.addLast("change-duple",new WsChannelDupleHandler(nettyWebSocketProperties.getReaderIdleTime()));
        //3.加解码
        pipeline.addLast("http-codec",new HttpServerCodec());
        //3.打印控制器,为工人提供明显可见的操作结果的样式
        pipeline.addLast("logging", new LoggingHandler(LogLevel.INFO));
        pipeline.addLast("aggregator",new HttpObjectAggregator(8192));
        // 将自己的授权handler替换原有的handler
        pipeline.addLast("auth",new SecurityServerHandler(
        		// 此处我是用的yaml配置的,换成自己的即可
                nettyWebSocketProperties.getWebsocketPath(),
                nettyWebSocketProperties.getSubProtocols(),
                nettyWebSocketProperties.getAllowExtensions(),
                nettyWebSocketProperties.getMaxFrameSize(),
                //todo
                false,
                nettyWebSocketProperties.getTokenHeader(),
                nettyWebSocketProperties.getHasToken()
        ));
        pipeline.addLast("http-chunked",new ChunkedWriteHandler());
        // 将自己的协议控制器替换原有的协议控制器
        pipeline.addLast("websocket",
                new CustomWebSocketServerProtocolHandler(
                nettyWebSocketProperties.getWebsocketPath(),
                nettyWebSocketProperties.getSubProtocols(),
                nettyWebSocketProperties.getAllowExtensions(),
                nettyWebSocketProperties.getMaxFrameSize())
        );
        //7.自定义的handler针对业务
        pipeline.addLast("chat-handler",new ChatHandler());
    }

附加:我自己的业务实现类(chatHandler)及相应工具类

chatHandler

/**
 * @author qb
 * @version 1.0
 * @since 2023/3/7 11:56
 */
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 连接时
     * @param ctx 上下文
     * @throws Exception /
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("与客户端建立连接,通道开启!");
        // 添加到channelGroup通道组
        ChannelHandlerPool.pool().addChannel(ctx.channel());
    }

    /**
     * 断开连接时
     * @param ctx 上下文
     * @throws Exception /
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("与客户端断开连接,通道关闭!");
        // 从channelGroup通道组移除
        ChannelHandlerPool.pool().removeChannel(ctx.channel());
        String useridQuit = ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).get().getUserId();
        ChannelHandlerPool.pool().removeChannelId(useridQuit);
        log.info("断开的用户id为:{}",useridQuit);
    }

    /**
     * 获取消息时
     * @param ctx 上下文
     * @param msg 消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
        log.info("msg.text():{}",msg.text());
        Message message = JSON.parseObject(msg.text(),Message.class);
        if("0".equals(message.getType())){
            log.info("消息接收的类型是绑定channel,userId:{}",message.getFromUserId());
            Boolean hasToken = ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).get().getHasToken();
            log.info("hasToken: {}",hasToken);
            if (!hasToken){
                log.info("binding channel...");
                // 没有鉴权就使用消息方式绑定channel
                binding(ctx,message);
            }
        }else{
            if(StringUtils.isNotBlank(message.getToUserId())){
                //私聊
                sendMsg(message);
            }else{
                // 发送给除了自己的其他人
                sendOther(ctx,message);
            }
        }
    }

    /**
     * 添加channel 回调方法
     * @param ctx /
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        //打印出channel唯一值,asLongText方法是channel的id的全名
        log.info("handlerAdded :{}",ctx.channel().id().asLongText());
    }

    /**
     * 删除channel 回调方法
     * @param ctx /
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        log.info("handlerRemoved :{}",ctx.channel().id().asLongText());
    }

    /**
     * 时间监听器
     * @param ctx       /
     * @param evt       /
     * @throws Exception    /
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof SecurityServerHandler.SecurityCheckComplete){
            log.info("Security check has passed");
            // 鉴权成功后的逻辑 暂不添加
        }
        else if (evt instanceof CustomWebSocketServerProtocolHandler.HandshakeComplete) {
            log.info("Handshake has completed");
            SecurityServerHandler.SecurityCheckComplete securityCheckComplete = ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).get();
            Boolean hasToken = securityCheckComplete.getHasToken();
            log.info("Handshake has completed after check hasToken:{}",hasToken);
            // 握手成功后的逻辑  如果鉴权了就绑定channel
            if(hasToken){
                log.info("Handshake has completed after binding channel");
                binding(ctx,securityCheckComplete.getUserId());
            }

        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("exceptionCaught 异常:{}",cause.getMessage());
        cause.printStackTrace();
        Channel channel = ctx.channel();
        //……
        if(channel.isActive()){
            log.info("手动关闭通道");
            ctx.close();
        };
    }

    /**
     * 群发所有人
     * @param message 消息
     */
    private void sendAllMessage(Message message){
        //收到信息后,群发给所有channel
        ChannelHandlerPool.pool().writeAndFlush(message);
    }

    /**
     * 发送消息
     * @param ctx       上下文
     * @param message   消息
     */
    private void sendMsg(ChannelHandlerContext ctx, Message message){
        //给自己发自己的消息
        ChannelHandlerPool.pool().writeAndFlush(ctx.channel().id(),message);
    }

    /**
     * 绑定channel与userid
     * @param ctx       上下文
     * @param message   消息
     */
    public void binding(ChannelHandlerContext ctx,Message message){
        ChannelId channelId = ctx.channel().id();
        Channel channel = ChannelHandlerPool.pool().getChannel(channelId);
        // 查看是否存在当前channel,不存在便重新插入
        if(null == channel){
            ChannelHandlerPool.pool().addChannel(ctx.channel());
        }
        try {
            //绑定userid 与 channel
            ChannelHandlerPool.pool().putChannelId(message.getFromUserId(), channelId);
        }catch (Exception e){
            log.info("主动断开");
            e.printStackTrace();
            // 发生异常断开连接
            ctx.close();
        }
    }

    /**
     * 绑定channel与userid
     * @param ctx       上下文
     * @param userId   用户id
     */
    public void binding(ChannelHandlerContext ctx,String userId){
        ChannelId channelId = ctx.channel().id();
        Channel channel =  ChannelHandlerPool.pool().getChannel(channelId);
        // 查看是否存在当前channel,不存在便重新插入
        if(null == channel){
            ChannelHandlerPool.pool().addChannel(ctx.channel());
        }
        try {
            SecurityServerHandler.SecurityCheckComplete complete = ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).get();
            //绑定userid 与 channel
            ChannelHandlerPool.pool().putChannelId(complete.getHasToken() ? complete.getUserId() : userId, channelId);
        }catch (Exception e){
            log.info("主动断开");
            e.printStackTrace();
            // 发生异常断开连接
            ctx.close();
        }
    }

    public void sendMsg(Message message){
        //私聊
        ChannelId channelId = ChannelHandlerPool.pool().getChannelId(message.getToUserId());
        if(null == channelId){
            log.info("用户: {},已经下线!",message.getToUserId());
            //下线操作 存库
            return;
        }
        Channel channel = ChannelHandlerPool.pool().getChannel(channelId);
        if(null == channel){
            log.info("清除用户:{}在mapper存的channelId",message.getToUserId());
            //特殊下线两个静态变量值不对称处理
            ChannelHandlerPool.pool().removeChannelId(message.getFromUserId());
            return;
        }
        ChannelHandlerPool.pool().writeAndFlush(channel,message);
        log.info("channel中的userId:{}",channel.attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).get().getUserId());
    }

    /**
     * 发送给除了自己的其他人
     * @param ctx       上下文
     * @param message   消息
     */
    public void sendOther(ChannelHandlerContext ctx, Message message){
        for (Channel channel : ChannelHandlerPool.pool().getChannelGroup()) {
            //给除自己外的人发消息
            if(channel != ctx.channel()){
                log.info("发送消息:{}",message);
                channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(message)));
            }
        }
    }

AttributeKeyUtils

public class AttributeKeyUtils {

    /**
     * 为channel添加属性  将userid设置为属性,避免客户端特殊情况退出时获取不到userid
     */
    public static final AttributeKey<String> USER_ID = AttributeKey.valueOf("userid");

    public static final AttributeKey<SecurityServerHandler.SecurityCheckComplete> SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY =
            AttributeKey.valueOf("SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY");


}

ChannelHandlerPool

package com.edu.message.handler.pool;

import com.alibaba.fastjson2.JSONObject;
import com.edu.common.utils.spring.SpringUtils;
import com.edu.message.converter.MessageConverter;
import com.edu.message.domain.vo.Message;
import com.edu.message.service.IMessageSocketService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.edu.common.constant.ThreadPoolConstants.POOL_NAME;

/**
 * 保存用户及相关信息
 * @author qb
 * @version 1.0
 * @since 2023/3/7 11:53
 */
@Slf4j
public class ChannelHandlerPool {

    private final IMessageSocketService iSocketService = SpringUtils.getBean(IMessageSocketService.class);

    private final MessageConverter messageConverter = SpringUtils.getBean(MessageConverter.class);
    private ChannelHandlerPool(){}

    private ThreadPoolTaskExecutor executor = SpringUtils.getBean(POOL_NAME);

    private static final ChannelHandlerPool POOL = new ChannelHandlerPool();

    public static ChannelHandlerPool pool(){
        return POOL;
    }

    public void execute(Message message){
        executor.submit(() -> {
            String name = Thread.currentThread().getName();
            Long id = Thread.currentThread().getId();
            log.info("message:{},线程名称:{},id:{}",message,name,id);
            iSocketService.save(messageConverter.messageToMsgSocket(message));
        });
    }

    /**
     *
     * map: userId,ChannelId
     */
    private static final Map<String, ChannelId> CHANNEL_ID_MAP = new ConcurrentHashMap<>();

    /**
     * 管道组
     */
    private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public Map<String, ChannelId> getChannelIdMap(){
        return CHANNEL_ID_MAP;
    }

    public ChannelGroup getChannelGroup(){
        return CHANNEL_GROUP;
    }


    /**
     * 获取channelId
     * @param userId 用户id
     * @return  /
     */
    public ChannelId getChannelId(String userId){
        return CHANNEL_ID_MAP.get(userId);
    }


    /**
     * 获取channel
     * @param channelId /
     * @return /
     */
    public Channel getChannel(ChannelId channelId){
        return CHANNEL_GROUP.find(channelId);
    }

    public Channel getChannel(String userId){
        return CHANNEL_GROUP.find(getChannelId(userId));
    }

    public void removeChannelId(String userid){
        if(StringUtils.isNotBlank(userid)){
            CHANNEL_ID_MAP.remove(userid);
        }
    }

    public void removeChannel(Channel channel){
        CHANNEL_GROUP.remove(channel);
    }

    public void addChannel(Channel channel){
        CHANNEL_GROUP.add(channel);
    }

    /**
     * 群发
     * @param message 消息内容
     */
    public void writeAndFlush(Message message){
        saveBase(message);
        CHANNEL_GROUP.writeAndFlush( new TextWebSocketFrame(JSONObject.toJSONString(message)));
    }

    /**
     * 私发
     * @param channel   channel
     * @param message   内容
     */
    public void writeAndFlush(Channel channel,Message message){
        saveBase(message);
        channel.writeAndFlush( new TextWebSocketFrame(JSONObject.toJSONString(message)));
    }

    /**
     * 私发
     * @param channelId channelId
     * @param message   消息
     */
    public void writeAndFlush(ChannelId channelId,Message message){
        saveBase(message);
        findChannel(channelId).writeAndFlush( new TextWebSocketFrame(JSONObject.toJSONString(message)));
    }

    public Channel findChannel(ChannelId channelId){
        return CHANNEL_GROUP.find(channelId);
    }

    public void putChannelId(String userid,ChannelId channelId){
        CHANNEL_ID_MAP.put(userid,channelId);
    }

    private void saveBase(Message message){
        ChannelHandlerPool.pool().execute(message);
    }
}

效果截图

netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

源码跟踪

SecurityServerHandler 调整

调整为自定义请求头解析,但不去替换其他handler

package com.edu.message.handler.security;

import com.edu.common.utils.SecurityUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpHeaders;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;

import static com.edu.message.handler.attributeKey.AttributeKeyUtils.SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY;

/**
 * @author Administrator
 */
@Slf4j
public class SecurityServerHandler extends ChannelInboundHandlerAdapter {

    private String tokenHeader;

    private Boolean hasToken;

    public SecurityServerHandler(String tokenHeader,Boolean hasToken){
        this.tokenHeader = tokenHeader;
        this.hasToken = hasToken;
    }

    private SecurityServerHandler(){}

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof FullHttpMessage){
            FullHttpMessage httpMessage = (FullHttpMessage) msg;
            HttpHeaders headers = httpMessage.headers();
            String token = Objects.requireNonNull(headers.get(tokenHeader));
            if(hasToken){
                // 开启鉴权 认证
                //extracts device information headers
                Long userId = 12345L;//SecurityUtils.getLoginUser(token).getUserId();
                //check ......
                SecurityCheckComplete complete = new SecurityCheckComplete(String.valueOf(userId),tokenHeader,hasToken);
                ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).set(complete);
                ctx.fireUserEventTriggered(complete);
            }else {
                // 不开启鉴权 / 认证
                SecurityCheckComplete complete = new SecurityCheckComplete(null,tokenHeader,hasToken);
                ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).set(complete);
            }
        }
        //other protocols
        super.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("channel 捕获到异常了,关闭了");
        super.exceptionCaught(ctx, cause);
    }
    @Getter
    @AllArgsConstructor
    public static final class SecurityCheckComplete {

        private String userId;

        private String tokenHeader;

        private Boolean hasToken;

    }
}

initChannel方法调整

改为使用默认实现

@Override
    protected void initChannel(SocketChannel ch){
        log.info("有新的连接");
        //获取工人所要做的工程(管道器==管道器对应的便是管道channel)
        ChannelPipeline pipeline = ch.pipeline();
        //为工人的工程按顺序添加工序/材料 (为管道器设置对应的handler也就是控制器)
        //1.设置心跳机制
        pipeline.addLast("idle-state",new IdleStateHandler(
                nettyWebSocketProperties.getReaderIdleTime(),
                0,
                0,
                TimeUnit.SECONDS));
        //2.出入站时的控制器,大部分用于针对心跳机制
        pipeline.addLast("change-duple",new WsChannelDupleHandler(nettyWebSocketProperties.getReaderIdleTime()));
        //3.加解码
        pipeline.addLast("http-codec",new HttpServerCodec());
        //3.打印控制器,为工人提供明显可见的操作结果的样式
        pipeline.addLast("logging", new LoggingHandler(LogLevel.INFO));
        pipeline.addLast("aggregator",new HttpObjectAggregator(8192));
        pipeline.addLast("auth",new SecurityServerHandler(
                nettyWebSocketProperties.getTokenHeader(),
                nettyWebSocketProperties.getHasToken()
        ));
        pipeline.addLast("http-chunked",new ChunkedWriteHandler());
//        pipeline.addLast("websocket",
//                new CustomWebSocketServerProtocolHandler(
//                nettyWebSocketProperties.getWebsocketPath(),
//                nettyWebSocketProperties.getSubProtocols(),
//                nettyWebSocketProperties.getAllowExtensions(),
//                nettyWebSocketProperties.getMaxFrameSize())
//        );
        pipeline.addLast("websocket",
                new WebSocketServerProtocolHandler(
                nettyWebSocketProperties.getWebsocketPath(),
                nettyWebSocketProperties.getSubProtocols(),
                nettyWebSocketProperties.getAllowExtensions(),
                nettyWebSocketProperties.getMaxFrameSize())
        );
        //7.自定义的handler针对业务
        pipeline.addLast("chat-handler",new ChatHandler());
    }

启动项目–流程截图

断点截图

netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

1. SecurityServerHandler

第一步走到了自己定义的鉴权控制器(入站控制器),执行channelRead方法
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

2.userEventTriggered

自定义业务handler中的事件方法
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

3.WebSocketServerProtocolHandshakeHandler

此处便是走到了默认协议控制器的channelRead方法,需要注意handshaker.handshake(ctx.channel(), req) 这个方法,这是处理握手的方法,打个断点进去
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

4.WebSocketServerHandshaker

可以看到handshake 方法传的 HttpHeaders是null这里就是核心的握手逻辑可以看到并没有提供相应的头处理器
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

5. WebSocketServerHandshaker

newHandshakeResponse(req, responseHeaders) 就是构建响应结果,可以看到头是null
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)

6. 最后的封装返回

可以看到有回到了自定义handler的业务控制器 中的时间监听方法
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)
此时只要放行这一步便会在控制台打印出响应头,可以看出并没有设置我们自己的响应头,还是null
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)
最后统一返回,连接中断,自协议头不一致所导致
netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)文章来源地址https://www.toymoban.com/news/detail-445339.html

到了这里,关于netty-websocket 鉴权token及统一请求和响应头(鉴权控制器)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • WebSocket请求获取请求头Sec-WebSocket-Protocol,以及正确返回响应

    当WebSocket请求获取请求头Sec-WebSocket-Protocol不为空时,需要返回给前端相同的响应,所以就需要处理 解决:在 WebSocketConfig 中新增 modifyHandshake方法,用来获取请求头和为返回响应赋值   modifyHandshake方法 关键点:需要在websocketserver的@ServerEndpoint注解中加上configurator = WebSocketCon

    2024年02月15日
    浏览(46)
  • http 请求报文响应报文的格式以及Token cookie session 区别

    HTTP 请求报文和响应报文的格式如下: HTTP 请求报文格式: 方法 : 请求方法,例如 GET、POST、PUT、DELETE 等。 路径 : 请求的路径,表示需要访问的资源。 协议版本 : 使用的协议版本,通常是 HTTP/1.1 或 HTTP/2。 请求头部字段:包含了关于请求的附加信息,每个字段由字段名和对

    2024年02月16日
    浏览(54)
  • Selenium中如何抓取网络请求响应及WebSocket信息

    目录 获取Chrome性能日志 获取请求及响应信息 我们在使用Selenium测试Web或Electronjs/Cef框架应用时,有时候操作一个元素需要判断是否发送了请求以及请求的参数是否正确 我们可以通过,开启Chrome的性能日志来然后配合driver.get_log(\\\"performance\\\")来查看请求,然后对Network相关的日子

    2024年02月07日
    浏览(37)
  • Selenium自动化测试中如何抓取网络请求响应及WebSocket信息

    我们在使用Selenium测试Web或Electronjs/Cef框架应用时,有时候操作一个元素需要判断是否发送了请求以及请求的参数是否正确 我们可以通过,开启Chrome的性能日志来然后配合driver.get_log(\\\"performance\\\")来查看请求,然后对Network相关的日子进行过滤, 实现如下: 运行结果如下: 由于日

    2024年02月16日
    浏览(57)
  • vue报错:Uncaught SyntaxError: Unexpected token <;也就是前端的js请求响应数据是html格式的原因和解决方法

    “Uncaught SyntaxError: Unexpected token lt;” 错误通常出现在浏览器的开发者工具(console)中,它表示在解析 JavaScript 代码时遇到了意外的 字符。这个错误通常是由以下几种情况引起的: 代码中的 被错误地识别为 HTML 标签的开始:这通常发生在在引用外部 JavaScript 文件时,浏览器

    2024年02月07日
    浏览(47)
  • 前端刷新token,判断token是否过期(jwt鉴权)

    4.1 什么是 JWT JWT 是 Auth0 提出的通过 对 JSON 进行加密签名来实现授权验证的方案; 就是登录成功后将相关用户信息组成 JSON 对象,然后对这个对象进行某种方式的加密,返回给客户端; 客户端在下次请求时带上这个 Token; 服务端再收到请求时校验 token 合法性,其实也就是在

    2024年02月03日
    浏览(62)
  • Flask_实现token鉴权

    目录 1、安装依赖 2、实现代码 3、测试 源码等资料获取方法 流程图 请求接口不传token 请求接口传有效token 请求接口传失效token 各位想获取源码等教程资料的朋友请 点赞 + 评论 + 收藏 ,三连! 三连 之后我会在评论区挨个私信发给你们~

    2024年02月17日
    浏览(44)
  • 登录业务实现 - token登录鉴权

     登录业务实现: 登录成功/失败实现  -  pinia管理用户数据及数据持久化  -  不同登录状态的模板适配   -  请求拦截器携带token( 登录鉴权 )   -  退出登录实现  -  token失效(401响应拦截)   当 表单校验通过 时, 封装登录接口,调用登录接口 ,分别处理 登录成功和

    2024年02月07日
    浏览(42)
  • satoken+ gateway网关统一鉴权 初版

    本博客内容 参考了satoken官网实现,satoken官网地址: https://sa-token.cc/doc.html#/micro/gateway-auth jinyi-gateway 网关服务 jinyi-user-service 用户服务 2.1 jinyi-user-api 2.2 jinyi-user-client 2.3 jinyi-user-provider jinyi-common 通用服务,定义了一些统一返回类,全局常量(R等) 项目层级关系截图: 3.1jinyi-

    2023年04月20日
    浏览(38)
  • 接口测试中的Token鉴权(Postman中Token的获取和引用)

    (我的公众号“墨石测试攻略”,分享测试技能和实战项目,欢迎关注!) 【什么是Token鉴权?】 鉴权是指验证用户是否有权访问系统的行为。 Token 鉴权是其中一种鉴权方式,其他的鉴权方式还有HTTP Basic Authentication、session+cookie、OAuth Token是一个令牌,通俗地说就是“暗号”

    2024年01月15日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包