spring Cloud Stream 实战应用深度讲解

这篇具有很好参考价值的文章主要介绍了spring Cloud Stream 实战应用深度讲解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

springCloudStream

简介

Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。

该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。

核心模块

  • Destination Binders: 负责提供与外部消息系统集成的组件
  • Destination Bindings: 外部消息系统和用户程序代码之间的桥梁(生产者-使用者之间的桥梁)
  • Message:生产者和消费者用于与Destination Binders(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

历史

Spring 的数据集成之旅始于 Spring Integration。通过其编程模型,它提供了一致的开发人员体验来构建应用程序,这些应用程序可以采用企业集成模式来连接外部系统,例如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,可以无缝开发独立的、基于 Spring 的生产级微服务。

为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被放在一个新项目中。Spring Cloud Stream 诞生了。

架构模型

spring Cloud Stream 实战应用深度讲解,java,java,spring,spring cloud,java-rabbitmq

这张图是spring-stream官网的,里面的Middleware指的就是RabbitMQ或者KafKa这些消息队列。

下图是我们原来和消息队列通信的方式。我们的程序直接发送数据给MQ或者监听到MQ的数据。

spring Cloud Stream 实战应用深度讲解,java,java,spring,spring cloud,java-rabbitmq

通过spring stream来做的话,就增加了Binder层来做统一调度,我们的程序只需要和Binder层通信,不需要关注底层的MQ是RabbitMQ还是Kafka

目前官方提供了两个Binder,分别是RabbitMQ的和Kafka的,其余队列的有一些第三方维护的。同时我们也可以自己实现Binder

一开始图中的InputOutput是对于spring stream来说的,input就是输入消息到stream中,output就是输出消息到我们的程序中。

简单介绍一下Binder,其实就是策略模式,统一接口实现,比如MQ1里面发送消息到MQ的方法叫Publish,MQ2里面发送消息到MQ的方法叫Release,但是在Binder接口里面提供了一个方法,就叫做add。也只需要提供一个Message消息。

public interface Binder{
    function add(Message msg);
}

// 连接MQ1的Binder
public class Binder1 implements Binder{
    public function add(Message msg){
        // 消息处理
        // 发送到MQ1
        publish(msg);
    }
}

// 连接MQ2的Binder
public class Binder2 implements Binder{
    public function add(Message msg){
        // 消息处理
        // 发送到MQ2
        release(msg);
    }
}

当我们使用的时候只需要自己决定使用哪个Binder就可以了。就是就和连接数据库一样,不需要关心连接的是Mysql还是PostgreSql。

public class main{
    public static function main() {
        Binder binder = new Binder1();
        Message msg = new Message();
        binder.add(msg);
    }
}
Bindings

Bindings作为一个桥梁,负责连接MQ和用户代码。比如绑定一个代码作为input往某一个Queue里面输入信息,绑定一个代码作为output从某个Queue里面接收信息。然后我们使用Binder来实现推送消息到MQ和消费消息。

这里是官网原文:The application communicates with the outside world by establishing bindings between destinations exposed by the external brokers and input/output arguments in your code. Broker specific details necessary to establish bindings are handled by middleware-specific Binder implementations.

下图为Bindings和Binder的关系

spring Cloud Stream 实战应用深度讲解,java,java,spring,spring cloud,java-rabbitmq

source 和 sink

source其实就是发送方的发送的Message. sink就是接收方接受的Message

注解实现

注解的实现已经被彻底删除,只有之前低版本的还能使用

函数式编程实现示例

依赖引入

将下面的代码加入pom文件,然后使用maven导入相关依赖即可。

// 引入spring cloud stream依赖
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
// 引入spring cloud stream的rabbit binder依赖
// 如果是kafka,那么把这个换成kafka的binder
// 在这个binder里面已经引入了 rabbit MQ依赖,所以不需要再单独引入rabbit MQ了
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream: # stream的配置
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

生产者

配置文件修改

对于函数式编程来说,spring cloud stream有一些约定或者说规定。比如我们注册了一个logPubBean,那么它对应的bindings配置的名称就是logPub-in-0或者logPub-out-0,前面是我们的方法名,中间表示生产者或消费者,in表示消费者,out表示生产者。这里的in or out是对于我们的代码来说的。后面的0就是一个序号。

写生产者之前我们需要加上对应的bindings配置。如果注册了多个Bean作为生产者或消费者,那么还需要配置哪些Bean是生产者和消费者。


spring:
  cloud:
    function: # 配置哪些Bean是Stream可以用的
        definition: log;logPub;sendLog
    stream: # stream的配置
        bindings: # 服务的整合处理
            logPub-out-0:
                destination: log # 表示要使用的Exchange名称定义,不存在会自动创建
                content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
写代码

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logProducer {

}

然后开始编写生产者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值只能是Supplier函数接口类型。不能是其他的。

方法里面可以写生产者的具体代码。会注册一个名为logPubBean作为生产者。

@Component
public class logListener {
    @Bean
    public Supplier<logListener.Person> logPub() {
        return () -> {
            Person person = new Person();
            person.setName("张三");
            System.out.println("生产者:"+person);
            return person;
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return this.name;
        }
    }
}

关于Supplier,这个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Supplier的注释和定义

//Represents a supplier of results.
//There is no requirement that a new or distinct result be returned each time the supplier is invoked.
//This is a functional interface whose functional method is get().

public interface Supplier<T>

翻译过来大概就是:一个结果的提供者或者一个结果的生产者。正好对应我们的生产者。该接口只有一个方法T get(),没有参数并且仅返回一个结果。

运行

运行的话会发现控制台一直在打印。我们的队列里面也一直在新增。

spring Cloud Stream 实战应用深度讲解,java,java,spring,spring cloud,java-rabbitmq

StreamBridge

当前的运行方式是当写完生产者以后,spring cloud stream会1/s次来调用我们的生产者,但是我们一般是自己来控制生产者的调用。就可以使用下面的方法。

我们可以通过StreamBridge来做到这一点。他有四个send方法。

  • public boolean send(String bindingName, Object data):第一个参数是bindingName,我们输入的是sendLog,就需要增加sendLog的配置,我们也可以用之前的logPub-out-0。第二个参数是发送的数据。
  • public boolean send(String bindingName, Object data, MimeType outputContentType):比上面的多了一个数据类型。
  • public boolean send(String bindingName, @Nullable String binderName, Object data):还可以指定Binder的name
  • public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType): 四个参数放在一起了。
