消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

这篇具有很好参考价值的文章主要介绍了消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

pulsar源码,docker,容器,运维

前言

对于pulsar的特性以及优异,这里不多讲解,直接上干货,主要讲一下Pulsar的docker部署,生产者/消费者几种 
不同模式,以及Topic的使用规则
复制代码

Docker部署pulsar

docker run -it -p 80:80 -p 8080:8080 -p 6650:6650 -d apachepulsar/pulsar-standalone
复制代码

部署问题

因为我用的是腾讯云最基础的服务器,在执行docker命令后,发现Pulsar会启动失败或启动不久便停止,查看日志发现是内存顶不住
复制代码

pulsar源码,docker,容器,运维

查看官网Pulsar默认启动是2g,因此把启动配置修改成机器支持的即可;
docker exec -it pulsar-test sh
cd /pulsar/conf/
vim conf/pulsar_env.sh;       之后重启pulsar即可
复制代码

pulsar源码,docker,容器,运维

pulsar源码,docker,容器,运维

连接Pulsar

/**
 * pulsar 连接bean
 */
@Bean
public PulsarClient getPulsarClient() throws PulsarClientException {
    return PulsarClient.builder()
            .serviceUrl("pulsar://Ip地址:6650")
            .build();
}
复制代码

基础概念了解

Produce 消息的源头,也是消息的发布者,负责将消息发送到 topic。

Consumer 消息的消费者,负责从 topic 订阅并消费消息。

Topic 消息数据的载体,在 Pulsar 中 Topic 可以指定分为多个 partition,如果不设置默认只有一个 partition
(这个指定多个partition,我会在文中后面示例演示,可以留意下)

Brkber 一个无状态组件,主要负责接收 Producer 发送过来的消息,并交付给 Consumer,可以理解成送快递的小哥
复制代码

pulsar源码,docker,容器,运维

Produce详解

创建方式

简单方法创建

    Producer<String> stringProducer = client.newProducer(Schema.STRING)
        .topic("my-topic")
        .create();
    stringProducer.send("My message")
复制代码

loadConf自定义配置创建

config里面可以填一些自定义配置,如sendTimeoutMs 消息发送超时(毫秒)。如果在sendTimeout过期之前服务器未确认消息,则会发生错误,其他有趣的可以看下官网
复制代码

Pulsar官网

/**
* 使用loadConf创建Produce
*/
@Test
public void testProducer() throws Exception {
   Map<String, Object> config1 = new HashMap<>();
   config1.put("producerName", "produce-demo1");
   config1.put("topicName", "topic1");
   Producer producer1 = client
           .newProducer()
           .loadConf(config1)
           .create();
   producer1.send(("test1 --- " + new Date()).getBytes());
}
复制代码

发送模式

同步发送

 同步发送消息是Producer发送消息以后要等到broker的确认以后才会认为消息发送成功,如果没有收到确认就认为消息发送失败 
复制代码
/**
 * 测试同步发送
 */
@Test
public void testProducer22() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("my-topic")
            .producerName("produce-demo1")
            .create();
    MessageId messageId = stringProducer.send("My message" + "发送消息时间" + new Date());
    System.out.println("消息同步发送---");
    System.in.read();
}
复制代码

异步发送

 异步发送消息是 Producer 发送消息,将消息放到阻塞队列中并立即返回。不需要等待 broker 的确认
复制代码
/**
 * 测试异步发送
 */
@Test
public void testProducer222() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("my-topic")
            .producerName("produce-demo1")
            .create();
    CompletableFuture<MessageId> messageIdCompletableFuture = stringProducer.sendAsync(
            "异步发送的消息");
    System.in.read();
}
复制代码

访问方式/发送方式

Share模式(默认情况)

 默认情况下多个生产者可以发布消息到同一个Topic,指定发送模式.accessMode(ProducerAccessMode.Shared)方法
复制代码
/**
 * shard模式 默认情况下多个生产者可以发布消息到同一个 Topic
 */
