DPDK系列之二十六缓冲Cache的管理

这篇具有很好参考价值的文章主要介绍了DPDK系列之二十六缓冲Cache的管理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Cache的用处

其实一直不想分析这个问题,主要是这个问题太多了。即使不学DPDK,计算机的原理和操作系统,内存型框架等等中都回避不了这个问题,包括多线程的伪共享也提到了这个问题。可以说这个问题是绕不开的,老生常谈谈得都糊了。
所以这里重点不谈Cache这个原理,书和网上都多得看不过来了。这里重点分析一下在DPDK是怎么样使用Cache的,也就是说DPDK的Cache有什么用处?
1、减少对内存锁的并发的冲突,目的也是为提高读写速度
2、提高读写速度

二、DPDK中的Cache处理

1、对Cache的支持
大页内存:命中率提高
DIDO:直接和硬件缓冲打交道,略过内存
TLB:TLB配合大页,仍然是提高Cache命中率的方法

2、预取指令
一般到Cache这个级别,都是硬件,最多OS操作处理一下,对上层一般是不开放的。但随着技术的发展,软件开发者也可以操作预取指令,同时底层也开放了这些软件预取的指令。DPDK就可以利用这个技术来处理Cache的数据加载,提高执行效率。但需要注意的是,如果自己也软件也要这样做,就得考虑好策略,别到时候儿画虎不成反类其犬。
预取指令一般是汇编命令,但有些程序也提供了封装的上层API库。
在DPDK中,为了和数据处理保持时钟周期的匹配,即达到最大效率,就必须保证数据都可以在Cache中,否则性能会严重下降。而这个预取指令就是为了配合这个命中采用的一种手段。当然,在DPDK中还有其它的手段同样也可以达到这个目的,结果只有一个,让处理和数据读取保持一致。

3、DPDK中预取一致性处理
现代计算机基本都是多核或者多CPU的,DPDK如何处理当不同的核去访问同一Cache的冲突呢。也就是说,如何保证数据的一致性呢?解决的方法很简单粗暴,直接每个核给单独一个Cache。这样读写只操作自己的数据队列,就不会有冲突的问题。但是,这也带来了数据的最终一致性问题,这个就需要设计来搞定了。尽量避免冲突,如果非要有,那么只好加锁或者用一些协议来解决。
另外为了保持冲突最小化Cache Line(Cache最小单元)直接在分配时对齐。这算是另外一个手段。换句话说,尽量保证能在一个Cache Line的不让他们分成两个。
那么一致性协议有几种呢:
目录协议( Directory-based protocol ) 和总线窥探协议 ( Bus snoopingprotocol),这里不展开,有兴趣可以去查查资料。

三、源码分析

下面看一下Cache的相关源码:

//Ring
struct rte_ring {
	/*
	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
	 * next time the ABI changes
	 */
	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
	int flags;               /**< Flags supplied at creation. */
	const struct rte_memzone *memzone;
			/**< Memzone, if any, containing the rte_ring */
	uint32_t size;           /**< Size of ring. */
	uint32_t mask;           /**< Mask (size-1) of ring. */
	uint32_t capacity;       /**< Usable size of ring */

	char pad0 __rte_cache_aligned; /**< empty cache line */

	/** Ring producer status. */
	struct rte_ring_headtail prod __rte_cache_aligned;
	char pad1 __rte_cache_aligned; /**< empty cache line */

	/** Ring consumer status. */
	struct rte_ring_headtail cons __rte_cache_aligned;
	char pad2 __rte_cache_aligned; /**< empty cache line */
};

// librte_eal/common/include/rte_common.h
/** Force alignment to cache line. */
#define __rte_cache_aligned __rte_aligned(RTE_CACHE_LINE_SIZE)
#define __rte_aligned(a) __attribute__((__aligned__(a)))  

在基础的数据结构中经常可以看到__rte_cache_aligned这个宏,它其实就是对Cache Line的一种处理对齐方式。
再看一下为每个核的配置数据结构定义:

struct lcore_conf {
	uint16_t nb_rx_queue;
	struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
	uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
	struct buffer tx_mbufs[RTE_MAX_ETHPORTS];
	struct ipsec_ctx inbound;
	struct ipsec_ctx outbound;
	struct rt_ctx *rt4_ctx;
	struct rt_ctx *rt6_ctx;
	struct {
		struct rte_ip_frag_tbl *tbl;
		struct rte_mempool *pool_dir;
		struct rte_mempool *pool_indir;
		struct rte_ip_frag_death_row dr;
	} frag;
} __rte_cache_aligned;//总是行对齐,防止跨Cache Line
static struct lcore_conf lcore_conf[RTE_MAX_LCORE];

