【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理

这篇具有很好参考价值的文章主要介绍了【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景

项目中需要建立客户端与服务端之间的长链接,首先就考虑用WebSocket,再来SpringBoot原来整合WebSocket方式并不高效,因此找到了netty-websocket-spring-boot-starter 这款脚手架,它能让我们在SpringBoot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单

二、netty-websocket-spring-boot-starter 入门介绍

2.1 核心注解

2.1.1 @ServerEndpoint

当ServerEndpointExporter类通过Spring配置进行声明并被使用,它将会去扫描带有@ServerEndpoint注解的类 被注解的类将被注册成为一个WebSocket端点 所有的配置项都在这个注解的属性中 ( 如:@ServerEndpoint(“/ws”) )

2.1.2 @OnOpen

当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders…

2.1.3 @OnClose

当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session

2.1.4 @OnError

当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable

2.1.5 @OnMessage

当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String

2.2 核心配置

属性 属性说明
path WebSocket的path,也可以用value来设置
host WebSocket的host,"0.0.0.0"即是所有本地地址
port WebSocket绑定端口号。如果为0,则使用随机端口(端口获取可见 多端点服务)
maxFramePayloadLength 最大允许帧载荷长度
allIdleTimeSeconds 与IdleStateHandler中的allIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler

三、实践netty-websocket-spring-boot-starter

3.1引入POM文件

主要添加包括以下依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
    <groupId>org.yeauty</groupId>
    <artifactId>netty-websocket-spring-boot-starter</artifactId>
    <version>0.9.5</version>
</dependency>

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.4.6</version>
</dependency>

3.2 在主程序类中排除数据库使用

