RocketMQ 5.1 NameServer 启动流程

这篇具有很好参考价值的文章主要介绍了RocketMQ 5.1 NameServer 启动流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


RocketMQ是一个分布式消息中间件,它的核心组件之一是namesrv,负责管理broker的路由信息和kv配置。本文将介绍RocketMQ5.1版本中namesrv的启动过程,包括如何解析命令行参数、加载配置文件、初始化和启动namesrv控制器等。

首先,我们需要在环境变量中设置ROCKETMQ_HOME,指向RocketMQ的安装目录。然后,我们可以使用如下命令启动namesrv:

nohup sh mqnamesrv &

这条命令执行运行的是 namesrv.NamesrvStartup#main 方法。

public static void main(String[] args) {
    main0(args);
    controllerManagerMain();
}

启动过程分为两部分

  1. main0(args):启动 NamesrvController
  2. controllerManagerMain():启动 ControllerManager

本文只分析 NamesrvController 的启动

namesrv.NamesrvStartup#main0 方法,代码如下:

public static NamesrvController main0(String[] args) {
    try {
        // 解析命令行参数和配置文件
        parseCommandlineAndConfigFile(args);
        // 创建并启动 NamesrvController
        NamesrvController controller = createAndStartNamesrvController();
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

namesrv的启动过程可以分为以下几个步骤:

  1. 解析命令行参数和配置文件,初始化namesrvConfig、nettyServerConfig、nettyClientConfig等配置对象
  2. 创建并启动 NamesrvController 对象,该对象是namesrv的核心控制器,它持有各种配置对象、网络通信对象、路由管理对象等

1 解析命令行参数和配置文件

解析命令行参数和配置文件的方法是 namesrv.NamesrvStartup#parseCommandlineAndConfigFile,代码如下:

public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    Options options = ServerUtil.buildCommandlineOptions(new Options());
    CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());
    if (null == commandLine) {
        System.exit(-1);
        return;
    }

    // 创建配置类对象
    namesrvConfig = new NamesrvConfig();
    nettyServerConfig = new NettyServerConfig();
    nettyClientConfig = new NettyClientConfig();
    // 设置namesrv的监听端口
    nettyServerConfig.setListenPort(9876);

    // 如果命令行中有-c参数,则从配置文件中加载namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的属性
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
            MixAll.properties2Object(properties, nettyClientConfig);
            if (namesrvConfig.isEnableControllerInNamesrv()) {
                controllerConfig = new ControllerConfig();
                MixAll.properties2Object(properties, controllerConfig);
            }
            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    // 如果命令行中有-p参数,则打印namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的属性
    if (commandLine.hasOption('p')) {
        MixAll.printObjectProperties(logConsole, namesrvConfig);
        MixAll.printObjectProperties(logConsole, nettyServerConfig);
        MixAll.printObjectProperties(logConsole, nettyClientConfig);
        if (namesrvConfig.isEnableControllerInNamesrv()) {
            MixAll.printObjectProperties(logConsole, controllerConfig);
        }
        System.exit(0);
    }

    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

}