RTE_MAX_LCORE是当前的最大核心数量,通过编号来控制对核心的访问,避免出现多个核心访问同一个数据结构的问题。同样,现代的网卡一般都支持多队列网卡,DPDK为其准备多个读写队列来应对这些问题,如果从前面的安装过来的读者可能会想起在安装时的配置。
看一下收的预取:

uint16_t
ixgbe_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
		uint16_t nb_pkts)
{
	struct ixgbe_rx_queue *rxq;
	volatile union ixgbe_adv_rx_desc *rx_ring;
	volatile union ixgbe_adv_rx_desc *rxdp;
	struct ixgbe_rx_entry *sw_ring;
	struct ixgbe_rx_entry *rxe;
	struct rte_mbuf *rxm;
	struct rte_mbuf *nmb;
	union ixgbe_adv_rx_desc rxd;
	uint64_t dma_addr;
	uint32_t staterr;
	uint32_t pkt_info;
	uint16_t pkt_len;
	uint16_t rx_id;
	uint16_t nb_rx;
	uint16_t nb_hold;
	uint64_t pkt_flags;
	uint64_t vlan_flags;

	nb_rx = 0;
	nb_hold = 0;
	rxq = rx_queue;
	rx_id = rxq->rx_tail;
	rx_ring = rxq->rx_ring;
	sw_ring = rxq->sw_ring;
	vlan_flags = rxq->vlan_flags;
	while (nb_rx < nb_pkts) {
		/*
		 * The order of operations here is important as the DD status
		 * bit must not be read after any other descriptor fields.
		 * rx_ring and rxdp are pointing to volatile data so the order
		 * of accesses cannot be reordered by the compiler. If they were
		 * not volatile, they could be reordered which could lead to
		 * using invalid descriptor fields when read from rxd.
		 */
		rxdp = &rx_ring[rx_id];
		staterr = rxdp->wb.upper.status_error;
		if (!(staterr & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
			break;
		rxd = *rxdp;

		/*
		 * End of packet.
		 *
		 * If the IXGBE_RXDADV_STAT_EOP flag is not set, the RX packet
		 * is likely to be invalid and to be dropped by the various
		 * validation checks performed by the network stack.
		 *
		 * Allocate a new mbuf to replenish the RX ring descriptor.
		 * If the allocation fails:
		 *    - arrange for that RX descriptor to be the first one
		 *      being parsed the next time the receive function is
		 *      invoked [on the same queue].
		 *
		 *    - Stop parsing the RX ring and return immediately.
		 *
		 * This policy do not drop the packet received in the RX
		 * descriptor for which the allocation of a new mbuf failed.
		 * Thus, it allows that packet to be later retrieved if
		 * mbuf have been freed in the mean time.
		 * As a side effect, holding RX descriptors instead of
		 * systematically giving them back to the NIC may lead to
		 * RX ring exhaustion situations.
		 * However, the NIC can gracefully prevent such situations
		 * to happen by sending specific "back-pressure" flow control
		 * frames to its peer(s).
		 */
		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
			   "ext_err_stat=0x%08x pkt_len=%u",
			   (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
			   (unsigned) rx_id, (unsigned) staterr,
			   (unsigned) rte_le_to_cpu_16(rxd.wb.upper.length));

		nmb = rte_mbuf_raw_alloc(rxq->mb_pool);
		if (nmb == NULL) {
			PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
				   "queue_id=%u", (unsigned) rxq->port_id,
				   (unsigned) rxq->queue_id);
			rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;
			break;
		}

		nb_hold++;
		rxe = &sw_ring[rx_id];
		rx_id++;
		if (rx_id == rxq->nb_rx_desc)
			rx_id = 0;

		/* Prefetch next mbuf while processing current one. */
		rte_ixgbe_prefetch(sw_ring[rx_id].mbuf);

		/*
		 * When next RX descriptor is on a cache-line boundary,
		 * prefetch the next 4 RX descriptors and the next 8 pointers
		 * to mbufs.
		 */
		if ((rx_id & 0x3) == 0) {
			rte_ixgbe_prefetch(&rx_ring[rx_id]);
			rte_ixgbe_prefetch(&sw_ring[rx_id]);
		}

		rxm = rxe->mbuf;
		rxe->mbuf = nmb;
		dma_addr =
			rte_cpu_to_le_64(rte_mbuf_data_iova_default(nmb));
		rxdp->read.hdr_addr = 0;
		rxdp->read.pkt_addr = dma_addr;

		/*
		 * Initialize the returned mbuf.
		 * 1) setup generic mbuf fields:
		 *    - number of segments,
		 *    - next segment,
		 *    - packet length,
		 *    - RX port identifier.
		 * 2) integrate hardware offload data, if any:
		 *    - RSS flag & hash,
		 *    - IP checksum flag,
		 *    - VLAN TCI, if any,
		 *    - error flags.
		 */
		pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.wb.upper.length) -
				      rxq->crc_len);
		rxm->data_off = RTE_PKTMBUF_HEADROOM;
		rte_packet_prefetch((char *)rxm->buf_addr + rxm->data_off);
		rxm->nb_segs = 1;
		rxm->next = NULL;
		rxm->pkt_len = pkt_len;
		rxm->data_len = pkt_len;
		rxm->port = rxq->port_id;

		pkt_info = rte_le_to_cpu_32(rxd.wb.lower.lo_dword.data);
		/* Only valid if PKT_RX_VLAN set in pkt_flags */
		rxm->vlan_tci = rte_le_to_cpu_16(rxd.wb.upper.vlan);

		pkt_flags = rx_desc_status_to_pkt_flags(staterr, vlan_flags);
		pkt_flags = pkt_flags |
			rx_desc_error_to_pkt_flags(staterr, (uint16_t)pkt_info,
						   rxq->rx_udp_csum_zero_err);
		pkt_flags = pkt_flags |
			ixgbe_rxd_pkt_info_to_pkt_flags((uint16_t)pkt_info);
		rxm->ol_flags = pkt_flags;
		rxm->packet_type =
			ixgbe_rxd_pkt_info_to_pkt_type(pkt_info,
						       rxq->pkt_type_mask);

		if (likely(pkt_flags & PKT_RX_RSS_HASH))
			rxm->hash.rss = rte_le_to_cpu_32(
						rxd.wb.lower.hi_dword.rss);
		else if (pkt_flags & PKT_RX_FDIR) {
			rxm->hash.fdir.hash = rte_le_to_cpu_16(
					rxd.wb.lower.hi_dword.csum_ip.csum) &
					IXGBE_ATR_HASH_MASK;
			rxm->hash.fdir.id = rte_le_to_cpu_16(
					rxd.wb.lower.hi_dword.csum_ip.ip_id);
		}
		/*
		 * Store the mbuf address into the next entry of the array
		 * of returned packets.
		 */
		rx_pkts[nb_rx++] = rxm;
	}
	rxq->rx_tail = rx_id;

	/*
	 * If the number of free RX descriptors is greater than the RX free
	 * threshold of the queue, advance the Receive Descriptor Tail (RDT)
	 * register.
	 * Update the RDT with the value of the last processed RX descriptor
	 * minus 1, to guarantee that the RDT register is never equal to the
	 * RDH register, which creates a "full" ring situtation from the
	 * hardware point of view...
	 */
	nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);
	if (nb_hold > rxq->rx_free_thresh) {
		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
			   "nb_hold=%u nb_rx=%u",
			   (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
			   (unsigned) rx_id, (unsigned) nb_hold,
			   (unsigned) nb_rx);
		rx_id = (uint16_t) ((rx_id == 0) ?
				     (rxq->nb_rx_desc - 1) : (rx_id - 1));
		IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);
		nb_hold = 0;
	}
	rxq->nb_rx_hold = nb_hold;
	return nb_rx;
}

