消息推送(websocket)集群化解决方案

这篇具有很好参考价值的文章主要介绍了消息推送(websocket)集群化解决方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

消息推送(websocket)集群化解决方案,单应用多租户SaaS平台实践,websocket,网络协议,网络,消息推送,集群化


需求分析

  1. 及时信息传递:消息推送功能能够确保网站向用户发送及时的重要信息,包括新闻更新、促销活动、账户状态变更等。这样可以增强用户体验,同时也提高用户对网站的参与度。
  2. 个性化定制:消息推送功能可以根据用户的偏好和兴趣来定制推送内容,使用户能够接收到与其相关和感兴趣的信息。这样可以提高用户满意度和参与度。
  3. 提高用户参与度:通过定期发送通知和提醒,消息推送功能可以吸引用户回归网站,并参与到活动中去。
  4. 提醒和警示功能:消息推送可以用作提醒和警示的工具,例如,提醒用户交易状态、密码更改请求、订单跟踪等。这样可以确保用户及时了解和处理相关事务。
  5. 营销和推广:消息推送还可以用于营销和推广活动,向用户发送促销信息、新产品推介等,从而增加销售和推广效果。

解决方案

  1. 实时性:使用WebSocket可以实现双向通信,而不需要客户端主动发送请求获取消息。一旦有新消息需要推送,服务器可以立即将消息推送给客户端,实现实时的消息传递。
  2. 高效性:相比传统的轮询机制,WebSocket的长连接机制可以减少不必要的网络请求和服务器负载。而使用Redis作为消息推送的中间件,可以利用其高性能的内存数据库特性,快速读写消息数据,提高消息推送的效率和响应速度。
  3. 可扩展性:WebSocket和Redis具有良好的可扩展性。WebSocket协议在现代浏览器中得到广泛支持,可以与各种后端框架和技术进行集成。而Redis作为高性能的分布式内存数据库,具备分布式部署和数据持久化等特性,可以满足大规模应用的消息推送需求。
  4. 灵活性:WebSocket和Redis的结合可以实现灵活的消息推送方式。通过使用Redis的发布/订阅功能,可以支持多种消息推送模式,如一对一推送、一对多推送和广播推送。根据实际需求,可以选择合适的推送模式,提供个性化的消息推送服务。
  5. 实现简单:WebSocket和Redis都有成熟的实现和丰富的文档资源,使用它们来实现消息推送相对来说比较简单。WebSocket提供基于事件的API,可方便地处理连接和消息的发送和接收。而Redis提供了易于使用的Pub/Sub功能,可以轻松地进行消息的发布和订阅。

实现步骤

架构图

消息推送(websocket)集群化解决方案,单应用多租户SaaS平台实践,websocket,网络协议,网络,消息推送,集群化文章来源地址https://www.toymoban.com/news/detail-575143.html

配置websocket请求地址

@Configuration
@EnableWebSocket
public class WebsocketConfiguration implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // webSocket通道
        // 指定处理器和路径
        registry.addHandler(new WebSocketHandler(), "/websocket")
                // 指定自定义拦截器
                .addInterceptors(new WebSocketInterceptor())
                // 允许跨域
                .setAllowedOrigins("*");
        // sockJs通道
        registry.addHandler(new WebSocketHandler(), "/sock-js")
                .addInterceptors(new WebSocketInterceptor())
                .setAllowedOrigins("*")
                // 开启sockJs支持
                .withSockJS();
    }
}

配置websocket连接前置和连接关闭监听

public class WebSocketInterceptor implements HandshakeInterceptor {

    private final RedisUtil redisUtil = GetBeanUtils.getBean(RedisUtil.class);
    private final JwtTokenUtils jwtTokenUtils = GetBeanUtils.getBean(JwtTokenUtils.class);

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
            // TODO: 2022/12/15 握手前置监听 可以验证token是否有效
            String authorization = serverHttpRequest.getServletRequest().getParameter("Authorization");
            Claims allClaimsFromToken = jwtTokenUtils.getAllClaimsFromToken(authorization);
            if (authorization != null
                    && jwtTokenUtils.validateToken(authorization, allClaimsFromToken.getSubject())
                    && !redisUtil.hasKey(RedisConstant.EXPIRATION_TOKEN.concat(CommonConstant.COLON).concat(authorization))) {
                map.put(UserInfo.ID, allClaimsFromToken.get(UserInfo.ID, String.class));
                return true;
            }
        }
        return false;
    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
        // TODO: 2022/12/15 断开连接监听
    }
}

