Spring cloud stream 结合 rabbitMq使用

这篇具有很好参考价值的文章主要介绍了Spring cloud stream 结合 rabbitMq使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

序言

之前的开发主要是底层开发,没有深入涉及到消息方面。现在面对的是一个这样的场景:
假设公司项目A用了RabbitMQ,而项目B用了Kafka。这时候就会出现有两个消息框架,这两个消息框架可能编码有所不同,且结构也有所不同,而且之前甚至可能使用的是别的框架,造成了一个不易管理的局面。目前我的需求是不改动或者说少量代码完成两个消息队列之间的切换。我要屏蔽掉切换的成本。

spring cloud stream官方文档

PS:如有英文,是作者纯纯的懒,懒得翻译

消息队列

市面上大部分消息队列的格局应该是 生产者 -》 broker -》消费者
采用的是发布-订阅的模式,大概的元素有如下几个:
Message:生产者/消费者之间靠消息媒介传递信息内容
MessageChannel:消息必须走特定的通道
队列:假如发消息会先发到消息队列当中

cloud Stream设计

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

用Stream链接rabbitMq

我们需要选用一个mq,我们使用一个测试环境的mq用于学习demo,我们需要引入一些依赖。

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>3.2.5</version>
        </dependency>

Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播,在RabbitMQ就是Exchange,在Kakfa中就是Topic。

核心组件

Binder: 很方便的连接中间件,屏蔽差异
Channel: 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channe对队列进行配置
Source(源:发送者)和Sink(水槽:接受者): 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

注解
@EnableBinding

通过将@EnableBinding注释应用于应用程序的一个配置类,可以将Spring应用程序转换为Spring Cloud Stream应用程序。@EnableBinding注释本身使用@Configuration进行元注释,并触发Spring Cloud Stream基础设施的配置:

...
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}

@EnableBinding注释可以将一个或多个接口类作为参数,这些接口类包含表示可绑定组件(通常是消息通道)的方法。

使用案例

@EnableBinding({XXXStreamClient.class})
@Input and @Output

Spring Cloud Stream应用程序可以在接口中定义任意数量的输入和输出通道,分别为@input和@output方法:

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

使用此接口作为@EnableBinding的参数将触发创建三个绑定通道,分别命名为orders、hotDrinks和coldDrinks。

@EnableBinding(Barista.class)
public class CafeConfiguration {

   ...
}
Customizing Channel Names

使用@Input和@Output注释,可以为通道指定自定义通道名称,如以下示例所示:
public interface Barista {

@Input(“inboundOrders”)
SubscribableChannel orders();
}

In this example, the created bound channel will be named inboundOrders.

Source, Sink, and Processor

为了简单地解决最常见的用例(包括输入通道、输出通道或两者),Spring Cloud Stream提供了三个开箱即用的预定义接口。

Source

Source 可以用于具有单个出站通道的应用程序。

public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}
Sink

Sink can be used for an application which has a single inbound channel.

public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();

}
Processor

Processor can be used for an application which has both an inbound channel and an outbound channel.

public interface Processor extends Source, Sink {
}
Accessing Bound Channels

每个绑定的接口,Spring Cloud Stream将生成一个实现该接口的bean。调用其中一个bean的@Input 或@Output 方法将返回相关的绑定通道。
以下示例中的bean在调用其hello方法时在输出通道上发送消息。它在注入的源bean上调用output()来检索目标通道。

@Component
public class SendingBean {

    private Source source;

    @Autowired
    public SendingBean(Source source) {
        this.source = source;
    }

    public void sayHello(String name) {
         source.output().send(MessageBuilder.withPayload(name).build());
    }
}

Bound channels can be also injected directly:

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         output.send(MessageBuilder.withPayload(name).build());
    }
}

If the name of the channel is customized on the declaring annotation, that name should be used instead of the method name. Given the following declaration:

public interface CustomSource {
    ...
    @Output("customOutput")
    MessageChannel output();
}

The channel will be injected as shown in the following example:

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(@Qualifier("customOutput") MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         this.output.send(MessageBuilder.withPayload(name).build());
    }
}
Using @StreamListener for Automatic Content Type Handling

作为对Spring Integration支持的补充,Spring Cloud Stream提供了自己的@StreamListener注释,该注释以其他Spring消息传递注释(例如,@MessageMapping、@JmsListener、@RabbitListener等)为模型。@StreamListener注释为处理入站消息提供了一个更简单的模型,尤其是在处理涉及内容类型管理和类型强制的用例时。
Spring Cloud Stream提供了一个可扩展的MessageConverter机制,用于处理绑定通道的数据转换,在本例中,用于向用@StreamListener注释的方法进行调度。以下是处理外部投票事件的应用程序示例:

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}

