RocketMQ不同的类型消息

这篇具有很好参考价值的文章主要介绍了RocketMQ不同的类型消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

普通消息

可靠同步发送

可靠异步发送

单向发送

三种发送方式的对比

顺序消息

事物消息

 两个概念

 事务消息发送步骤

事务消息回查步骤

消息消费要注意的细节

RocketMQ支持两种消息模式:


普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。

可靠同步发送

同步发送是指消息发送放发出数据后,会在收到接收方 响应之后才发下一个数据包的通讯方式。

这种方式应用场景非常广泛,列如重要通知邮件、报名短信通知、营销短信系统等。

//同步消息
   @Test
   public void testSyncSend() {
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
      SendResult sendResult =
              rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");                         
      System.out.println(sendResult);
   }

可靠异步发送

异步发送是指发送放  发出数据后,不等接收方 发回响应,接着发送下一个数据包的通讯方式。发送方 通过回调接口接收服务器响应,并对响应结果进行处理。

异步发送一般用于 链路耗时较长,对RT响应时间较为敏感的业务场景,列如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

//异步消息
   @Test
   public void testAsyncSend() throws InterruptedException { 
        public void testSyncSendMsg() {
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
          rocketMQTemplate.asyncSend(
                "test-topic-1", 
                "这是一条异步消息",
          new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {             
                 System.out.println(sendResult);
              }
              @Override
              public void onException(Throwable throwable) {             
                 System.out.println(throwable);
              }
        });
//让线程不要终止
       Thread.sleep(30000000);
   }

单向发送

单向发送是指发送方 只负责发送消息,不等待服务器回应 没有 回调函数触发,只发送请求不等待应答。

适合用于某些耗时非常短,但对可靠性要求并不高的场景,不如说日志收集。

//单向消息
      @Test
      public void testOneWay() {
         rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
      }

三种发送方式的对比

RocketMQ不同的类型消息

 

顺序消息

RocketMQ不同的类型消息

//同步顺序消息[异步顺序 单向顺序写法类似]
        public void testSyncSendOrderly() {
//第三个参数用于队列的选择
            rocketMQTemplate.syncSendOrderly(
                "test-topic-1", 
                "这是一条异步顺序消息",
                "xxxx");
        }

事物消息

 RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。

RocketMQ不同的类型消息

 两个概念

        半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到 RocketMQ服务,但是服务端 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息 为半事务消息。

         消息回查:由于网络闪断、生产者应用重启等原因,导致的某条事务消息的二次确认丢失,

 RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或者 Rollback),该询问过程 消息回查。

 事务消息发送步骤

   1.发送方 将半事务消息发送至PocketMQ服务端。

    2.RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。

    3.发送方开始执行本地事务逻辑

    4.发送方根据本地事务执行结果向服务端提交二次确认(Commit 或者 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息。服务端说到R0llback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤

     1.在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该数据发起消息回查

     2.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

     3.发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍然按照步骤4对半事务消息  进行操作。

//事物日志
@Entity(name = "shop_txlog")

@Data
public class TxLog {

        @Id
        private String txLogId;

        private String content;

        private Date date;
}


@Service
public class OrderServiceImpl4 {
    @Autowired

    private OrderDao orderDao;


    @Autowired
    private TxLogDao txLogDao;


    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    public void createOrderBefore(Order order) {

         String txId =  UUID.randomUUID().toString();

        //发送半事务消息

        rocketMQTemplate.sendMessageInTransaction(
                                        "tx_producer_group",

                                        "tx_topic",
                                        MessageBuilder.withPayload(order).setHeader(

                                                                        "txId",
                                                                        txId).build(),

                                        order
                                        );
    }


     //本地事物

@Transactional

public void createOrder(String txId, Order order) {
        //本地事物代码

        orderDao.save(order);

        //记录日志到数据库,回查使用

        TxLog txLog = new TxLog();
        txLog.setTxLogId(txId);

        txLog.setContent("事物测试");

        txLog.setDate(new Date());
        txLogDao.save(txLog);
    }
}

@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")

public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {

        @Autowired
         private TxLogDao txLogDao;


        @Autowired
        private OrderServiceImpl4 orderServiceImpl4;


        //执行本地事物

    @Override

    public RocketMQLocalTransactionState executeLocalTransaction(

        Message msg, Object arg) {
        try {
        //本地事物
            orderServiceImpl4.createOrder((String) msg.getHeaders().get("txId"), (Order) arg);
           

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    //消息回查

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        //查询日志记录
        TxLog txLog = txLogDao.findById((String) msg.getHeaders().get("txId")).get();

        if (txLog == null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

消息消费要注意的细节

@RocketMQMessageListener(
        consumerGroup = "shop",//消费者分组

        topic = "order-topic",//要消费的主题
        consumeMode = ConsumeMode.CONCURRENTLY, //消费模式:无序和有序                messageModel = MessageModel.CLUSTERING, //消息模式:广播和集群,默认是集群
)
public class SmsService implements RocketMQListener<Order> {

}

RocketMQ支持两种消息模式:

  • 广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;

  • 集群消费: 一条消息只能被一个消费者实例消费文章来源地址https://www.toymoban.com/news/detail-436266.html

到了这里,关于RocketMQ不同的类型消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

    我们先看一串代码,并思考一下为什么要先入库然后发MQ: 如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。 ① 消息从生产者发送到Broker 生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接

    2024年02月11日
    浏览(52)
  • rabbitmq消息可靠性之消息回调机制

    rabbitmq消息可靠性之消息回调机制 rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制 生产者确认机制 消息持久

    2024年02月08日
    浏览(53)
  • RabbitMQ:可靠消息传递的强大消息中间件

     消息中间件在现代分布式系统中起着关键作用,它们提供了一种可靠且高效的方法来进行异步通信和解耦。在这篇博客中,我们将重点介绍 RabbitMQ,一个广泛使用的开源消息中间件。我们将深入探讨 RabbitMQ 的特性、工作原理以及如何在应用程序中使用它来实现可靠的消息传

    2024年02月12日
    浏览(74)
  • 消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

    官方网址 源码:https://kafka.apache.org/downloads 快速开始:https://kafka.apache.org/documentation/#gettingStarted springcloud整合 发送消息流程 主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。 Map集合:存储了主线程的消息。 Sender线程:真正的

    2024年03月10日
    浏览(44)
  • RabbitMQ --- 消息可靠性

    消息队列在使用过程中,面临着很多实际问题需要思考:      消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 co

    2024年02月14日
    浏览(46)
  • RabbitMQ消息可靠性(一)-- 生产者消息确认

    目录 前言 一、消息确认流程图 二、生产者消息确认 1、publisher-confirm(发送者确认) 2、publisher-return(发送者回执) 三、代码实现 1、修改application.yml 配置 2、ConfirmCallback函数和ReturnCallback函数 在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一

    2024年02月04日
    浏览(70)
  • 解析 RocketMQ 业务消息 - “顺序消息”

    Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案

    2024年01月16日
    浏览(38)
  • RabbitMQ消息的可靠性

    面试题: Rabbitmq怎么保证消息的可靠性? 1.消费端消息可靠性保证: 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费

    2024年04月14日
    浏览(70)
  • RabbitMQ-保证消息可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月07日
    浏览(48)
  • RocketMQ 发送批量消息、过滤消息和事务消息

    前面我们知道RocketMQ 发送延时消息与顺序消息,现在我们看下怎么发送批量消息、过滤消息和事务消息。 限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。 消息的生产者 消息的消费者 消息分割 如果

    2023年04月21日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包