SpringCloud集成RocketMQ
-
pom
<!--boot web--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--spring-cloud-starter-stream-rocketmq 2.2.5.RELEASE--> <!--根据自己部署的mq版本来选择对应的客户端版本--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <exclusions> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.8.0</version> </dependency> <!-- actuator 不引入会报错哦--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
生产者
-
生产者yaml
spring: cloud: stream: #RocketMQ 通用配置 rocketmq: binder: #客户端接入点,必填 rocketMQ的连接地址,binder高度抽象 name-server: localhost:9876 bindings: # bindings 具体生产消息、消费消息的桥梁 # destination: 指定发送的topic # content-type: #默认是application/json # group: 组 # gis_group_out: @Output @Input绑定 @Output绑定就是生产者 @Input绑就是消费者 # consumer:concurrency: 设置消费线程数 gis_group_out: destination: godown_gis content-type: application/plain group: gis_group_out #consumer: # concurrency: 20
-
自定义Source
package com.dist.ytgz.approve.rocketmq; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义Source * * @author <a href="mailto:zhangxiao@dist.com.cn">Zhang Xiao</a> * @since */ public interface MySource { String GIS_GROUP_OUT = "gis_group_out"; @Output(MySource.GIS_GROUP_OUT) MessageChannel gisGroupOut(); }
-
发送消息
// spring cloud stream里面发消息通过 Source 发送 @Autowired private MySource source; // 发送消息的方法 withPayload(T payload) 所以是传任意类型, 把对应的CONTENT_TYPE, APPLICATION_JSON设置好就行 source.gisGroupOut().send( MessageBuilder .withPayload(messagePayload) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build() );
-
启动类
/** * 记得添加@EnableBinding注解指定消息信道 * Sink.class 表示信道 input , Source.class 表示信道 output * 这两个分别与application.yml中的spring.cloud.stream.bindings.input和spring.cloud.stream.bindings.output对应 * 如果信道名称换成是sender和receiver,就得配置spring.cloud.stream.bindings.sender和spring.cloud.stream.bindings.receiver * * * 而我这里是gis_group_out 所以 spring.cloud.stream.bindings.gis_group_out */ // @EnableBinding(value = {Sink.class, Source.class}) @EnableBinding(value = Source.class) @SpringBootApplication public class ApproveApplication { public static void main(String[] args) { SpringApplication.run(ApproveApplication.class, args); } }
消费者
-
消费者yaml
spring: cloud: stream: #RocketMQ 通用配置 rocketmq: binder: #客户端接入点,必填 rocketMQ的连接地址,binder高度抽象 name-server: localhost:9876 bindings: gis_group_in: consumer: # 消息失败直接放入死信队列, 消息不会被重复消费 delayLevelWhenNextConsume: -1 bindings: # bindings 具体生产消息、消费消息的桥梁 # destination: 指定发送的topic # content-type: #默认是application/json # group: 组 # gis_group_out: @Output @Input绑定 @Output绑定就是生产者 @Input绑就是消费者 # consumer.maxAttempts: 1 #默认是3,1表示不重试 gis_group_in: destination: godown_gis content-type: application/plain group: gis_group_in consumer: concurrency: 20 maxAttempts: 1
-
自定义Sink
package com.dist.ic.rocketmq; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义Sink * * @author <a href="mailto:zhangxiao@dist.com.cn">Zhang Xiao</a> * @since */ public interface MySink { String GIS_GROUP_IN = "gis_group_in"; @Input(MySink.GIS_GROUP_IN) SubscribableChannel gis_group_in(); }
-
接受消息
package com.dist.ic.rocketmq; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.dist.common.security.vo.UserVO; import com.dist.ic.service.ICService; import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; import java.util.Map; /** * TODO * * @author <a href="mailto:zhangxiao@dist.com.cn">Zhang Xiao</a> * @since */ @Component public class RocketMessageListener { private final Logger log = LoggerFactory.getLogger(RocketMessageListener.class); @Autowired private ICService icService; @StreamListener(MySink.GIS_GROUP_IN) // 指定对应的Input管道 public void handlerMessage(MessagePayload message) { log.info("接收到rocketmq的消息========>{}", JSON.toJSONString(message)); } }
-
启动类文章来源:https://www.toymoban.com/news/detail-499133.html
/** * 记得添加@EnableBinding注解指定消息信道 * Sink.class 表示信道 input , Source.class 表示信道 output * 这两个分别与application.yml中的spring.cloud.stream.bindings.input和spring.cloud.stream.bindings.output对应 * 如果信道名称换成是sender和receiver,就得配置spring.cloud.stream.bindings.sender和spring.cloud.stream.bindings.receiver * * * 而我这里是gis_group_out 所以 spring.cloud.stream.bindings.gis_group_out */ // @EnableBinding(value = {Sink.class, Source.class}) @EnableBinding(value = Sink.class) @SpringBootApplication public class ICApplication { public static void main(String[] args) { SpringApplication.run(ICApplication.class, args); } }
重点: RocketMQ相关配置看https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit文章来源地址https://www.toymoban.com/news/detail-499133.html
到了这里,关于SpringCloud集成RocketMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!