上述代码的执行流程如下:

  1. 创建一个NamesrvConfig对象,用于存储namesrv的配置信息,如rocketmqHome、kvConfigPath等。NamesrvConfig类的具体属性如下表所示

    属性名 含义 默认值
    rocketmqHome RocketMQ的根目录 环境变量ROCKETMQ_HOME
    kvConfigPath KV配置文件的路径 ${user.home}/namesrv/kvConfig.json
    configStorePath 配置存储文件的路径 ${user.home}/namesrv/namesrv.properties
    productEnvName 环境名称 center
    clusterTest 是否开启集群测试 false
    orderMessageEnable 是否支持顺序消息 false
  2. 创建一个NettyServerConfig对象,用于存储netty服务端的配置信息,如listenPort、serverWorkerThreads等。NettyServerConfig类的具体属性如下表所示

    属性名 含义 默认值
    listenPort 监听端口,用于接收客户端的连接请求 9876
    serverWorkerThreads 服务端工作线程数量,用于处理网络IO事件或执行业务任务 8
    serverCallbackExecutorThreads 服务端回调执行线程数量,用于执行异步回调任务,如果为0,则使用公共线程池 0
    serverSelectorThreads 服务端选择器线程数量,用于接收和分发网络IO事件,建议不要超过3个 3
    serverOnewaySemaphoreValue 服务端单向请求信号量值,用于限制单向请求的并发度,防止资源耗尽 256
    serverAsyncSemaphoreValue 服务端异步请求信号量值,用于限制异步请求的并发度,防止资源耗尽 64
    serverChannelMaxIdleTimeSeconds 服务端通道最大空闲时间(秒),超过该时间则关闭连接,释放资源 120
    serverSocketSndBufSize 服务端Socket发送缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1
    serverSocketRcvBufSize 服务端Socket接收缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1
    serverPooledByteBufAllocatorEnable 是否启用服务端池化ByteBuf分配器,可以提高内存利用率和性能 true
    useEpollNativeSelector 是否使用Epoll本地选择器,可以提高Linux平台下的网络IO效率,需要操作系统支持 false
  3. 创建一个NettyClientConfig对象,用于存储netty客户端的配置信息,如clientWorkerThreads、clientCallbackExecutorThreads等。NettyClientConfig类的具体属性如下表所示

    属性名 含义 默认值
    clientWorkerThreads 客户端工作线程数量,用于处理网络IO事件或执行业务任务 4
    clientCallbackExecutorThreads 客户端回调执行线程数量,用于执行异步回调任务,如果为0,则使用公共线程池 可用处理器的数量
    connectTimeoutMillis 连接超时时间(毫秒),超过该时间则放弃连接 3000
    channelNotActiveInterval 通道不活跃的间隔时间(毫秒),超过该时间则关闭通道,释放资源 1000 * 60
    clientChannelMaxIdleTimeSeconds 客户端通道最大空闲时间(秒),超过该时间则关闭连接,释放资源 120
    clientSocketSndBufSize 客户端Socket发送缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1
    clientSocketRcvBufSize 客户端Socket接收缓冲区大小(字节),如果为-1,则使用操作系统默认值 -1
    clientPooledByteBufAllocatorEnable 是否启用客户端池化ByteBuf分配器,可以提高内存利用率和性能 false
    clientCloseSocketIfTimeout 是否在超时时关闭客户端Socket,可以避免资源泄漏 false
    useTLS 是否使用TLS协议进行加密通信,需要操作系统支持 false
    clientOnewaySemaphoreValue 客户端单向请求信号量值,用于限制单向请求的并发度,防止资源耗尽 65535
    clientAsyncSemaphoreValue 客户端异步请求信号量值,用于限制异步请求的并发度,防止资源耗尽 65535
  4. 解析命令行参数,支持以下选项:

    • -c 指定配置文件路径,如果指定了配置文件,会从文件中加载namesrvConfig、nettyServerConfig和nettyClientConfig的属性。
    • -p 打印所有配置信息,并退出程序
    • 其他选项会被转换为properties对象,并覆盖namesrvConfig、nettyServerConfig和nettyClientConfig的属性

2 创建并启动 NamesrvController

创建并启动 NamesrvController 的方法是 namesrv.NamesrvStartup#createAndStartNamesrvController

public static NamesrvController createAndStartNamesrvController() throws Exception {
    // 创建 NamesrvController
    NamesrvController controller = createNamesrvController();
    // 启动 NamesrvController
    start(controller);

    // 输出启动成功的日志
    NettyServerConfig serverConfig = controller.getNettyServerConfig();
    String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());
    log.info(tip);
    System.out.printf("%s%n", tip);
    return controller;
}

代码执行流程如下:

  1. 调用createNamesrvController方法,创建NamesrvController对象
  2. 调用start方法,传入NamesrvController对象,启动 NamesrvController 对象
  3. 格式化一个提示信息,包含序列化类型、绑定地址和监听端口信息
  4. 使用log对象记录提示信息到日志文件中
  5. 把提示信息打印到控制台中
  6. 返回NamesrvController对象

如果在执行中出现异常,则抛出异常并退出程序

2.1 创建 NamesrvController 对象

namesrv.NamesrvStartup#createAndStartNamesrvController 方法中调用了 namesrv.NamesrvStartup#createNamesrvController方法,用于创建 NamesrvController 对象,代码为:

public static NamesrvController createNamesrvController() {
    // 创建 NamesrvController,传入namesrvConfig、nettyServerConfig、nettyClientConfig配置
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
    return controller;
}

