Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

这篇具有很好参考价值的文章主要介绍了Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.引入RocketMQ依赖:首先,在pom.xml文件中添加RocketMQ的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.配置RocketMQ连接信息:在application.propertiesapplication.yml中配置RocketMQ的连接信息,包括Name Server地址等:

spring:
  application:
    name: ${sn.publish}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
        bindings:
          output:
            producer:
              group: testSocket
              sync: true
      bindings:
        output:
          destination: test-topic
          content-type: application/json

3.消息发布组件

@Component
public class MqSourceComponent {
    @Resource
    Source source;

    public void publishNotify(SampleNotifyDTO notify) {
        source.output().send(MessageBuilder.withPayload(notify).build());
    }
}

4.消息发布控制器

@RestController
@Api(tags = "rocketmq")
public class MqController {
    @Resource
    MqSourceComponent mq;

    @ApiOperation(value = "测试发布消息")
    @PostMapping("test-publish")
    public JsonVO<String> testSend(SampleNotifyDTO notify) {
        mq.publishNotify(notify);
        return JsonVO.success("消息已发送");
    }
}

项目结构:

Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅,中间件,微服务,rocketmq

接下来是websocket模块的搭建

1. 依赖添加

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.application.yml配置文件

server:
  port: ${sp.ws}
spring:
  application:
    name: ${sn.ws}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
      bindings:
        input:
          destination: test-topic
          content-type: application/json
          group: testSocket

3.将应用程序绑定到消息代理

@EnableBinding(Sink.class): 这是Spring Cloud Stream的注解,它用于将应用程序绑定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的预定义输入通道,允许你接收消息。通过这个注解,你的应用程序可以监听消息通道,并定义消息处理逻辑。

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class WsApplication {

    public static void main(String[] args) {
        SpringApplication.run(WsApplication.class, args);
    }

}

4.消息订阅组件

监听消息通道中的消息,一旦有消息到达,就会触发listenNotify方法,该方法负责处理消息并通过chat服务发送响应。

@Component
@Slf4j
public class MqListenComponent {
    @Resource
    ChatService chat;

    @StreamListener(Sink.INPUT)
    public void listenNotify(SampleNotifyDTO notify) {
        log.info(notify.toString());
        chat.sendMessage(notify.getClientId(), notify);
    }
}

5.消息通知服务

package com.zeroone.star.ws.service;

import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;


