Flink流批一体计算(7):Flink优化

这篇具有很好参考价值的文章主要介绍了Flink流批一体计算(7):Flink优化。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

配置内存

设置并行度

操作场景

具体设置

补充

配置进程参数

操作场景

具体配置

配置netty网络通信

操作场景

具体配置

配置内存

Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大。可以通过监控GCGarbage Collection),评估内存使用及剩余情况来判断内存是否变成性能瓶颈,并根据情况优化。

监控节点进程的YARNContainer GC日志,如果频繁出现Full GC,需要优化GC

conf/flink-conf.yaml

env.java.opts: -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:+AlwaysPreTouch -server
-XX:+HeapDumpOnOutOfMemoryError

调整老年代和新生代的比值。env.java.opts配置项中添加参数:-XX:NewRatio。

如“-XX:NewRatio=2,则表示老年代与新生代的比值为2:1,新生代占整个堆空间的1/3,老年代占2/3。

设置并行度

操作场景

并行度控制任务的数量,影响操作后数据被切分成的块数。调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到最优。

查看CPU使用情况和内存占用情况,当任务和数据不是平均分布在各节点,而是集中在个别节点时,可以增大并行度使任务和数据更均匀的分布在各个节点。增加任务的并行度,充分利用集群机器的计算能力。

具体设置

任务的并行度可以通过以下四种层次(按优先级从高到低排列)指定,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。

  • 算子层次

一个算子、数据源和sink的并行度可以通过调用setParallelism()方法来指定,例如

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
  • 执行环境层次

Flink程序运行在执行环境中。执行环境为所有执行的算子、数据源、data sink定义了一个默认的并行度。

执行环境的默认并行度可以通过调用setParallelism()方法指定。例如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
  • 客户端层次

并行度可以在客户端将job提交到Flink时设定。对于CLI客户端,可以通过“-p”参数指定并行度。例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar
  • 系统层次

在系统级可以通过修改Flink客户端conf目录下的“flink-conf.yaml”文件中的“parallelism.default”配置选项来指定所有执行环境的默认并行度。

补充

开发Flink应用程序时,优化DataStream的数据分区或分组操作。

  • 当分区导致数据倾斜时,需要考虑优化分区。
  • 避免非并行度操作,有些对DataStream的操作会导致无法并行,例如WindowAll。
  • keyBy尽量不要使用String。

配置进程参数

操作场景

Flink on YARN模式下,有JobManagerTaskManager两种进程。在任务调度和运行的过程中,JobManagerTaskManager承担了很大的责任。

因而JobManagerTaskManager的参数配置对Flink应用的执行有着很大的影响意义。用户可通过如下操作对Flink集群性能做优化。

具体配置
  • 配置JobManager内存

JobManager负责任务的调度,以及TaskManager、RM之间的消息通信。当任务数变多,任务并行度增大时,JobManager内存都需要相应增大。

您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。

        在使用yarn-session命令时,添加“-jm MEM”参数设置内存。
        在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。

  • 配置TaskManager个数

每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。

  • 配置TaskManager Slot数

每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。

        在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。
        在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。

  • 配置TaskManager内存

TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。

        将在使用yarn-session命令时,添加“-tm MEM”参数设置内存。
        将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。

配置netty网络通信

操作场景

Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。

具体配置

以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。

  • taskmanager.network.netty.num-arenas:

默认是taskmanager.numberOfTaskSlots,表示netty的域的数量。

  • taskmanager.network.netty.server.numThreads和taskmanager.network.netty.client.numThreads:

默认是taskmanager.numberOfTaskSlots,表示netty的客户端和服务端的线程数目设置。

  • taskmanager.network.netty.client.connectTimeoutsec:

默认是120s,表示taskmanager的客户端连接超时的时间。

  • taskmanager.network.netty.sendReceiveBufferSize:

默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。

  • taskmanager.network.netty.transport:

默认为nio方式,表示netty的传输方式,有nio和epoll两种方式。文章来源地址https://www.toymoban.com/news/detail-517264.html

到了这里,关于Flink流批一体计算(7):Flink优化的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(4):Flink功能模块

    目录 Flink功能架构 Flink输入输出 Flink功能架构 Flink是分层架构的分布式计算引擎,每层的实现依赖下层提供的服务,同时提供抽象的接口和服务供上层使用。 Flink 架构可以分为4层,包括Deploy部署层、Core核心层、API层和Library层 部署层:主要涉及Flink的部署模式。Flink支持多种

    2024年02月10日
    浏览(50)
  • Flink流批一体计算(5):部署运行模式

    目录 集群运行模式 1.local模式 2.standalone模式 3.Flink on YARN模式 本地模式 Standalone 模式 Flink on Yarn 模式 集群运行模式 类似于 Spark , Flink 也有各种运行模式,其中主要支持三种: local 模式、 standalone 模式以及 Flink on YARN 模式。 每种模式都有特定的使用场景,接下来一起了解一

    2024年02月10日
    浏览(41)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(43)
  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(43)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(45)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(40)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(39)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(44)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(60)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包