创建 NamesrvController 对象的步骤如下:

  1. 创建一个NamesrvController对象,传入namesrvConfig、nettyServerConfig和nettyClientConfig对象,这些对象存储了namesrv的业务配置和网络配置
  2. 调用NamesrvController对象的getConfiguration方法,获取一个Configuration对象,该对象负责管理namesrv的配置信息
  3. 调用Configuration对象的registerConfig方法,传入properties对象,将properties对象中的配置信息注册到Configuration对象中
  4. 返回NamesrvController对象

这个函数的返回值是NamesrvController对象

2.2 启动 NamesrvController 对象

namesrv.NamesrvStartup#createAndStartNamesrvController 方法中调用了 namesrv.NamesrvStartup#start 方法,用于启动创建好的 NamesrvController 对象,代码为:

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    // 初始化controller
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    // 当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。
    // 所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        controller.shutdown();
        return null;
    }));

    // 启动controller
    controller.start();

    return controller;
}

启动 NamesrvController 对象的步骤如下:

第一步:初始化 controller

调用 namesrv.NamesrvController#initialize 方法,初始化controller。如果初始化失败,就调用controller的shutdown方法,关闭controller,并退出程序

public boolean initialize() {
    loadConfig();
    initiateNetworkComponents();
    initiateThreadExecutors();
    registerProcessor();
    startScheduleService();
    initiateSslContext();
    initiateRpcHooks();
    return true;
}

初始化操作包括:

  • 调用 namesrv.NamesrvController#loadConfig 方法从 NamesrvConfig 对象中保存的 kvConfigPath 指定的文件中加载kv配置,并创建一个KVConfigManager对象,用于管理和打印kv配置

    private void loadConfig() {
        this.kvConfigManager.load();
    }
    
  • 调用 namesrv.NamesrvController#initiateNetworkComponents 方法初始化网络组件,包括remotingClient和remotingServer

    remotingClient是一个NettyRemotingClient对象,它用于向其他服务发送请求或响应。remotingServer是一个NettyRemotingServer对象,它用于接收和处理来自其他服务的请求或响应。BrokerHousekeepingService对象用于处理broker的连接和断开事件。

    private void initiateNetworkComponents() {
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
    }
    
  • 调用 namesrv.NamesrvController#initiateThreadExecutors 方法初始化两个线程池,一个是defaultExecutor,用于处理默认的远程请求;另一个是clientRequestExecutor,用于处理客户端的路由信息请求。这两个线程池都使用了LinkedBlockingQueue作为任务队列,并且重写了newTaskFor方法,使用FutureTaskExt包装了Runnable任务。

    private void initiateThreadExecutors() {
        this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
        this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),
                this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,
                this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {
            @Override
            protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
                return new FutureTaskExt<>(runnable, value);
            }
        };
    
        this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
        this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),
                this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,
                this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {
            @Override
            protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
                return new FutureTaskExt<>(runnable, value);
            }
        };
    }
    
  • 调用 namesrv.NamesrvController#registerProcessor 方法根据 namesrvConfig.isClusterTest() 的值,选择使用ClusterTestRequestProcessor或者DefaultRequestProcessor作为默认处理器

    • ClusterTestRequestProcessor是一个用于集群测试的处理器,它会在请求前后添加一些环境信息,比如产品环境名称、请求时间等
    • DefaultRequestProcessor是一个用于正常运行的处理器,它会根据请求的类型,调用不同的方法来处理,比如注册Broker、获取路由信息、更新配置等。
    • namesrvConfig.isClusterTest() = false 时如果收到请求的 requestCode 等于 RequestCode.GET_ROUTEINFO_BY_TOPIC 则会使用ClientRequestProcessor来处理;当收到其他请求时,会使用DefaultRequestProcessor来处理。
    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
    
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);
        } else {
            // Support get route info only temporarily
            ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
    
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
        }
    }
    

    此部分为 RIP 29 Optimize RocketMQ NameServer 中 Thread pool separation 改进。在改进之前nameserver使用同一个线程池和队列来处理所有的客户端路由请求、broker注册请求等,如果其中一种类型的请求爆发,会影响所有的请求。为了解决这个问题,RIP-29 将最重要的客户端路由请求单独隔离出来,使用不同的线程池和队列,队列大小和线程数都可以配置,这样可以保证不同类型的请求之间不会相互影响,如下图所示

    RocketMQ 5.1 NameServer 启动流程
  • 调用 namesrv.NamesrvController#startScheduleService 方法启动定时服务,执行以下三个任务:

    • 每隔一段时间扫描不活跃的broker,并清理路由信息
    • 每隔 10 分钟打印所有的KV配置信息
    • 每隔 1 秒打印线程池的水位日志,即客户端请求线程池和默认线程池的队列大小和头部任务的慢时间(从创建到执行的时间)
    private void startScheduleService() {
        // 周期性扫描不活跃的Broker,并从路由信息中移除
        this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
                5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
    
        // 每隔 10 分钟打印KVConfig 信息
        this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
                1, 10, TimeUnit.MINUTES);
    
        // 每隔 1 秒打印线程池的水位日志,
        // 即客户端请求线程池和默认线程池的队列大小和头部任务的慢时间(从创建到执行的时间)
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                NamesrvController.this.printWaterMark();
            } catch (Throwable e) {
                LOGGER.error("printWaterMark error.", e);
            }
        }, 10, 1, TimeUnit.SECONDS);
    }
    
  • 调用 namesrv.NamesrvController#initiateSslContext 方法初始化SSL上下文,即配置remotingServer使用TLS协议进行安全通信

  • 调用 namesrv.NamesrvController#initiateRpcHooks 方法注册RPC钩子,即在remotingServer处理请求之前或之后执行一些自定义的逻辑

    private void initiateRpcHooks() {
        this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());
    }
    

    namesrv.route.ZoneRouteRPCHook 类中重写了 doAfterResponse 方法,它会在处理GET_ROUTEINFO_BY_TOPIC请求(即请求的 requestCode = RequestCode.GET_ROUTEINFO_BY_TOPIC)时,根据请求中的zoneName参数,过滤掉不属于该区域的broker和queue数据,从而实现区域隔离的功能。

    具体来说,doAfterResponse会设置 response 的 body 为 namesrv.route.ZoneRouteRPCHook#filterByZoneName 方法返回值的字节数组格式,filterByZoneName 方法作用是返回过滤掉不属于该区域的broker和queue数据后的TopicRouteData数据对象

    @Override
    public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
        if (RequestCode.GET_ROUTEINFO_BY_TOPIC != request.getCode()) {
            return;
        }
        // 省略部分代码
        TopicRouteData topicRouteData = RemotingSerializable.decode(response.getBody(), TopicRouteData.class);
    
        response.setBody(filterByZoneName(topicRouteData, zoneName).encode());
    }
    