@Test
public void testProducer222() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .accessMode(ProducerAccessMode.Shared)
            .topic("访问模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());

    Producer<String> stringProducer2 = client
            .newProducer(Schema.STRING)
            .accessMode(ProducerAccessMode.Shared)
            .topic("访问模式-shared")
            // Producer with name 'produce-demo1' is already connected to topic
            //注意生产者名称不能重复
            .producerName("produce-demo2")
            .create();
    stringProducer2.send("My message 2 " + "发送消息时间" + new Date());

    System.in.read();
}
复制代码

请注意:

  这里我特意标注了生产者名称不能重复,否则对于Pulsar来说,发送消息会报错,如下图,已经有一个produce- 
  demo1的生产者了,再来一个就会报错Producer with name 'produce-demo1' is already connected to topic
  因此如果我们是集群部署的话,尤其注意每一个节点生产者的命名
  当然对于消费者也是同样的规则,不允许名称重复(在下文我也会演示到)
复制代码
/**
 * 演示生产者名称重复,发送报错
 */
@Test
public void testProducer1() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());
    System.in.read();
}

/**
 * 演示生产者名称重复,发送报错
 */
@Test
public void testProducer11() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-shared")
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());
    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

Exclusive

要求生产者以独占模式访问 Topic,在此模式下如果 Topic已经有了生产者,那么其他生产者在连接就会失败报错。 
复制代码
/**
 * Exclusive 要求生产者以独占模式访问 Topic,在此模式下 如果 Topic 已经有了生产者,那么其他生产者在连接就会失败报错。
 * <p>
 * "Topic has an existing exclusive producer: standalone-0-12
 */
@Test
public void testProducer6() throws IOException {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-Exclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.Exclusive)
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());

    Producer<String> stringProducer2 = client
            .newProducer(Schema.STRING)
            .topic("访问模式-Exclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.Exclusive)
            // Producer with name 'produce-demo1' is already connected to topic
            //注意生产者名称不能重复
            .producerName("produce-demo2")
            .create();
    stringProducer2.send("My message 2 " + "发送消息时间" + new Date());

    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

WaitForExclusive

如果主题已经连接了生产者,则将当前生产者挂起,直到生产者获得了Exclusive 访问权限。
该怎么来理解这句话,打个不恰当比喻,类似于Java中的独占锁Sycronized一样,你没有获取到锁,没有获取到权限,就不能发消息,
对比Exclusive报错来说,WaitForExclusive是不会报错的,只会是挂起,
来看下面的demo感受下

1 我们先开启一个线程A向 访问模式-WaitForExclusive topic发送一条消息,My message 1 ***
复制代码

/**
 * WaitForExclusive
 * <p>
 * 如果主题已经连接了生产者,则将当前生产者挂起,直到生产者获得了 Exclusive 访问权限。
 * <p>
 * 也就是存在相同的生产者,不会报错,当然也不会发送消息,     获取到独占后,会将未获取到独占时的消息进行发送!!!
 */
@Test
public void testProducer2() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-WaitForExclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.WaitForExclusive)
            .producerName("produce-demo1")
            .create();
    stringProducer.send("My message 1 " + "发送消息时间" + new Date());
    System.in.read();
}
复制代码
2 然后再开启另一个线程B向 访问模式-WaitForExclusive topic发送10条消息,My message 2 ***
复制代码
/**
 * WaitForExclusive
 */
@Test
public void testProducer22() throws Exception {
    Producer<String> stringProducer = client
            .newProducer(Schema.STRING)
            .topic("访问模式-WaitForExclusive")
            //设置访问模式 默认shared
            .accessMode(ProducerAccessMode.WaitForExclusive)
            .producerName("produce-demo1")
            .create();
    //假设有10条消息在未获取 独占前,均未被发送,模拟来看一下,获取独占后, 这10条消息会进行发送吗 ? 会
    for (int i = 0; i < 10; i++) {
        stringProducer.send("My message 2 " + "发送消息时间" + new Date());
    }
    System.in.read();
}
复制代码
3 然后写个简单的消费者看一下消费情况
复制代码
@Test
public void testConsumer2() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("访问模式-WaitForExclusive")
            .subscriptionName("my-subscription")
            .messageListener(myMessageListener)
            .subscribe()
    System.in.read();
}
复制代码
4 会看到消费者只消费到了 线程A发送的消息,线程B的消息未被消费,因为此时topic的独占权还在线程池A上
复制代码