配置websocket处理程序

@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {
    /**
     * 存储sessionId和webSocketSession
     * 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储
     * 在分布式系统中,要想别的办法实现webSocketSession共享(解决方案:采用redis发布订阅功能实现)
     */
    private static final Map<String, WebSocketSession> SESSION_MAP = new ConcurrentHashMap<>();
    private static final Map<String, String> USERID_MAP = new ConcurrentHashMap<>();

    private final StringRedisTemplate stringRedisTemplate = GetBeanUtils.getBean(StringRedisTemplate.class);

    /**
     * webSocket连接创建后调用
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // 获取参数
        String userId = String.valueOf(session.getAttributes().get(UserInfo.ID));
        USERID_MAP.put(userId, session.getId());
        SESSION_MAP.put(session.getId(), session);
    }

    /**
     * 接收到消息会调用
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
        try {
            //登录消息不返回信息
            if (message.getPayload().equals(NewsEnum.LOGIN.getCode())){
                return;
            }
            JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString());
            MessageBo messageBo = jsonObject.toJavaObject(MessageBo.class);
            sendMessage(messageBo);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 连接出错会调用
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        SESSION_MAP.remove(session.getId());
        USERID_MAP.values().remove(session.getId());
    }

    /**
     * 连接关闭会调用
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        SESSION_MAP.remove(session.getId());
        USERID_MAP.values().remove(session.getId());
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 其他模块调用该方法
     * @param messageDto 消息模型
     */
    public void sendMessage(MessageDto messageDto){
        MessageBo messageBo = new MessageBo();
        BeanUtils.copyProperties(messageDto, messageBo);
        sendMessage(messageBo);
    }

    /**
     * 后端发送消息
     */
    public void sendMessage(MessageBo messageBo) {
        if (messageBo.isPublish()) {
            //发送消息需要通过redis发布出去
            publish(messageBo);
        } else if (messageBo.getUserIdList().isEmpty()) {
            //未指定用户,发送给所有在线的客户
            USERID_MAP.forEach((username, sessionId) -> {
                send(sessionId, messageBo.getMessage());
            });
        } else {
            //根据指定客户筛选出客户对应的sessionId集合
            List<String> sessionIdList = USERID_MAP.entrySet().stream()
                    .filter(entry -> messageBo.getUserIdList().contains(entry.getKey()))
                    .map(Map.Entry::getValue)
                    .collect(Collectors.toList());
            //发布消息
            sessionIdList.forEach(sessionId -> send(sessionId, messageBo.getMessage()));
        }
    }

    /**
     * 指定用户发送消息
     *
     * @param sessionId 消息
     */
    @SneakyThrows
    private void send(String sessionId, String msg) {
        WebSocketSession session = SESSION_MAP.get(sessionId);
        if (session != null) {
            session.sendMessage(new TextMessage(msg));
        }
    }

    /**
     * redis发布
     *
     * @param messageBo 消息
     */
    private void publish(MessageBo messageBo) {
        // 每个消息只需要发布一次
        messageBo.setPublish(false);
        stringRedisTemplate.convertAndSend(ConstantConfiguration.CHANNEL_1, JSON.toJSONString(messageBo));
    }
}

配置redis交换机

public class ConstantConfiguration {

    public static final String CHANNEL_1 = "CHANNEL_1";
}

配置redis订阅监听

@Slf4j
@Component
public class MessageSubListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        log.info("消费消息:".concat(message.toString()));
        WebSocketHandler webSocketHandler = new WebSocketHandler();
        MessageBo messageBo = JSONObject.toJavaObject(JSON.parseObject(message.toString()), MessageBo.class);
        webSocketHandler.sendMessage(messageBo);
    }
}

配置redis发布监听

@Configuration
public class RedisPubListenerConfig {

