RocketMQ 5.1.0 源码详解 | Producer 启动流程

这篇具有很好参考价值的文章主要介绍了RocketMQ 5.1.0 源码详解 | Producer 启动流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

初始化DefaultMQProducer实例

初始化一个 DefaultMQProducer 对象的代码如下

// 返回一个producer对象
DefaultMQProducer producer = new DefaultMQProducer();
// 设置组名
producer.setProducerGroup("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");

在初始化 DefaultMQProducer 时会初始化一个 DefaultMQProducerImpl 实例并赋值给 producer 的成员变量

public DefaultMQProducer() {
    this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);
}

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    // 将defaultMQProducerImpl对象保存在成员变量中
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

同时,在初始化 DefaultMQProducerImpl 实例时也会将 producer 对象作为成员变量保存在 DefaultMQProducerImpl 实例中

构造 defaultMQProducerImpl 的代码如下

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    // 将defaultMQProducer对象保存在成员变量中
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;

    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<>(50000);
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
    if (defaultMQProducer.getBackPressureForAsyncSendNum() > 10) {
        semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),10), true);
    } else {
        semaphoreAsyncSendNum = new Semaphore(10, true);
        log.info("semaphoreAsyncSendNum can not be smaller than 10.");
    }

    if (defaultMQProducer.getBackPressureForAsyncSendNum() > 1024 * 1024) {
        semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),1024 * 1024), true);
    } else {
        semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
        log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
    }
}

因此 DefaultMQProducerImpl 对象能够通过其保存的 producer 对象实例获取到 producer 的所有参数

经过上面简单的设置后此时 producer 被赋值的成员变量有

producerGroup = "ProducerGroupName"
namesrvAddr = "127.0.0.1:9876"
namespaceInitialized = false
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook) // 这里rpcHook为null

启动流程

DefaultMQProducer#start

只需要执行下面代码即可启动上方初始化的 produer 对象

producer.start();

producerstart() 方法具体的内容如下

public void start() throws MQClientException {
    // 由于DefaultMQProducer继承了ClientConfig,所以可以直接使用ClientConfig的withNamespace方法
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 调用DefaultMQProducerImpl的start方法
    this.defaultMQProducerImpl.start();
    // 如果traceDispatcher不为空,则调用traceDispatcher的start方法
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            logger.warn("trace dispatcher start failed ", e);
        }
    }
}

可以看到 DefaultMQProducer 只是一个门面类,具体的实现都是由DefaultMQProducerImpl 去做的


由于 Namespace 的存在,因此在启动 producer 时首先会重新设置 producerGroup,我们需要重点关注经过 withNamespace() 方法处理后返回的生产者组名

public String withNamespace(String resource) {
    // this.getNamespace()不设置的话返回的是null
    return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}

可见 withNamespace() 方法仅仅是调用了 wrapNamespace() 方法,并将 NamespaceproducerGroup 作为参数一并传入

public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {
    // 如果namespace为空或者resourceWithOutNamespace为空,则直接返回resourceWithOutNamespace
    if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {
        return resourceWithOutNamespace;
    }

    // 如果resourceWithOutNamespace是SystemResource,
    // 或者resourceWithOutNamespace已经组合了Namespace,则直接返回resourceWithOutNamespace
    if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {
        return resourceWithOutNamespace;
    }

    String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);
    StringBuilder stringBuilder = new StringBuilder();

    if (isRetryTopic(resourceWithOutNamespace)) {
        stringBuilder.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
    }

    if (isDLQTopic(resourceWithOutNamespace)) {
        stringBuilder.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
    }

    // 返回 [RETRY_PREFIX] + [DLQ_PREFIX] + namespace + % + resourceWithoutRetryAndDLQ
    return stringBuilder.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();

}

由于我们并没有设置 producerNamespace,因此会直接返回 producerGroup。造成的效果就是在这个生产者启动过程中第一行代码没有任何效果