上面的预取写得很清楚,看一下定义,源码中有三类平台的,这里只看X86的:

static inline void rte_prefetch0(const volatile void *p)
{
	asm volatile ("prefetcht0 %[p]" : : [p] "m" (*(const volatile char *)p));
}

static inline void rte_prefetch1(const volatile void *p)
{
	asm volatile ("prefetcht1 %[p]" : : [p] "m" (*(const volatile char *)p));
}

static inline void rte_prefetch2(const volatile void *p)
{
	asm volatile ("prefetcht2 %[p]" : : [p] "m" (*(const volatile char *)p));
}

static inline void rte_prefetch_non_temporal(const volatile void *p)
{
	asm volatile ("prefetchnta %[p]" : : [p] "m" (*(const volatile char *)p));
}

其它预取的可以搜索rte_packet_prefetch,都OK了。这里不再赘述。

四、总结

其实从上面分析来看,不管采用何种手段,目的只有一个,流水线作业要尽量保证流水不停不乱。这样,就可以高效率的生产和处理数据。这种数据处理型的软件框架,最重视的就是这些,只要数据保持了流水按照意图前进,就达到了设计目的。
毕竟,只要有了外在的干预,这个干预时间对CPU来说就是一个超长的时间周期,那么效率已经就不再乎了。而在没有干预的情况下,就必须保证数据最大的流水。比如下载网络数据,在线观看视频,在线视频会议等等。
其实这也说明了一点,软件设计重点看应用场景,包括一些框架,比如Redis这种,它也清晰的定位了自己在应用中的场景,所以才如此之火。那么,我们从中可以学习到什么呢?不言而喻吧。文章来源地址https://www.toymoban.com/news/detail-617436.html