第二步:注册 JVM 钩子

通过addShutdownHook方法注册一个ShutdownHookThread对象,即 JVM 钩子,用来在程序终止时调用controller的shutdown方法,释放资源

public void shutdown() {
    this.remotingClient.shutdown();
    this.remotingServer.shutdown();
    this.defaultExecutor.shutdown();
    this.clientRequestExecutor.shutdown();
    this.scheduledExecutorService.shutdown();
    this.scanExecutorService.shutdown();
    this.routeInfoManager.shutdown();

    if (this.fileWatchService != null) {
        this.fileWatchService.shutdown();
    }
}

第二步:启动 controller

调用 namesrv.NamesrvController#start 方法,启动 controller

public void start() throws Exception {
    this.remotingServer.start();

    // In test scenarios where it is up to OS to pick up an available port, set the listening port back to config
    if (0 == nettyServerConfig.getListenPort()) {
        nettyServerConfig.setListenPort(this.remotingServer.localListenPort());
    }

    this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()
        + ":" + nettyServerConfig.getListenPort()));
    this.remotingClient.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }

    this.routeInfoManager.start();
}

启动操作包括:

  • 调用remotingServer对象的start方法,启动一个NettyRemotingServer,用于接收和处理客户端的请求。
  • 如果nettyServerConfig对象的listenPort属性为0,说明是由操作系统自动分配一个可用端口,那么将remotingServer对象的localListenPort属性赋值给nettyServerConfig对象的listenPort属性,保持一致。
  • 调用remotingClient对象的updateNameServerAddressList方法,更新本地地址列表,只包含当前机器的IP地址和端口号。
  • 调用remotingClient对象的start方法,启动一个NettyRemotingClient,用于向其他服务发送请求。
  • 如果fileWatchService对象不为空,调用它的start方法,启动一个文件监视服务,用于动态加载证书文件。
  • 调用routeInfoManager对象的start方法,启动一个路由信息管理器,用于维护Broker和Topic的路由关系。