/**
 * 主程序启动类
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class WebsocketApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebsocketApplication.class, args);
    }

}

3.3 开启WebSocket支持

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.4 定义WebSocketServer服务器(核心代码)

在端点类上加上@ServerEndpoint注解,并在相应的方法上加上@OnOpen、@OnMessage、@OnError、@OnClose注解, 代码如下:

@ServerEndpoint(port = "${ws.port}", path = "/demo/{version}", maxFramePayloadLength = "6553600", allIdleTimeSeconds = "300")
public class WebSocketServer {

    private static Log LOGGER = LogFactory.get();

    // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    // 接收用户ID
    protected StringBuilder userInfo = new StringBuilder();

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,
                       HttpHeaders headers,
                       @RequestParam String req,
                       @RequestParam MultiValueMap<String, Object> reqMap,
                       @PathVariable String arg,
                       @PathVariable Map<String, Object> pathMap) {
        this.session = session;
        // 加入set中
        webSocketSet.add(this);
        // 在线数加1
        addOnlineCount();
        LOGGER.debug("UserId = {}, 通道ID={}, 当前连接人数={}", userInfo.toString(), getSessionId(session), getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        JSONObject jsonData = JSONUtil.parseObj(message);
        if (!jsonData.containsKey("command")) {
            LOGGER.debug("UserId = {}, 通道ID={}, 上行内容={}, 上行请求非法,缺少command参数, 处理结束",
                    userInfo.toString(), getSessionId(session), message);
            return;
        }
        String userId = jsonData.getStr("userId");
        this.userInfo = new StringBuilder(userId);
        String command = jsonData.getStr("command");
        Class<?> service = Command.getService(command);
        if (Objects.isNull(service)) {
            errorMessage(command);
            LOGGER.error("UserId = {}, 通道ID={}, 解析指令执行出错!", userInfo.toString(), getSessionId(session));
            return;
        }
        LOGGER.info("UserId = {}, 通道ID={}, 处理类={}, 开始处理,请求内容={}",
                userInfo.toString(), getSessionId(session), service, jsonData.toString());
        BaseMessage baseMessage = getBaseMessage(service, session, command);
        if (baseMessage == null) {
            return;
        }
        try {
            jsonData.set("SessionId", getSessionId(session));
            JSON resp = baseMessage.handlerMessage(userInfo, jsonData);
            resp.putByPath("command", command);
            resp.putByPath("userId", userId);
            String value = resp.toString();
            //将结果写回客户端, 实现服务器主动推送
            ChannelFuture future = sendMessage(value);
            LOGGER.info("UserId = {}, 通道ID = {}, 返回内容 = {}, future = {}, 处理结束",
                    userInfo.toString(), getSessionId(session), value, future.toString());
        } catch (Exception e) {
            LOGGER.error("UserId = {}, 通道ID={}, 解析执行出错信息={}", userInfo.toString(), getSessionId(session), e.getMessage());
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        // 从set中删除
        webSocketSet.remove(this);
        // 在线数减1
        subOnlineCount();
        String userId = this.userInfo.toString();
        LOGGER.warn("UserId = {}, 通道ID = {}, 有一连接关闭!当前在线人数={}", userId, getSessionId(session), getOnlineCount());
        userInfo.delete(0, userInfo.length());
        if (ObjectUtil.isNotNull(userId)) {
            String keyStr = ConstDef.ONLINE_USER_TYPE + userId;
            redisTemplate.delete(keyStr);
        }
        session.close();
    }
    /**
     * 出错方法
     */
    @OnError
    public void onError(Session session, Throwable cause) {
        if (Objects.nonNull(this.session) && Objects.nonNull(cause) && !(cause instanceof EOFException)) {
            LOGGER.error("UserId = {}, 通道ID={}, 出错信息={}", userInfo.toString(), this.session.id(), cause.toString());
        }
        if (Objects.nonNull(session) && session.isOpen()) {
            session.close();
        }
    }
    /**
     * 通过class获取Bean
     */
    private BaseMessage getBaseMessage(Class<?> service, Session session, String command) {
        BaseMessage baseMessage;
        try {
            baseMessage = (BaseMessage) SpringUtils.getBean(service);
        } catch (Exception e) {
            LOGGER.error("UserId = {}, 通道ID = {}, 未找到协议头 = {} 的处理类", userInfo.toString(), getSessionId(session), service);
            errorMessage(command);
            return null;
        }
        return baseMessage;
    }
    /**
     * 获取通道ID
     */
    private String getSessionId(Session session) {
        return session.id().asShortText();
    }
    /**
     * 协议错误
     */
    public void errorMessage(String command) {
        JSONObject retObj = new JSONObject();
        retObj.set("code", ConstDef.ERROR_CODE_10001);
        retObj.set("msg", ConstDef.ERROR_CODE_10001_DESP);
        retObj.set("command", command);
        try {
            sendMessage(retObj.toString());
        } catch (IOException e) {
            LOGGER.error("UserId = {}, 通道ID={}, 解析执行出错信息={}", userInfo.toString(), getSessionId(session), e.getMessage());
        }
    }
    /**
     * 实现服务器主动推送
     */
    public ChannelFuture sendMessage(String message) throws IOException {
        return this.session.sendText(message);
    }
    /**
     * 在线用户数
     */
    public long getOnlineCount() {
        String onlineCountValue = redisTemplate.opsForValue().get(ConstDef.ONLINE_COUNT_KEY);
        if (StrUtil.isBlank(onlineCountValue) || !NumberUtil.isNumber(onlineCountValue)) {
            return 0L;
        }
        return Long.parseLong(onlineCountValue);
    }
    /**
     * 在线数+1
     */
    private void addOnlineCount() {
        redisTemplate.opsForValue().increment(ConstDef.ONLINE_COUNT_KEY);
    }
    /**
     * 在线数-1
     */
    private void subOnlineCount() {
        redisTemplate.opsForValue().decrement(ConstDef.ONLINE_COUNT_KEY);
    }
}

3.5 定义接口

/**
 * 消息处理接口
 */