到了这里,关于DPDK系列之二十六缓冲Cache的管理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 软件设计模式系列之二十二——状态模式

    状态模式是一种行为型设计模式,它允许对象在内部状态发生改变时改变其行为,使得对象的行为看起来像是改变了其类。状态模式将对象的状态抽象成一个独立的类,让对象在不同状态下具有不同的行为,而且可以在运行时切换状态。这种方式使得状态的管理更加清晰,避

    2024年02月08日
    浏览(31)
  • 软件设计模式系列之二十三——策略模式

    策略模式(Strategy Pattern)是一种行为型设计模式,它允许在运行时动态选择算法的行为。这意味着你可以定义一系列算法,将它们封装成独立的策略对象,然后根据需要在不修改客户端代码的情况下切换这些算法。策略模式有助于解决问题领域中不同行为的变化和扩展,同时

    2024年02月08日
    浏览(27)
  • 软件设计模式系列之二十——备忘录模式

    备忘录模式是一种行为型设计模式,它允许我们在不暴露对象内部细节的情况下捕获和恢复对象的内部状态。这个模式非常有用,因为它可以帮助我们实现撤销、恢复和历史记录等功能。在本文中,我们将深入探讨备忘录模式的各个方面,包括定义、示例、结构、实现步骤、

    2024年02月08日
    浏览(29)
  • 软件设计模式系列之二十四——模板方法模式

    在软件设计领域,设计模式是一组被反复使用、多次实践验证的经典问题解决方案。其中,模板方法模式是一种行为型设计模式,用于定义一个算法的骨架,将算法中的某些步骤延迟到子类中实现,从而使子类可以重新定义算法的某些特定步骤,同时保持算法的整体结构不变

    2024年02月08日
    浏览(35)
  • 软件设计模式系列之二十一——观察者模式

    观察者模式(Observer Pattern)是一种行为型设计模式,它允许对象之间建立一对多的依赖关系,当一个对象的状态发生变化时,所有依赖于它的对象都会得到通知并自动更新。这个模式也被称为发布-订阅模式,因为它模拟了一个主题(发布者)与多个观察者(订阅者)之间的

    2024年02月08日
    浏览(42)
  • 软件设计模式系列之二十五——访问者模式

    访问者模式(Visitor Pattern)是一种强大的行为型设计模式,它允许你在不改变被访问对象的类的前提下,定义新的操作和行为。本文将详细介绍访问者模式,包括其定义、举例说明、结构、实现步骤、Java代码实现、典型应用场景、优缺点、类似模式以及最后的小结。 访问者

    2024年02月08日
    浏览(74)
  • Iceberg从入门到精通系列之二十二:Spark DDL

    要在 Spark 中使用 Iceberg,请首先配置 Spark 目录。 Iceberg 使用 Apache Spark 的 DataSourceV2 API 来实现数据源和目录。 Spark 3 可以使用 USINGiceberg 子句在任何 Iceberg 目录中创建表: Iceberg会将Spark中的列类型转换为对应的Iceberg类型。详细信息请查看创建表的类型兼容性部分。 PARTITIONE

    2024年02月19日
    浏览(31)
  • WPF入门教程系列二十六——DataGrid使用示例(3)

    WPF入门教程系列目录 WPF入门教程系列二——Application介绍 WPF入门教程系列三——Application介绍(续) WPF入门教程系列四——Dispatcher介绍 WPF入门教程系列五——Window 介绍 WPF入门教程系列十一——依赖属性(一) WPF入门教程系列十五——WPF中的数据绑定(一)   五、DataGrid的DataG

    2024年02月06日
    浏览(30)
  • 【Android从零单排系列二十六】《Android视图控件——ScrollView》

    目录 前言 一 ScrollView基本介绍 二 ScrollView使用方法 三 ScrollView常见属性及方法 四 ScrollView简单案例 五 总结 小伙伴们,在上文中我们介绍了Android视图组件RecyclerView,本文我们继续盘点,介绍一下视图控件的ScrollView。 ScrollView是Android平台上的一个可滚动视图容器,它用于在一

    2024年02月12日
    浏览(26)
  • Oracle数据库从入门到精通系列之二十一:Oracle 19c数据库增加重做日志大小

    在此最佳实践中,我们增加了 Oracle 重做日志的大小以提高数据库性能。适当调整数据库重做日志的大小可以减少数据库中的等待事件,从而优化数据库系统。 数据库重做日志至少包含两个文件,负责存储对数据库所做的任何更新。重做日志对于数据库至关重要,因为它们可

    2024年04月26日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包