当考虑具有String有效载荷和application/json的contentType标头的入站消息时,可以看到@StreamListener和Spring Integration@ServiceActivator之间的区别。在@StreamListener的情况下,MessageConverter机制将使用contentType标头将String有效载荷解析为Vote对象。
与其他Spring Messaging方法一样,方法参数可以用@Payload、@Headers和@Header进行注释。
对于返回数据的方法,必须使用@SendTo注释为该方法返回的数据指定输出绑定目标:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

使用@StreamListener将消息调度到多个方法
在本例中,所有带有值为foo的头类型的消息都将被调度到receiveFoo方法,而所有带有值bar的头类型消息都将调度到receive bar方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")
    public void receiveFoo(@Payload FooPojo fooPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")
    public void receiveBar(@Payload BarPojo barPojo) {
       // handle the message
    }
}

demo

先安装rabbitMq

过程 略

代码部分

配置

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
          test: # 表示定义的名称,用于于binding整合
            type: rabbit # 消息组件类型
            environment: # 设置rabbitmq的相关的环境配置
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
      bindings: # 服务的整合处理
        testOutput: #生产者消息输出通道 ---> 消息输出通道 = 生产者相关的定义:Exchange & Queue
          destination: exchange-test          #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输出通道绑定到RabbitMQ的exchange-test交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit       #设置要绑定的消息服务的具体设置,默认绑定RabbitMQ
          group: testGroup                    #分组=Queue名称,如果不设置会使用默认的组流水号
        testInput: #消费者消息输入通道 ---> 消息输入通道 = 消费者相关的定义:Exchange & Queue
            destination: exchange-test          #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输入通道绑定到RabbitMQ的exchange-test交换器。
            content-type: application/json
            default-binder: defaultRabbit
            group: testGroup
        testOutput1: #生产者消息输出通道 ---> 消息输出通道 = 生产者相关的定义:Exchange & Queue
          destination: exchange-test          #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输出通道绑定到RabbitMQ的exchange-test交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: test       #设置要绑定的消息服务的具体设置,默认绑定RabbitMQ
          group: testGroup1                    #分组=Queue名称,如果不设置会使用默认的组流水号
        testInput1: #消费者消息输入通道 ---> 消息输入通道 = 消费者相关的定义:Exchange & Queue
          destination: exchange-test          #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输入通道绑定到RabbitMQ的exchange-test交换器。
          content-type: application/json
          default-binder: test
          group: testGroup1


TestChannelProcessor

package com.anxin.rabbitmq_scz.provider;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface TestChannelProcessor {
    /**
     * 生产者消息输出通道(需要与配置文件中的保持一致)
     */
    String TEST_OUTPUT = "testOutput";

    /**
     * 消息生产
     *
     * @return
     */
    @Output(TEST_OUTPUT)
    MessageChannel testOutput();

    /**
     * 消费者消息输入通道(需要与配置文件中的保持一致)
     */
    String TEST_INPUT = "testInput";

    /**
     * 消息消费
     *
     * @return
     */
    @Input(TEST_INPUT)
    SubscribableChannel testInput();
}

TestMessageProducer

package com.anxin.rabbitmq_scz.provider;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

import java.util.HashMap;
import java.util.Map;

@EnableBinding(value = {TestChannelProcessor.class})
public class TestMessageProducer {
    @Autowired
    private BinderAwareChannelResolver channelResolver;

    /**
     * 生产消息
     *
     * @param msg
     */
    public void testSendMessage(String msg) {
        Map<String, Object> headers = new HashMap<>();
        Map<String, Object> payload = new HashMap<>();
        payload.put("msg", msg);
        System.err.println("生产者发送消息:" + JSON.toJSONString(payload));
        channelResolver.resolveDestination(TestChannelProcessor.TEST_OUTPUT).send(
                MessageBuilder.createMessage(payload, new MessageHeaders(headers))
        );
    }

    /**
     * 生产消息
     *
     * @param msg
     */
    public void testSendMessage1(String msg) {
        Map<String, Object> headers = new HashMap<>();
        Map<String, Object> payload = new HashMap<>();
        payload.put("msg", msg);
        System.err.println("生产者发送消息:" + JSON.toJSONString(payload));
        channelResolver.resolveDestination(TestChannelProcessor1.TEST_OUTPUT).send(
                MessageBuilder.createMessage(payload, new MessageHeaders(headers))
        );
    }
}

TestMessageConsumer

package com.anxin.rabbitmq_scz.consumer;

import com.anxin.rabbitmq_scz.provider.TestChannelProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

@EnableBinding(TestChannelProcessor.class)
public class TestMessageConsumer {
    @StreamListener(TestChannelProcessor.TEST_INPUT)
    public void testConsumeMessage(Message<String> message) {
        System.err.println("消费者消费消息:" + message.getPayload());
    }
}

SwaggerConfig

package com.anxin.rabbitmq_scz.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.async.DeferredResult;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi() {

        return new Docket(DocumentationType.SWAGGER_2)
                .genericModelSubstitutes(DeferredResult.class)
                .select()
                .paths(PathSelectors.any())
                .build().apiInfo(apiInfo());

    }

    private ApiInfo apiInfo() {

        return new ApiInfoBuilder().title("Stream server")
                .description("测试SpringCloudStream")
                .termsOfServiceUrl("https://spring.io/projects/spring-cloud-stream")
                .version("1.0").build();
    }
}