traceDispatcher 又是个什么东西?traceDispatcher 的作用是追踪消息的发送和消费的轨迹,它是一个 AsyncTraceDispatcher 对象,它实现了 TraceDispatcher 接口,用于异步地发送追踪消息到 Broker。它可以帮助用户查看每条消息的完整链路数据,包括发送时间、消费时间、存储时间等。我们可以通过使用下面的构造函数构造出一个含有 traceDispatcher 的 DefaultMQProducer 实例

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
    boolean enableMsgTrace, final String customizedTraceTopic) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.defaultMQProducerImpl);
            traceDispatcher = dispatcher;
            this.defaultMQProducerImpl.registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));
            this.defaultMQProducerImpl.registerEndTransactionHook(
                new EndTransactionTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

由于我们在初始化 DefaultMQProducer 实例时没有生成 traceDispatcher 实例,因此 null != traceDispatcher 返回 FALSE,不调用 traceDispatcher 的 start 方法

DefaultMQProducerImpl#start

接下来调用 defaultMQProducerImplstart() 方法,方法内容如下:

private ServiceState serviceState = ServiceState.CREATE_JUST;

public void start() throws MQClientException {
    this.start(true);
}

public void start(final boolean startFactory) throws MQClientException {
    // serviceState默认为CREATE_JUST
    switch (this.serviceState) {
        case CREATE_JUST:
            // 1. 设置serviceState为START_FAILED
            this.serviceState = ServiceState.START_FAILED;

            // 2. 检查生产者组名是否合法
            this.checkConfig();

            // 3. 如果实例名为默认值则将生产者的instanceName设置为 UtilAll.getPid() + "#" + System.nanoTime()
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            // 4. 使用MQClientManager.getInstance()返回一个单例的MQClientManager对象
            // defaultMQProducer继承了ClientConfig,因此getOrCreateMQClientInstance方法的参数可以是defaultMQProducer
            // mQClientFactory是MQClientInstance的一个实例,MQClientInstance是MQClientManager的内部类
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // 5. 注册producer实例:将生产者组名作为key,defaultMQProducerImpl对象作为value保存到MQClientInstance的producerTable中
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                // 如果注册失败则将serviceState重设为CREATE_JUST
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

             // 6. 将defaultMQProducer的createTopicKey作为key,TopicPublishInfo作为value,放入到defaultMQProducerImpl的topicPublishInfoTable中
             // createTopicKey的默认值为TBW102
             // topicPublishInfoTable的作用是存储topic的路由信息,包括topic的queue数目、brokerName、brokerId等
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            // 7. 启动MQClientInstance实例
            if (startFactory) {
                // MQClientInstance的start方法会启动MQClientInstance的定时任务
                // 包括定时向所有broker发送心跳、定时清理过期的topic、定时清理过期的consumer、定时清理过期的producer
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 8. 如果启动成功则将serviceState设置为RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    // 立即发送心跳到所有broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    // 开启一个定时任务来处理所有的 Request 状态,对异步的请求根据状态处理回调函数
    // 这个异步请求指的并不是在 send 中的异步回调机制,而是 Request-Reply 特性,用来模拟 RPC 调用
    RequestFutureHolder.getInstance().startScheduledTask(this);

}

注意上述代码中有一段注释为createTopicKey 的默认值为 TBW102,这个 Topic 在自动创建 topic 时有关键作用

最后的 RequestFutureHolder.getInstance().startScheduledTask(this) 用来扫描和处理过期的异步请求,但是需要注意的是这个异步请求指的并不是在 send 中的异步回调机制,而是 Request-Reply 特性,用来模拟 RPC 调用

RocketMQ 有两种异步请求的方式,一种是在 send 方法中传入一个回调函数,当消息发送成功或失败时,会调用这个回调函数。这种方式不需要等待服务器的响应,只需要等待服务器的确认。

另一种是在 RocketMQ 4.7.0 后加入的 Request-Reply 特性,这种方式是模拟 RPC 调用,需要等待服务器的响应,并返回一个结果。这种方式需要使用 RequestResponseFuture 对象来封装请求和响应的信息。


在使用 getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook) 返回一个 MQClientInstance 对象时,如果 factoryTable 中没有实例的话则会初始化一个新的实例,代码如下:

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    // 最多情况下 clientId = ip@instanceName@unitName@RequestType.STREAM
    // 默认情况下 clientId = ip@instanceName
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

注意在初始化 MQClientInstance 实例的过程中会初始化一个新的 DefaultMQProducer 实例,与我们一开始就有的 producer 实例不是一个对象

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    // ...

    // 此处实例化内部的producer
    // 用于消费失败或超时的消息,sendMessageBack回发给broker,放到retry topic中重试消费
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);

    // ...
}

