Pulsar消息发送、消费架构概述

这篇具有很好参考价值的文章主要介绍了Pulsar消息发送、消费架构概述。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好,我是威哥,《RocketMQ技术内幕》、《RocketMQ实战》作者、RocketMQ社区首席布道师、极客时间《中间件核心技术与实战》专栏作者、中通快递基础架构资深架构师,越努力越幸运,唯有坚持不懈,与大家共勉。

1、订阅与发布

Pulsar基于发布-订阅模式,消息发送者向主题发送消息,而消费消费者订阅主题,消息从Pulsar Broker中获取消息,处理成功后需要向Pulsar发送ACK,表示消息处理成功。与RocketMQ/Kafka不同的是,**Pulsar只有当消费者确认消息都成功被处理后才能去删除消息。**如果Consumer在处理一批消息失败后,可以再次请求Broker重新下发该批消息,以便进行重试。

pulsar 发布订阅,Pulsar实战与进阶,架构,分布式,pulsar,云原生

2、消息(Messages)

消息(Message)是Pulsar中最基本的抽象单位,一条Pulsar消息中的属性如下所示:

组件名称 描述
Value / data payload 消息体,字节数组
Key 消息键或者分区键,主题的压缩机制将依赖消息的key
Properties 消息属性,类比RocketMQ的消息属性、Kafka中的Header,通常用于存放消息的扩展属性
Producer name 消息的生产者
Topic name 消息所属的主题
Schema version 消息生产者的Schema版本号
Sequence ID 消息在主题中所有消息的序号,默认由发送者产生,也可以由用户自定义,可以用于在一次消息发送调用API中消息去重。
如果服务端brokerDeduplicationEnabled设置为true,则服务端会进行唯一性校验
Message ID 消息ID,消息持久化到服务端后生成,里面包含了消息的特定存储位置,并且在集群内是全局唯一的
Publish time 消息的发送时间,消息发送者自动生成
Event time 事件时间,由应用程序附加到消息的可选时间时间戳,通常是业务发生的时间,例如订单的下单时间等

在Pulsar中,消息的默认最大消息大小为5M,我们可以有如下两种方式进行更改:

  • 在broker.conf中maxMessageSize=5242880
  • 在bookkeeper.conf中nettyMaxFrameSizeBytes=5253120

3、生产者(Producers)

Producer生产者,消息发送客户端。

3.1发送模式

Produder有两种消息发送模式:

  • 同步发送
  • 异步发送

3.2 访问模式

在Pulsar中,生产者访问Broker中主题提供了多种访问模式,详细如下表所示:

组件名称 描述
Shared 多个生产者都可以向一个主题发送消息,默认模式
Exclusive 独占模式,一个主题只能被一个生产者连接,如果另外一个生产者试图连接,则会立即收到一个错误,但如果老的生产者宕机,会选举产生一个新的生产者
ExclusiveWithFencing 只允许一个生产者往该主题发送消息,相比Exclusive,没有备选机制。
WaitForExclusive 支持多个生产者通过选举机制成为Leader后发送消息,Leader宕机后,重新竞争选举出新的Leader,只有Leader可以发送消息

3.3 压缩算法

Pulsar目前支持LZ4、ZLIB、ZSTD、SNAPP四种压缩机制,可以通过如下代码指定压缩算法:

client.newProducer()
    .topic("topic-name")
    .compressionType(CompressionType.LZ4)
    .create();

3.4 批量发送

Pulsar支持批量消息发送。如果开启批量发送,消息发送者会将多条消息累积到一个批次中进行一次发送。一个批次的消息大小由最大的消息条数+最大的发送延迟两个参数共同决定(参考kafka中的batch.size、linger.ms),如果开启了批处理,backlogSize表示的一个批次中消息的条数。

批处理示例图如下:

pulsar 发布订阅,Pulsar实战与进阶,架构,分布式,pulsar,云原生

Pulsar会把一个批次作为一个整体存储到Broker中,消费者接到一个批次后再解绑成一条一条的消息。但即使开启批处理,但调度类消息(设置了deliverAt或者deliverAfter)会单独一条消息进行发送。