至此,NamesrvController 的启动流程结束文章来源地址https://www.toymoban.com/news/detail-404429.html

到了这里,关于RocketMQ 5.1 NameServer 启动流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式事务,zookeeper,dubbo,rocketmq

    CAP理论是分布式领域中非常重要的一个指导理论,C(Consistency)表示强一致性,A(Availability)表示可用性,P(Partition Tolerance)表示分区容错性,CAP理论指出在目前的硬件条件下,一个分布式系统是必须要保证分区容错性的,而在这个前提下,分布式系统要么保证CP,要么保

    2024年04月12日
    浏览(45)
  • RocketMQ分布式事务 -> 最终一致性实现

    · 分布式事务的问题常在业务与面试中被提及, 近日摸鱼看到这篇文章, 阐述的非常通俗易懂, 固持久化下来我博客中, 也以便于我二刷 转载源 : 基于RocketMQ分布式事务 - 完整示例 本文代码不只是简单的demo,考虑到一些异常情况、幂等性消费和死信队列等情况,尽量向可靠业务

    2024年02月15日
    浏览(54)
  • 分布式消息中间件RocketMQ的应用

    所有代码同步至GitCode:https://gitcode.net/ruozhuliufeng/test-rocketmq.git 普通消息 消息发送分类 ​ Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。 同步发送消息 ​ 同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。

    2024年02月05日
    浏览(82)
  • SpringCloudAlibaba集成RocketMQ实现分布式事务事例(一)

    业务需求 用户请求订单微服务 order-service 接口删除订单(退货),删除订单时需要调用 account-service的方法给账户增加余额,一个典型的分布式事务问题。 代码实现 事务消息有三种状态: TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息 TransactionStatus.Roll

    2024年02月13日
    浏览(104)
  • 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

    痛点背景 业务场景 假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做? 之前方案 最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)30分钟,则关闭订单。 方案评估 优点:是实

    2024年02月13日
    浏览(51)
  • 解析RocketMQ:高性能分布式消息队列的原理与应用

    什么是消息队列 消息队列是一种消息传递机制,用于在应用程序和系统之间传递消息,实现解耦和异步通信。它通过将消息发送到一个中间代理(消息队列),然后由消费者从该队列中获取消息并处理。 RocketMQ简介 RocketMQ是阿里巴巴开源的一款高性能分布式消息队列系统。它

    2024年02月14日
    浏览(50)
  • 【送书福利-第七期】《分布式中间件核心原理与RocketMQ最佳实践》

    大家好,我是洲洲,欢迎关注,一个爱听周杰伦的程序员。关注公众号【程序员洲洲】即可获得10G学习资料、面试笔记、大厂独家学习体系路线等…还可以加入技术交流群欢迎大家在CSDN后台私信我! 分布式中间件核心原理与RocketMQ实战技术一本通:实战案例+操作步骤+执行效

    2024年02月08日
    浏览(96)
  • [RocketMQ] Broker启动流程源码解析 (二)

    1.Brocker介绍 Broker主要负责消息的存储、投递和查询以及服务高可用保证, Broker包含了以下几个重要子模块。 Remoting Module: 整个Broker的实体, 负责处理来自Client端的请求 Client Manager: 负责管理客户端 Producer和Consumer, 维护Consumer的Topic订阅信息 Store Service: 提供方便简单的API接口处理

    2024年02月09日
    浏览(51)
  • RocketMQ 5.1.0 源码详解 | Producer 启动流程

    初始化一个 DefaultMQProducer 对象的代码如下 在初始化 DefaultMQProducer 时会初始化一个 DefaultMQProducerImpl 实例并赋值给 producer 的成员变量 同时,在初始化 DefaultMQProducerImpl 实例时也会将 producer 对象作为成员变量保存在 DefaultMQProducerImpl 实例中 构造 defaultMQProducerImpl 的代码如下 因此

    2024年02月15日
    浏览(34)
  • [RocketMQ] Consumer消费者启动主要流程源码 (六)

    客户端常用的消费者类是DefaultMQPushConsumer, DefaultMQPushConsumer的构造器以及start方法的源码。 1.创建DefaultMQPushConsumer实例 最终都是调用下面四个参数的构造函数: 指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器, 创建了一个DefaultMQPushConsumerImpl实例

    2024年02月16日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包