pulsar源码,docker,容器,运维

5 手动杀死线程A,然后看消费者情况,会看到开始消费出My message 2 *** 也就是线程B的消息,
因为此时线程A被杀死,线程B得到了独占权,线程B将消息发送出去 
复制代码

pulsar源码,docker,容器,运维

Consumer详解

创建方式

简单方法创建

可以看到写了一个while true去获取消息,对于线城是阻塞不友好的,因此我一般用第二种,监听器方法
复制代码

/**
 * 创建消费者
 */
@Test
public void testConsumer22() throws Exception{
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscribe();
    while (true) {
        // Wait for a message
        Message msg = consumer.receive();
        try {
            // Do something with the message
            System.out.println("Message received: " + new String(msg.getData()));
            // Acknowledge the message so that it can be deleted by the message broker
            consumer.acknowledge(msg);
        } catch (Exception e) {
            // Message failed to process, redeliver later
            consumer.negativeAcknowledge(msg);
        }
    }
}
复制代码

监听器方法创建

/**
 * 接收消息:异步 不阻塞主线程
 */
@Test
public void testConsumer2() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .messageListener(myMessageListener)
            .subscribe();
    System.out.println("监听器方式,不阻塞线程");
    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

loadConf自定义配置创建

更多自定义的配置可以看下官网文件
复制代码
/**
 * loadConf创建消费者
 */
@Test
public void testConsumer222() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Map<String, Object> config1 = new HashMap<>();
    config1.put("subscriptionName", "consumer-demo1");
    config1.put("topicNames", Arrays.asList(new String[]{"my-topic"}));

    Consumer consumer = client
            .newConsumer()
            .loadConf(config1)
            .messageListener(myMessageListener)
            .subscribe();
    System.out.println("loadConf方式");
    System.in.read();
}
复制代码

多主题订阅

 多主题订阅主要是指一个消费者,可以订阅多个topic,这里我只演示其中两个
复制代码

传入List数组的多主题订阅

/**
 * Multi-topic subscriptions
 * 多主题订阅
 * 多topic 订阅list设置的topic1 topic2
 */
@Test
public void testConsumer3() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };

    ConsumerBuilder consumerBuilder = client.newConsumer()
            .subscriptionName("consumer-3");
    List<String> topics = Arrays.asList(
            "topic1",
            "topic2"
    );
    Consumer multiTopicConsumer = consumerBuilder
            .topics(topics)
            .messageListener(myMessageListener)
            .subscribe();
    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

正则表达式多主题订阅

简单点就是正则表达式匹配,根据业务需要自行设置表达式,这里不多演示
复制代码
/**
 * Multi-topic subscriptions
 * 多主题订阅
 * 正则表达式,订阅所有以1结束的topic
 *
 */
@Test
public void testConsumer222() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    ConsumerBuilder consumerBuilder = client.newConsumer()
            .subscriptionName("consumer-1");
    Pattern allTopicsInNamespace = Pattern.compile("public/default/.*1");
    Consumer allTopicsConsumer = consumerBuilder
            .topicsPattern(allTopicsInNamespace)
            .messageListener(myMessageListener)
            .subscribe();
    System.in.read();
}
复制代码

消费模式

Exclusive(默认)

 这里需要注意的是同一topic主题上只能有一个具有相同订阅名称的使用者 默认,也就是说 如果后端是集群部署的话,请注意默认情况下subscriptionName的命名情况,否则会报错
复制代码