默认情况下如果一个批次中的消息出现部分消费失败,消费端在消费重试时会再次收到这个批次中所有的消息,为了避免这种情况,Pulsar在2.6.0版本中引入了批量索引确认机制。一个批次中所有消息被确认后会删除。那pulsar是如何支持消息回溯的呢?[答案在介绍Consumer的时候会介绍]

默认情况下,批索引确认机制是关闭的。如果要开启,需要在broker端配置acknowledgmentAtBatchIndexLevelEnabled=true。同样在消费端也需要设置acknowledgmentAtBatchIndexLevelEnabled=true。

消费端开启批索引确认示例代码:

Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic(topicName)
        .subscriptionName(subscriptionName)
        .subscriptionType(subType)
        .enableBatchIndexAcknowledgment(true)
        .subscribe();

3.5 消息分块

Pulsar能够在消息发送端将一个大的消息体分割成多个小块的消息,并且在消息消费端聚合成一条完整消息再消费。

如果开启分块机制,当需要发送的消息超过允许的最大消息大小(maxMessageSize)时,其工作流程如下:

  • 生产者将原始消息拆分成分块消息,并将他们与分块元数据按顺序发送给Broker。
  • Broker将分块消息当成普通消息进行存储,并且使用chunkedMessageRate来记录分块消息的速率。
  • 消息消费端首先将缓存分块消息,直到一个消息所有分库都被接收,然后整合成一条完整的消息,传输到接收Queue中,客户端从接收队列中获取一条完整的消息。

限制:

  • 分块消息只适用于持久化主题
  • 分块只对 exclusive与failover subscription types 两种订阅模式生效
  • 分块不能与批处理同时开启

关于分块的工作机制,官方文档如下所示:

pulsar 发布订阅,Pulsar实战与进阶,架构,分布式,pulsar,云原生

两个生产者发送多条消息的分块,在服务端,一条消息的多个分块会被顺序存储,但一条消息的多个分块并不是连续存储的,然后消费者在接收时,会利用缓存对分块信息进行聚合。

**注意:**一旦开启了消息分块,消费时需要在消费端聚合成一条完整的消息,必须为每一条大消息创建独立的缓冲区,会对消费端的内存带来压力,有内存溢出的风险。

故为了保护消费端,消费端采取了两个措施:

  • 引入了一个maxPendingChunkedMessage参数,设置可以缓存的最大chunk数,当缓存的chunk数量达到这个值后,pulsar会drop掉部分chunk,先保证一条消息顺利合并,其他丢弃的消息再在合适的时候重新从Broker拉取。
  • 引入了expireTimeOfIncompleteChunkedMessage参数,如果一个消息的所有块在指定时间内没有全部到达,这些分块将在消费端全部被移除,默认的过期时间为1min。

要开启消息分块的一个前提条件是需要关闭批量发送,具体做法是将生产者的enableBatching设置为false。

默认情况下消息分块是禁用的,如果需要开启,需要将生产者的chunkingEnabled设置为true。

4、消费组(Consumers)

消费者,通过订阅主题,从而从Broker端接受消息,消息发送,消息消费的核心示意图如下:

pulsar 发布订阅,Pulsar实战与进阶,架构,分布式,pulsar,云原生

消费端会使用一个队列来接受Broker端的消息,这个缓存可以通过receiverQueueSize来配置,默认为1000。

4.1 消息接受模式

消费端接收消息支持同步接收与异步接收两种模式:

  • Sync receive:同步接收模式,如果Broker没有需要消费的消息,接受线程将阻塞
  • An async receive:异步接收模式,将立马返回,使用了Feture模式,消息真正到达后可用。

4.2 消费监听器(Listeners)

消息消费监听器,当从Broker中收到消息后,将调用消费监听器,从而触发业务代码的执行。

4.3 消费确认机制(Acknowledgment)