此时新初始化的 DefaultMQProducer 实例的 producerGroup = "CLIENT_INNER_PRODUCER"instanceName = "DEFAULT"

接着,在初始化实例后又执行了 this.defaultMQProducer.resetClientConfig(clientConfig) 这行代码

public void resetClientConfig(final ClientConfig cc) {
    this.namesrvAddr = cc.namesrvAddr;
    this.clientIP = cc.clientIP;
    this.instanceName = cc.instanceName;
    this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
    this.pollNameServerInterval = cc.pollNameServerInterval;
    this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
    this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
    this.pullTimeDelayMillsWhenException = cc.pullTimeDelayMillsWhenException;
    this.unitMode = cc.unitMode;
    this.unitName = cc.unitName;
    this.vipChannelEnabled = cc.vipChannelEnabled;
    this.useTLS = cc.useTLS;
    this.socksProxyConfig = cc.socksProxyConfig;
    this.namespace = cc.namespace;
    this.language = cc.language;
    this.mqClientApiTimeout = cc.mqClientApiTimeout;
    this.decodeReadBody = cc.decodeReadBody;
    this.decodeDecompressBody = cc.decodeDecompressBody;
    this.enableStreamRequestType = cc.enableStreamRequestType;
}

可以看到,这段代码将 producer 实例的 ClinetConfig 属性完全拷贝了一份给新创建的 DefaultMQProducer 实例的 ClinetConfig 属性。因此这段代码后新初始化的 DefaultMQProducer 实例的 instanceName 不再是默认值而是和 producer 的一致

MQClientInstance#start

接下来我们来看 mQClientFactory.start() 这部分的代码

public void start() throws MQClientException {

    // 用synchronized修饰保证线程安全性与内存可见性
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                // 由于传入了NameServer的地址,因此不进入分支
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                // 1. 启动用于和broker通信的netty客户端
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                // 2. 启动定时任务,包括心跳,拉取topic路由信息,更新broker信息,清理过期消息等
                this.startScheduledTask();
                // 3. Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                // 4. 启动负载均衡服务(对MQConsumer有效)
                this.rebalanceService.start();
                // Start push service
                // 5. 启动它内部的producer实例
                // this.defaultMQProducer是在DefaultMQProducerImpl中
                // 使用MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook)被初始化的
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

这段代码的主要任务是启动和broker通信的 Netty 客户端、启动定时任务、启动拉取消息服务、启动负载均衡服务、启动内部生产者实例

其中启动负载均衡服务只是针对消费者而言的,在生产者启动过程中并无作用

mQClientAPIImpl 对象是 RocketMQ 客户端与 Broker 之间通信的实现类

我们先看启动定时任务这个方法做了什么

private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        // 如果没有指定namesrv地址,则定时获取namesrv地址
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
            } catch (Exception e) {
                log.error("ScheduledTask fetchNameServerAddr exception", e);
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    // 定期从NameServer更新topic路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {
            MQClientInstance.this.updateTopicRouteInfoFromNameServer();
        } catch (Exception e) {
            log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    // 用于定期清除离线Broker,并向所有Broker发送心跳包
    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    // 定时持久化消费者当前消费进度(对MQConsumer有效)
    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 根据当前的积压调优线程池的核心线程数
    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
        try {
            MQClientInstance.this.adjustThreadPool();
        } catch (Exception e) {
            log.error("ScheduledTask adjustThreadPool exception", e);
        }
    }, 1, 1, TimeUnit.MINUTES);
}

其中的定时持久化消费者当前消费进度任务同样只是针对消费者而言的,在生产者启动过程中并无作用

至此,producer 的启动流程结束

启动流程总结

