RocketMQ学习笔记:生产者Producer

这篇具有很好参考价值的文章主要介绍了RocketMQ学习笔记:生产者Producer。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

DefaultMQProducer

根据上文:RocketMQ学习笔记:消息Message - 掘金 (juejin.cn),我们定位到Producer中的这一行代码:

java

复制代码

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start();

  1. 通过new DefaultMQProducer("ProducerGroupName")实例化一个生产者对象。这里的ProducerGroupName在5.x的版本已经移除了,他主要说的是生产者分组,主要是保证Producer发送同一类消息且发送逻辑一致。.
  2. 通过setNamesrvAddr("127.0.0.1:9876"),是指明设置设置NameServer地址。(记得这句话吗:Producer、Consumer、Broker在启动时,会自行将数据注册到NameServer中)。
  3. 启动这个生产者实例。

DefaultMQProducer是继承ClientConfig的,ClientConfig主要是做RocketMQ客户端公共配置的,setNamesrvAddr就是其方法,后续我们再介绍这个ClientConfig类。

RocketMQ学习笔记:生产者Producer

DefaultMQProducer的其他推荐资料:RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer_小虚竹的博客-CSDN博客,该文章介绍的很详细。

干活的是DefaultMQProducerImpl

构造它需要初始化一些基本属性,才能方便后面干活。

在构造方法中,我们继续往下定位发现,在DefaultMQProducer中其实有一个成员属性是defaultMQProducerImpl,我们记住:它是真正做消息传输的事:

 

java

复制代码

public class DefaultMQProducer extends ClientConfig implements MQProducer { protected final transient DefaultMQProducerImpl defaultMQProducerImpl; ...

DefaultMQProducer的构造方法中,实例化了DefaultMQProducerImpl

RocketMQ学习笔记:生产者Producer

同时在上面第3步中,执行的

 

java

复制代码

producer.start();

也是在执行DefaultMQProducerImplstart()

RocketMQ学习笔记:生产者Producer

了解一下它

当在DefaultMQProducer调用以下代码时,会去创建DefaultMQProducerImpl,它是 RocketMQ 生产者的实现类。该类的主要作用是提供一个异步发送线程池,用于处理生产者发送消息的异步任务。

 

ini

复制代码

defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);

调用以下的代码,主要是构造DefaultMQProducerImpl并且初始化一些属性,主要是线程池和队列。

  • asyncSenderThreadPoolQueue:容量为 50000。该队列用于存储异步发送任务,当队列已满时,新的任务将被阻塞,直到队列中有空间可用。
  • defaultAsyncSenderExecutor:线程池中的线程数量与 CPU 核心数量相同;线程存活时间为 60 秒,即如果一个线程在 60 秒内没有执行任务,则会被回收。
 

java

