nsqd的架构及源码分析

这篇具有很好参考价值的文章主要介绍了nsqd的架构及源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

文章目录

一  nsq的整体代码结构

二  回顾nsq的整体架构图

三  nsqd进程的作用

四  nsqd启动流程的源码分析

五  本篇博客总结


在博客 nsq整体架构及各个部件作用详解_YZF_Kevin的博客-CSDN博客 中我们讲了nsq的整体框架,各个部件的大致作用。如果没看过的,建议大家去学习下,不然理解后续的内容会有难度

这篇博客开始我们来看下每个部件的详细功能,从源码入手分析其内部实现原理

一  nsq的整体代码结构

建议大家也下载nsq的代码,一边看博客一边看代码印象更深刻。nsq的官方git代码地址:GitHub - nsqio/nsq: A realtime distributed messaging platform

nsq代码结构如下,图中有注释,大家先有个整体印象,知道各个模块的代码在哪就行

nsqd的架构及源码分析,nsq,nsqd源码,nsq源码

二  回顾nsq的整体架构图

nsqd的架构及源码分析,nsq,nsqd源码,nsq源码

 图中最上面的四个节点就是nsqd进程,至少要有1个,可以多开。我们画了4个,分别是nsq1,nsq2,nsq3,nsq4

注意看nsqd的连接关系,每个nsqd节点和所有客户端都有连接(tcp+http),且每个nsqd节点和所有的nsqlookupd节点也有连接(tcp)

三  nsqd进程的作用

1. topic的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

2. channel的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

3. message的监听,中转,持久化(保存到文件,从文件加载),主动推送消息给各个客户端,超时重发,消息计数

4. 配置修改,运行状态(协程、内存)统计

5. 抽检channel的延迟队列,飞行队列,消息超时的重新入队

6. 统计和上报工作,主要统计topic,channel,消息,各种队列,客户端连接,GC等信息,通过UDP协议上报给指定地址的进程

可以说,nsqd进程是整个nsq平台的核心,消息队列架构简单的话,只有一个nsqd进程就够了。

我画了一个图来概括一个nsqd进程的工作内容,如下

nsqd的架构及源码分析,nsq,nsqd源码,nsq源码

 

四  nsqd启动流程的源码分析

nsqd的代码主要在两块