执行 producer.start()启动一个 producer

  1. 重新设置生产者组名
  2. 调用 defaultMQProducerImplstart() 方法,是进行启动实现的入口
    1. 检查当前状态,如果是 CREATE_JUST 则进入启动流程
    2. 检查生产者组名称是否合法
    3. 更改 producerinstanceName
    4. 创建一个 MQClientInstance 类型的 mQClinetFactory 实例
      1. 创建一个新的 defaultMQProducer 实例作为 mQClinetFactory 实例的成员变量
      2. 新的 defaultMQProducer 实例内部又会创建一个新的 defaultMQProducerImpl 实例
      3. 将新的 defaultMQProducer 实例的 ClinetConfig 属性复制粘贴为 producerClinetConfig 属性
    5. producer 实例放入 mQClinetFactoryproducerTable 中,key为 producer 的生产者组名
    6. defaultMQProducercreateTopicKey 作为key,TopicPublishInfo 作为value,放入到 defaultMQProducerImpltopicPublishInfoTable
    7. 启动 mQClinetFactory
      1. 如果没有 NameServer 地址则尝试获取
      2. 启动用于和 broker 通信的 netty 客户端
      3. 启动定时任务
        1. 如果没有指定 NameServer 地址,则定时获取 NameServer 地址
        2. 定期从NameServer更新topic路由信息
        3. 定期清除离线Broker,并向所有Broker发送心跳包
        4. 定时持久化消费者当前消费进度(对MQConsumer有效)
        5. 定时根据当前的积压调优线程池的核心线程数,但是实现是空的
      4. 启动 pullMessageService 从 broker 拉取消息
      5. 启动消费者客户端的负载均衡服务
      6. 启动 mQClinetFactory 内部的 defaultMQProducerImpl 实例
        1. 检查当前状态,如果是 CREATE_JUST 则进入启动流程
        2. 检查生产者组名称是否合法
        3. 由于其 instanceName 等于 MixAll.CLIENT_INNER_PRODUCER_GROUP,因此不更改
        4. 创建一个 MQClientInstance 类型的 mQClinetFactory 实例
        5. producer 实例放入 mQClinetFactoryproducerTable 中,key为 producer 的生产者组名
        6. defaultMQProducercreateTopicKey 作为key,TopicPublishInfo 作为value,放入到 defaultMQProducerImpltopicPublishInfoTable
        7. 将当前状态设置为 RUNNING
        8. 如果上述都成功,则立即发送心跳到所有的 broker
        9. 启动定时任务扫描和处理过期的异步请求
    8. 将当前状态设置为 RUNNING
    9. 如果上述都成功,则立即发送心跳到所有的 broker
    10. 启动定时任务扫描和处理过期的异步请求
  3. 如果 traceDispatch 不为空则启动 traceDispatcher

实例内容

以生产者组名称为 ProducerGroupName,topic名称为 TopicTest 为例,生产者启动成功后的实例内容如下文章来源地址https://www.toymoban.com/news/detail-616695.html

DefaultMQProducer producer:1 # 启动入口
	String namesrvAddr = "127.0.0.1:9876" # NameServer地址
	String clientIp = "192.168.142.1" # producer的ip地址
	String producerGroup = "ProducerGroupName" # 生产者组名称
	String createTopicKey = "TBW102" # 用来自动创建topic的topic名称
	String instanceName = "14896#9822706678400" # 生产者实例的名称
	DefaultProducerImpl defaultProducerImpl:1 # 实际启动类
		DefaultMQProducer defaultMQProducer:1 # 就是最开头的 producer
        ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable:1 = {
            "TBW102" : TopicPublishInfo
        } # 存储topic的路由信息
		MQClientInstance mQClientFactory # 负责管理与 NameServer 和 Broker 的网络连接
			String clientId = "192.168.142.1@14896#9822706678400" # 为 ip + instanceName
			MQClientAPIImpl mQClientAPIImpl # 负责实现与 Broker 之间的通信
			ConcurrentMap<String, MQProducerInner> producerTable = {
                "ProducerGroupName" : defaultProducerImpl:1
			} # MQProducerInner是DefaultProducerImpl的接口
            ConcurrentMap<String, MQConsumerInner> consumerTable # MQConsumerInner是DefaultConsumerImpl的接口
            ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable # broker的地址
            DefaultMQProducer defaultMQProducer:2 # mQClientFactory内部的producer实例
            	String producerGroup = "CLIENT_INNER_PRODUCER"
            	Stirng instanceName = "14896#9822706678400" # 和 producer 的 clientConfig 属性完全一致
                DefaultProducerImpl defaultProducerImpl:2
                    DefaultMQProducer defaultMQProducer:2
                    ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable:2 = {
                        "TBW102" : TopicPublishInfo
                    } # 存储topic的路由信息