    public static final String ONMESSAGE = "ONMESSAGE";

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 可以添加多个 messageListener,配置不同的交换机
        container.addMessageListener(listenerAdapter, new PatternTopic(ConstantConfiguration.CHANNEL_1));
        return container;
    }

    /**
     * 消息适配器
     *
     * @param receiver 接收者
     * @return {@link MessageListenerAdapter}
     */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageSubListener receiver) {
        return new MessageListenerAdapter(receiver, ONMESSAGE);
    }
}

到了这里,关于消息推送(websocket)集群化解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

    有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket 的使用有一定的了解了, 今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket 。 在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进

    2024年01月23日
    浏览(40)
  • 抖音直播间websocket礼物和弹幕消息推送可能出现重复的情况,解决办法

    在抖音直播间里,通过websocket收到的礼物消息数据格式如下: 根据字段名称可以看到送礼物的人和送的礼物是什么,并且这个礼物的traceId是唯一的,所以可以通过这个traceId进行去重。 判断这个礼物是否在监控列表中并且是否已经在全局id中: 消息和礼物等数据也有可能会出

    2024年02月01日
    浏览(214)
  • 分布式websocket解决方案

    websocket基础请自行学习,本文章是解决在分布式环境下websocket通讯问题。 在单体环境下,所有web客户端都是连接到某一个微服务上,这样消息都是到达统一服务端,并且也是由一个服务端进行响应,所以不会出现问题。 但是在分布式环境下,我们很容易发现,客户端连接的

    2024年02月13日
    浏览(39)
  • 服务器给前端实时推送数据轻量化解决方案eventSource+Springboot

    body代码 js代码 WebFlux 框架依赖jar包 控制器代码 Flux.interval(Duration.ZERO,Duration.ofSeconds(1)),等待0秒开始,间隔1秒,Flux流数据里面的数字加1 三、效果展示 时间和数字一直在增加,后端在不断推送,前端订阅到数据更新到页面 相对于websocket简单很多,只需要很少的代码就实现前

    2024年04月11日
    浏览(43)
  • 消息队列解决方案

    什么是消息队列 消息队列是在消息的传输过程中保存消息的容器,简单点理解就是传递消息的队列,具备先进先出的特点,一般用于异步、解耦、流量 削锋等问题,实现高性能、高可用、高扩展的架构 主要概念 Producer:消息生产者,负责产生和发送消息到 Broker。 Broker:消

    2024年02月10日
    浏览(33)
  • RabbitMQ--消息堆积--解决方案

    原文网址:RabbitMQ--消息堆积--解决方案_IT利刃出鞘的博客-CSDN博客 本文介绍如何处理RabbitMQ消息堆积(积压)。 对于消息队列(MQ)来说,消息丢失/消息重复/消费顺序/消息堆积(积压)是比较常见的问题,都属于消息异常,这几个问题比较重要,面试中也会经常问到。 消息堆积即

    2023年04月08日
    浏览(44)
  • kafka消息丢失解决方案

    目录 一、生产端数据丢失 二、存储端消息丢失 三、消费端数据丢失 四、小结 一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。 存储

    2023年04月26日
    浏览(36)
  • git中git push origin master推送远程操作失败,报错解决方案

    报错图片如下所示: 解决方案: 使用下面代码进行本地与远程仓库的链接: 链接完成之后就会输出: fatal: remote origin already exists. 链接完成之后就需要使用 git branch 查看一下你所处是哪个分支上面 查看是否是要你要合并的那个分支使用 git merge 分支 进行合并 查看完成,这个时候不

    2024年02月05日
    浏览(49)
  • RabbitMQ消息丢失的场景,MQ消息丢失解决方案

    第一种 : (生产者) 生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种 : (服务端) RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种 : (消费者) 消费端弄丢了数据。刚消费到,还没处理

    2024年02月08日
    浏览(44)
  • git 推送出现fatal: The remote end hung up unexpectedly解决方案

    在使用git更新或提交项目时候出现 \\\"fatal: The remote end hung up unexpectedly \\\" 的报错; 报错的原因原因是推送的文件太大。 下面给出解决方法 方法一: 修改提交缓存大小为500M,或者更大的数字 git config --global http.postBuffer 524288000 git config --global http.postBuffer 1048576000 或者在克隆/创建

    2024年02月04日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包