@RestController
public class logController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/sendLog")
    public void sendLog() {
        logListener.Person person = new logListener.Person();
        person.setName("李四");
        System.out.println("生产者发送消息"+person);
        streamBridge.send("sendLog", person);
    }
}

消费者

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logListener {

}

然后开始编写消费者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值可以是Consumer,也可以是Function。不能是其他的。

方法里面就可以写消费的具体代码了。

@Component
public class logListener {
    @Bean
    public Consumer<logListener.Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return this.name;
        }
    }
}

关于ConsumerFunction,这两个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Consumer接口的注释和接口的定义。

//Represents an operation that accepts a single input argument and returns no result. Unlike most other functional interfaces, Consumer is expected to operate via side-effects.
//This is a functional interface whose functional method is accept(Object).

public interface Consumer<T>

翻译过来大概就是说Consumer接口仅接收一个参数并且没有返回值,我们的代码里面也可以看到,接收了一个person参数,没有return。

该接口只有一个方法void accept(T t),T类型就是我们的Person类型。

下面是Function接口的注释和定义

//Represents a function that accepts one argument and produces a result.
//This is a functional interface whose functional method is apply(Object).
public interface Function<T, R>

翻译过来大概就是说Function接口仅接收一个参数并且返回一个结果。该接口只有一个方法R apply(T t),接收一个T类型的参数,返回一个R类型的结果。

spring Cloud Stream 实战应用深度讲解,java,java,spring,spring cloud,java-rabbitmq

手动ACK

通过禁止使用死信队列来执行手动的ACK,这个时候如果抛出异常,则会重试。如果开启了死信队列,那么抛出异常以后则会进入死信队列。

log-in-0:
    consumer:
        auto-bind-dlq: false

队列持久化

上面可以看出来,创建的都是匿名队列,当程序启动的时候自动创建,当程序关闭的时候自动删除。

但是正常开发中,很少使用这种,都会指定一个持久化的队列,不管程序是否运行,队列都存在。

我们可以在bindings的配置里面增加group配置来显式指定哪个队列,我们指定log123队列。

log-in-0:
    destination: log
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    group: log123
sendLog:
    destination: log
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    group: log123

再次运行程序,可以看到该队列被创建。接下来停止程序,可以看到队列还存在那里。

bindings重命名

默认约定的名称为log-in-0这种形式

但是我们也可以将它重命名。通过配置文件可以将log-in-0重命名为input,不过这样的话,所有的log-in-0的bindings配置都需要修改成input,使用上也是。注意官方并不推荐这种做法,他们认为在大多数情况下,这有点矫枉过正。

spring:
    cloud:
        stream:
            function:
                bindings:
                    log-in-0: input

显式绑定创建

默认约定的是log-in-0负责输入,log-out-0负责输出,我们也可以显式的创建这些。

通过配置文件

spring:
    cloud:
        stream:
            input-bindings: login;fooin
            output-bindings: logout;fooout

轮询配置属性

spring:
    integration:
        poller:
            # 全局配置
            fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L 
            maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L
            cron: none # Cron 触发器的 Cron 表达式值。默认 none
            initialDelay: 0 # 周期性触发的初始延迟。 默认0
            timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

也可以单独为某个bindings来配置

