Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例(dsm/toc接口备忘录)

这篇具有很好参考价值的文章主要介绍了Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例(dsm/toc接口备忘录)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

相关
《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
《Linux内存映射函数mmap与匿名内存块》
《Linux共享内存与子进程继承》

dsm/toc使用备忘

用dsm框架的流程

  1. 评估共享内存大小:多次用shm_toc_estimate_chunk、shm_toc_estimate_keys向estimate中增加数据结构,最后用shm_toc_estimate得出刚才增加的总大小。
  2. 申请共享内存:dsm_create
  3. 共享内存头部放kv管理结构toc:shm_toc_create,现在共享内存中可以使用toc接口,以kv的形式存放数据结构了。
  4. 例如需要存放FixedParallelState结构
    1. 第一步:在刚才申请的共享内存段中,申请一个起始地址:shm_toc_allocate
    2. 第二步:拿到地址后,转为FixedParallelState开始赋值。
    3. 第三步:最后调用toc接口shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps),将刚才的数据结构地址关联一个PARALLEL_KEY_FIXED,使用的时候用PARALLEL_KEY_FIXED查找即可fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false)

0 概念

数据结构含义:

  1. dsm_segment(动态共享内存段):
    • 每个后端进程可以通过使用dsm_segment来访问共享内存。dsm_segment包含了内存分配和释放等操作所需的信息,并提供了一组函数来管理和操作共享内存。每个会话(session)都可以有一个或多个dsm_segment。
    • 提供共享内存,dsm_segment表示申请的一个共享内存段,对于dsm api来说这是代表共享内存的最小单位。
  2. shm_mq_handle(共享内存消息队列句柄)
    • shm_mq_handle则是用于在共享内存中进行消息传递的句柄。它提供了一组函数来发送和接收消息,并提供了同步和互斥机制,确保多个进程之间的顺序和一致性。
    • 使用dsm_segment提供共享内存段做进程通信。

在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。

1 shm_toc初始化一段共享内存,共享内存是从哪来的?

在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。

初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。

struct shm_toc
{
	uint64		toc_magic;		/* Magic number identifying this TOC */
	slock_t		toc_mutex;		/* Spinlock for mutual exclusion */
	Size		toc_total_bytes;	/* Bytes managed by this TOC */
	Size		toc_allocated_bytes;	/* Bytes allocated of those managed */
	uint32		toc_nentry;		/* Number of entries in TOC */
	shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
};

初始化

shm_toc *
shm_toc_create(uint64 magic, void *address, Size nbytes)
{
	shm_toc    *toc = (shm_toc *) address;

	Assert(nbytes > offsetof(shm_toc, toc_entry));
	toc->toc_magic = magic;
	SpinLockInit(&toc->toc_mutex);

	/*
	 * The alignment code in shm_toc_allocate() assumes that the starting
	 * value is buffer-aligned.
	 */
	toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);
	toc->toc_allocated_bytes = 0;
	toc->toc_nentry = 0;

	return toc;
}

那么shm_toc_create用的内存是从哪来的?

2 动态mmap一段新的共享内存(dsm机制)

  1. Postgresql能看到很多dsm开头的函数,这类函数属于运行时动态申请共享内存模块( dynamic shared memory)。
  2. 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》介绍过,PG的共享内存是在启动时,直接用mmap 一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。

3 dsm申请共享内存应用实例:mq

在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:

src/test/modules/test_shm_mq

下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。

3.1 内存初始化:test_shm_mq_setup

void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
				  shm_mq_handle **output, shm_mq_handle **input)
{
	dsm_segment *seg;
	test_shm_mq_header *hdr;
	shm_mq	   *outq = NULL;	/* placate compiler */
	shm_mq	   *inq = NULL;		/* placate compiler */
	worker_state *wstate;

	/* Set up a dynamic shared memory segment. */
	setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
	*segp = seg;

	/* Register background workers. */
	wstate = setup_background_workers(nworkers, seg);

	/* Attach the queues. */
	*output = shm_mq_attach(outq, seg, wstate->handle[0]);
	*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);

	/* Wait for workers to become ready. */
	wait_for_workers_to_become_ready(wstate, hdr);

	/*
	 * Once we reach this point, all workers are ready.  We no longer need to
	 * kill them if we die; they'll die on their own as the message queues
	 * shut down.
	 */
	cancel_on_dsm_detach(seg, cleanup_background_workers,
						 PointerGetDatum(wstate));
	pfree(wstate);
}

