消息驱动 —— SpringCloud Stream

这篇具有很好参考价值的文章主要介绍了消息驱动 —— SpringCloud Stream。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Stream 简介

Spring Cloud Stream 是用于构建消息驱动的微服务应用程序的框架,提供了多种中间件的合理配置

Spring Cloud Stream 包含以下核心概念:

  • Destination Binders:目标绑定器,目标指的是 Kafka 或者 RabbitMQ,绑定器就是封装了目标中间件的包,如果操作的是 Kafka,就使用 Kafka Binder,如果操作的是 RabbitMQ,就使用 RabbitMO Binder
  • Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
  • Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信

应用程序通过 inputs 或者 outpus 与 Spring Cloud Stream 的 Binder 交互,Binder 层负责和中间件的通信,通过配置来 binding。通过定义 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离,应用程序不需要再考虑各种不同的消息中间件实现。当需要升级消息中间件或是更换其他消息中间件产品时,只需要更换对应的 Binder 绑定器

消息驱动 —— SpringCloud Stream


Stream 整合 kafka

以 Kafka 为例,确保安装 Kafka 并启动

分别创建生产者和消费者项目,分别添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

1. 创建生产者

开发 MqSource 接口

public interface MqSource {

    @Output("test-topic")
    MessageChannel testTopic();

    @Output("test-topic-2")
    MessageChannel testTopic2();
}

通过 @Output@Input 注解定义消息输入和输出通道的名称定义,输出通道需要返回 MessageChannel 接口对象,它定义了向消息通道发送消息的方法。默认情况下,通道的名称就是注解的方法的名称,也能自己定义通道名称,只需要给 @Input@Output 注解传入 String 类型参数通道名称即可,这里指定两个通道分别为 test-topictest-topic-2

开发 MsgProducer 类

@Slf4j
@EnableBinding(MqSource.class)
public class MsgProducer {

    @Autowired
    private MqSource mqSource;

    public void sendTestTopicMessage(String msg) {
        try {
            mqSource.testTopic().send(MessageBuilder.withPayload(msg).build());
        } catch (Exception e) {
            log.error("sendTestTopicMessage error", e);
        }
    }

    public void sendTestTopic2Message(String msg) {
        try {
            mqSource.testTopic2().send(MessageBuilder.withPayload(msg).build());
        } catch (Exception e) {
            log.error("sendTestTopic2Message error", e);
        }
    }
}

使用 @EnableBinding 创建和绑定通道,绑定通道是指将通道和 Binder 进行绑定,比如 Kafka、RabbiMQ 等。如果类路径下只有一种 Binder,那么 Spring Cloud Stream 会找到并绑定它,不需要进行配置。如果有多个就需要明确配置

调用 MqSource 接口方法获取输出通道对象,接着调用 send 方法发送数据。send 方法接收一个 Message 对象,这个对象不能直接新建,需要使用 MessageBuilder 获取

2. 创建消费者

public interface MqSink {

    @Input("test-topic")
    MessageChannel testTopic();

    @Input("test-topic-2")
    MessageChannel testTopic2();
}

与生产者的 MqSource 同理

开发 MsgReceiver 类,@StreamLisiener 接收的参数是要处理的通道名,所注解的方法就是处理从通道获取数据的方法,方法的参数就是获取到的数据文章来源地址https://www.toymoban.com/news/detail-711940.html

@Slf4j
@EnableBinding(MqSink.class)
public class MsgReceiver {

    @StreamListener("test-topic")
    public void testTopicMessageListen(String msg) {
        log.info("testTopicMessageListen: {}", msg);
    }

    @StreamListener("test-topic-2")
    public void testTopic2MessageListen(String msg) {
        log.info("testTopic2MessageListen: {}", msg);
    }
}

到了这里,关于消息驱动 —— SpringCloud Stream的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function

    注意当多个消费者时,需要添加配置项:spring.cloud.function.definition 启动日志 交换机名称对应: spring.cloud.stream.bindings.demo-in-0.destination配置项的值 队列名称是交换机名称+分组名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 问题总结 问题一 解决办法: 查看配置是否正确: spring

    2024年02月19日
    浏览(31)
  • Spring Cloud学习笔记【消息总线-SpringCloud Bus】

    Spring Cloud Bus是Spring Cloud生态系统中的一个组件,用于实现微服务架构中的消息总线。它利用了轻量级消息代理(如RabbitMQ或Kafka)作为通信中间件,实现了在分布式系统中的消息传递和事件广播。 Spring Cloud Bus旨在简化微服务架构中的配置管理和状态同步。它允许将配置更改或

    2024年02月09日
    浏览(44)
  • 实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq

    前面的博文我们介绍并实战演示了Spring Cloud Stream整合rabbitmq,其中主要介绍了如何使用和配置完成消息中间件的集成。但是,在实际的生产环境中可能会用到多个消息中间件,又或者是由于业务改变需要更换消息中间件,在这些情况下我们的Spring Cloud Stream框架可以完全兼容多

    2024年02月08日
    浏览(39)
  • 在Spring Cloud中使用RabbitMQ完成一个消息驱动的微服务

    Spring Cloud系列目前已经有了Spring Cloud五大核心组件:分别是,Eureka注册中心,Zuul网关,Hystrix熔断降级,openFeign声明式远程调用,ribbon负载均衡。这五个模块,对了,有没有发现,其实我这五个模块中ribbon好像还没有案例例举,目前只有一个Ribbon模块的搭建,后边我会完善的

    2024年02月04日
    浏览(35)
  • Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

    1. 引入RocketMQ依赖 :首先,在 pom.xml 文件中添加RocketMQ的依赖: 2. 配置RocketMQ连接信息 :在 application.properties 或 application.yml 中配置RocketMQ的连接信息,包括Name Server地址等: 3.消息发布组件 4.消息发布控制器 项目结构: 接下来是websocket模块的搭建 1. 依赖添加 2.application.yml配

    2024年02月08日
    浏览(28)
  • Spring Cloud Stream集成Kafka

    Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)、inputs/outputs Channel完成应用程序和MQ的解耦。 Binder 负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交

    2024年02月13日
    浏览(39)
  • Spring cloud stream 结合 rabbitMq使用

    之前的开发主要是底层开发,没有深入涉及到消息方面。现在面对的是一个这样的场景: 假设公司项目A用了RabbitMQ,而项目B用了Kafka。这时候就会出现有两个消息框架,这两个消息框架可能编码有所不同,且结构也有所不同,而且之前甚至可能使用的是别的框架,造成了一个

    2024年02月04日
    浏览(35)
  • spring Cloud Stream 实战应用深度讲解

    Spring Cloud Stream 是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。 该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。 核心模块 Dest

    2024年01月24日
    浏览(30)
  • SpringBoot 如何使用 Spring Cloud Stream 处理事件

    在分布式系统中,事件驱动架构(Event-Driven Architecture,EDA)已经成为一种非常流行的架构模式。事件驱动架构将系统中的各个组件连接在一起,以便它们可以相互协作,响应事件并执行相应的操作。SpringBoot 也提供了一种方便的方式来处理事件——使用 Spring Cloud Stream。 Spr

    2024年02月10日
    浏览(35)
  • 《微服务实战》 第十六章 Spring cloud stream应用

    第十六章 Spring cloud stream应用 第十五章 RabbitMQ 延迟队列 第十四章 RabbitMQ应用 https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来bindin

    2024年02月06日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包