TestController

package com.anxin.rabbitmq_scz.controller;

import com.anxin.rabbitmq_scz.provider.TestMessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @Autowired
    private TestMessageProducer testMessageProducer;

    /**
     * 发送保存订单消息
     *
     * @param message
     */
    @GetMapping(value = "sendTestMessage")
    public void sendTestMessage(@RequestParam("message") String message) {
        //发送消息
        testMessageProducer.testSendMessage(message);
    }

    /**
     * 发送保存订单消息
     *
     * @param message
     */
    @GetMapping(value = "sendTestMessage1")
    public void sendTestMessage1(@RequestParam("message") String message) {
        //发送消息
        testMessageProducer.testSendMessage1(message);
    }
}

启动即可

结束语

新入一门技术最简单最快的办法就是观看它的官方文档,学会api的调用,代价是极低的,但是如果要深入一门技术,必须需要阅读其源码且结合其设计模式。文章来源地址https://www.toymoban.com/news/detail-755929.html

到了这里,关于Spring cloud stream 结合 rabbitMq使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function

    注意当多个消费者时,需要添加配置项:spring.cloud.function.definition 启动日志 交换机名称对应: spring.cloud.stream.bindings.demo-in-0.destination配置项的值 队列名称是交换机名称+分组名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 问题总结 问题一 解决办法: 查看配置是否正确: spring

    2024年02月19日
    浏览(42)
  • SpringBoot 如何使用 Spring Cloud Stream 处理事件

    在分布式系统中,事件驱动架构(Event-Driven Architecture,EDA)已经成为一种非常流行的架构模式。事件驱动架构将系统中的各个组件连接在一起,以便它们可以相互协作,响应事件并执行相应的操作。SpringBoot 也提供了一种方便的方式来处理事件——使用 Spring Cloud Stream。 Spr

    2024年02月10日
    浏览(48)
  • 【springcloud 微服务】Spring Cloud Alibaba Sentinel使用详解

    目录 一、前言 二、分布式系统遇到的问题 2.1 服务可用性问题 2.1.1  单点故障

    2024年01月16日
    浏览(48)
  • 【springcloud 微服务】Spring Cloud Alibaba Nacos使用详解

    目录 一、前言 二、nacos介绍 2.1  什么是 Nacos 2.2 nacos 核心能力 2.2.1 服务发现和服务健康监测

    2024年01月22日
    浏览(50)
  • 【springcloud 微服务】Spring Cloud Ribbon 负载均衡使用策略详解

    目录 一、前言 二、什么是Ribbon 2.1 ribbon简介 2.1.1  ribbon在负载均衡中的角色

    2024年02月02日
    浏览(63)
  • 【springcloud 微服务】Spring Cloud 微服务网关Gateway使用详解

    目录 一、微服务网关简介 1.1 网关的作用 1.2 常用网关 1.2.1 传统网关 1.2.2 云原生网关

    2023年04月16日
    浏览(50)
  • 【SpringCloud】11、Spring Cloud Gateway使用Sentinel实现服务限流

    1、关于 Sentinel Sentinel 是阿里巴巴开源的一个流量防卫防护组件,可以为微服务架构提供强大的流量防卫能力,包括流量控制、熔断降级等功能。Spring Cloud Gateway 与 Sentinel 结合,可以实现强大的限流功能。 Sentinel 具有以下特性: 丰富的应用场景:Sentinel 承接了阿里巴巴近

    2024年02月01日
    浏览(54)
  • 【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    @[TOC](【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)) 2.1 消息发送者 2.1.1 使用 StreamBridge streamBridge; 往指定信道发送消息 2.1.2 通过隐式绑定信道, 注册 Bean 发送消息 2.2 消息接收者 注意: 多个方法之间可以使用 “|” 间隔, 但是绑定时 多个需要按顺序写. 其中

    2024年02月03日
    浏览(37)
  • 【RabbitMQ】RabbitMQ安装与使用详解以及Spring集成

    🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的专栏《RabbitMQ实战》。🎯🎯 👉点击这里,就可以查看我的主页啦!👇👇 Java方文山的个人主页 🎁如果感觉还不错的话请给我点赞吧!🎁🎁 💖期待你的加入,一

    2024年01月20日
    浏览(40)
  • 在Spring Cloud中使用RabbitMQ完成一个消息驱动的微服务

    Spring Cloud系列目前已经有了Spring Cloud五大核心组件:分别是,Eureka注册中心,Zuul网关,Hystrix熔断降级,openFeign声明式远程调用,ribbon负载均衡。这五个模块,对了,有没有发现,其实我这五个模块中ribbon好像还没有案例例举,目前只有一个Ribbon模块的搭建,后边我会完善的

    2024年02月04日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包