第一步:申请共享内存setup_dynamic_shared_memory
static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,
							dsm_segment **segp, test_shm_mq_header **hdrp,
							shm_mq **outp, shm_mq **inp)
{
	shm_toc_estimator e;
	int			i;
	Size		segsize;
	dsm_segment *seg;
	shm_toc    *toc;
	test_shm_mq_header *hdr;

用estimator来评估需要多少共享内存(可以不用自己算了:)
评估器的接口:

  • shm_toc_initialize_estimator:初始化记录(key的个数和chunk的总大小)。
  • shm_toc_estimate_chunk:需要多大的chunk?调用一次记录一个指定大小的chunk,对齐到32字节上。
  • shm_toc_estimate_keys:需要几个key?这里记录一下
  • shm_toc_estimate:把之前记录的结构换算成大小

评估器的原理:记录key的个数和chunk的总大小即可。

	shm_toc_initialize_estimator(&e);
	shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
	for (i = 0; i <= nworkers; ++i)
		shm_toc_estimate_chunk(&e, (Size) queue_size);
	shm_toc_estimate_keys(&e, 2 + nworkers);
	segsize = shm_toc_estimate(&e);

评估完了开始用dsm_create申请新的共享内存了!

dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。

现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。

	/* Create the shared memory segment and establish a table of contents. */
	seg = dsm_create(shm_toc_estimate(&e), 0);

在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引

---------------- 共享内存起始地址
struct shm_toc
{
	uint64		toc_magic;				/* Magic number identifying this TOC */
	slock_t		toc_mutex;				/* Spinlock for mutual exclusion */
	Size		toc_total_bytes;		/* Bytes managed by this TOC */
	Size		toc_allocated_bytes;	/* Bytes allocated of those managed */
	uint32		toc_nentry;				/* Number of entries in TOC */
	shm_toc_entry toc_entry[0];
};
---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
toc_entry[0]   -------------\
----------------             |
toc_entry[1]                 |
----------------             |
toc_entry[2]  ---------\     |
----------------       |     |
data <-----------------/     |
...                          |
data                         |
...                          |
data <----------------------/
----------------
	toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
						 segsize);

申请一段用户自定义数据空间,放一个test_shm_mq_header结构。

	hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
	SpinLockInit(&hdr->mutex);
	hdr->workers_total = nworkers;
	hdr->workers_attached = 0;
	hdr->workers_ready = 0;

初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。

	shm_toc_insert(toc, 0, hdr);

	/* Set up one message queue per worker, plus one. */
	for (i = 0; i <= nworkers; ++i)
	{
		shm_mq	   *mq;

每个worker申请一个queue_size,按上面方法allocate+insert。

		mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
						   (Size) queue_size);
		shm_toc_insert(toc, i + 1, mq);

		if (i == 0)
		{

把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。

			/* We send messages to the first queue. */
			shm_mq_set_sender(mq, MyProc);
			*outp = mq;
		}
		if (i == nworkers)
		{
			/* We receive messages from the last queue. */
			shm_mq_set_receiver(mq, MyProc);
			*inp = mq;
		}
	}

	/* Return results to caller. */
	*segp = seg;
	*hdrp = hdr;
}
第二步:配置bgworker:setup_background_workers
static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{
	MemoryContext oldcontext;
	BackgroundWorker worker;
	worker_state *wstate;
	int			i;

	oldcontext = MemoryContextSwitchTo(CurTransactionContext);

setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:

------------------------------
typedef struct
{
	int			nworkers;
	BackgroundWorkerHandle *handle[];  //  flex数组,每个位置对应一个worker
	{
		int			slot;
		uint64		generation;
	}
} worker_state;
------------------------------
handle[0]    
------------------------------
handle[1]
------------------------------
handle[2]
------------------------------
	wstate = MemoryContextAlloc(TopTransactionContext,
								offsetof(worker_state, handle) +
								sizeof(BackgroundWorkerHandle *) * nworkers);
	wstate->nworkers = 0;
	
	on_dsm_detach(seg, cleanup_background_workers,
				  PointerGetDatum(wstate));

	/* Configure a worker. */

配通用的worker通用部分:

	memset(&worker, 0, sizeof(worker));
	worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
	worker.bgw_start_time = BgWorkerStart_ConsistentState;
	worker.bgw_restart_time = BGW_NEVER_RESTART;
	sprintf(worker.bgw_library_name, "test_shm_mq");
	sprintf(worker.bgw_function_name, "test_shm_mq_main");
	snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
	/* set bgw_notify_pid, so we can detect if the worker stops */
	worker.bgw_notify_pid = MyProcPid;

把worker通用的部分传进去,拼每个worker特有的部分

	/* Register the workers. */
	for (i = 0; i < nworkers; ++i)
	{

BackgroundWorkerData->slot数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]中并返回即可。

		if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
			ereport(ERROR,
					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
					 errmsg("could not register background process"),
					 errhint("You may need to increase max_worker_processes.")));
		++wstate->nworkers;
	}

	/* All done. */
	MemoryContextSwitchTo(oldcontext);
	return wstate;
}
第三步:剩余流程
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
				  shm_mq_handle **output, shm_mq_handle **input)
{
	dsm_segment *seg;
	test_shm_mq_header *hdr;
	shm_mq	   *outq = NULL;	/* placate compiler */
	shm_mq	   *inq = NULL;		/* placate compiler */
	worker_state *wstate;

	/* Set up a dynamic shared memory segment. */
	setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
	*segp = seg;

	/* Register background workers. */
	wstate = setup_background_workers(nworkers, seg);

第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle

	/* Attach the queues. */
	*output = shm_mq_attach(outq, seg, wstate->handle[0]);
	*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);