消费者在成功消费完一条消息后需要告知Broker已成功消费,俗称ACK确认信息。然后这条消息会被持久化存储,并且在所有订阅组都成功确认后才会删除这条消息。如果希望Broker继续存储已被所有订阅确认的消息,则需要设置消息的持久策略(本文后面会详细介绍)。

如果发送端启用了批处理,则Pulsar可以引入批索引确认机制,避免一个批次的消息重复下发给消费者。

在Pulsar中,确认一条消息有如下两种方式:

  • 单条消息独立确认。消费者每一条消息都会发送ACK给Broker,消费端通过调用consumer.acknowledge(msg)对单条消息进行确认。
  • 累积确认,消费者只确认接收到的最后一条消息。消费端通过调用consumer.acknowledgeCumulative(msg)进行累积消息确认。
4.3.1 Negative acknowledgment(取消确认)

消费者可以通过发送neagative ack请求到broker,告知broker并未成功消费该条消息,broker收到该请求后,会触发broker将这条消息重新下发给消费者进行消费。

如果消费者订阅模式为Exclusive或者Failover subscription类型时,消费者只能否认收到的最后一条消息。

如果消费者订阅模式为Shared或者Key_Shared类型时,消费者可以否认单独一条消息。

值得注意的是,Negative acknowledgment机制将对顺序性语义带来破坏,在顺序消费场景,请慎重考虑。

如果要对消息使用否定确认,请确保在消息确认超时之前进行发起。

我们可以使用如下API来进行否定确认,代码如下:

Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName("sub-negative-ack")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 //设置当客户端调用negativeAcknowledge方法后,服务端进行再投递的延迟时间
                .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
                .subscribe();

Message<byte[]> message = consumer.receive();

// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);

message = consumer.receive();
consumer.acknowledge(message);

当客户端调用negativeAcknowledge后,但服务端如果一直未收到这条消息的再次ACK,会在服务端进行重推,并且可以设置阶梯延迟投递,启用类似阶梯投递机制的代码如下所示:

Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic(topic)
        .subscriptionName("sub-negative-ack")
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
         // 启用阶梯延迟重推
        .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
            .minDelayMs(1000)  // 最小时间,机第一次推送的延迟时间,然后按multiplier倍数增长,
            .maxDelayMs(60 * 1000) // 最大延迟推送等待时间
            .multiplier(2) // 一次延迟推送的倍数,这里设置为,则重试时间如下:1s 2s 4s 8s 16s 32s 60s 60s 
            .build())
        .subscribe();

温馨提示:如果发送端启用了批处理,Broker是按批的维度重推这一批消息。

4.3.2 (确认超时)Acknowledgment timeout

默认情况下,并不会开启ACK超时确认,也就是意味着Broker将一条消息传递给消费者后并不会再次投递,除非消费者崩溃退出。

确认超时机制允许您设置一个时间范围,在此期间客户端跟踪未确认的消息。在设置的超时(ackTimeout)时间过期后,客户端可以向Broker发送redeliver unacknowledged messages 请求,然后Broker会将未确认消息再次投递给消费者。

客户端在ackTimeout超时后,有两种机制向服务端发送redeliver unacknowledged messages:

  • 第一种是以固定频率定时发送,主要是通过设置消费者的ackTimeoutTickTime参数,示例如下:

    PulsarClient pulsarClient = PulsarClient.builder().build();
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
             .ackTimeout(10, TimeUnit.SECONDS) // 开启超时确认机制
             .ackTimeoutTickTime(1000, TimeUnit.SECONDS) // 设置定时发送频率
             // 省略其他属性
             .subscribe();
    
  • 第二种是延迟梯度的方式进行发送,具体代码如下:

    consumer.ackTimeout(10, TimeUnit.SECOND)
            .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
                .minDelayMs(1000) // 最小延迟时间
                .maxDelayMs(60 * 1000) // 最大延迟时间
                .multiplier(2) // 递增倍数
                .build());
    

温馨提示:

  • 如果启用了批处理,确认超时后,Broker会将一个批次作为一个整体重推,而不是重推这个批次中的部分消息。
  • negative acknowledgment确认比超时确认拥有更高的优先级。