1. 代码框架及main函数,目录在 nsq/apps/nsqd/*

2. 实现代码,目录在 nsq/nsqd/*

值得一提的是nsqd,nsqlookupd,nsqadmin这三个进程的框架都使用了go-svc包,这个包很简单,使用者只需实现它的三个函数即可

Init()           配置,初始化等操作

Start()        真正启动

Stop()        结束时的关闭操作

好了,我们看nsqd的入口,也就是main函数,代码在nsq/apps/nsqd/main.go,代码如下(已加注释)

type program struct {
	once 		sync.Once
	nsqd 		*nsqd.NSQD	// nsqd对象
}

// nsqd的启动入口
func main() {
	prg := &program{}
	// Run内部会调用Init(),Start(),监听到这两个系统信号时会调用Stop()
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		logFatal("%s", err)
	}
}

main()函数内部只有一个对象program,也只有一处调用svc.Run(),这个函数内部会调用program.Init()program.Start()

其中program.Init()函数,主要是创建并检测nsqd的配置,然后根据配置创建出一个nsqd实例

重点在program.Start()函数,代码如下(已加注释)

// nsqd的启动,重点在调用的Main()函数
func (p *program) Start() error {
	// 加载元数据,并创建初始化出所有的topic对象,所有的channel对象
	err := p.nsqd.LoadMetadata()
	if err != nil {
		logFatal("failed to load metadata - %s", err)
	}
	// 再持久化元数据到文件(不要觉得奇怪,因为上面的LoadMetadata()函数可能会过滤掉一些无效的topic,channel,这里再重写算是刷新了元数据)
	err = p.nsqd.PersistMetadata()
	if err != nil {
		logFatal("failed to persist metadata - %s", err)
	}
	// 启动一个新协程,专门运行nsqd的Main()循环,注意这个Main()是永不退出的(除非出错)
	go func() {
		err := p.nsqd.Main()
		if err != nil {
			p.Stop()
			os.Exit(1)
		}
	}()
	return nil
}

对上面的代码解释下,program.Start()函数一共干了3件事

1. nsqd.LoadMetadata(), 这个函数根据配置加载旧nsqd元数据。这些元数据包含版本号,topic,channel,过滤掉不合法的topic和channel,合法的topic和channel都创建出对象,并且为每个topic建立处理循环

2. nsqd.PersistMetadata(), 把过滤后的topic和channel再保存到文件nsqd.dat,算是把旧数据过滤了一遍

3. 新启动一个协程,调用nsqd.Main(),这个Main()是nsqd的核心,启动了nsqd的全部服务。除非遇到错误,否则永不退出

接下来看nsqd.Main()的内部实现,代码在nsq/nsqd/nsqd.go,代码如下(已加注释)

// nsqd主协程(内部启动tcp循环,http循环,https循环, 扫描队列池,和nsqlookupd循环),永不退出,除非严重错误
func (n *NSQD) Main() error {
	exitCh := make(chan error)
	var once sync.Once

	// 退出函数(独立协程运行,一直监听,遇到错误
	exitFunc := func(err error) {
		once.Do(func() {
			if err != nil {
				n.logf(LOG_FATAL, "%s", err)
			}
			exitCh <- err
		})
	}

	// TCP服务,独立协程
	n.waitGroup.Wrap(func() {
		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
	})

	// HTTP服务,独立协程
	if n.httpListener != nil {
		httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
		})
	}

	// HTTPS服务,独立协程
	if n.httpsListener != nil {
		httpsServer := newHTTPServer(n, true, true)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
		})
	}

	// 独立协程,抽检扫描各个队列
	n.waitGroup.Wrap(n.queueScanLoop)

	// 独立协程,和nsqlookupd的循环(连接和重连,心跳维持,topic,channel变化通知等)
	n.waitGroup.Wrap(n.lookupLoop)

	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}

	err := <-exitCh
	return err
}

对上面的代码解释下,nsqd.Main()主要干了6件事

1. 开一个新协程,启动tcp服务并一直监听,为客户端提供tcp服务。我们的客户端最常用,因为生产消息,中转消息,处理消息都是这里实现的

2. 开一个新协程,启动http服务并一直监听,为客户端提供htttp服务

3. 开一个新协程,启动https服务并一直监听,为客户端提供htttps服务

4. 开一个新协程,建立并维持扫描池,这些扫描协程会扫描所有channel的延迟队列,飞行队列,如果消息超时了就重新入队。很有意思的是,nsqd作者很大方地承认他抄袭了redis的抽检策略,内部实现也确实是类redis操作,这个我们后面再讲,todo

5. 开一个新协程,和nsqlookupd建立循环,主要是连接和重连,心跳维持,实时报告自己的topic和channel变化

6. 如果你配置了信息上报的地址,nsqd会再开一个新协程,做统计上报操作,统计topic,channel,消息,内存,GC等信息,通过UDP协议上报过去

五  本篇博客总结

1. 给大家看了nsq平台下代码整体结构,建议大家下载源码自己看下,加强印象

2. 讲了nsqd进程提供的功能实现

3. 跟踪了nsqd进程启动流程,最核心的nsqd.Main()建议大家仔细看,后面讲的nsqd内容也都是这几个协程里面干的活

下一篇博客我们开始详解分析nsqd内部各个协程的具体工作

todo文章来源地址https://www.toymoban.com/news/detail-630082.html

到了这里,关于nsqd的架构及源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Linux内核源码分析 1:Linux内核体系架构和学习路线

    好久没有动笔写文章了,这段时间经历了蛮多事情的。这段时间自己写了一两个基于不同指令集的 Linux 内核, x86 和 RISC-V 。期间也去做了一些嵌入式相关的工作,研究了一下 ARM 指令集架构。 虽然今年九月份我就要申请了,具体申请 AI 方向还是机器人、嵌入式、操作系统、

    2024年02月07日
    浏览(54)
  • 【ROS】fsd_algorithm架构学习与源码分析(致敬)

    😏 ★,° :.☆( ̄▽ ̄)/$: .°★ 😏 这篇文章主要介绍fsd_algorithm架构学习与源码分析(致敬)。 无专精则不能成,无涉猎则不能通。——梁启超 欢迎来到我的博客,一起学习,共同进步。 喜欢的朋友可以关注一下,下次更新不迷路🥞 ------------------ 叮叮叮!!! --------------

    2024年02月13日
    浏览(26)
  • 商城小程序源码开发流程、技术架构简述,商城系统前端代码分析

    小程序的兴起,为电商行业带来了全新的机遇。商城小程序源码的开发,是一项非常重要的技术工作。本文将从商城小程序的开发流程、技术架构、代码实现等方面进行讲解。 一、商城小程序的开发流程 商城小程序的开发流程包括需求分析、界面设计、后台开发、前端开发

    2024年02月10日
    浏览(45)
  • openxr runtime Monado 源码解析 源码分析:整体介绍 模块架构 模块作用 进程 线程模型 整体流程

    monado系列文章索引汇总: openxr runtime Monado 源码解析 源码分析:源码编译 准备工作说明 hello_xr解读 openxr runtime Monado 源码解析 源码分析:整体介绍 模块架构 模块作用 进程 线程模型 整体流程 openxr runtime Monado 源码解析 源码分析:CreateInstance流程(设备系统和合成器系统)C

    2024年02月11日
    浏览(83)
  • Day18:信息打点-小程序应用&解包反编译&动态调试&抓包&静态分析&源码架构

    目录 小程序获取-各大平台搜索 小程序体验-凡科建站模版测试上线 小程序抓包-ProxifierBurpSuite联动 小程序逆向-解包反编译动态调试架构 思维导图 章节知识点 Web:语言 / CMS / 中间件 / 数据库 / 系统 / WAF 等 系统:操作系统 / 端口服务 / 网络环境 / 防火墙等 应用: APP

    2024年04月09日
    浏览(42)
  • centos环境搭建nsq集群

    简言 下载 启动nsq(单节点)         1. 启动nsqd         2. 启动nsqlookupd         3. 启动nsqadmin 查看状态 1. nsq是go语言实现的分布式消息处理平台,类似我们常用的kafka,rocket mq等,目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结

    2024年02月16日
    浏览(34)
  • centos环境搭建nsq单点

    简言 下载 启动nsq(单节点)         1. 启动nsqd         2. 启动nsqlookupd         3. 启动nsqadmin 查看状态 1. nsq是go语言实现的分布式消息处理平台,类似我们常用的kafka,rocket mq等,目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结

    2024年02月16日
    浏览(23)
  • nsq中diskqueue详解 - 第一篇

     diskqueue是nsq消息持久化的核心,内容较多,故分为多篇 1. diskqueue第一篇 - 是什么,为什么需要它,整体架构图,对外接口 2. diskqueue第二篇 - 元数据文件,数据文件,启动入口,元数据文件的读写及保存 3. diskqueue第三篇 - 数据定义详解,运转核心ioloop()源码详解 4. diskqueue第

    2024年02月13日
    浏览(75)
  • Linux系统中怎么安装NSQ的Go语言客户端

    一、安装Go语言环境 在安装NSQ前,需先安装Go语言环境。在Linux系统中安装Go语言环境的步骤如下: 1.下载安装包 官方网站https://golang.org/dl/提供了各种操作系统的安装包,下载对应Linux系统的安装包。 2.解压安装包 将下载的安装包解压至Linux系统的指定目录,例如:解压至/u

    2024年02月16日
    浏览(53)
  • 基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

    场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中

    2024年02月14日
    浏览(67)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包