Apache-Pulsar安装操作说明

这篇具有很好参考价值的文章主要介绍了Apache-Pulsar安装操作说明。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

说明

Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。

Pulsar 的主要特性如下:

对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。
极低的发布和端到端延迟。
无缝可扩展至超过一百万个主题。
一个简单的客户端 API,具有Java、Go、Python和C++的绑定。
主题的多种订阅类型(独占、共享和故障转移)。
通过Apache BookKeeper提供的持久消息存储来保证消息传递。无服务器轻量级计算框架Pulsar Functions提供流原生数据处理功能。
基于 Pulsar Functions 构建的无服务器连接器框架Pulsar IO可以更轻松地将数据移入和移出 Apache Pulsar。
当数据老化时,分层存储将数据从热/温存储卸载到冷/长期存储(例如S3和GCS)。

安装包下载

本文使用的是apache-pulsar-3.2.2-bin.tar.gz版本

csdn下载 也可以自行去官网下载

解压目录

tar -zxvf apache-pulsar-3.2.2-bin.tar.gzApache-Pulsar安装操作说明,Apache-Pulsar

目录说明

目录 描述
bin 入口pulsar点脚本和许多其他命令行工具
conf 配置文件,包括broker.conf
lib Pulsar 使用的 JAR
examples Pulsar 函数示例
instances Pulsar 函数的工件

启动Pulsar

bin/pulsar standalone

注意:需要保证jdk在17+

创建Topic

创建一个名为my-topic的topic

bin/pulsar-admin topics create persistent://public/default/my-topic

Apache-Pulsar安装操作说明,Apache-Pulsar

生产者发送消息

bin/pulsar-client produce my-topic --messages 'Hello Pulsar!'

Apache-Pulsar安装操作说明,Apache-Pulsar

消费者消费消息

Apache-Pulsar安装操作说明,Apache-Pulsar

测试批量发送消息

bin/pulsar-client produce my-topic --messages "$(seq -s, -f 'Message NO.%g' 1 10)"

重新消费

bin/pulsar-client consume my-topic -s 'my-subscription' -p Earliest -n 0

Apache-Pulsar安装操作说明,Apache-Pulsar

java生产消息

pom.xml

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>3.2.2</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.14.2</version>
</dependency>

代码

package com.pulsar.demo;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
 
public class PulsarProducer {
    private static final Logger log = LoggerFactory.getLogger(PulsarProducer.class);
    private static final String SERVER_URL = "pulsar://192.168.xxx:6650";
    public static void main(String[] args) throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        // 构造生产者
        Producer<String> producer = client.newProducer(Schema.STRING)
                .producerName("my-producer")
                .topic("my-topic")
                .batchingMaxMessages(1024)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .enableBatching(true)
                .blockIfQueueFull(true)
                .maxPendingMessages(512)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
        // 同步发送消息
        MessageId messageId = producer.send("Hello World");
        log.info("message id is {}",messageId);
        System.out.println(messageId.toString());
        // 异步发送消息
        CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");
        // 阻塞线程,直到返回结果
        log.info("async message id is {}",asyncMessageId.get());
 
        
        producer.close();
 
        // 关闭licent的方式有两种,同步和异步
        // client.close();
        client.closeAsync();
 
    }
}

java消费消息

package com.pulsar.demo;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.TimeUnit;
 

public class PulsarConsumer {
    private static final String SERVER_URL = "pulsar://192.168.xxx:6650";
    private static final String topic = "persistent://public/default/my-topic"; // 要订阅的topic
 
    public static void main(String[] args) throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        Consumer consumer = client.newConsumer()
                .consumerName("my-consumer")
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .maxTotalReceiverQueueSizeAcrossPartitions(10)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();
        while (true) {
            Message msg = consumer.receive();
 
            try {
                System.out.printf("Message received: %s\n", new String(msg.getData()));
 
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }
    }
}

停止Pulsar

完成后,您可以关闭 Pulsar 集群。在启动集群的终端窗口中按Ctrl-C 。文章来源地址https://www.toymoban.com/news/detail-850497.html