public interface BaseMessage {
    Log LOGGER = LogFactory.get();
    /**
     * 处理类、处理方法
     */
    JSON handlerMessage(StringBuilder vin, JSONObject jsonData);
}

3.6 定义接口实现类 (业务处理逻辑)

该类是各业务的处理逻辑类,是接口类的具体实现。

@Component
@Configuration
public class QueryAllActivityListMessage implements BaseMessage {
    @Override
    public JSON handlerMessage(StringBuilder userId, JSONObject jsonData) {
        LOGGER.debug("开始处理QueryAllActivityListMessage请求, 参数={}", JSONUtil.toJsonStr(jsonData));
        String resp = "我是服务器端返回的处理结果!";
        LOGGER.info("UserId = {}, param={}, QueryAllActivityListMessage回复 = {}", userId.toString(), jsonData, resp);
        JSONObject respStr = new JSONObject();
        return respStr.set("handleResult", resp);
    }
}

3.7 定义枚举Command

每增加一个业务接口的实现,就需要在这个枚举类注册一下。

/**
 * 指令-服务 枚举
 */
public enum Command {
    /**
     * 业务1处理逻辑
     */
    queryAllActivityList("queryAllActivityList", QueryAllActivityListMessage.class, "业务1处理逻辑");
     /**
     * 业务2处理逻辑
     */
     //略
     /**
     * 业务3处理逻辑
     */
     //略
    /**
     * 服务编码
     */
    private String processCode;
    /**
     * 服务接口类
     */
    private Class<?> service;
    /**
     * 接口描述
     */
    private String desc;

    Command(String processCode, Class<?> service, String desc) {
        this.processCode = processCode;
        this.service = service;
        this.desc = desc;
    }

    public Class<?> getService() {
        return service;
    }

    public static Class<?> getService(String processCode) {
        for (Command command : Command.values()) {
            if (command.processCode.equals(processCode)) {
                return command.getService();
            }
        }
        return null;
    }
}

3.8 编写SpringUtils 工具类

用于搜索Bean, 通过class获取Bean

/**
 * SpringUtils 工具类,用于搜索
 */
@Component
public class SpringUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringUtils.applicationContext == null) {
            SpringUtils.applicationContext = applicationContext;
        }
    }

    /**
     * 获取applicationContext
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过class获取Bean
     */
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name获取 Bean.
     */
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     */
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}

3.9 定义常量定义类 + 返回码

/**
 * 常量定义类 + 返回码
 */
public class ConstDef {
    /**
     * 返回码
     */
    public static final int ERROR_CODE_10001 = 10001;
    public static final String ERROR_CODE_10001_DESP = "请求参数不合法";
    /**
     * 按UserId决定,用户在线类型,车机或者手机
     */
    public static final String ONLINE_USER_TYPE = "ONLINE_USER_TYPE_";
    /**
     * 在线用户数
     */
    public static final String ONLINE_COUNT_KEY = "ONLINE_COUNT_KEY";
}

四、功能验证

打开WebSocket客户端,连接到ws://127.0.0.1:9095/demo/1

从截图来看,WebSocket服务端能正常接受并处理来自客户端的请求,验证成功!

【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理,002 - 联网协议与网络通信编程,websocket,服务器,spring boot,postman,后端文章来源地址https://www.toymoban.com/news/detail-526719.html