4.4 重试主题(Retry letter topic)

Pulsar支持消息消费重试,消费者在消费消息的过程中如果处理失败,可以将这些消息存储在消费者对应的重试主题中,以便后续再次重新消费,消费者会自动订阅重试主题。达到最大消费重试次数后如果还是失败,则会将消息存储在死信队列,死信队列中的消息需要人工手动去处理。

重试主题的工作机制如下图所示:

pulsar 发布订阅,Pulsar实战与进阶,架构,分布式,pulsar,云原生

**消息消费失败重试机制默认是禁用的,**可以通过设置enableRetry为true开启消费消费失败重试,可以通过maxRedeliverCount设置最大重试次数,开启消息消费重试机制的示例代码如下:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic("my-topic")
                .subscriptionName("dw_test_consumer_022000")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true) // 开启消费重试
                .deadLetterPolicy(DeadLetterPolicy.builder() 
                        .maxRedeliverCount(maxRedeliveryCount)  // 最大重试次数
                        .retryLetterTopic("my-retry-letter-topic-name")// 可以自定义重试主题
                        .build())
                .subscribe();

重试主题的默认命名规则:topicName-subscriptionname-RETRY

重试主题中的消息包含一些特殊的属性:

  • REAL_TOPIC 消息原始主题
  • ORIGIN_MESSAGE_ID 消息原始ID
  • RECONSUMETIMES 当前重试次数
  • DELAY_TIME 消息重试间隔(毫秒)

如果使用消息重试,客户端需要调用如下API将消息持久化到消息队列中:

consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);

//并且该方法还有一个重载方法,支持自定义消息属性
Map<String, String> customProperties = new HashMap<String, String>();
customProperties.put("custom-key-1", "custom-value-1");
customProperties.put("custom-key-2", "custom-value-2");
consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);

温馨提示:

  • 目前,只在Shared 订阅模式中启用了消息消费重试机制
  • 与否定确认(Negative acknowledgment)相比,消息消费重试机制更加适合类似需要大量重试并且重试间隔可配置的场景,因为消息重试主题是持久到BookKeeper中,而否定确认是缓存在客户端。

4.5 死信队列(Dead letter topic)

如果消费重试次数达到指定的最大值后还是未成功消费,Pulsar会将消息发送到消费者对应的死信队列,一旦消息进入到死信队列,Pulsar不会主动对这些消息进行任何处理,需要要消费者自己决定如何处理这些消息。

死信队列默认的主题名称为:topicname-subscriptionname-DLQ。

我们也可以通过如下代码自定义死信队列的名称:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                      .maxRedeliverCount(maxRedeliveryCount)
                      .deadLetterTopic("my-dead-letter-topic-name")
                      .build())
                .subscribe();

需要特别注意的是消费者默认并不会订阅死信队列,也就是意外着如果有消息进入到了死信队列,说明有部分消息没有被成功消费。

如果需要自动为DLQ创建订阅,可以通过initialSubscriptionName来设置订阅组,但如果服务端将allowAutoSubscriptionCreation设置为false,则无法成功创建DLQ producer。

文章首发:https://www.codingw.net/article?id=786


见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为RocketMQ社区优秀布道师、大厂资深架构师,出版《RocketMQ技术内幕》、《RocketMQ实战》两本技术书籍,在CSDN中记录了我的成长历程,欢迎大家关注,私信,一起交流进步。文章来源地址https://www.toymoban.com/news/detail-698053.html

