Pulsar的消费模式

这篇具有很好参考价值的文章主要介绍了Pulsar的消费模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Pulsar 提供了三种消费模式:独立消费者模式、共享订阅模式和发布订阅模式。

1: 独立消费者模式:每个消费者实例都会独立地消费消息,并且每个消息只会被一个消费者消费。这种模式适合于需要完全独立处理消息的场景,例如数据采集和日志处理。

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarConsumerDemo {

    private static final Logger log = LoggerFactory.getLogger(PulsarConsumerDemo.class);

    public static void main(String[] args) throws PulsarClientException {
        String pulsarServiceUrl = "pulsar://localhost:6650";
        String topicName = "persistent://public/default/test-topic";
        String subscriptionName = "test-subscription";

        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(pulsarServiceUrl)
                .build();

        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();

        while (true) {
            Message<String> msg = consumer.receive();
            log.info("Received message: {}", msg.getValue());
            consumer.acknowledge(msg);
        }

        // 如果不再需要消费消息,可以关闭消费者和 Pulsar 客户端
        // consumer.close();
        // pulsarClient.close();
    }
}

在这个示例中,我们创建了一个独立的消费者实例,它会订阅名为 "test-topic" 的主题,并以独占模式订阅该主题。该消费者实例将接收来自 Pulsar 服务的所有消息,并将每条消息打印到日志中。注意,在处理完每条消息后,我们调用 consumer.acknowledge(msg) 来告诉 Pulsar 服务该消息已被处理,以便 Pulsar 服务可以将该消息从订阅的队列中删除。 

2: 共享订阅模式:多个消费者实例可以订阅同一个主题,每个消息会被分发给其中一个消费者实例进行处理。这种模式适合于需要多个消费者处理同一个主题的场景,例如任务分发和负载均衡。

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;

public class SharedSubscriptionDemo {

    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String TOPIC_NAME = "my-topic";
    private static final String SUBSCRIPTION_NAME = "my-subscription";

    public static void main(String[] args) throws PulsarClientException {
        // 创建 Pulsar 客户端
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();

        // 创建共享订阅模式的消费者
        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
                .topic(TOPIC_NAME)
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);

        // 创建多个消费者实例
        int numConsumers = 3;
        Consumer<byte[]>[] consumers = new Consumer[numConsumers];
        for (int i = 0; i < numConsumers; i++) {
            consumers[i] = consumerBuilder.clone()
                    .consumerName("consumer-" + i)
                    .subscribe();
        }

        // 消费消息
        while (true) {
            for (Consumer<byte[]> consumer : consumers) {
                // 每个消费者实例都会收到一部分消息
                byte[] message = consumer.receive().getData();
                System.out.printf("Consumer %s received message: %s%n", consumer.getConsumerName(), new String(message));
                consumer.acknowledge(message);
            }
        }
    }
}

在这个示例中,我们创建了一个名为 "my-topic" 的主题,使用共享订阅模式创建了三个消费者实例。当有消息发布到主题时,每个消费者实例都会收到其中一部分消息进行处理。在这里,我们简单地打印每个消费者实例收到的消息内容。

3: 发布订阅模式:每个消费者实例都会订阅一个或多个主题,并接收所有发布到这些主题的消息。这种模式适合于需要将消息广播给多个消费者实例的场景,例如实时推送和广告投放。

from pulsar import Client, Message

# 创建 Pulsar 客户端
client = Client('pulsar://localhost:6650')

# 创建生产者并发送消息到主题
producer = client.create_producer('my-topic')
producer.send(Message('Hello, Pulsar!'.encode('utf-8')))

# 创建两个消费者并订阅主题
consumer1 = client.subscribe('my-topic', 'my-subscription-1')
consumer2 = client.subscribe('my-topic', 'my-subscription-2')

# 从消费者中接收消息
while True:
    msg1 = consumer1.receive()
    msg2 = consumer2.receive()
    print('Consumer 1 received: {}'.format(msg1.data().decode('utf-8')))
    print('Consumer 2 received: {}'.format(msg2.data().decode('utf-8')))
    consumer1.acknowledge(msg1)
    consumer2.acknowledge(msg2)

这个例子创建了一个生产者,将消息发送到主题“my-topic”。然后,创建两个消费者并订阅相同的主题“my-topic”,并从每个消费者中接收消息。最后,确认消费者已成功处理消息并将其从队列中删除。文章来源地址https://www.toymoban.com/news/detail-675876.html