到了这里,关于RocketMQ 5.1.0 源码详解 | Producer 启动流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ 5.1 NameServer 启动流程

    RocketMQ是一个分布式消息中间件,它的核心组件之一是namesrv,负责管理broker的路由信息和kv配置。本文将介绍RocketMQ5.1版本中namesrv的启动过程,包括如何解析命令行参数、加载配置文件、初始化和启动namesrv控制器等。 首先,我们需要在环境变量中设置ROCKETMQ_HOME,指向RocketM

    2023年04月08日
    浏览(34)
  • RocketMQ学习笔记:生产者Producer

    根据上文:RocketMQ学习笔记:消息Message - 掘金 (juejin.cn),我们定位到 Producer 中的这一行代码: java 复制代码 DefaultMQProducer producer = new DefaultMQProducer(\\\"ProducerGroupName\\\"); producer.setNamesrvAddr(\\\"127.0.0.1:9876\\\"); producer.start(); 通过 new DefaultMQProducer(\\\"ProducerGroupName\\\") 实例化一个生产者对象。这

    2024年02月04日
    浏览(43)
  • RocketMQ 5.0 本地源码启动Cluster模式指南

    这里是weihubeats,觉得文章不错可以关注公众号 小奏技术 ,文章首发。拒绝营销号,拒绝标题党 这里选用最新的稳定版本5.0 目前5.0的架构发生了重大调整,添加了一层 rocketmq-proxy ,可以通过 grpc 的方式接入 目前部署方式分为两种 Local 模式:由于 Local 模式下 Proxy 和 Broker 是同进

    2024年02月13日
    浏览(44)
  • 一文详解RocketMQ-Spring的源码解析与实战

    摘要: 这篇文章主要介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。 本文分享自华为云社区《RocketMQ-Spring : 实战与源码解析一网打尽》,作者:勇哥java实战分享。 RocketMQ 是大家耳熟能详的消息队列,开源项

    2023年04月24日
    浏览(36)
  • Rocketmq安装与使用:启动报错:Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC ···

    下载rocketmq压缩包之后解压到目录下(不能是中文目录)之后,cmd打开启动 拦路虎1:运行后chua报错了o(╥﹏╥)o,暴风哭泣中 错误内容全部如下: Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Unrecognized VM option

    2024年02月04日
    浏览(48)
  • RocketMQ架构和工作流程

    目录 一.MQ概述 1.简介 2.用途 限流削峰 异步解耦  数据收集  3.MQ对比 二. RocketMQ概述 1.基本概念 消息(Message) 主题(Topic) 标签(Tag) 队列(Queue) 消息标识(MessageId/Key)  2.系统架构 Producer Consumer NameServer Broker 工作流程 三.RocketMQ的启动 1.安装JDK 2.配置RocketMQ ①修改Nam

    2024年02月09日
    浏览(45)
  • RocketMQ(三) broker启动

    RocketMQ源码版本V5.0.0,可兼容之前的版本,因为整理资料的时候,之前的版本,和V5版本有所出入,核心流程基本还是大同小异的。 此前已经总结了NameServer的启动流程源码:现在来了解Broker的启动流程。在RocketMQ启动的时候,首先要启动NameServer,然后再启动Broker。 Broker模块主

    2024年02月08日
    浏览(46)
  • RocketMQ broker启动失败

    版本:4.9.3 现象:NameServer启动没问题,Broker无法启动。 查看日志,没有broker方面的报错,应该是整个服务都没起来。 于是开始网上搜索解决方案: 方案1: 删除store文件夹。 删除之后问题依旧 方案2: 更改broker.conf,加上IP等配置。 发现这些配置已经有了,于是更改无效。

    2024年02月09日
    浏览(42)
  • Windows下RocketMQ的启动

    下载地址:下载 | RocketMQ 解压后 修改 bin目录下的runbroker.cmd set \\\"JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g\\\" set \\\"JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g\\\" set \\\"JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%\\\" 分别改为  set \\\"JAVA_OPT=%JAVA_OPT% -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m\\\"  set \\\"JAVA

    2024年02月14日
    浏览(36)
  • rocketMQ5.0启动broker报错:module java.base does not export sun.nio.ch to unnamed module

    最近在搭建RocketMQ5.0,然后按照官方流程:https://rocketmq.apache.org/zh/docs/quickStart/02quickstart 先启动namesrv完,再启动broker的时候遇到以下错误。 之前大家如果搭建过老版本的RocketMQ基本都遇到过,因为使用jdk版本高的原因,jdk1.8以上涉及到导包的问题,所以RocketMQ启动命令不兼容

    2024年02月12日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包