SpringBoot+Netty+Websocket实现消息推送

这篇具有很好参考价值的文章主要介绍了SpringBoot+Netty+Websocket实现消息推送。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

这样一个需求:把设备异常的状态每10秒推送到页面并且以弹窗弹出来,这个时候用Websocket最为合适,今天主要是后端代码展示。

添加依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

定义netty端口号

websocket:
  netty:
    port: 8888
  path: /websocket

netty服务器

@Slf4j
@Component
public class NettyServer {
    /**
     * netty服务端口号
     */
    @Value("${websocket.netty.port}")
    private int port;
    /**
     * netty事件辅助组
     */
    private EventLoopGroup bossGroup;
    /**
     * netty事件工作组
     */
    private EventLoopGroup workGroup;

    /**
     * 管道配置
     */
    private final CustomChannelInitializer channelInitializer;

    public NettyServer(CustomChannelInitializer channelInitializer) {
        this.channelInitializer = channelInitializer;
    }


    /**
     * netty服务初始化
     */
    @PostConstruct
    public void start() {
        new Thread(() -> {

            bossGroup = new NioEventLoopGroup();

            workGroup = new NioEventLoopGroup();

            ServerBootstrap bootstrap = new ServerBootstrap();
            //bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            //设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            //设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            //设置管道
            bootstrap.childHandler(channelInitializer);
            try {
                ChannelFuture channelFuture = bootstrap.bind().sync();
                log.info("Netty服务启动成功,开启监听:{}", channelFuture.channel().localAddress());
                //对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("Netty服务启动失败!", e);
                throw new RuntimeException(e);
            }

        }).start();
    }

}

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @version 1.0.0
 * @description 业务类
 */
public class NettyConfig {
    /**
     * 定义全局单利channel组 管理所有channel
     */
    private static volatile ChannelGroup channelGroup = null;

    /**
     * 存放请求ID与channel的对应关系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

    /**
     * 定义两把锁
     */
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();


    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

管道配置

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


/**
 * @version 1.0.0
 * @description Netty管道配置类
 */
@Component
public class CustomChannelInitializer extends ChannelInitializer<SocketChannel> {
    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
    /**
     * websocket服务地址
     */
    @Value("${websocket.path:/websocket}")
    private String websocketPath;
    private final CustomChannelHandler channelHandler;

    public CustomChannelInitializer(CustomChannelHandler channelHandler) {
        this.channelHandler = channelHandler;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(channelHandler);
    }
}

自定义CustomChannelHandler

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.ruoyi.common.utils.StringUtils;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @version 1.0.0
 * @description Netty管道handler类
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class CustomChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());

        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        if(StringUtils.isNotEmpty(uid)){
            NettyConfig.getChannelMap().put(uid, ctx.channel());
            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(uid);
            // 回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        super.exceptionCaught(ctx,cause);
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        if(StringUtils.isNotEmpty(userId)){
            NettyConfig.getChannelMap().remove(userId);
        }
    }
}

推送消息接口及实现类

public interface PushMsgService {
    /**
     * 推送给指定用户
     */
    void pushMsgToOne(String group, String msg);

    /**
     * 推送给所有用户
     */
    void pushMsgToAll(String msg);
}

实现接口

@Service
public class PushMsgServiceImpl implements PushMsgService {