到了这里,关于Pulsar的消费模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rabbitmq topic模式设置#通配符情况下 消费者队列未接收消息问题排查解决

    生产者配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.# 消费者代码配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.user 其实以上代码看着没有问题,意思是代码生成一个队列,并把【topic.shcool.user】队列和生产者的【topic_exchange_shcool】exchange绑定,但是生产者发送消息是

    2024年02月11日
    浏览(44)
  • Pulsar and Apache FlinkKafka: Comparing FlinkKafka and Pulsar for Stream Processing

    在现代大数据处理领域,流处理技术已经成为了核心技术之一。流处理是一种实时数据处理技术,它可以在数据流中进行实时分析和处理,从而实现对数据的实时挖掘和应用。在流处理技术中,Apache Flink和Pulsar是两个非常重要的开源项目,它们都具有强大的流处理能力。本文

    2024年04月29日
    浏览(32)
  • Pulsar-架构与设计

    随着云原生的兴起,对消息中间件的伸缩性和多租户隔离有了更高的要求。现有的消息中间件不支持多租户的隔离,但是有一定伸缩性,需要一定的迁移工具支持和手工操作。 Pulsar是下一代云原生分布式消息平台,采用存储和计算分离架构设计,支持弹性伸缩,支持多租户、

    2024年02月22日
    浏览(33)
  • 与ChatGPT浅聊Pulsar

    我 : 艾米丽,谈一谈你对Pulsar的理解? ChatGPT : 当然可以!Apache Pulsar是一款分布式消息中间件,它支持多种消息模式,包括发布/订阅模式、队列模式和流模式。在发布/订阅模式下,消息发布者将消息发布到一个主题中,订阅者可以订阅该主题,并接收到所有发布到该主题

    2023年04月15日
    浏览(39)
  • Apache Pulsar入门指南

    Apache Pulsar 是灵活的发布-订阅消息系统(Flexible Pub/Sub messaging),采用计算与存储分离的架构。雅虎在 2013 年开始开发 Pulsar ,于 2016 年首次开源,目前是 Apache 软件基金会的顶级项目。Pulsar 具有支持多租户、持久化存储、多机房跨区域数据复制、高吞吐、低延迟等特性。 P

    2024年02月06日
    浏览(43)
  • pulsar集群搭建_亲测成功

    pulsar集群搭建_亲测成功 单机运行请看: Linux MacBook单机部署Pulsar并开启认证功能 集群组成 搭建 Pulsar 集群至少需要 3 个组件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(Broker 是 Pulsar 的自身实例)。这三个集群组件如下: ZooKeeper 集群(3(或多) 个 ZooKeeper 节点组成) bookie 集群(

    2024年02月09日
    浏览(26)
  • Apache-Pulsar安装操作说明

    Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。 Pulsar 的主要特性如下: 对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。 极低的发布和端到端延迟。 无缝可扩展至超过一百万个主题。 一个简单的客户端 API,具有Java、Go、Python和C++的绑

    2024年04月14日
    浏览(46)
  • Pulsar's Integration with Apache Samza for Stateful Stream Processing

    随着数据的增长和复杂性,流处理技术变得越来越重要。流处理系统允许实时分析大规模的、高速变化的数据流。Apache Pulsar 是一个高性能的分布式消息系统,适用于流处理和批处理。Apache Samza 是一个用于有状态流处理的系统,它可以与 Pulsar 集成,以实现高效的状态流处理

    2024年04月14日
    浏览(45)
  • Kafka、RabbitMQ、Pulsar、RocketMQ基本原理和选型

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,削峰填谷等问题。实现高性能、高可用、可伸缩和最终一致性架构。 使用消息队列能够获得如下好处,能够在应用与应用之间降低依赖和实时性要求。 解耦:多个服务监听、处理同一条消息,避免

    2024年04月23日
    浏览(45)
  • 14_基于Flink将pulsar数据写入到HBase

    3.7.1.编写Flink完成数据写入到Hbase操作, 完成数据备份, 便于后续进行即席查询和离线分析 3.7.1.1.HBase基本介绍 hbase是基于Google发布bigTable论文产生一款软件, 是一款noSQL型数据, 不支持SQL. 不支持join的操作, 没有表关系, 不支持事务(多行事务),hbase是基于 HDFS的采用java 语言编写 查

    2024年02月13日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包