复制代码

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { // 保存传入的 DefaultMQProducer 对象和 RPCHook 对象 this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; // 创建一个有界阻塞队列,容量为 50000,用于存储异步发送任务 this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); // 创建一个异步发送线程池 this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( // 设置核心线程数为当前可用的处理器数量 Runtime.getRuntime().availableProcessors(), // 设置最大线程数为当前可用的处理器数量 Runtime.getRuntime().availableProcessors(), // 设置线程存活时间为 60 秒 1000 * 60, TimeUnit.MILLISECONDS, // 使用上面创建的有界阻塞队列作为任务队列 this.asyncSenderThreadPoolQueue, // 创建一个 ThreadFactory,用于创建新的线程 new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { // 创建一个新的线程,名称为 AsyncSenderExecutor_1、AsyncSenderExecutor_2、... return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } }); }

启动生产者实例

 

java

复制代码

producer.start();

通过producer.start();其实是调用this.defaultMQProducerImpl.start();这也就证明了干活的是DefaultMQProducerImpl

RocketMQ学习笔记:生产者Producer

最后看一下发送消息

回到Producer类中,看最后Producer做了什么事情。

 

java

复制代码

SendResult sendResult = producer.send(msg);

根据代码一步步深入,主要是以下这样的逻辑,我们直接看到最后的地方:

RocketMQ学习笔记:生产者Producer

send最后执行的是sendDefaltImpl这个方法。 它主要做这些事情:

  1. 校验消息
 

java

复制代码

Validators.checkMessage(msg, this.defaultMQProducer);

根据checkMessage源码,他主要校验:

  • 消息对象是否为空
  • topic是否合法
    • 是否为空的校验
    • 是不是系统主题的校验
  • 检查消息体(实际放的内容)是否为空
  • 检查消息体的长度是否为0
  • 检查消息的大小有没有超过默认值,默认值是4M
  1. 拿 Topic 名称去 NameServer 换取详情(挖坑)
  2. 消息重投,同步传输默认重传3次(包含传输的1次),否则1次。

最后,消息传递的方式有:同步、异步(回调函数)、还有一个是发就得了。文章来源地址https://www.toymoban.com/news/detail-440969.html

到了这里,关于RocketMQ学习笔记:生产者Producer的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python脚本,用于查询RocketMQ的JMX接口以获取生产者和消费者的连接数

    以下是一个简单的Python脚本,用于查询RocketMQ的JMX接口以获取生产者和消费者的连接数。这个脚本依赖于 requests 库来发送HTTP请求。 首先,你需要在Broker的配置文件中开启JMX: 然后,你可以使用以下脚本来获取连接数: 然后,你可以在Zabbix中创建一个新的监控项,用于执行这

    2024年02月12日
    浏览(58)
  • RocketMQ生产者和消费者都开启Message Trace后,Consume Message Trace没有消费轨迹

    1、生产者和消费者所属同一个程序 2、生产者开启消息轨迹 3、消费者开启消息轨迹 4、生产者和消费者一起开启后,在RocketMQ可视化界面,无法查看到消息的消费轨迹 注:如果只开启生产者或消费者其中之一的消息轨迹,则消息的消费轨迹是正常的 无法展示消费轨迹 具体原

    2024年02月14日
    浏览(45)
  • kafka学习-生产者

    目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 在Kafka中保存的数据都是字节数组。 消息发送前,需要将消息序列化为字节数组进行发送。

    2024年02月09日
    浏览(39)
  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(54)
  • Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验

    1.1 生产者消息发送流程 1.1.1 发送原理 在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。 batch.size:只有数据积累到bat

    2024年02月09日
    浏览(46)
  • C#多线程学习(三) 生产者和消费者

    线程学习第一篇: C#多线程学习(一) 多线程的相关概念 线程学习第二篇: C#多线程学习(二) 如何操纵一个线程 前面说过,每个线程都有自己的资源,但是代码区是共享的,即每个线程都可以执行相同的函数。这可能带来的问题就是几个线程同时执行一个函数,导致数据的混

    2023年04月21日
    浏览(42)
  • SparkStreaming学习——读取socket的数据和kafka生产者的消息

    目录 一、Spark Streaming概述 二、添加依赖 三、配置log4j 1.依赖下载好后打开IDEA最左侧的外部库 2.找到spark-core 3.找到apache.spark目录 4.找到log4j-defaults.properties文件 5.将该文件放在资源目录下,并修改文件名 6.修改log4j.properties第19行的内容 四、Spark Streaming读取Socket数据流 1.代码编

    2023年04月27日
    浏览(40)
  • 【Java系列】多线程案例学习——基于阻塞队列实现生产者消费者模型

    个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习JavaEE的一点学习心得,欢迎大家在评论区交流讨论💌 什么是阻塞式队列(有两点): 第一点:当队列满的时候

    2024年02月04日
    浏览(57)
  • 【Linux学习】多线程——同步 | 条件变量 | 基于阻塞队列的生产者消费者模型

    🐱作者:一只大喵咪1201 🐱专栏:《Linux学习》 🔥格言: 你只管努力,剩下的交给时间! 以生活中消费者生产者为例: 生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。所以我们就是消费者,供应商

    2024年02月05日
    浏览(51)
  • 【Linux学习】多线程——信号量 | 基于环形队列的生产者消费者模型 | 自旋锁 | 读写锁

    🐱作者:一只大喵咪1201 🐱专栏:《Linux学习》 🔥格言: 你只管努力,剩下的交给时间! 之前在学习进程间通信的时候,本喵简单的介绍过一下信号量,今天在这里进行详细的介绍。 这是之前写的基于阻塞队列的生产者消费者模型中向阻塞队列中push任务的代码。 上面代码

    2024年02月07日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包