到了这里,关于Pulsar消息发送、消费架构概述的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列如何选择?Kafka、Pulsar、RabbitMQ还是...

    公众号: MCNU云原生 ,欢迎搜索关注,更多干货,第一时间掌握! 消息队列是当代分布式系统架构中非常重要的一部分,在应用解耦、流量削峰、异步通信等方面有非常多的应用场景。目前最为我们所熟知的消息队列有:ActiveMQ、Kafka、RabbitMQ、Pulsar和RocketMQ,他们都有哪些优

    2024年02月04日
    浏览(50)
  • 消息队列之六脉神剑:RabbitMQ、Kafka、ActiveMQ 、Redis、 ZeroMQ、Apache Pulsar对比和如何使用

    消息队列(Message Queue)是一种异步通信机制,它将消息发送者和接收者解耦,从而提高了应用程序的性能、可扩展性和可靠性。在分布式系统中,消息队列经常被用于处理高并发、异步处理、应用解耦等场景。 本篇回答将分析比较常见的六种消息队列:RabbitMQ、Kafka、Active

    2024年02月14日
    浏览(46)
  • MQTT记录(概述,docker部署,基于spring-integration-mqtt实现消息订阅与发布,客户端工具测试)

    需要spring-boot集成spring-integration-mqtt代码的直接跳到第5部分 1.1 MQTT是什么呢? message queue telemetry translation 是一种基于发布与订阅的轻量级消息传输协议.适用于低带宽或网络不稳定的物联网应用.开发者可以使用极少的代码来实现物联网设备之间的消息传输.mqtt协议广泛应用于物

    2024年02月12日
    浏览(47)
  • Pulsar and Apache FlinkKafka: Comparing FlinkKafka and Pulsar for Stream Processing

    在现代大数据处理领域,流处理技术已经成为了核心技术之一。流处理是一种实时数据处理技术,它可以在数据流中进行实时分析和处理,从而实现对数据的实时挖掘和应用。在流处理技术中,Apache Flink和Pulsar是两个非常重要的开源项目,它们都具有强大的流处理能力。本文

    2024年04月29日
    浏览(33)
  • 与ChatGPT浅聊Pulsar

    我 : 艾米丽,谈一谈你对Pulsar的理解? ChatGPT : 当然可以!Apache Pulsar是一款分布式消息中间件,它支持多种消息模式,包括发布/订阅模式、队列模式和流模式。在发布/订阅模式下,消息发布者将消息发布到一个主题中,订阅者可以订阅该主题,并接收到所有发布到该主题

    2023年04月15日
    浏览(39)
  • Apache Pulsar入门指南

    Apache Pulsar 是灵活的发布-订阅消息系统(Flexible Pub/Sub messaging),采用计算与存储分离的架构。雅虎在 2013 年开始开发 Pulsar ,于 2016 年首次开源,目前是 Apache 软件基金会的顶级项目。Pulsar 具有支持多租户、持久化存储、多机房跨区域数据复制、高吞吐、低延迟等特性。 P

    2024年02月06日
    浏览(46)
  • pulsar集群搭建_亲测成功

    pulsar集群搭建_亲测成功 单机运行请看: Linux MacBook单机部署Pulsar并开启认证功能 集群组成 搭建 Pulsar 集群至少需要 3 个组件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(Broker 是 Pulsar 的自身实例)。这三个集群组件如下: ZooKeeper 集群(3(或多) 个 ZooKeeper 节点组成) bookie 集群(

    2024年02月09日
    浏览(31)
  • Apache-Pulsar安装操作说明

    Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。 Pulsar 的主要特性如下: 对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。 极低的发布和端到端延迟。 无缝可扩展至超过一百万个主题。 一个简单的客户端 API,具有Java、Go、Python和C++的绑

    2024年04月14日
    浏览(48)
  • Pulsar's Integration with Apache Samza for Stateful Stream Processing

    随着数据的增长和复杂性,流处理技术变得越来越重要。流处理系统允许实时分析大规模的、高速变化的数据流。Apache Pulsar 是一个高性能的分布式消息系统,适用于流处理和批处理。Apache Samza 是一个用于有状态流处理的系统,它可以与 Pulsar 集成,以实现高效的状态流处理

    2024年04月14日
    浏览(46)
  • Kafka、RabbitMQ、Pulsar、RocketMQ基本原理和选型

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,削峰填谷等问题。实现高性能、高可用、可伸缩和最终一致性架构。 使用消息队列能够获得如下好处,能够在应用与应用之间降低依赖和实时性要求。 解耦:多个服务监听、处理同一条消息,避免

    2024年04月23日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包