RocketMQ集成Springboot 三种消息发送方式
生产者
引入依赖
<!--⽗⼯程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>
/**
*生产者的代码书写demo
**/
@SpringBootTest
public class RockerMQTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void sendMsg(){
Message msg = MessageBuilder.withPayload("发送同步消息1").build();
rocketMQTemplate.send("helloTopicBoot",msg);
}
/**
* 异步发送消息,成功或者失败之后进行回调
*/
@Test
public void sendASYNCMsg() throws InterruptedException {
System.out.println("发送前");
Message msg = MessageBuilder.withPayload("boot发送异步消息").build();
rocketMQTemplate.asyncSend("helloTopicBoot", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送状态:"+sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送失败");
}
});
System.out.println("发送完毕");
//睡五秒,不睡的话整个方法就结束了不能进行回调了
TimeUnit.SECONDS.sleep(5);
}
/**
* 一次性消息无论消息的结果是什么,通常用于日志等丢失一小部分数据无关紧要的情况下使用
*/
@Test
public void sendOnewayMsg(){
Message msg = MessageBuilder.withPayload("boot发送一次性消息").build();
rocketMQTemplate.sendOneWay("helloTopicBoot",msg);
}
}
//application.yml配置文件
rocketmq:
name-server: 10.0.0.130:9876
producer:
group: my-group
消费者
引入依赖
<!--⽗⼯程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
</dependencies>
实现一个监听器对象,重写其中的消费消息的方法。使用注解@RocketMQMessageListener(consumerGroup = “htpConsumerGroup”,topic = “helloTopicBoot”)
consumerGroup组必须是唯一的,helloTopicBoot表示要监听的主题文章来源:https://www.toymoban.com/news/detail-599686.html
@Component
@RocketMQMessageListener(consumerGroup = "htpConsumerGroup",topic = "helloTopicBoot")
public class HelloTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到的消息:"+new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
最后生产者启动测试类发送消息,消费者运行主程序一直运行即可.文章来源地址https://www.toymoban.com/news/detail-599686.html
到了这里,关于RocketMQ集成Springboot --Chapter1的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!