flink内存管理(三):MemorySegment内存使用场景:托管内存与网络内存

这篇具有很好参考价值的文章主要介绍了flink内存管理(三):MemorySegment内存使用场景:托管内存与网络内存。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在Flink内存模型中我们已经知道,Flink会将内存按照使用方式、内存类型分为不同的内存区域,底层会借助MemorySegment对内存块进行管理和访问,MemorySegment的使用场景有很多,本文我们主要看下ManagedMemory和NetworkBuffer是如何申请和使用MemorySegment内存块的。

一.ManagedMemory(算子)内存的申请与使用

1. tm内存申请与使用大致流程

Task使用的物理计算资源主要是TaskSlot提供的,TaskSlot由TaskManager中TaskSlotTable组件创建和管理。

  • 创建MemoryManager:JobManager申请到足够的Slot计算资源后,会在TaskSlotTable中创建相应的TaskSlot,然后对TaskSlot基本环境进行初始化,包括在TaskSlot内部创建MemoryManager组件。最终使用MemoryManager管理当前TaskSlot的内存计算资源。
  • task线程使用内存:当Task线程启动时,会直接从TaskSlot中获取MemoryManager组件申请内存空间。通过MemoryManager对MemorySegment内存空间进行管理,这一步对应内存模型中的ManagedMemory,也被称为托管内存。

 

2. 创建MemoryManager实例

在TaskSlot的构造器中调用createMemoryManager()方法创建MemoryManager实例,管理当前TaskSlot(代表一个线程的资源) 中的内存空间

/**
创建具有**给定**容量和给定页面大小的内存管理器。
这是 MemoryManager 的生产版本,一旦 MemoryManager 
的所有者准备好处置,它就会检查内存泄漏 ( verifyEmpty() )。

参数:
memorySize – 该内存管理器管理的堆外内存的总大小。 
pageSize – 内存管理器分配的页面大小。
**/
private static MemoryManager createMemoryManager(  
        ResourceProfile resourceProfile, int pageSize) {  
    return MemoryManager.create(resourceProfile.getManagedMemory().getBytes(), pageSize);  
}

在TaskSlot.createMemoryManager()方法中,会根据ResourceProfile参数获取内存空间大小,默认设置为非堆ing。其中pageSize参数就是MemorySegment的大小,如下代码默认为32kb。

TaskManagerOptions.
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)  
public static final ConfigOption<MemorySize> MEMORY_SEGMENT_SIZE =  
        key("taskmanager.memory.segment-size")  
                .memoryType()  
                .defaultValue(MemorySize.parse("32kb"))  
                .withDescription(  
                        "Size of memory buffers used by the network stack and the memory manager.");
                    

 

3. 算子使用通过MemoryManager使用内存

MemoryManager创建完毕后,会通过TaskSlot将MemoryManager对象传递给Task,此时Task会通过将MemoryManager封装在Environment变量中,然后传递给算子
算子接收到MemoryManager对象后,通过MemoryManager动态申请内存空间,最终用于算子的具体计算过程。

需要注意的是:并不是所有的算子都会使用MemoryManager申请内存空间,这个步骤主要针对批计算类型的算子,例如HashJoinOperator、SortMergeJoinOperator和SortOperator等,这些算子往往需要借助非常大的内存空间进行数据的排序等操作。

 

4. ManagedMemory内存空间申请流程

申请ManagedMemory内存空间,是调用MemoryManager.allocatePages()方法执行的,见如下逻辑。

  • 1)从AllocationRequest参数中获取MemorySegment的空集合、申请Pages总数量以及资源Owner(与内存关联的所有者:slot?还是算子?)等参数,并对参数进行非空和状态检查;
  • 2)计算申请内存大小,并预留出内存空间;
  • 3)根据page数、pageCleanup、owner等,开始分配内存,将内存以MemorySegment为单位,并维护一个set集合,最终返回给算子使用。