用取一个PID的方式检查worker是不是ready了。

	wait_for_workers_to_become_ready(wstate, hdr);

	cancel_on_dsm_detach(seg, cleanup_background_workers,
						 PointerGetDatum(wstate));
	pfree(wstate);
}

到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。文章来源地址https://www.toymoban.com/news/detail-633685.html

到了这里,关于Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例(dsm/toc接口备忘录)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C++——内存分配与动态内存管理

    🌸作者简介: 花想云 ,在读本科生一枚,致力于 C/C++、Linux 学习。 🌸 本文收录于 C++系列 ,本专栏主要内容为 C++ 初阶、C++ 进阶、STL 详解等,专为大学生打造全套 C++ 学习教程,持续更新! 🌸 相关专栏推荐: C语言初阶系列 、 C语言进阶系列 、 数据结构与算法 本章我们

    2023年04月17日
    浏览(45)
  • 内存动态分区分配算法

    所谓动态分区分配,就是指 内存在初始时不会划分区域,而是会在进程装入时,根据所要装入的进程大小动态地对内存空间进行划分,以提高内存空间利用率,降低碎片的大小 动态分区分配算法有以下四种: 1. 首次适应算法(First Fit) 空闲分区以 地址递增的次序链接 。分

    2024年02月11日
    浏览(38)
  • 动态分配内存与释放

    1.malloc malloc()可以找到一个大小合适的块。 内存是匿名的,也就是说,malloc()分配了内存,但没有为它指定名字。 格式如下: double*ptd; ptd=(double*)malloc(30*sizeof(double)); ps:ptd可以看成是一个数组。 malloc()可能分配不到所需的内存。在这种情况下,该函数返回空指针。

    2024年01月17日
    浏览(49)
  • 用指针实现内存动态分配

    导引 :已知:变量在使用前必须被定义且安排好存储空间。且变量有这么一些分类:全局变量、静态局部变量【它们的储存一般是在编译时确定,在程序开始执行前完成。】自动变量【在执行进入变量定义所在的复合语句时为它们分配存储,变量的大小也是静态确定的。临时

    2023年04月09日
    浏览(68)
  • C++类和动态内存分配

    C++能够在程序运行时决定内存的分配,而不是只在编译阶段,因此,就可以根据程序的需要,而不是根据一系列严格的存储类型规则来使用内存,C++使用new和delete运算符来动态控制内存,但是,在类中使用这些运算符会导致许多新的问题,在这种情况下,析构函数就是必不可

    2024年04月16日
    浏览(33)
  • 【进阶C语言】动态内存分配

    本章大致内容介绍: 1.malloc函数和free函数 2.calloc函数 3.realloc函数 4.常见错误案例 5.笔试题详解 6.柔性数组 1.malloc函数 (1)函数原型 函数参数: 根据用户的需求需要开辟多大的字节空间,为无符号的字节。 返回值: malloc函数成功开辟内存后,会返回该内存的起始地址,可

    2024年02月07日
    浏览(37)
  • C++ 指针进阶:动态分配内存

    malloc 是 stdlib.h 库中的函数,原型为 void *__cdecl malloc(size_t _Size); : 作用 : malloc 函数沿空闲链表(位于内存 堆空间 中)申请一块满足需求的内存块,将所需大小的内存块分配给用户剩下的返回到链表上; 并返回指向该内存区的首地址的指针,意该指针的类型为 void * ,因此

    2024年02月05日
    浏览(31)
  • 超详细——动态内存分配+柔性数组

    ☃️个人主页:fighting小泽 🌸作者简介:目前正在学习C语言和数据结构 🌼博客专栏:C语言学习 🏵️欢迎关注:评论👊🏻点赞👍🏻留言💪🏻 我们已经学会的内存开辟方式有:创建一个变量,创建一个数组 我们创建一个整形变量就会申请4个字节,创建个数组就会申请

    2023年04月15日
    浏览(33)
  • 数据结构:动态内存分配+内存分区+宏+结构体

    1.定义一个学生结构体,包含结构体成员:身高,姓名,成绩;定义一个结构体数组有7个成员,要求终端输入结构体成员的值,根据学生成绩,进行冒泡排序。 1.申请一个10个int类型的堆区空间,并实现选择排序(需要导入头文件 #include stdlib.h) 2.用##拼接带参宏的参数 3.宏函

    2024年02月20日
    浏览(34)
  • 深入挖掘C语言 ----动态内存分配

    开篇备忘录: \\\"自给自足的光, 永远都不会暗\\\" 正文开始 C语言提供了一个动态开辟内存的函数; 这个函数向内存申请一块连续可用的空间, 并返回指向这块空间的指针. 如果内存开辟成功, 则返回一个指向开辟好空间的指针 如果开辟失败, 则返回一个NULL指针, 因此malloc的返回值一

    2024年04月28日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包