到了这里,关于Apache-Pulsar安装操作说明的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ 管理界面操作说明

    当我们安装好RabbitMQ,并开启了 rabbitmq_management 插件并重启RabbitMQ服务后,我们就可以访问管控台了。 rabbitmq_management 开启后,重启RabbitMQ服务,然后浏览器访问 http://localhost:15672/ 账号:guest 密码:guest 登录进去后: 点击队列名,可以进行消息数量的查看: 消息内容查看: A

    2023年04月17日
    浏览(50)
  • Git —— submodule 操作说明

    这里存在一个较大的坑,默认检出的子模块并不属于任何分支,而是一个 “detached head” ,虽然可以提交更改,但是并没有本地分支跟踪提交的更改,这意味着 下次更新子模块会丢失这些更改 。 因此在对子模块进行开发修改前,请先切换其所属分支和对应的 commit id。 由于

    2024年02月05日
    浏览(62)
  • QGIS 3D功能操作说明

    QGIS 可以轻松快速地创建 3D 地图和可视化。 可用于3d数据效果浏览及与2D数据的对比及数据的打印输出。具体功能如下。 1.在 QGIS 中,您可以通过几个简单的步骤创建 3D 模型。      (1)在QGIS中添加3D 数据的数据层,例如DEM,以供3D功能使用。               (2) 打开

    2024年02月17日
    浏览(54)
  • C#关于byte的操作说明

    获取byte中每一位的值 获取int16中其中某几位的数值 bit 内容   11-15 预留   10 值7   6-9 值6   5 值5   4 值4   3 值3   2 值2   0-1 值1   public UInt16 ToByte() { UInt16 ret = 0x00; ret = (UInt16)(ret | ((UInt16)Waypoint 9)); ret = (UInt16)(ret | ((UInt16)Side 8)); ret = (UInt16)(ret | ((UInt16)Head 7)); ret = (UInt16)(ret

    2024年02月09日
    浏览(48)
  • ORACLE表空间说明及操作

    数据存储:表空间是数据库中存储数据的逻辑结构。它提供了用于存储表、索引、视图、存储过程等数据库对象的空间。通过划分数据和索引等对象的存储,可以更好地管理和组织数据库的物理存储结构。 性能管理和优化:通过将不同类型的数据存储在不同的表空间中,可以

    2024年02月08日
    浏览(51)
  • 电脑入门:路由器 基本设置操作说明

    路由器  基本设置操作说明 首先我们我设置路由器,就需要先登录路由器, 那么怎样登路由器啊? 登录路由器的方法是   在ie的地址栏输入:http://192.168.1.1  输入完成以后直接回车  那么如果你输入正确 这个时候就应该听到有用户名的提示  呵呵 这是怎么回事啊?  不要召

    2024年02月11日
    浏览(47)
  • windows10 固定电脑IP地址操作说明

       本文主要介绍, windows10 操作系统下,不同的网络类型,对应的电脑IP地址设置方法。 在桌面右下角,点击网络图标,然后点击“网络和Internet设置”。 找到当前连接无线网的详细信息:选择“网络和共享中心”,点击“WLAN”进入WLAN状态界面,点击“详细信息”,保存

    2024年02月03日
    浏览(55)
  • Windows下MATLAB调用Python函数操作说明

    具体可参看MATLAB与Python版本的兼容 操作说明请参看下面两个链接: 操作指南 简单说明: 我安装的是MATLAB2022a和Python3.8.6(安装时请勾选所有可以勾选的,包括 路径 )。对应版本安装完成后,在MATLAB命令行中敲入执行路劲’D:SoftwareAppsPython3_7python.exe’(因人而异) 完了以

    2024年02月11日
    浏览(50)
  • SQL SEVER CDC 启动和关闭 操作说明

    变更数据捕获使用 SQL Server 代理记录表中发生的插入、更新及删除。 因此,它使得可以通过关系格式轻松使用这些数据更改。 将为修改的行捕获将这些更改数据应用到目标环境所需的列数据和基本元数据,并将其存储在镜像所跟踪源表的列结构的更改表中。 此外,表值函数

    2024年02月05日
    浏览(43)
  • MySQL笔记——MySQL数据库介绍以及在Linux里面安装MySQL数据库,对MySQL数据库的简单操作,MySQL的外接应用程序使用说明

    MySQL笔记——MySQL数据库介绍以及在Linux里面安装MySQL数据库,对MySQL数据库的简单操作,MySQL的外接应用程序使用说明 MySQL笔记——表的分组查询、表的分页查询、表的约束、数据库设计 MySQL案例——多表查询以及嵌套查询 MySQL笔记——数据库当中的事务以及Java实现对数据库进

    2024年01月16日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包