《微服务实战》 第十六章 Spring cloud stream应用

这篇具有很好参考价值的文章主要介绍了《微服务实战》 第十六章 Spring cloud stream应用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

第十六章 Spring cloud stream应用
第十五章 RabbitMQ 延迟队列
第十四章 RabbitMQ应用

《微服务实战》 第十六章 Spring cloud stream应用


前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

《微服务实战》 第十六章 Spring cloud stream应用
《微服务实战》 第十六章 Spring cloud stream应用

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

《微服务实战》 第十六章 Spring cloud stream应用

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组

3.3、生产

/**
 * 订单消息输出通道处理器
 */
@Component
public interface OrderOutputChannelProcesor {
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.4、消费

/**
 * 订单消息输入通道处理器
 */
@Component
public interface OrderInputChannelProcesor {
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){
        log.info("接收消息成功:" + userInfoMessage.getPayload());
    }
}

3.5、延迟队列

安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
      rabbit:
        bindings: #服务的整合处理
          saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.5.2、消息确认机制 消费端

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){

    log.info("接收消息成功:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag:Channel的消息投递的唯一标识符。
     * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
     * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
     * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
     * 如果设置为false,则消息被丢弃或发送到死信Exchange。
     */
    try {
        channel.basicAck(delieverTag,true);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

定义交换机类型为direct文章来源地址https://www.toymoban.com/news/detail-460127.html

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: '''orderRoutingKey'''
        exchangeType: direct

到了这里,关于《微服务实战》 第十六章 Spring cloud stream应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《微服务实战》 第十三章 JWT

    【项目实战】Spring boot整合JWT、Vue案例展示用户鉴权 【微服务实战】JWT JSON Web Token(JWT)是目前最流行的跨域身份验证解决方案。 基于JSON的开发标准 用户信息加密到token里,服务器不保存任何用户信息 在传统的用户登录认证中,因为http是无状态的,所以都是采用session方式

    2024年02月06日
    浏览(58)
  • 《微服务实战》 第十七章 Redis下载与安装

    第二十八章 分布式锁框架-Redisson 第二十四章 Spring boot 操作 Redis 第二十三章 Redis RDB AOF 第二十一、二十二章 Redis发布订阅、事务;HyperLoglog基数统计 第二十章 Redis连接指令 客户端指令 服务器指令 第十九章 Redis key 第十八章 Redis查看配置文件和数据类型 第十七章 Redis下载与安

    2024年02月06日
    浏览(27)
  • 【送书福利-第十六期】Spring Cloud Alibaba 核心技术宝典

    大家好,我是洲洲,欢迎关注,一个爱听周杰伦的程序员。关注公众号【程序员洲洲】即可获得10G学习资料、面试笔记、大厂独家学习体系路线等…还可以加入技术交流群欢迎大家在CSDN后台私信我! 今天洲洲给大家推荐一本开发实战好书:《Spring Cloud Alibaba核心技术宝典》,

    2024年02月12日
    浏览(26)
  • 第十六章 Redies

    2024年04月09日
    浏览(29)
  • 第十六章:变量、流程控制与游标

    ​ 在 MySQL 数据库的存储过程和函数中,可以使用变量来存储查询或计算的中间结果数据,或输出最终的结果数据。变量分为 系统变量 和 用户自定义变量 。 系统变量 ​ 变量由系统定义,不是用户定义,输入服务器层面。启动 MySQL 服务,生成 MySQL 服务实例期间, MySQL 将为

    2024年02月08日
    浏览(39)
  • 第十六章 脚手架文件介绍

    react项目脚手架文件目录 public/index.html public/index.html 文件是 React 应用程序的入口点,负责提供 HTML 文档的基本结构,并包含一个 id 为 root 的 div 元素,这是 React 应用程序将呈现的地方。 src/index.js src/index.js 文件是React应用程序的主要入口点,负责将应用程序呈现到 DOM 中。

    2023年04月09日
    浏览(27)
  • 第十六章 预制件prefab(上)

    本章节我们介绍一下“预制件”,也有人叫“预制体”,也就是Prefab。在游戏世界中,那些自然环境的游戏对象,我们可以提前创建在场景中,这个大家能够理解。但是,有些游戏对象,需要根据游戏逻辑来通过代码生成,例如刷新怪物,触发机关等等。Unity 的预制件系统允许

    2024年02月02日
    浏览(32)
  • spring Cloud Stream 实战应用深度讲解

    Spring Cloud Stream 是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。 该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。 核心模块 Dest

    2024年01月24日
    浏览(30)
  • 《TCP IP网络编程》第十六章

              「分离 I/O 流」是一种常用表达。有 I/O 工具可区分二者,无论采用哪种方法,都可以认为是分离了 I/O 流。 2次 I/O 流分离: 第一种是第 10 章的「TCP I/O 过程」分离。通 过调用 fork 函数复制出一个文件描述符,以区分输入和输出中使用的文件描述符。虽然文件描

    2024年02月13日
    浏览(32)
  • 【Rust】Rust学习 第十六章无畏并发

    安全且高效的处理并发编程是 Rust 的另一个主要目标。 并发编程( Concurrent programming ),代表程序的不同部分相互独立的执行,而 并行编程( parallel programming )代表程序不同部分于同时执行 ,这两个概念随着计算机越来越多的利用多处理器的优势时显得愈发重要。由于历

    2024年02月12日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包