/**
 * Exclusive 模式  也是默认的
 * 同一主题上只能有一个具有相同订阅名称的使用者 默认
 * 否则会启动报错
 */
@Test
public void testConsumerExclusive() throws IOException {
    MessageListener myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription2")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener)
            .subscribe();

    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription2")
            .subscriptionType(SubscriptionType.Exclusive)
            .messageListener(myMessageListener)
            .subscribe();

    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

Failover

 这个主要是失败转移,对比Exclusive模式,同一主题上可以有具有相同订阅名称的使用者,也就是subscriptionName可以重复,一个节点挂掉了 剩余消息转移到另一个节点继续消费;
 这块的业务场景挺不错的,假设我们后台有两台集群部署机器A,B,并且subscriptionName相同,
 正常情况下,其他模块往队列仍了一条消息,但是只希望被其中一台机器消费, 一条消息被消费一次,而不是A,B两机器都消费对吧,正常的幂等性操作
 现在开始模拟,假设其他模块发送了10条消息,然后只被其中一台消费
复制代码
    @Test
    public void testProduce2() throws PulsarClientException {
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic")
                .enableBatching(false)
                .create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
        // 这里的key可以类似于 投递到 不同broker的一个标识
        producer.newMessage().key("key-1").value("message-1-1").send();
        producer.newMessage().key("key-1").value("message-1-2").send();
        producer.newMessage().key("key-1").value("message-1-3").send();
        producer.newMessage().key("key-2").value("message-2-1").send();
        producer.newMessage().key("key-2").value("message-2-2").send();
        producer.newMessage().key("key-2").value("message-2-3").send();
        producer.newMessage().key("key-3").value("message-3-1").send();
        producer.newMessage().key("key-3").value("message-3-2").send();
        producer.newMessage().key("key-4").value("message-4-1").send();
        producer.newMessage().key("key-4").value("message-4-2").send();
    }
    
@Test
public void testConsumerFailover() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
           // a++;
          //  if (a > 4) {
               // System.out.println("模拟节点1故障");
                //关闭节点1
              //  consumer.close();
              //  throw new RuntimeException("模拟某时刻节点1故障,转移至节点2消费");
           // }
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

再来看看失败转移,假设其中一台机器宕机,然后我希望剩下机器B,继续消费未消费完的消息,可以看到一台机器模拟宕机后,另一台机器继续消费,也就是失败转移
复制代码
/**
 * Failover故障转移 .subscriptionName("my-subscription") 可重复
 * 一个节点挂掉了 剩余消息转移到另一个节点继续消费
 * 注意这些消费模式 都是和subscriptionName("my-subscription") 订阅者名称相关
 */
int a = 0;

@Test
public void testConsumerFailover() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            a++;
            if (a > 4) {
                System.out.println("模拟节点1故障");
                //关闭节点1
                consumer.close();
                throw new RuntimeException("模拟某时刻节点1故障,转移至节点2消费");
            }
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            //consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Failover)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

Shared

多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证
复制代码
/**
 * Shared模式
 * 多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证。
 * 也就是消费者 1 消费者2 总共消费10条
 * 注意都是从 .subscriptionName("my-subscription") 视角
 */
@Test
public void testShared() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维

Key_Shared模式

这个简单来理解,发送消息的时候,给这批消息指定一个key,那么消息被消费的时候,相同key的这批消息,只能被同一个节点消费
如下示例我发送消息时,指定下key,然后写消费者看下消费情况,会看到key相同的消息被同一节点消费
复制代码
    @Test
    public void testProduce2() throws PulsarClientException {
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic")
                .enableBatching(false)
                .create();
        producer.newMessage().key("key-1").value("message-1-1").send();
        producer.newMessage().key("key-1").value("message-1-2").send();
        producer.newMessage().key("key-1").value("message-1-3").send();
        producer.newMessage().key("key-2").value("message-2-1").send();
        producer.newMessage().key("key-2").value("message-2-2").send();
        producer.newMessage().key("key-2").value("message-2-3").send();
        producer.newMessage().key("key-3").value("message-3-1").send();
        producer.newMessage().key("key-3").value("message-3-2").send();
        producer.newMessage().key("key-4").value("message-4-1").send();
        producer.newMessage().key("key-4").value("message-4-2").send();
    }
    
/**
 * Key_Shared模式
 * 多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证。
 * 也就是消费者 1 消费者2 总共消费10条
 * 注意都是从 .subscriptionName("my-subscription") 视角
 * <p>
 * 具有相同密钥的消息仅按顺序传递给一个消费者。消息在不同消费者之间的可能分布(默认情况下,我们事先不知道哪些密钥将被分配给消费者,但一个密钥只会同时被分配给消费者
 * ("key-1", "message-1-1")
 * ("key-1", "message-1-2")
 * ("key-1", "message-1-3")
 * ("key-3", "message-3-1")
 * ("key-3", "message-3-2")
 * <p>
 * <p>
 * ("key-2", "message-2-1")
 * ("key-2", "message-2-2")
 * ("key-2", "message-2-3")
 * ("key-4", "message-4-1")
 * ("key-4", "message-4-2")
 */
@Test
public void testKeyShared() throws IOException {
    MessageListener myMessageListener1 = (consumer, msg) -> {
        try {
            System.out.println("Message1 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener1)
            .subscribe();

    MessageListener myMessageListener2 = (consumer2, msg) -> {
        try {
            System.out.println("Message2 received: " + new String(msg.getData()));
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };
    Consumer consumer2 = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Key_Shared)
            .messageListener(myMessageListener2)
            .subscribe();

    System.in.read();
}
    
复制代码

pulsar源码,docker,容器,运维

模式对比

 Exclusive只支持同一topic只能有一个同名订阅者,对于目前大多集群架构,需要每个节点命名subscriptionName不同操作下,
 集群中的每个节点都能收到topic消息,对于特殊场景如 前端websocket连接后台集群这类场景,还是蛮实用
 Failover:可以保证在集群中消息只被消费一次,幂等性嘛简单点,正常情况下只被其中一台机器消费,也就是固定一台机器,这种就很纱布了
 Shared: 可以保证在集群中消息只被消费一次,也是保证了幂等性,而且消息被集群平均消费了,压力down down
 Key_Shared 我再想想
 
复制代码

Topic

Pulsar对topic的命名有如下规则,
{persistent|non-persistent}://tenant/namespace/topic
复制代码
  • persistent / non-persistent 表示主题的类型,主题分为持久化和非持久化主题,默认是持久化的类型。持久化的主题会将消息保存到磁盘上,而非持久化的主题就不会将消息保存到磁盘。

  • tenant Pulsar 中主题的租户,租户对于 Pulsar中的多租户至关重要,并且分布在集群中。

  • namespace 将相关联的 Topic 作为一个组来管理,是管理 Topic 的基本单元。每个租户可以有一个或多个命名空间。

在上面的示例,我们都没有去关注persistent,tenant,namespace的玩法,因为你不去特殊设置的话,pulsar都有默认的
复制代码
我们可以尝试往persistent://sample/namespace_test4/topic-haha1直接发一条消息,你会发现发送报错Policies not found for sample/namespace_test4 namespace
复制代码
/**
 * 报错
 * 向租户sample 命名空间 namespace_test4  topic topic-haha1 发送消息
 * 注意namespace需手动先创建好,否则会报错 olicies not found for sample/namespace_test4 namespace
 */
@Test
public void testProduce322() throws Exception {
    Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("persistent://sample/namespace_test4/topic-haha1")
            .enableBatching(false)
            .create();
    producer.send("向租户sample 命名空间 namespace_test2  topic topic-haha1 发送消息");
    System.in.read();
}
复制代码

pulsar源码,docker,容器,运维文章来源地址https://www.toymoban.com/news/detail-720169.html

这里则表示我们需要先创建namespace之后,搞好对应的namespace tenant这些之后才行

那么如何动态去创建namespace,管理tenanat,以及包括我们刚才搞了那么多的生产者消费者测试出来,
能不能有一个UI界面让我一目了然,一手掌握Pulsar呢?

这里我即将介绍Pulsar的一款UI工具Pulsar admin

敬请期待,持续更新

到了这里,关于消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 基于 BlockQueue(阻塞队列) 的 生产者消费者模型

    基于 BlockQueue(阻塞队列) 的 生产者消费者模型

    阻塞队列(Blocking Queue) 是一种特殊类型的队列,它具有阻塞操作的特性。在并发编程中,阻塞队列可以用于实现线程间的安全通信和数据共享。 阻塞队列的 主要特点 是: 当 队列为空时 ,消费者线程尝试从队列中获取(出队)元素时会被阻塞,直到有新的元素被添加到队

    2024年02月12日
    浏览(23)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(47)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(24)
  • 多线程(初阶七:阻塞队列和生产者消费者模型)

    多线程(初阶七:阻塞队列和生产者消费者模型)

    目录 一、阻塞队列的简单介绍 二、生产者消费者模型 1、举个栗子: 2、引入生产者消费者模型的意义: (1)解耦合 (2)削峰填谷 三、模拟实现阻塞队列 1、阻塞队列的简单介绍 2、实现阻塞队列 (1)实现普通队列 (2)加上线程安全 (3)加上阻塞功能 3、运用阻塞队列

    2024年02月05日
    浏览(10)
  • 【小黑嵌入式系统第十五课】μC/OS-III程序设计基础(四)——消息队列(工作方式&数据通信&生产者消费者模型)、动态内存管理、定时器管理

    【小黑嵌入式系统第十五课】μC/OS-III程序设计基础(四)——消息队列(工作方式&数据通信&生产者消费者模型)、动态内存管理、定时器管理

    上一课: 【小黑嵌入式系统第十四课】μC/OS-III程序设计基础(三)——信号量(任务同步资源同步)、事件标记组(与或多个任务) 下一课: 【小黑嵌入式系统第十六课】PSoC 5LP第三个实验——μC/OS-III 综合实验 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣

    2024年01月17日
    浏览(13)
  • 【Linux】基于环形队列的生产者消费者模型的实现

    【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(14)
  • 优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

    优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

    前言 SpringBoot 集成 RabbitMQ 公司老大觉得使用注解太繁琐了,而且不能动态生成队列所以让我研究是否可以动态绑定,所以就有了这个事情。打工人就是命苦没办法,硬着头皮直接就上了,接下来进入主题吧。 需求思路分析 根据老大的需求,大致分为使用配置文件进行配置,

    2024年02月16日
    浏览(9)
  • Linux之信号量 | 消费者生产者模型的循环队列

    Linux之信号量 | 消费者生产者模型的循环队列

    目录 一、信号量 1、概念 2、信号量操作函数 二、基于环形队列的生产者消费者模型 1、模型分析 2、代码实现 1、单生产单消费的生产者消费者模型 2、多生产多消费的生产者消费者模型 引入:前面我们讲到了,对临界资源进行访问时,为了保证数据的一致性,我们需要对临

    2024年04月17日
    浏览(9)
  • 多线程学习之生产者和消费者与阻塞队列的关系

    多线程学习之生产者和消费者与阻塞队列的关系

    生产者消费者问题,实际上主要是包含了两类线程: 生产者线程用于生产数据 消费者线程用于消费数据 生产者和消费者之间通常会采用一个共享的数据区域,这样就可以将生产者和消费者进行解耦, 两者都不需要互相关注对方的 Object类的等待和唤醒方法 方法名 说明 void

    2024年02月11日
    浏览(9)
  • 【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

    【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

    死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 当多线程并发执行并都需要访问临界资源时,因为每个线程都是不同的执行流,这就有可能 导致数据不一致问题 ,为了避免此问题的发生

    2024年01月24日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包