/**
从此内存管理器分配一组内存段。
分配的总内存不会超过构造函数中声明的大小限制。
参数:
owner – 与内存段关联的所有者,用于后备释放。 
target – 将分配的内存页放入其中的列表。 numberOfPages – 要分配的页数。
**/
public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages)  
        throws MemoryAllocationException {  
    // sanity check  
    Preconditions.checkNotNull(owner, "The memory owner must not be null.");  
    Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");  
    Preconditions.checkArgument(  
            numberOfPages <= totalNumberOfPages,  
            "Cannot allocate more segments %s than the max number %s",  
            numberOfPages,  
            totalNumberOfPages);  
  
    // reserve array space, if applicable  
    if (target instanceof ArrayList) {  
        ((ArrayList<MemorySegment>) target).ensureCapacity(numberOfPages);  
    }  
    //计算申请内存大小,并预留空间(以免申请过程中被用掉)
    long memoryToReserve = numberOfPages * pageSize;  
    try {  
        memoryBudget.reserveMemory(memoryToReserve);  
    } catch (MemoryReservationException e) {  
        throw new MemoryAllocationException(  
                String.format("Could not allocate %d pages", numberOfPages), e);  
    }  
    //创建pageCleanup方法用于清理unsafe内存
    Runnable pageCleanup = this::releasePage;  
    allocatedSegments.compute(  
            owner,  
            (o, currentSegmentsForOwner) -> {  
                Set<MemorySegment> segmentsForOwner =  
                        currentSegmentsForOwner == null  
                                ? new HashSet<>(numberOfPages)  
                                : currentSegmentsForOwner;  
                for (long i = numberOfPages; i > 0; i--) {  
                 //分配内存
                    MemorySegment segment =  
                            allocateOffHeapUnsafeMemory(getPageSize(), owner, pageCleanup);  
                    target.add(segment);  
                    segmentsForOwner.add(segment);  
                }  
                return segmentsForOwner;  
            });  
  
    Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");  
}					

如下如下算子会申请内存使用:
flink内存管理(三):MemorySegment内存使用场景:托管内存与网络内存,# flink源码,# flink 实战,flink,python,java

 

二.NetworkBuffer内存申请与使用

在Flink内存模型中,另外一个非常重要的堆外内存使用区域就是Network内存。Network内存主要用于网络传输中Buffer数据的缓冲区。

1. NetworkBuffer构造器

在NetworkBufferPool的构造器中可以看出,创建NetworkBufferPool时会根据用户配置的NetworkBuffer数量,调用MemorySegmentFactory创建相应的MemorySegment内存空间,再通过LocalBufferPool应用到ResultSubPartition或InputChannel组件中。

public NetworkBufferPool(
            int numberOfSegmentsToAllocate, int segmentSize, Duration requestSegmentsTimeout) {
        this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
        this.memorySegmentSize = segmentSize;

        Preconditions.checkNotNull(requestSegmentsTimeout);
        checkArgument(
                requestSegmentsTimeout.toMillis() > 0,
                "The timeout for requesting exclusive buffers should be positive.");
        this.requestSegmentsTimeout = requestSegmentsTimeout;

        final long sizeInLong = (long) segmentSize;

        try {
            this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate);
        } catch (OutOfMemoryError err) {
            throw new OutOfMemoryError(
                    "Could not allocate buffer queue of length "
                            + numberOfSegmentsToAllocate
                            + " - "
                            + err.getMessage());
        }

        try {
            //申请segment内存,并放到availableMemorySegments中。
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                availableMemorySegments.add(
                        MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
            }
        } catch (OutOfMemoryError err) {
        //如果申请过程中失败,则释放已申请的内存,算出缺少多少内存
            int allocated = availableMemorySegments.size();

            // free some memory
            availableMemorySegments.clear();

            long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
            long allocatedMb = (sizeInLong * allocated) >> 20;
            long missingMb = requiredMb - allocatedMb;

            throw new OutOfMemoryError(
                    "Could not allocate enough memory segments for NetworkBufferPool "
                            + "(required (MB): "
                            + requiredMb
                            + ", allocated (MB): "
                            + allocatedMb
                            + ", missing (MB): "
                            + missingMb
                            + "). Cause: "
                            + err.getMessage());
        }

        availabilityHelper.resetAvailable();
        //计算共申请了多少mb:20:为2的20次方
        long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

        LOG.info(
                "Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
                allocatedMb,
                availableMemorySegments.size(),
                segmentSize);
    }

 