@Component
@ServerEndpoint("/chat")
public class ChatService {
    /**
     * 连接会话池
     */
    private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session) throws IOException {
        // 判断客户端对象是否存在
        if (SESSION_POOL.containsKey(session.getQueryString())) {
            CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID冲突,连接拒绝");
            session.getUserProperties().put("reason", closeReason);
            session.close();
            return;
        }
        // 将客户端对象存储到会话池
        SESSION_POOL.put(session.getQueryString(), session);
        System.out.println("客户端(" + session.getQueryString() + "):开启了连接");
    }

    @OnMessage
    public String onMessage(String msg, Session session) throws IOException {
        // 解析消息 ==> ID::消息内容
        String[] msgArr = msg.split("::", 2);
        // 处理群发消息,ID==all表示群发
        if ("all".equalsIgnoreCase(msgArr[0])) {
            for (Session one : SESSION_POOL.values()) {
                // 排除自己
                if (one == session) {
                    continue;
                }
                // 发送消息
                one.getBasicRemote().sendText(msgArr[1]);
            }
        }
        // 指定发送
        else {
            // 获取接收方
            Session target = SESSION_POOL.get(msgArr[0]);
            if (target != null) {
                target.getBasicRemote().sendText(msgArr[1]);
            }
        }
        return session.getQueryString() + ":消息发送成功";
    }

    @OnClose
    public void onClose(Session session) {
        // 连接拒绝关闭会话
        Object reason = session.getUserProperties().get("reason");
        if (reason instanceof CloseReason) {
            CloseReason creason = (CloseReason) reason;
            if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {
                System.out.println("拒绝客户(" + session.getQueryString() + "):关闭连接");
                return;
            }
        }
        // 从会话池中移除会话
        SESSION_POOL.remove(session.getQueryString());
        System.out.println("客户端(" + session.getQueryString() + "):关闭连接");
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        System.out.println("客户端(" + session.getQueryString() + ")错误信息:" + throwable.getMessage());
    }

    @SneakyThrows
    public void sendMessage(String id, Object message) {
        // 群发
        if ("all".equalsIgnoreCase(id)) {
            for (Session one : SESSION_POOL.values()) {
                // 发送消息
                one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
        // 指定发送
        else {
            // 获取接收方
            Session target = SESSION_POOL.get(id);
            if (target != null) {
                target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
    }
}

项目结构:

Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅,中间件,微服务,rocketmq文章来源地址https://www.toymoban.com/news/detail-715115.html

到了这里,关于Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合!

    🎉🎉 欢迎光临,终于等到你啦 🎉🎉 🏅我是 苏泽 ,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏 《Spring 狂野之旅:从入门到入魔》 🚀 本专栏带你从Spring入门到入魔   这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.

    2024年03月10日
    浏览(39)
  • Java:SpringBoot整合WebSocket实现服务端向客户端推送消息

    思路: 后端通过websocket向前端推送消息,前端统一使用http协议接口向后端发送数据 本文仅放一部分重要的代码,完整代码可参看github仓库 websocket 前端测试 :http://www.easyswoole.com/wstool.html 依赖 项目目录 完整依赖 配置 WebSocketServer.java 前端页面 websocket.html 前端逻辑 index.js 参

    2024年02月04日
    浏览(36)
  • Spring Cloud【消息驱动(什么是Spring Cloud Stream、SpringCloud Stream核心概念、入门案例之消息消费者 )】(十一)

      目录 消息驱动_什么是Spring Cloud Stream 消息驱动_SpringCloud Stream核心概念

    2024年02月15日
    浏览(32)
  • 【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    @[TOC](【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)) 2.1 消息发送者 2.1.1 使用 StreamBridge streamBridge; 往指定信道发送消息 2.1.2 通过隐式绑定信道, 注册 Bean 发送消息 2.2 消息接收者 注意: 多个方法之间可以使用 “|” 间隔, 但是绑定时 多个需要按顺序写. 其中

    2024年02月03日
    浏览(28)
  • Spring Cloud Alibaba整合RocketMQ架构原理分析

    关于RocketMQ的原理,本文就不做详细分析了,这里就重点关注Spring Cloud Alibaba是如何整合RocketrMQ的。 RocketMQ提供了RocketMQ Client SDK,开发者可以直接依赖这个SDK,就可以完成消息的生产和消费。 1.生产消息 RocketMQ Client SDK提供了生产消息的API接口DefaultMQProducer,开发者可以直接使

    2024年01月22日
    浏览(34)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(34)
  • Spring Boot 3 + Vue 3 整合 WebSocket (STOMP协议) 实现广播和点对点实时消息

    🚀 作者主页: 有来技术 🔥 开源项目: youlai-mall 🍃 vue3-element-admin 🍃 youlai-boot 🌺 仓库主页: Gitee 💫 Github 💫 GitCode 💖 欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请纠正! WebSocket是一种在Web浏览器与Web服务器之间建立双向通信的协议,而Spring Boot提供了便捷的WebSocket支持

    2024年02月02日
    浏览(40)
  • rocketMq消息队列详细使用与实践整合spring

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月13日
    浏览(30)
  • Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function

    注意当多个消费者时,需要添加配置项:spring.cloud.function.definition 启动日志 交换机名称对应: spring.cloud.stream.bindings.demo-in-0.destination配置项的值 队列名称是交换机名称+分组名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 问题总结 问题一 解决办法: 查看配置是否正确: spring

    2024年02月19日
    浏览(30)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包