到了这里,关于【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Springboot+WebSocket+Netty实现在线聊天、群聊系统

    此文主要实现在好友添加、建群、聊天对话、群聊功能,使用Java作为后端语言进行支持,界面友好,开发简单。 2.1、下载安装IntelliJ IDEA(后端语言开发工具),Mysql数据库,微信Web开发者工具。 1.创建maven project 先创建一个名为SpringBootDemo的项目,选择【New Project】 然后在弹出

    2024年02月14日
    浏览(33)
  • 基于netty框架不使用SSL证书,实现websocket数据加密传输

    1、简介 2、实现方式 3、服务端主要代码 4、客户端主要代码 5、调用方式 为什么不使用SSL证书? 1、服务器运行在专网环境,不能访问互联网。证书有有效期,CA机构规定,证书有效期最多2年。在客户的专网环境里更新和维护证书就会增加运营成本。 实现逻辑? 参照SSL的实

    2024年02月04日
    浏览(43)
  • 【Vue项目实战】之WebSocket消息监听

    前言 哈喽!CSDN! 很久以前有位好朋友就建议来CSDN做一些笔记,直到最近又被提醒了一次,这次终于想起来了, 好习惯还是需要一个好的开始 ╭(●`∀´●)╯,感谢这位好朋友让我拥有这个好习惯 ╭(′▽ )╭(′▽ )╯ 这位好朋友的博客链接如下: 点击进入 由于业务需求

    2024年01月17日
    浏览(26)
  • 【项目实战】如何使用Postman调用WebSocket程序

    项目中需要使用WebSocket进行通信,开发完了WebSocket接口,总得测试吧,以下是Postman调用WebSocket程序的方法。 最近都在用ApiFox做接口调用,但是目前版本的ApiFox(2.2.26)竟然不支持调用WebSocket程序。好吧,只能另外找一个支持调用的工具, 那就是强大的Postman啦。 Postman v8.5以

    2024年02月03日
    浏览(25)
  • Netty和Websocket的区别

    Netty 和 WebSocket 没有直接可比性,因为它们在网络编程环境中具有不同的用途。 Netty: Netty 是一个高性能、事件驱动的网络框架,用于用 Java 构建网络应用程序。 它提供了一组用于处理各种网络协议(例如 TCP 和 UDP)的工具和抽象。 Netty 通常用于构建需要低延迟、高吞吐量

    2024年01月22日
    浏览(28)
  • Netty 教程 – 实现WebSocket通讯

    WebSocket 协议是基于 TCP 的一种新的网络协议,它实现了浏览器与服务器 全双工(full-duplex)通信 ,允许 服务器主动发送信息给客户端 优点及作用 Http协议的弊端: Http协议为半双工协议。(半双工:同一时刻,数据只能在客户端和服务端一个方向上传输) Http协议冗长且繁琐 易

    2024年02月09日
    浏览(24)
  • netty整合websocket支持自签证书出现netty websocket ssl Received fatal alert: certificate_unknown

    win+r cmd 生成自己jks文件,指向自己要生成jks的文件位置下,我直接生成到项目resources下 2.生成证书 3.迁移到行业标志 成功生成证书 将jks文件考入项目resources下 yaml配置: netty证书加载 这里我就只上关键代码了 不添加信任netty websocket ssl Received fatal alert: certificate_unknown。 错误原

    2024年02月02日
    浏览(33)
  • netty对websocket协议的实现

    1. websocket协议 websocket协议是对http协议的扩充, 也是使用的TCP协议可以全双工通信的应用层协议。 websocket协议允许服务端向客户端推送消息。 浏览器和服务端只需要进行一次握手,不必像http协议一样,每次连接都要新建立连接,两者之间创建持久性的连接,并进行双向的数

    2024年01月20日
    浏览(30)
  • netty websocket uri 连接时 传参

    在URL后面加上参数 new WebSocket(\\\"ws://127.0.0.1:20683/ws/serialPort?name=value\\\") 然后自己解析参数 在客户端设置 连接成功回调 ,一旦连接成功发送参数:

    2024年02月16日
    浏览(25)
  • SpringBoot+Netty+Websocket实现消息推送

    这样一个需求:把设备异常的状态每10秒推送到页面并且以弹窗弹出来,这个时候用Websocket最为合适,今天主要是后端代码展示。 添加依赖 定义netty端口号 netty服务器 Netty配置 管理全局Channel以及用户对应的channel(推送消息) 管道配置 自定义CustomChannelHandler 推送消息接口及

    2024年02月04日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包