参考:《Flink设计与实现:核心原理与源码解析》- 张利兵文章来源地址https://www.toymoban.com/news/detail-814055.html

到了这里,关于flink内存管理(三):MemorySegment内存使用场景:托管内存与网络内存的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》 学习笔记如下: Flink 中每条消息都会被放到网络缓冲(network buffer) 中,并以此为最小单位发送到下一个 subtask。 Flink 在传输过程的输入端和输出端使用了网络缓冲队列,即每个 subtask 都有一个输入队列来接收

    2024年01月21日
    浏览(54)
  • Flink 使用场景

    Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、K8s 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用

    2024年02月05日
    浏览(36)
  • Flink 内容分享(二十):这三种场景,建议使用Flink

    目录 01 事件驱动型应用 02 数据分析型应用 03 数据管道型应用 Flink的应用场景十分广泛,下面介绍3种常见的应用。 在许多场景中,需要处理的数据往往来自事件。小到一些交互式的用户行为,大到一些复杂的业务操作,它们都会被转化成一条条数据,进而形成数据流(事件

    2024年01月16日
    浏览(47)
  • Flink SQL Hive Connector使用场景

    目录 1.介绍 2.使用 2.1注册HiveCatalog 2.2Hive Read 2.2.1流读关键配置 2.2.2示例

    2024年02月06日
    浏览(42)
  • 聊一聊 Valgrind 监视非托管内存泄露和崩溃

    只要是程序总会出现各种莫名其妙的问题,比如:非托管内存泄露,程序崩溃,在 Windows 平台上一般用微软自家的官方工具 App Verifier 就可以洞察,那问题出在 Linux 上怎么办呢?由于 Linux 崇尚自由,需要在各种牛鬼蛇神写的非官方开源软件中寻找一个比较靠谱的,比如本篇所

    2024年02月02日
    浏览(84)
  • 动态内存malloc,calloc,realloc如何使用,使用场景及使用free释放内存时崩溃的原因

    目录 1.内存区域 2.void与void* 3.应用场景 4.malloc 5.calloc 6.realloc 7.free崩溃的原因 7.1引入 7.2具体原因 7.2.1越界 7.2.2指针移动 7.2.3重复释放同一段内存 局部变量 : 定义在函数内部的变量 , 包括形参 , 在栈 (stack) 中 , 作用域在函数内部有效 , 生存周期 : 进入函数创建, 退出函数销毁。

    2024年02月05日
    浏览(56)
  • PerfView 洞察C#托管堆内存 "黑洞现象"

    首先声明的是这个 黑洞 是我定义的术语,它是用来表示 内存吞噬 的一种现象,何为 内存吞噬 ,我们来看一张图。 从上面的 卦象图 来看,GCHeap 的 Allocated=852M 和 Committed=16.6G ,它们的差值就是 分配缓冲区=16G ,缓冲区的好处就是用空间换时间,弊端就是会实实在在的侵占内

    2024年02月16日
    浏览(50)
  • PerfView专题 (第十六篇): 如何洞察C#托管堆内存的 "黑洞现象"

    首先声明的是这个 黑洞 是我定义的术语,它是用来表示 内存吞噬 的一种现象,何为 内存吞噬 ,我们来看一张图。 从上面的 卦象图 来看,GCHeap 的 Allocated=852M 和 Committed=16.6G ,它们的差值就是 分配缓冲区=16G ,缓冲区的好处就是用空间换时间,弊端就是会实实在在的侵占内

    2024年02月16日
    浏览(53)
  • Docker网络(网络通信),资源控制(CPU优化,内存优化,磁盘优化),数据管理(数据卷,端口映射,容器互联)

    目录 docker网络 网络实现原理 网络实现实例 网络模式 查看Docker中的网络列表: 指定容器网络模式 模式详解 Host模式(主机模式): Container模式(容器模式): None模式(无网络模式): Bridge模式(桥接模式): 自定义网络: cpu优化概述 1. 资源限制: 2. CPU 实时调度策略:

    2024年01月16日
    浏览(80)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包