    @Override
    public void pushMsgToOne(String group, String msg) {
        Channel channel = NettyConfig.getChannel(group);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }

        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }
    @Override
    public void pushMsgToAll(String msg) {
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

具体的controller层接口

   /**
     * 获取弹框网关状态
     */
    @GetMapping("/upKnxNetworkLink/{uid}")
    public void upKnxNetworkLink(@PathVariable String uid){
        KnxNetworkLinkInfo knxNetworkLinkInfo =new KnxNetworkLinkInfo();
        knxNetworkLinkInfo.setStatus("0");
       List<KnxNetworkLinkInfo>knxNetworkLinkInfoList=knxNetworkLinkInfoService.queryList(knxNetworkLinkInfo);
        JSONArray array= JSONArray.parseArray(JSON.toJSONString(knxNetworkLinkInfoList));
        pushMsgService.pushMsgToOne(uid,array.toJSONString());
    }

使用postman测试Websocket推送
SpringBoot+Netty+Websocket实现消息推送,Springboot,spring boot,websocket,后端
连接Websocket
SpringBoot+Netty+Websocket实现消息推送,Springboot,spring boot,websocket,后端
在开一个窗口测试发送消息的接口
SpringBoot+Netty+Websocket实现消息推送,Springboot,spring boot,websocket,后端
发送过后在回到连接Websocket窗口
SpringBoot+Netty+Websocket实现消息推送,Springboot,spring boot,websocket,后端
前端需要做一个定时访问发送消息的接口,每发一次就会往前端推送一次数据。
参考:Springboot + netty +websocket 实现推送消息文章来源地址https://www.toymoban.com/news/detail-764707.html

到了这里,关于SpringBoot+Netty+Websocket实现消息推送的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot集成websocket实现消息推送和在线用户统计

    在启动类上添加一个bean 核心代码 实现消息推送只要在业务代码中调用sendMessageSpecial()方法即可。 然后调用刚才的业务接口测试:http://localhost:8080/websocket/t1 调用成功后可以看到三个窗口中都收到了消息

    2023年04月08日
    浏览(46)
  • SpringBoot集成WebSocket实现消息实时推送(提供Gitee源码)

    前言:在最近的工作当中,客户反应需要实时接收消息提醒,这个功能虽然不大,但不过也用到了一些新的技术,于是我这边写一个关于我如何实现这个功能、编写、测试到部署服务器,归纳到这篇博客中进行总结。 目录 一、什么是WebSocket 二、后端实现 2.1、引入pom.xml依赖

    2024年02月11日
    浏览(40)
  • Java:SpringBoot整合WebSocket实现服务端向客户端推送消息

    思路: 后端通过websocket向前端推送消息,前端统一使用http协议接口向后端发送数据 本文仅放一部分重要的代码,完整代码可参看github仓库 websocket 前端测试 :http://www.easyswoole.com/wstool.html 依赖 项目目录 完整依赖 配置 WebSocketServer.java 前端页面 websocket.html 前端逻辑 index.js 参

    2024年02月04日
    浏览(46)
  • Springboot+Netty+WebSocket搭建简单的消息通知

    Springboot+Netty+WebSocket搭建简单的消息通知 一、快速开始 1、添加依赖 2、添加配置 3、添加启动类 二、添加WebSocket部分代码 1、WebSocketServer 2、WebSocketConfig 3、DemoController 6、添加templates/index.html 三、添加Netty部分 1、NettyServer 2、WSChannelHandlerPool 3、WSWebSocketHandler 四、启动服务 ht

    2024年02月11日
    浏览(35)
  • SpringBoot集成WebSocket(实时消息推送)

    🍓 简介:java系列技术分享(👉持续更新中…🔥) 🍓 初衷:一起学习、一起进步、坚持不懈 🍓 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正🙏 🍓 希望这篇文章对你有所帮助,欢迎点赞 👍 收藏 ⭐留言 📝 🍓 更多文章请点击 调试工具 :http://coolaf.com/tool/chatt

    2024年04月29日
    浏览(36)
  • SpringBoot + WebSocket+STOMP指定推送消息

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 本文将简单的描述SpringBoot + WebSocket+STOMP指定推送消息场景,不包含信息安全加密等,请勿用在生产环境。 JDK:11+ Maven: 3.5+ SpringBoot: 2.6+ stompjs@7.0.0 STOMP 是面向简

    2024年02月14日
    浏览(41)
  • Spring Boot集成WebSocket实现消息推送

    项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询、comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

    2023年04月08日
    浏览(39)
  • SpringBoot+WebSocket 消息推送 校验 心跳机制 PING-PONG 用户分组等

            WebSocket PING-PONG心跳机制,只需要服务端发送PING,客户端会自动回应PONG,本文中使用了两个@OnMassage注解一个用于接收Text消息,一个用于接收PONG响应消息,此外还有二进制格式( InputStream  ,byte[], ByteBuffer  等)。            说明:               记录

    2024年02月11日
    浏览(37)
  • Springboot中使用netty 实现 WebSocket 服务

    依赖 创建启动类 创建WebSocket 服务 WsServerInitialzer 初始化 创建信息ChatHandler 处理类

    2024年02月14日
    浏览(32)
  • Spring Boot 集成 WebSocket 实现服务端推送消息到客户端

          假设有这样一个场景:服务端的资源经常在更新,客户端需要尽量及时地了解到这些更新发生后展示给用户,如果是 HTTP 1.1,通常会开启 ajax 请求询问服务端是否有更新,通过定时器反复轮询服务端响应的资源是否有更新。                         在长时间不更新

    2024年02月16日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包