spring:
    cloud:
        stream:
            bindings:
                log-out-0:
                    producer:
                        poller:
                            # log-out-0的单独配置
                            fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L 
                            maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L
                            cron: none # Cron 触发器的 Cron 表达式值。默认 none
                            initialDelay: 0 # 周期性触发的初始延迟。 默认0
                            timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

函数组合

假设我们有两个处理Bean,enrich负责检查header,如果缺少foo,就添加为foo,bar。然后第二个echo则负责检查是否包含foo这个Header然后输出消息内容。

@Bean
public Function<Message<String>, Message<String>> enrich() {
    return message -> {
        Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
        return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
    };
}

@Bean
public Function<Message<String>, Message<String>> echo() {
    return message -> {
        Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
        System.out.println("Incoming message " + message);
        return message;
    };
}

通过配置将这两个bean组合起来,组合之后,这个bean名称就编程了enrich|echo,后续的配置都需要这种冗长的名称,所以这里官方推荐使用重命名的方式将它变成简单的名称。文章来源地址https://www.toymoban.com/news/detail-821851.html

spring:
    cloud:
        function:
            definition: enrich|echo # 函数组合
        stream:
            function: 
                bindings:
                    enrich|echo-in-0: input # 重命名

到了这里,关于spring Cloud Stream 实战应用深度讲解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 云原生微服务 Spring Cloud Hystrix 降级、熔断实战应用

    第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Spring Cloud Netflix 之 Hystrix 多个微服务之间调用的时候,假如微服

    2024年02月08日
    浏览(42)
  • Spring Cloud【消息驱动(什么是Spring Cloud Stream、SpringCloud Stream核心概念、入门案例之消息消费者 )】(十一)

      目录 消息驱动_什么是Spring Cloud Stream 消息驱动_SpringCloud Stream核心概念

    2024年02月15日
    浏览(41)
  • Spring Cloud Stream集成Kafka

    Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)、inputs/outputs Channel完成应用程序和MQ的解耦。 Binder 负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交

    2024年02月13日
    浏览(46)
  • Spring cloud stream 结合 rabbitMq使用

    之前的开发主要是底层开发,没有深入涉及到消息方面。现在面对的是一个这样的场景: 假设公司项目A用了RabbitMQ,而项目B用了Kafka。这时候就会出现有两个消息框架,这两个消息框架可能编码有所不同,且结构也有所不同,而且之前甚至可能使用的是别的框架,造成了一个

    2024年02月04日
    浏览(47)
  • SpringBoot 如何使用 Spring Cloud Stream 处理事件

    在分布式系统中,事件驱动架构(Event-Driven Architecture,EDA)已经成为一种非常流行的架构模式。事件驱动架构将系统中的各个组件连接在一起,以便它们可以相互协作,响应事件并执行相应的操作。SpringBoot 也提供了一种方便的方式来处理事件——使用 Spring Cloud Stream。 Spr

    2024年02月10日
    浏览(48)
  • Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function

    注意当多个消费者时,需要添加配置项:spring.cloud.function.definition 启动日志 交换机名称对应: spring.cloud.stream.bindings.demo-in-0.destination配置项的值 队列名称是交换机名称+分组名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 问题总结 问题一 解决办法: 查看配置是否正确: spring

    2024年02月19日
    浏览(43)
  • Spring Cloud Stream解密:流式数据在微服务中的魔力

    欢迎来到我的博客,代码的世界里,每一行都是一个故事 在微服务的大舞台上,数据流就像一曲美妙的交响乐,而Spring Cloud Stream正是指挥家,将音符有序地传递给每个微服务。在这篇文章中,我们将揭开Spring Cloud Stream的神秘面纱,一起探索在微服务体系结构中如何通过流式

    2024年02月20日
    浏览(39)
  • 【微服务架构】Spring Cloud入门概念讲解

    目录 一、单体架构VS微服务架构 1.1 单体应用 单体架构的优点 单体应用的缺点 1.2 微服务“定义” 微服务的特性 微服务的缺点 微服务的适用场景 二、微服务常见概念与核心模块 三、Spring Cloud 工作流程         一个归档包(如war包)包含所有功能的应用程序通常称为单体应

    2024年02月03日
    浏览(55)
  • 【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合!

    🎉🎉 欢迎光临,终于等到你啦 🎉🎉 🏅我是 苏泽 ,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏 《Spring 狂野之旅:从入门到入魔》 🚀 本专栏带你从Spring入门到入魔   这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.

    2024年03月10日
    浏览(49)
  • Spring Cloud + Spring Boot 项目搭建结构层次示例讲解

    以下是我搭建Spring cloud项目架构的经验,我将以图片的形式和大家进行分享;至于Spring Boot的搭建经验,我会在图后以文字描述的方式和大家分享,请往下看: Spring Boot的搭建经验,我会以文字描述的方式和大家分享,请往下看: Controller 层 :Controller 层负责处理客户端的请

    2024年02月11日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包