概述
Flink是一个对有界和无界数据流进行有状态计算的分布式处理引擎和框架,既可以处理有界的批量数据集,也可以处理无界的实时流数据,为批处理和流处理提供了统一编程模型,其代码主要由 Java 实现,部分代码由 Scala实现。
Flink以REST资源的形式和外部进行交互,所以可以集成在所有常见的集群资源管理环境中运行,同时提供计算状态的容错及持久化机制,基于Event(事件)驱动并行化在集群中运行,理论上可以利用无限数量的CPU,内存,磁盘和网络IO,实现任意规模的计算任务。
Flink提供一系列状态功能易于维护非常大的应用程序状态,通过异步和增量检查点算法可确保对处理延迟的影响降至最低且同时保证精确语义的状态一致性。
Flink提供了诸多抽象层的API以便用户编写分布式任务:DataSet API、DataStream API、Table API等,可支持多种编程语言,例如:Java、Python、Go、Scala等。
流处理应用主要关注的维度是流数据、数据流转状态、数据延迟到达、时间、容错处理等核心内容,所以流处理框架必须要对上面几个维度进行合理支持。
Flink通过分布式存储、状态后台存储、时间语义细分、窗口机制、检查点、保存点、Stateful Stream Processing、Watermark等机制来完成流处理应用的全方位支持。
Flink核心是一个流式的数据流执行引擎,能够基于同一个Flink运行时,同时对多个Flink客户端应用提供支持流处理和批处理两种类型应用。
Flink集群遵循一主(Master)多从(Worker)的常规架构模式, 主要包括四个组成部分:Client、Master(宿主JobManager)、Worker(宿主TaskManger)、资源管理器(ResourceManager)<外部依赖>。
下面从如下几个方面介绍下其相关理论:
目录
概述
一、架构
二、核心知识点
1、核心组件
1.1、分布式存储
1.2、运行时(Core)
1.3、抽象API
1.4、集成高阶应用
1.5、组件间通讯
1.6、客户端应用
2、基本概念
2.1、JobManager (后续迭代更名为:JobMaster)
2.2、TaskManager
2.3、分发器(Dispatcher)
2.4、资源管理器(ResourceManager)
2.5、Client
2.6、状态(State)
2.9、时间语义
2.10、窗口机制
2.11、水位线(Watermark)机制
2.12、容错机制
2.13、数据倾斜
3、编程模型
3.1、Fink内存模型
3.2、数据流
3.3、数据结构
4、计算过程
5、整体运行过程
三、部署方式
四、优缺点分析
缺点
优点
五、常见应用场景
六、调优经验
七、API应用:
java平台:(1.16以上)
DoNet平台:
一、架构
二、核心知识点
1、核心组件
Flink主要有下面一些核心组件构成,包括:分布式存储、运行时(Core)、抽象API、集成应用、 组件间通讯、 客户端应用
1.1、分布式存储
支持内存和磁盘操作,提供实时计算状态后端存储、检查点及保存点后端存储等数据管理功能,支持本地文件、Kafka、 Hadoop HDFS、RocksDB等存储方式
1.2、运行时(Core)
运行时(Runtime)为Flink各类计算提供了实现,是Flink最底层也是最核心的组件。
1.3、抽象API
抽象API包括SQL & Table API、DataStream、DataSet、Stateful Stream Processing等。
SQL & Table API:支持以SQL相同语义的方式进行流处理(Table API)和批处理(SQL)且产生相同的结果。除了基本查询外, 它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的复杂查询需求。
DataStream:是Flink数据处理的核心API,支持处理有界和无界数据流。
DataSet:是Flink数据处理的核心API,支持处理有界数据流。
Stateful Stream Processing:是Flink最低级别的抽象通过一些Process Function对外服务,基于此可以封装一些功能即服务(FaaS)函数(是一种在无状态容器中运行的事件驱动型计算执行模式,不需要独立宿主后台,类似于函数直接调用),
具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。
1.4、集成高阶应用
复杂事件处理(CEP):
CEP(Complex Event Processing) 是Flink提供的复杂事件处理库,支持流式处理, 是一种基于动态环境中事件流的分析技术,它可以让你在无界流或有界流中检测出特定的数据,提取数据中重要的那部分。
事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。
机器学习计算库(ML):
ML(Machine Learning)是Flink提供的机器学习库,支持批处理。
图计算库(Gelly):
Gelly(Graph Processing)是Flink提供的图计算库,支持批处理。
1.5、组件间通讯
Flink 的所有组件都基于 Actor System 来进行通讯。Actor system是多种角色的 actor 的容器,它提供调度,配置,日志记录等多种服务,并包含一个可以启动所有 actor 的线程池,如果 actor 是本地的,则消息通过共享内存进行共享,
但如果 actor 是远程的,则通过 RPC 来传递共享消息。
ActorSystem 是 基于 akka 实现的一个通信模块,负责节点之间的通信,如 Client 和 JobManager 之间,JobManager 和 TaskManager 之间的通信;
1.6、客户端应用
是Flink的具体应用实现部分,在此进行作业定义、连接数据源、依据业务设计转换、聚合、输入、输出等操作,然后交由Flink计算,最后得到目标结果,支持本地文件、Kafka、 Hadoop HDFS、云端存储等多种形式。
2、基本概念
2.1、JobManager (后续迭代更名为:JobMaster)
JobManager 是管理节点,负责协调管理Flink作业,一个作业由一个 JobManager 来负责(一对一)。主要有下面一些职责:
a、接受Flink客户端提交的应用(Application),包括JAR、数据流图 StreamGraph(DAG),作业图JobGraph(优化过的DAG)等
b、将 JobGraph 拆封成 Task 并转化为 执行图(ExecutionGraph),包含了所有可以并发执行的任务
c、向集群资源管理器申请计算资源,调度分发计算任务(ExecutionGraph)到计算节点(TaskManager)
d、管理多个TaskManager,包括收集作业的状态信息、协调生成检查点、保存点,必要时进行故障恢复(Failover)等
e、基于ActorSystem时刻和 TaskManager 保持心跳
2.2、TaskManager
TaskManager 包括下面一些核心概念:TaskManager、Task和Sub Task、并行度、算子链、Task Slot、数据交换管理模块(Network Manager)等。
TaskManager:是执行计算任务的节点,每个 TaskManager 提供一定数量的槽(Slot)。一个Flink作业一般会分布到多个 TaskManager 上执行,计算任务是在Slot上执行的,槽是最小粒度的计算资源单位。主要有下面一些职责:
a、启动时将TaskManager所辖的Slot资源向ResourceManager注册
b、和 JobManager 交互心跳
c、和其他TaskManager进行数据交换,依赖Network Manager管理
d、管理Slot并启动线程池执行JobManager分发的计算任务
e、向JobManager报给任务执行状态
f、协同 JobManager 执行检查点、保存点、状态存储、故障恢复、任务重试等管理任务
任务(Task):是一个实际执行的物理图,一个Task是Flink运行时工作的最基本的单元, Task封装了一个并行的Operator算子或者一个Operator Chain算子链。
Flink 中的执行图可以分成四层:StreamGraph ->JobGraph -> ExecutionGraph -> 物理执行图。
逻辑流图(StreamGraph): 这是根据用户通过 DataStream API 编写的代码生成的最初的 DAG 图,用来表示程序的拓扑结构。这一步一般在客户端完成。
作业图(JobGraph): StreamGraph 经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。
主要的优化为: 将多个符合条件的节点链接在一起 合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph 一般也是在客 户端生成的,在作业提交时传递给 JobManager。
执行图(ExecutionGraph):JobManager 收到 JobGraph 后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分, 并明确了任务间数据传输的方式。
物理图(Physical Graph): JobManager 生成执行图后, 会将它分发给 TaskManager;各个 TaskManager 会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。
这只是具体执行层面的图,并不是一个具体的数据结构。 物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager 就可以对传递来的数据进行处理计算了。 一共有 5 个并行子任务,最终需要 5 个线程来执行。
子任务(SubTask):是负责处理一部分数据流任务,Sub-Task强调的是并行执行相同的算子Operator或者算子链Operator Chain,经过算子操作把Task分拆成多个Sub Task。
并行度(Parallelism):表示一个特定算子的子任务(subtask)的个数。把一个算子操作“复制”多份到多个节点, 数据来了之后就可以到其中任意一个上执行。一个算子任务就被拆分成了多个并行的 “子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。 一般情况下,一个流应用的并行度,可以认为就是其所有算子中最大的并行度,一个程序中, 不同的算子可能具有不同的并行度。
设置并行度有下面四种方式:
1)、操作算子层面(Operator Level)
在代码中的Operator中调用setParallelism(int) 方法设置
2)、执行环境层面(Execution Environment Level)
在代码中执行环境env中调用setParallelism 方法
3)、客户端层面(Client Level)
在通过flink提交任务加入 -p 参数指定并行度 或 WEB-UI中显示指定的并行度
4)、系统层面(System Level)
在flink配置文件中指定的默认并行度parallelism.default: 1
优先级:算子层面>环境层面>客户端层面>系统层面,建议采用在代码中特定的Operator显式调用setParallelism方法。
算子链(Operator Chain):将并行度(Parallelism)相同的一对一(one to one)算子操作直接链接在一起形成一个任务然后去执行。算子操作间除了一对一(one to one)的关系,还有由keyBy和并行度改变引起的重分区(Redistributing)关系。
合并Operator成为一个Operator Chain 有两个必要的条件:
1)、Operator 并行度必须一致,不能出现Redistribution
2)、one-to-one 操作的Operator数据流维护数据的顺序与分区,上游算子的操作可以直接流入下游算子
任务槽(Task Slots):
Slot 是计算资源调度的最小单位,Slot 的数量限制了 TaskManager 能够并行处理的任务数量,一个TaskManager启动时就静态的划定了Slot的数量,可以通过参数taskmanager.numberOfTaskSlots控制大小。
每一个Slot 表示在 TaskManager 上一个固定大小的计算资源,用来处理独立的子任务,如: TaskManager 的Slot为3, TaskManager 会将计算资源平均分配成3份,每个Slot独占一份,不需要跟其他的作业进行竞争内存资源。
Slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 Slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争,这也是开发环境默认并行度设为机器 CPU 核心数的原因。
任务槽默认开启子任务共享,Flink 是允许子任务共享 Slot 的,只要属于同一个作业,那么对于不同任务节点的并行子任务,就可以放到同一个 Slot 上执行。
如上图:所以对于第一个任务节点 source→map,它的 6 个并行子任务必须分到不同的 slot 上(如果在 同一 Slot 就没法数据并行了),而第二个任务节点 keyBy/window/apply 的并行子任务却可以和第一个任务节点共享 Slot。
Slot共享有下面两个好处:
1)、不同的任务可能耗时不一样,如果每一个任务独占一个Slot,可能某一个任务只是简单的接收数据,而另一个任务是对数据的加工处理,此时单纯接收数据的任务会浪费 Slot 中的资源,
也会导致下游任务的积压,忙的忙死,闲的闲死,此时利用了Slot共享的机制可以避免资源的浪费。
2)、允许我们保存完整的作业管道,这样一来,即使某个 TaskManager 出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行。
数据交换管理模块(Network Manager):
在一个运行的Flink应用Application中,它的Tasks间需要持续的交换数据,数据传递过程由 TaskManager 负责,实际就是数据交换管理模块(Network Manager)。
Network Manager组件首先从缓冲buffer中收集records,然后再以batch的形式发送,高效使用网络资源并提高吞吐量。
每个TaskManager有一组网络缓冲池(默认每个buffer是32KB),用于发送与接受数据,如发送端和接收端位于不同的TaskManager进程中,则它们需要通过操作系统的网络栈进行交流。
流应用需要以管道的模式进行数据交换,每对TaskManager会维持一个永久的TCP连接用于做数据交换。
大数据计算在计算过程中都需要对中间计算结果不停的进行“Shuffle”处理,直至计算出最终目标结果。
在shuffle连接模式下(多个sender与多个receiver),每个sender task需要向每个receiver task传递数据,此时TaskManager需要为每个receiver task都分配一个缓冲区。如下图:
在上图中,有四个sender 任务,对于每个sender,都需要有至少四个network buffer用于向每个receiver发送数据,每个receiver都需要有至少四个buffer用于接收数据。
若sender与receiver任务都运行在同一个TaskManager进程,则sender任务会将发送的条目做序列化,并存入一个字节缓冲,然后将缓冲放入一个队列,直到队列被填满;Receiver任务从队列中获取缓冲,并反序列化输入的条目,
所以,在同一个TaskManager内,任务之间的数据传输并不经过网络交互。
TaskManager之间的buffer以I/O多路复用的方式使用同一网络连接,为了提供平滑的数据管道型的数据交换,一个TaskManager必须能提供足够的缓冲,以服务所有并行的出入数据处理。
I/O多路复用是一种共享网络连接的通讯控制机制,需要操作系统层面的支持,一般三种类型:select(Windows)、poll(Linux)、epoll(Linux),其他支持的平台也都类属于这几种模式。
2.3、分发器(Dispatcher)
Dispatcher 主要负责提供一个表述性状态转移(Representational State Transfer)REST式的接口,以超文本传输协议(Hyper Text Transfer Protocal)HTTP的形式来对外提供服务,所以Flink框架可以很容易的和其他资源管理框架集成。
Dispatcher可以接收多个作业,每接收一个作业,Dispatcher都会为这个作业分配一个JobManager,同时启动一个 Web UI用来方便地展示和监控作业执行的信息。
Dispatcher 在架构中并不是必需的, 在有些部署模式下可能不会应用到。
2.4、资源管理器(ResourceManager)
ResourceManager 主要负责计算资源的分配和管理,在 Flink 集群中只有一个,Flink 通过 ResourceManager 模块来统一处理资源分配,所以,可以很容易的部署在其他集群资源管理框架中,例如:YARN、K8S、Mesos等。
在 Flink 中,计算资源的基本单位是 TaskManager 上的任务槽位(Task Slot,简称槽位Slot)。ResourceManager 的职责主要是从YARN等资源提供方获取计算资源,
当 JobManager 有计算需求时,将空闲的 Slot 分配给 JobManager,当计算任务结束时,ResourceManager 还会重新收回这些 Slot。
在 Flink 实际执行任务时,每一个任务(Task)都需要分配到一个 Slot 上执行。
2.5、Client
Client是指用于提交Flink应用的客户端,Flink 作业在哪台机器上面提交,那么当前机器称之为Client,Flink集群提供的客户端工具是 Flink 主目录下 bin/flink,使用方式 bin/flink run 详见具体命令操作规范。
Flink应用会通过Flink提供的一系列算子操作来表达具体的技术逻辑,构建出整个计算任务的数据流图 DataFlow Graph并打包成JAR,然后通过Client提交给Flink集群去安排执行,Client需要从用户提交的Flink程序配置中获取Flink集群的JobManager的地址。
Client在接收到提交的Flink应用(一些JAR包)后,会找到JAR包中的 Main 方法,创建Flink运行环境,把数据流图 DataFlow Graph 转化为 JobGraph (DAG有向无环图表示的作业),连通JAR包一起提交给Flink集群的 JobManager,
如果成功提交会返回一个 JobClient,用来和 JobManager 通信获得任务执行的状态。
这里说的 JobGraph 其实就是在 Flink UI 界面上看到的有向无环图DAG,如下图:
另外,JobGraph 也是对集群组件的一个解耦过程,不管什么程序最终都生成 JobGraph ,JobGraph 作为客户端和 JobManager 提交的数据规范。
2.6、状态(State)
概念:在流计算中,状态(State)是一个比较宽泛的概念,泛指计算的中间结果,根据是否需要保存中间结果,分为无状态(Stateless)计算和有状态(Stateful)计算,
在流计算实际应用场景中,事件是持续不断地产生,如果每次计算都是相互独立的,不依赖于上下游的事件,则是无状态计算;如果计算需要依赖于之前或者后续的事件,则是有状态计算。
无状态(Stateless)计算:每一个进入的记录独立于其他记录。不同记录之间没有任何关系,它们可以被独立处理和持久化。例如:map、fliter、静态数据 join 等操作。
有状态(Stateful)计算:处理进入的记录依赖于之前记录处理的结果,需要维护不同数据处理之间的中间结果,每一个进入的记录都可以读取和更新该中间结果。例如:独立键的聚合计数、去重等等。
在Flink中,状态具有如下一些分类:
托管状态、算子状态、列表状态、联合列表状态、广播状态、键控状态、值状态、映射状态、归约状态、聚合状态、原始状态等。
算子状态:作用范围是算子,算子的多个并行实例各自维护一个状态。
键控状态:每个分组维护一个状态。
State管理:
应用状态是 Flink 中的重中之重,Flink 提供了许多状态管理相关的特性支持,具体包括多种状态基础类型、StateBackend、状态数据的备份和恢复、状态数据的划分和动态扩容、状态数据的清理、状态重分布、状态功能等。
1)、多种状态基础类型:
Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)、映射(map)等,开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
2)、状态数据的存储和访问:
State的存储在Flink中叫作StateBackend,负责管理应用程序状态,并在需要的时候进行 checkpoint、savepoint等。
StateBackend的特点:
a、持久化到外部存储
b、在计算过程中可被访问
c、支持精确一次语义
d、基于插件形式设计,支持自定义扩展
e、弹性伸缩的状态应用,支持超大数据量状态存储
f、状态数据支持容错
StateBackend的存储方式(支持分布式存储):
1)、纯内存:MemoryStateBackend,适用于验证、测试,不推荐生产环境。
2)、内存+文件:FsStateBackend,适用于长周期大规模的数据。
3)、RocksDB:RocksDBStateBackend,适用于长周期大规模的数据,RocksDB 是一种高效的嵌入式、持久化键值存储引擎。
在Flink内部,状态存储迭代发展过程:
1.13前 本地状态存哪里 Checkpoint存哪里
内存 TaskManager的内存 JobManager内存
文件 TaskManager的内存 HDFS
RocksDB RocksDB HDFS
1.13后 本地状态 Checkpoint
哈希表 TaskManager的内存 JobManager内存/HDFS
RocksDB RocksDB HDFS
3)、状态数据的备份和恢复:通过对状态数据的缓存,支持建立 checkpoint、savepoint,做到对状态数据的容错管理。
4)、状态数据的划分和动态扩容:支持多种基础状态数据类型用于对应用数据进行划分,基于插件形式实现的StateBackend,支持外部存储及自定义存储,很容易进行动态扩容。
5)、状态数据的清理:
状态数据过大影响性能,同时可能会引发内存溢出(OOM)异常,有些中间状态在用过后可能不再需要,为了减少存储的压力及对存储性能的影响,需要清理状态数据。
如果定时清理State,则存在可能因为State被清理而导致计算结果不完全准确的风险,为了保障Flink应用的健壮性,需要精确的把控存储控制,并提供State清理策略,涉及清理策略时,可参考下面几个关注点:
a、过期时间:超过多长时间未访问,视为State过期,类似于缓存。
b、过期时间更新策略:创建和写时更新、读取和写时更新,参见缓存和DB同步的“延迟双删机制”。
c、State的可见性控制:未清理可用,超期则不可用。
6)、状态重分布:
在Flink中,中间状态包括很多类型,但大部分是算子状态(Operator State)和基于Key分组状态(Keyed State),在从checkpoint、savepoint、算子转换、key分组后,由于改变了并行度(Parallelism)会导致状态重分布(Redistribute)。
7)、状态功能:
状态功能是Flink提供的一种API,表示存在于代表实体的多个实例中的一小段逻辑或代码,通过消息调用函数进行应用,状态功能用于简化构建分布式状态应用的管理过程,支持持久化且保证精确一次语义,可分为2种:有状态功能、无状态功能。
有状态功能特点:
a、有状态的:函数具有嵌入式的容错状态,可以像变量一样在本地访问。
b、虚拟的:就像FaaS(稍后介绍)一样,函数不会占用资源,不活动的函数不会消耗CPU、内存。
无状态功能特点:
a、无状态的:函数具有嵌入式的容错状态,可以像变量一样在本地访问。
b、虚拟的:就像FaaS(稍后介绍)一样,函数不会占用资源,不管是否处于活动状态。
为了进行高效的管理状态功能,可以将一些通用的、特定应用的状态功能抽取出来形成服务,供状态功能API应用,抽取出的这些服务就是“功能即服务(FaaS)”。
功能即服务(FaaS):是一种在无状态容器中(即无服务器)运行的事件驱动型计算执行模式,这些功能将利用服务来管理服务器端逻辑和状态,具体应用可以直接调用该服务来实现预期目标,不再需要专门提供支持该功能服务的基础架构。
由于FaaS是基于事件驱动而非资源,所以很容易进行扩展、能够快速启动和运行、无限复用,从而优化管控资源利用。
在现实中,随着软硬件及技术的不断迭代更新,现代应用基本有3种解放方案混合而成:功能、微服务、长期运行的管理服务,其中功能、微服务大部分可由FaaS来实现,极大的降低了应用研发的时间和成本。
FaaS非常适合大数据量的交易,经常发生的工作负载,例如报表生成,图像处理、任何计划任务。常见的FaaS用例包括数据处理、loT服务、移动、Web应用等。可使用FaaS构建完全无服务器化的应用,也可以打造部分无服务器,部分传统微服务组件的应用。
下面简单介绍下Flink无服务器架构构建的运行时,见下图:
Flink基于FaaS理念设计的运行时,具有以下设计原则:
a、逻辑计算/状态并置:消息、状态访问/更新和功能调用被紧密地管理在一起。这确保了开箱即用的高度一致性。
b、物理计算/状态分离:功能可以远程执行,消息和状态访问作为调用请求的一部分提供。这样,可以像无状态流程一样管理功能,并支持快速扩展,滚动升级和其他常见的操作模式。
c、语言独立性:函数调用使用简单的基于HTTP/gRPC的协议,因此可以轻松地此各种语言实现函数。
这使得可以在 Kubernetes部署,FaaS平台或微服务执行功能,同时在功能之间提供一致的状态和轻量级消息的传递。
FaaS理念具有下面一些优势:
a、提高开发人员的生产率并缩短开发时间
b、不负责服务器管理
c、易于扩展,且横向扩展由平台管理
d、仅在需要时消耗资源或支付付费
e、几乎可以用任何编程语言来编写功能
f、应用程序由具有多种功能的模块组成,可以与一些模块任何交互:
g、精确一次语义:状态和消息传递齐头并进,提供了一次精确的消息/状态语义。
h、逻辑录址:函数通过逻辑地址相互传递消息。无需发现服务。
i、动态和循环消息:消息传递模式无需预先定义为数据流(动态),也不限于DAG(循环)。
使用FaaS理念是通过无服务器架构来构建应用的方法之一,但随着无服务器模式的日渐普及,开发人员正在寻找支持构建无服务器微服务、无服务功能、无状态容器的解决方案。以下是FaaS的一些常见事列:
AWS Lambda
Google云功能
Microsoft Azure 功能(开源)
OpenFass(开源)
2.9、时间语义
时间是流处理应用的另一个重要组成部分,在流应用中,事件总是在特定时间点发生,所以大多数的事件流都拥有事件本身所固有的时间语义,而且一些操作算子都基于时间语义。例如窗口聚合、会话计算、模式检测、基于时间的 join等。
Flink从三个方面来对时间进行处理和衡量,一个是事件时间模式;一个是迟到事件处理模式;一个是处理时间模式。
事件时间模式:根据事件本身自带的时间戳进行结果的计算。因此,无论处理的是历史记录的事件还是实时的事件,事件时间模式的处理总能保证结果的准确性和一致性。
迟到事件处理模式:Flink 引入了 Watermark(稍后介绍) 的概念,用以衡量事件时间的进展,处理迟到的数据。Watermark 也是一种平衡处理延时和完整性的灵活机制。
当以带有 Watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达,这样的事件被称为迟到事件。Flink针对迟到数据进行单独处理,提供了如下三种策略:
1)、重新激活已关闭的窗口并进行重新计算修正结果
2)、将迟到事件收集起来单读处理
3)、丢弃迟到事件
Flink默认处理方式为【丢弃迟到事件】
将迟到事件收集起来单读处理:可以将未收入窗口的迟到数据,放入侧输出流”(side output)进行另外的处理。
所谓的侧输出流,相当于是数据流的一个“分支”, 这个流中单独放置那些错过了该上的车、本该被丢弃的数据,在进行最终计算时,可拿出来参与最终的计算。
处理时间模式:根据处理引擎的机器时钟触发计算,一般适用于有着严格的低延迟需求,并且能够容忍近似结果的流处理应用。针对处理时间模式,Flink提供了三时间语义:
事件时间(Event Time):数据本身携带的时间,这个时间是指事件产生时的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
摄取时间(Ingestion Time):数据进入 Flink 的时间,
处理时间(Processing Time):执行具体操作算子的本地机器时间,与机器相关,默认的时间属性就是Processing Time。
2.10、窗口机制
概念:Flink 是一种流式计算引擎,主要是来处理无界数据流的,在处理数据前需要将无界数据流切割成有界的“数据块”进行处理,这种切割机制就是所谓切分“窗口”(Window),在 Flink 中,窗口是用来处理无界流的处理的核心。
窗口分类:
a、按驱动类型来分:时间窗口、计数窗口。
1)、时间窗口:以时间点到来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据,到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。
窗口大小 = 结束时间 - 开始时间,Flink中有一个专门的TimeWindow类来表示时间窗口,这个类只有两个私有属性,表示窗口的开始和结束的时间戳,单位为毫秒。
2)、计数窗口:基于数据的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。计数窗口理解简单,只需指定窗口大小,就可以把数据分配到对应的窗口中,Flink内部对应的类来表示计数窗口,底层通过全局窗口(Global Window)实现。
b、按窗口分配数据的规则来分:滚动窗口、滑动窗口、会话窗口、全局窗口。
1)、滚动窗口(Tumbling Windows):滚动窗口有固定的大小,对数据进行“均匀切分”,首尾相接,每个数据都会被分配到一个窗口,而且只会属于一个窗口。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。
计算规则:
start=按照数据的事件时间向下取窗口长度的整数倍(timestamp-(timestamp-offset+windowSize)%windowSize)
end=start+size
比如开了一个10s的滚动窗口,第一条数据是857s,那么它属于[850s,860s)
2)、滑动窗口(Sliding Windows):滑动窗口的大小固定,但窗口之间不是首尾相接,而有部分重合。滑动窗口可以基于时间定义、也可以基于数据个数定义,定义滑动窗口的参数有两个:窗口大小,滑动步长。
滑动步长是固定的,且代表了两个窗口开始/结束的时间间隔,数据分配到多个窗口的个数 = 窗口大小/滑动步长。
3)、会话窗口(Session Windows):基于“会话”(session)来对数据进行分组,会话窗口只能基于时间来定义,“会话”终止的标志就是隔一段时间(size)没有数据来。
size:两个会话窗口之间的最小距离。我们可以设置静态固定的size,也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔gap的值。
在Flink底层,对会话窗口有比较特殊的处理:每来一个新的数据,都会创建一个新的会话窗口,然后判断已有窗口之间的距离,如果小于给定的size,就对它们进行合并操作。在Winodw算子中,对会话窗口有单独的处理逻辑。
会话窗口的长度不固定、起始和结束时间不确定,各个分区窗口之间没有任何关联。会话窗口之间一定是不会重叠的,且会留有至少为size的间隔。
4)、全局窗口(Global Windows):把相同 key 的所有数据都分配到同一个窗口中,这种窗口全局有效,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。
如果希望它能对数据进行计算处理, 还需要自定义触发器(Trigger)(稍后介绍)。
窗口计算过程:
在对数据进行窗口划分前,要先确定窗口是按键分区窗口(Keyed)还是非按键分区(Non-Keyed)。
按键分区窗口(Keyed Windows):基于按键分区(Keyed)的数据流KeyedStream来开窗,在调用窗口算子之前有keyBy操作,数据会被聚合。
非按键分区(Non-Keyed Windows):直接在没有按键分区的DataStream上开窗,在调用窗口算子之前没有keyBy操作,数据不会被聚合,还是原始明细数据。
窗口的操作主要包括:窗口分配器、窗口函数、触发器、移除器、允许延迟、将迟到的数据放入侧输出流。
1)、窗口分配器(Window Assigners):窗口分配器是构建窗口算子的第一步,定义数据如何分配给窗口。具体细分如下(除去自定义外的全局窗口外,其它常用的类型Flink都给出了内置的分配器实现)
时间窗口
a、滚动处理时间窗口应用样例
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
b、滑动处理时间窗口应用样例
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
c、处理时间会话窗应用样例
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
d、滚动事件时间窗口应用样例
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
e、滑动事件时间窗口应用样例
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
f、事件时间会话窗口应用样例
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
计数窗口:底层是全局窗口,Flink为我们提供了非常方便地接口:直接调用countWindow()方法,根据分配规则的不同,又可以分为滚动计数、滑动计数窗口。
a、滚动计数窗口应用样例
stream.keyBy(...)
.countWindow(10)
b、滑动计数窗口应用样例
stream.keyBy(...)
.countWindow(10,3)
全局窗口:使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。应用样例如下:
stream.keyBy(...)
.window(GlobalWindows.create());
2)、窗口函数(Window Functions):定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数、全窗口函数
增量聚合函数:每来一条数据立即进行增量计算,保留中间计算结果状态,待窗口结束时输出聚合结果。 典型的增量聚合函数有两个:ReduceFunction、AggregateFunction。
a、归约函数(ReduceFunction):将窗口收集到的数据两两进行归约,实现增量式的聚合,要求输入、聚合、输出的数据类型一致。
b、聚合函数(AggregateFunction):将窗口收集到的数据进行累加,实现增量式的聚合,要求输入、聚合、输出的数据类型可以不一致。
全窗口函数(Full Window Functions):先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。全窗口函数也有两种:WindowFunction和ProcessWindowFunction
a、窗口函数(WindowFunction)应用样例:当窗口到达结束时间需要触发计算时,就会调用的apply方法。
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
b、处理窗口函数(ProcessWindowFunction):是 Window API 中最底层的通用窗口函数接口,他可以获取到一个“上下文对象”(Context)。这个上下文对象不仅能够获取窗口信息,还可以访问当前的时间和状态信息,这里的时间就包括了处理时间和事件时间水位线。
c、增量聚合和全窗口结合:
增量聚合的优点:高效,输出更加实时
全窗口的优点:提供更多的信息,支持自定义的窗口操作。
在实际应用中,我们往往希望兼具这两者的优点,,结合使用。
3)、触发器(Trigger):控制在什么条件下触发进行窗口数据计算,用来定义具体的触发计算逻辑,调用trigger()方法,就可以传入一个自定义的窗口触发器。
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器,不同的窗口类型有不同的实现,对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。
4)、移除器(Evictor):控制从窗口中移除数据,用来定义移除某些数据的逻辑,实现evictor()方法,就可以传入一个自定义的移除器(Evictor),Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。
5)、允许延迟(Allowed Lateness):在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线 (watermark)并不一定能保证时间戳更早的所有数据不会再来。
Flink一个了一个特殊的接口,可以为窗口算子设置一个“运行的最大延迟”,也就是说我们可以设定允许延迟一段时间。
水位线 = 窗口结束时间 + 延迟时间,应用样例如下:
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))
6)、将迟到的数据放入侧输出流:Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入侧输出流”(side output)进行另外的处理。
所谓的侧输出流,相当于是数据流的一个“分支”, 这个流中单独放置那些错过了该上的车、本该被丢弃的数据,在进行最终计算时,可拿出来参与最终的计算。
窗口生命周期:
1)、窗口的创建:窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好, 而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。
2)、 窗口计算的触发:通过窗口分配器约定窗口后,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。 窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是 指定调用窗口函数的条件。
3)、 窗口的销毁:一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。 这时窗口的销毁可以认为和触发计算是同一时刻。
这里需要注意,Flink 中只对时间窗口 (TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw) 实现的,而全局窗口不会清除状态,所以就不会被销毁。
在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点 是窗口的结束时间加上用户指定的允许延迟时间。
2.11、水位线(Watermark)机制
在流处理应用中,数据流在流转过程中会因为某些原因(数据源分区、网络延迟、反压(亦称背压)、failover)造成数据乱序到达,所以基于事件时间去处理数据,会有延迟到达的数据产生。
为了保证能够正确处理全部窗口数据,需要引入一种机制确保全部的窗口数据被正确处理,这种机制俗称:水位线(WaterMark)。
WaterMark是一个很好的机制,能够衡量 Event Time 的进展,可以设定延迟触发保证即使部分事件数据延迟到达,也不会影响窗口计算的正确性。
Watermark主要属性就是时间戳,可以理解一个特殊的数据,插入到流里面,然后依据一定的规则(周期性(默认200ms)、间歇性)进行更新Watermark, 直到水位线上涨到它所在的记录然后触发窗口计算。
Watermark = 当前最大事件时间戳 - 最大乱序时间(偏移量即延迟等待时间)。
虽然设置有Watermark来保证窗口数据能够被全部正确处理,但实际应用当中,由于种种原因很难把控准确设置Watermark延迟偏移量,所以实际通过Watermark很难准确把控全部窗口数据被处理。
为此Flink还引入另外一种机制(将迟到的数据放入侧输出流)来确保窗口的数据被全部正确处理,该情况在上面的窗口计算过程处已经介绍过。
2.12、容错机制
Flink通过提供检查点、保存点、状态持久化存储(内部或外部分布式)等手段实现容错处理。一般在应用处理时,都需要保证处理状态的一致性,包括分布式存储、消息队列、批处理计算、流处理计算、事务处理等。
状态一致性:强调的是结果一致性且正确,有三种语义级别:最多一次、至少一次、精确一次。
最多一次(AT-MOST-ONCE):数据最多被处理一次, 当任务发生故障时,数据就会丢掉。
至少一次(AT-LEAST-ONCE):数据至少被处理一次,当任务发生故障时,可以重复处理,数据不会丢掉。
精确一次(EXACTLY-ONCE): 数据只能被处理一次,最严格的一致性保证,意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。
在完整的流处理应用中,需要保证端到端的状态一致性,一般来讲流处理应用包括三个阶段:数据源提供、流处理过程、计算结果sink到外部系统,所以,要做到精确一次(EXACTLY-ONCE)语义,需要这三个阶段都能做到。
数据源提供:需要数据源提供方能够支持精确一次(EXACTLY-ONCE)语义,例如:Kafka。
流处理过程:Flink内部通过检查点机制已经实现了精确一次(EXACTLY-ONCE)语义支持
计算结果sink到外部系统:该过程需要sink处理器及分布式存储系统都支持精确一次(EXACTLY-ONCE)语义。
sink处理器一般通过“幂等写入机制”实现精确一次(EXACTLY-ONCE)语义,具体一些幂等方法有:数据库唯一性、Zookeeper唯一性、Redis分布式锁等。
分布式存储系统一般通过“事务写入机制”实现精确一次(EXACTLY-ONCE)语义,具体一些事务方法有:预写日志(WAL)和两阶段提交(2PC)方式、3阶段提交(改进型2PC)、Paxos算法、ZAB协议、Raft算法等。
下面重点介绍下Flink的容错实现原理:
1)、检查点(CheckPoint):检查点是Flink实现容错的核心保障,它通过周期性的对算子状态快照的持久化存储,在程序发生错误时,有选择的把程序恢复到某一个(比如最近的)检查点,从该检查点可以开始重新执行计算,进而实现数据的容错。
a、检查点算法:
分界线(亦称屏障)(Barrier):在数据流中插入一个特殊的数据结构, 数据流通过Barrier被切分成一个一个的数据段,每个数据段就是Checkpoint持久化存储的实际内容,专门用来表示触发检查点保存的时间点。
在不同的状态一致性语义下,有不同的检查点处理算法:
精确一次(EXACTLY-ONCE):采用“异步分界线快照”(Asynchronous Barrier Snapshotting)算法,要求Barrier对齐(需要等到所有并行分区 的Barrier都到齐,才可以开始状态的保存)。
至少一次(AT-LEAST-ONCE):不需要Barrier对齐,遇到Barrier直接开始保存。
但在实际应用中,Barrier在等待对齐的时候,可能需要缓存下一个检查点要保存的内容,在作业出现反压(背压)时可能会引起一些异常。所以,在Flink 1.11 之后提供了exactly-once模式下不对齐的检查点保存方式。
b、检查点启动策略:
自动检查点恢复: 固定间隔定期恢复(Fixed delay)、 失败率(Failure rate)、直接失败(No restart)
手动检查点恢复:有应用端根据应用逻辑自动控制
c、检查点主要配置:
间隔:1min~10min,3min;
模式:exactly-once(默认),at-least-once(因为一些异常原因可能导致某些barrier无法向下游传递,造成job失败,对于一些时效性要求高、精准性要求不是特别严格的指标,可以设置为至少一次);
超时 : 参考间隔, 0.5~2倍之间, 建议0.5倍;
最小等待间隔:上一次ck结束 到 下一次ck开始 之间的时间间隔,设置间隔的0.5倍;
最大并发检查点数量:用于指定运行中的检查点最多可以有多少个;
不对齐检查点:不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时 间。这个设置要求检查点模式(CheckpointingMode)必须为 exctly-once,并且并发的检查点 个数为 1;
开启外部持久化存储:DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。 RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点;
2)、保存点(SavePoint):原理和算法与检查点完全相同,只是多了一些额外的元数据,保存点就是通过检查点的机制来创建的流作业执行状态的一致性镜像(Consistent Image)。
保存点中的状态快照,是以算子 ID 和状态名称组织起来的,相当于一个键值对,从保存点启动应用程序时,Flink 会将保存点的状态数据重新分配给相应的算子任务。
3)、检查点和保存点的差异:
从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异,具体有下面一些异同:
Checkpoint 是自动触发的,Checkpoint需要命令行触发或者web控制台触发,由人工或特殊应用(独立于当前应用)控制;
Checkpoint 的主要目的是为意外失败的作业提供恢复机制;
Checkpoint 的生命周期由 Flink 管理(也支持人工控制);
Checkpoint 特点:轻量级创建、尽可能快地恢复;
Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。(例如:升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等);
Checkpoint 在应用运行过程中存在,应用停止后不再存在(可配置为保留),Savepoint 在应用停止后继续存在;
Checkpoint 和 Savepoint 的当前实现基本上使用相同的算法并生成相同的格式。
Checkpoint 主要用于应用执行过程中的容错处理,Savepoint 主要用于作业停止后的保障处理,以免重新启动后还得全量重新计算,浪费时间和资源;
2.13、数据倾斜
概念:计算任务收到的数据分布不均匀,大量数据聚集到单个任务,这种情况称谓:数据倾斜。
产生情况:
1)、数据源 source发生倾斜
应对方案:让 Flink 任务强制进行 shuffle或调整并发度,使用 shuffle、rebalance 或 rescale 算子即可将数据均匀分配。
2)、keyBy后的(无窗口)数据倾斜
应对方案:通过不同的机制,调整key的分布,打散key,具体有如下方式:
a、在原来分区 key/uid 的基础上,加上随机的前缀或者后缀;
b、使用数据到达的顺序seq,作为分区的key;
3)、keyBy后的(无窗口)聚合操作数据倾斜
应对方案:在聚合前,先进行预聚合,本质还是调整key分布,打散key,具提有如下方式:
a、 预聚合:加盐局部聚合,在原来的 key 上加随机的前缀或者后缀
b、两阶段聚合:在预聚合后再加盐局部聚合,然后再去盐全局聚合
产生的影响:
1)、引起反压(背压)异常
2)、数据单点问题
3)、导致JVM频繁进行 GC
4)、吞吐下降、延迟增大
5)、严重时可导致应用崩溃,甚至操作系统崩溃
3、编程模型
3.1、Fink内存模型
JobManager 内存模型
TaskManager 内存模型
3.2、数据流
在自然环境中,数据的产生原本就是基于时间不停的产生(流式),但分析数据时,需要以一定模型来组织数据,目前是按有界流(bounded)或 无界流(unbounded)两种模型来组织,选择不同的模型,程序的执行和处理方式也会不同。
有界流(bounded):具有定义的开始和结束,可以通过在执行任何计算之前提取所有数据来处理有界流,有界流数据始终可以排序,对应的流处理称为批处理。
无界流(unbounded):有一个起点,但没有定义的终点。它们不会终止并在生成数据时提供数据,无界流必须被连续处理,即事件在被摄取后必须被及时处理,无法等待所有输入数据到达,
处理无边界数据通常需要以特定顺序(例如事件发生的顺序)来摄取事件,一遍能够推断出结果的完整性。
3.3、数据结构
在Flink内部划定了一些关键数据结构,用于支撑整个计算过程的开展,具体如下:
1)、逻辑流图(StreamGraph): 这是根据用户通过 DataStream API 编写的代码生成的最初的有向无环图( DAG),用来表示程序的拓扑结构。StreamGraph一般在客户端完成。
2)、作业图(JobGraph): StreamGraph 经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。JobGraph一般也是在客 户端生成的,在作业提交时传递给 JobManager。
3)、执行图(ExecutionGraph):JobManager 收到 JobGraph 后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分, 并明确了任务间数据传输的方式。
4)、物理图(Physical Graph): JobManager 生成执行图后, 会将它分发给 TaskManager;各个 TaskManager 会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。
物理图只是具体执行层面的图,并不是一个具体的数据结构,主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式,有了物理图,TaskManager 就可以对传递来的数据进行处理计算了。
4、计算过程
大数据计算基本原则:遵循”移动数据不如移动计算“理念,MapReduce、Spark、Flink都遵循这一基本原则,在中间计算过程中都需对数据进行“Shuffle”处理,然后才能得到最终的计算结果。
Shuffle过程:(无论是MapReduce、Spark还是Flink,都要实现Shuffle)
a、Shuffle就是处理数据从Map Tasks的输出到Reduce Tasks输入的这段过程。
b、Map Tasks的output向着Reduce Tasks的输入input映射的时候,并非节点一一对应的,在节点A上做map任务的输出结果,可能要分散跑到reduce节点A、B、C、D ,就好像shuffle的字面意思“洗牌”一样,
这些map的输出数据要打散然后根据新的路由算法(比如对key进行某种hash算法),发送到不同的reduce节点上去。
c、将最后阶段的reduce结果汇集在一起得到计算任务最终结果。
MapReduce计算过程:
根据MapReduce应用代码生成任务执行计划,然后持续迭代Map和Reduce两个过程的结果,直至全部任务计算完成,Shuffle过程需要多次IO磁盘,计算速度比较慢。下图描述了下MapReduce的Shuffle过程:
MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先partition、key对中间结果进行排序合并,因为其输入数据可以通过外部得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)
Spark计算过程:大数据系列——Spark理论
Flink计算过程:
1)、Flink应用会通过Flink提供的一系列算子操作来表达具体的技术逻辑,构建出整个计算任务的数据流图 DataFlow Graph并打包成JAR,然后通过Client提交给Flink集群去安排执行
2)、Client在接收到提交的Flink应用(一些JAR包)后,会找到JAR包中的 Main 方法,创建Flink运行环境,把数据流图 DataFlow Graph 转化为 JobGraph (DAG有向无环图表示的作业),连通JAR包一起提交给Flink集群的 JobManager
3)、JobManager 收到 JobGraph 后,按照JobGraph 描述的任务并行度对并行子任务进行拆分且明确了任务间的数据传输方式,最后生成执行图(ExecutionGraph)
4)、JobManager 生成执行图后, 分发给 TaskManager,各个 TaskManager 会根据执行图部署任务,并形成最终的物理执行过程,确定数据存放的位置和收发的具体方式,最后生成物理图(Physical Graph)
5)、TaskManager 对传递来的数据依据物理图(Physical Graph),选定分配的Slot进行计算处理
6)、在计算过程中会依据Flink应用设计不停的对中间计算结果进行“Shuffle”处理,数据会在本Flink任务所辖的各个TaskManager之间交互,
数据交换通过Flink提供的专有数据交换管理模块(Network Manager)进行(采用I/O多路复用技术及批量提交提高吞吐量),直至计算出最终目标结果
截至目前常见的流处理引擎有Spark Streaming 和 Flink,下面从四个维度简要描述下二者的主要区别:
1)架构模型:Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor;Flink 在运行时主要包含:Jobmanager、Taskmanager、Slot。
2)任务调度:Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler;
Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,
JobManager 根据 ExecutionGraph 对 Job 进行调度。
3)时间机制:Spark Streaming 支持的时间机制有限,只支持处理时间; Flink 支持了流处理程序在时间上的三个语义:事件时间、注入时间、处理时间,同时也支持 Watermark 机制来处理滞后数据。
4)容错机制:Spark Streaming 通过checkpoint仅能做到至少一次(AT-LEAST-ONCE)状态语义,保证不丢失,但可能重复处理;Flink 通过checkpoint及事务写入(两阶段提交协议)能做到精确一次(EXACTLY-ONCE)状态语义,保证不丢不重复。
备注:
Storm是第一代流处理框架,数据吞吐量和延迟上的表现不尽如人意,并且只支持(AT-LEAST-ONCE)和 "(AT-MOST-ONCE)"状态语义,即数据流里的事件投递只能保证至少一次或至多一次,不能保证只有一次,在数据准确性方面也存在不足。
Spark Streaming是第二代流处理框架(本质是微批处理),仅支持(AT-LEAST-ONCE)状态语义,保证不丢失,但可能重复处理;
Flink是第三代流引擎框架,支持在有界和无界数据流上做有状态计算,以事件为驱动单位,支持(EXACTLY-ONCE)状态语义,保证不丢不重复。
相比Storm,Flink吞吐量更高,延迟更低;
相比Spark Streaming,Flink是真正意义上的实时计算,且所需计算资源相对更少。
5、整体运行过程
下面简要描述下Flink计算的整体运行过程,不同的资源管理平台执行流程略有差异,常见的有Yarn、K8S、Mesos等。
5.1、Client通过分发器Dispatcher或客户端工具提交Flink作业(JAR)到JobManager
5.2、JobManager解析作业图JobGraph成执行图ExecutionGraph,解析出所需的资源数量和Slot,向集群的资源管理器ResourceManger发起申请
5.3、ResourceManager查看集群是否有足够资源,尝试启动新的TaskManager
5.4、TaskManager启动之后会向集群中的ResourceManager注册Slot相关的资源信息
5.5、ResourceManager向TaskManager通知提供Slot资源命令
5.6、TaskManager向JobManager提供Slot资源,JobManager向TaskManager分发Task任务,JobManager与TaskManager通过心跳保持通讯,TaskManager实时向JobManager汇报任务执行状态
5.7、TaskManager与TaskManager之间在进行任务的执行中,若需要会发生数据交互
三、部署方式
Flink部署模式非常灵活,支持本地模式(Local)、集群模式(Standalone、 依赖三方资源管理等Cluster)、云端模式(Cloud)。
1、本地模式(Local):有两种不同的方式,一种是单节点(SingleNode),一种是单虚拟机(SingleJVM)。
Local-SingleJVM:大多是开发和测试时使用的部署方式,该模式下JobManager和TaskManager都在同一个JVM里。
Local-SingleNode:JobManager和TaskManager等所有角色都运行在一台机器上,虽然是按照分布式集群架构进行部署,但是集群的节点只有1个。该模式大多是在测试或者IoT设备上进行部署时使用。
本地模式即在部署服务器直接解压flink二进制包就可以使用,不用修改任何参数,用于一些简单测试场景。
进入flink目录执行启动命令:
[root@vm1 ~]# cd /usr/local/myapp/flink/flink-1.11.1/
[root@vm1 flink-1.11.1]# bin/start-cluster.sh
[root@vm1 flink-1.11.1]# jps
3577 Jps
3242 StandaloneSessionClusterEntrypoint
3549 TaskManagerRunner
2、集群模式(Cluster):有独立集群模式(Standalone)、Yarn集群模式、K8S集群模式、Mesos集群模式等,下面主要介绍独立集群模式(Standalone)和Yarn集群模式。
2.1、独立集群模式(Standalone):是Flink的独立集群部署模式,不依赖任何其它第三方软件或库。搭建一个标准的Flink集群,需要准备3台服务器。借助Zookeeper可搭建独立集群模式高可用。
在Flink集群中,Master节点上会运行JobManager(StandaloneSessionClusterEntrypoint)进程,Slave节点上会运行TaskManager(TaskManagerRunner)进程。
在 Standalone 部署时(没有 Per-Job 模式),因为 TaskManager 是单独启动的,所以 JobManager 只能分发可用 TaskManager 的任务槽,不能单独启动新 TaskManager。
当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理,所以独立模式一般只用在开发测试或作业非常少的场景下。
进入flink目录/flink-1.11.1/bin执行start-cluster.sh
[root@vm1 ~]# cd /usr/local/myapp/flink/flink-1.11.1/
[root@vm1 flink-1.11.1]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host vm1.
Starting taskexecutor daemon on host vm2.
Starting taskexecutor daemon on host vm3.
查看vm1、vm2和vm3这3个节点上的进程信息
[root@vm1 flink-1.11.1]# jps
StandaloneSessionClusterEntrypoint
Jps
[root@vm2 ~]# jps
TaskManagerRunner
Jps
[root@vm3 ~]# jps
4101 Jps
4059 TaskManagerRunner
2.2、Yarn集群模式:资源管理器依赖Yarn,Flink在Yarn上启动, 由Yarn来进行任务调度管理。能够充分利用集群资源,提高服务器的利用率,这种模式的前提需要一个Hadoop集群(支持HA模式)。
yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用。
在该模式下,Flink集群启动时只启动了 JobManager,而 TaskManager 可以根据需要动态地启动,在 JobManager 内 部,由于还没有提交作业,所以只有 ResourceManager 和 Dispatcher 在运行。
由于启动时只启动一个Jobmanager, 当这个Jobmanager挂了之后, Yarn会再次启动一个, 利用Yarn的重试次数来实现的高可用,底层仍然是依赖Zookeeper实现。
该模式依据Flink集群的不同启动方式又分三种子模式:会话模式(Session)、单作业模式(Per-Job)、应用模式(Application)。
1)、会话模式(Session):Flink会在Yarn中启动一个Session集群,这个集群主要用来申请资源的,不管是否有Flink任务执行,都占用Yar中固定的资源,后续提交的其他作业都会直接提交到这个Session集群中,
除非手动停止,若Yarn中分配给该应用的资源耗尽,只能等待资源空闲才能执行下一个Flink作业。该方式不需要频繁创建Flink集群,这样效率会变高,但是,作业之间相互不隔离。
适合需要频繁提交的多个小job,并且执行时间都不长。
该模式作业提交过程简要描述:(在客户端侧执行应用的main方法)
a、客户端通过 REST 接口,将作业提交给分发器。
b、分发器启动 JobManager,并将作业(包含 JobGraph)提交给 JobManager。
c、JobManager 向资源管理器请求资源(slots)。
d、资源管理器向 Yarn 的资源管理器请求 container 资源。
e、Yarn 启动新的 TaskManager 容器。
f、TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽。
g、资源管理器通知 TaskManager 为新的作业提供 slots。
h、TaskManager 连接到对应的 JobManager,提供 slots。
i、JobManager 将需要执行的任务分发给 TaskManager,执行任务。
2)、单作业模式(Per-Job)【已弃用】:每次提交job都会创建一个新的Flink集群,任务之间互相独立,互不影响,任务执行完成之后创建的集群也会消失。该模式确保资源利用最大化,适合大规模且长时间运行的作业。
该模式作业提交过程简要描述:(在客户端侧执行应用的main方法)
a、脚本启动执行,解析参数,创建对应的客户端。客户端执行用户代码,生成StreamGraph;调用YarnJobClusterExecutor生成JobGraph,其中YarnClusterDescriptor上传jar包、配置、数据流图和作业图到HDFS,封装提交参数和命令通过YarnClient提交任务信息给ResourceManager
b、ResourceManager选择一个NodeManger创建Container启动ApplicationMaster
c、ApplicationMaster启动Dispatcher和ResourceManger,Dispatcher启动JobManager,JobManager生成ExecutionGraph
d、JobManager中的SlotPool向ResourceManger中的SlotManger注册、请求slot
e、ResourceManager向Yarn的ResourceManager申请资源
f、找到合适的NodeManager创建Container,启动TaskManager
g、YarnTaskExcutorRunner调用runTaskManager启动TaskExecutor
h、TaskExecutor向ResourceManager的SlotManager注册slot
i、ResourceManager分配slot给TaskExecutor
j、TaskExecutor提供slot给JobManager中的SlotPool
k、JobManager 将需要执行的任务分发给 TaskManager,执行任务。
3)、应用模式(Application)【推荐生产使用】:依据应用在Yarn中初始化一个Flink集群,每个Application提交之后就会启动一个JobManager,也就是创建一个集群,这个JobManager只为执行这一个Flink Application而存在,Application中的多个job都会共用该集群,
Application执行结束之后JobManager也就关闭了。该模式下一个Application会动态创建自己的专属集群(JobManager),所有任务共享该集群,不同Application之间是完全隔离的,在生产环境中建议使用Application模式提交任务。
该模式作业提交过程简要描述:(在集群侧执行应用的main方法)
a、脚本启动执行,解析参数,只提交flink应用JAR包
b、ResourceManager选择一个NodeManger创建Container启动ApplicationMaster,然后创建Flink运行环境并执行客户端用户代码,生成StreamGraph;调用YarnJobClusterExecutor生成JobGraph,
其中YarnClusterDescriptor上传jar包、配置、数据流图和作业图到HDFS,封装提交参数和命令通过YarnClient提交任务信息给ResourceManager
c、ApplicationMaster启动Dispatcher和ResourceManger,Dispatcher启动JobManager,JobManager生成ExecutionGraph
d、JobManager中的SlotPool向ResourceManger中的SlotManger注册、请求slot
e、ResourceManager向Yarn的ResourceManager申请资源
f、找到合适的NodeManager创建Container,启动TaskManager
g、YarnTaskExcutorRunner调用runTaskManager启动TaskExecutor
h、TaskExecutor向ResourceManager的SlotManager注册slot
i、ResourceManager分配slot给TaskExecutor
j、TaskExecutor提供slot给JobManager中的SlotPool
k、JobManager 将需要执行的任务分发给 TaskManager,执行任务。
3、云端模式(Cloud):Flink也可以部署在各大云平台上,包括Amazon、Google、阿里云等。
四、优缺点分析
缺点
1、因为基于内存计算,所以对内存的容量及稳定性要求较高,同时应用要控制内存的应用情况,不然会发生OOM,导致应用崩溃
2、很难合理评估应用的当前及未来数据规模和可能的演变方式,容易发生数据倾斜
3、自带集群资源管理器不支持单作业模式,推荐生产环境应用Yarn来接管资源分配
4、容错机制高度依赖检查点和保存点,所以合理管控检查点和保存点不太容易做到
5、不支持复杂的SQL统计计算
6、不适合计算实时/准实时指标,因为聚合中间计算状态需要大量的IO和存储,代价较大
7、无法集中处理历史数据,大数量在集中短时间内计算困难
8、无法正确处理超过最大无序边界的数据,只能处理有限的延迟数据
优点
1、基于事件驱动 (Event-driven) ,一个运行时能够同时支持流处理和批处理
2、基于内存计算,计算速度快,对于大容量计算优化,可将中间计算结果写入内存或磁盘(数据量大时),在 JVM 内部实现了自己的内存管理:支持迭代计算、支持程序自动优化
3、任务执行机制采用有向无环图DAG,能够关联数据血缘关系,可以共享中间计算结果,避免重新计算,耗费时间和资源
4、支持精确一次 (Exactly-once) 语义,保证计算状态的一致性和正确性
支持有状态计算且提供很多状态管理特性功能
集成大量的相关计算算法及高阶应用库
支持多种数据集操作类型
5、可以基于不同的数据源进行计算,支持Hive、Hbase、RMDBS、Kafka、JSON、自定义等数据源
6、一个运行时同时支持多个不同的Flink作业一同运行,支持高度灵活的窗口 (Window) 操作,支持扩展外部分布式存储
7、高容错性
提供分布式存储,可持久化中间计算状态,通过检查点、保存点等机制,很容易进行故障恢复,作业重启后就近断点处继续处理
8、高扩展性
计算能力理论可无限扩展, 支持scale-out和scale-up两个方向扩展
集群管理器可集成不同三方资源管理器(例如Yarn、Mesos、k8s等)
支持任意数量规模的任务计算
9、高性能
待计算数据实时读入内存,计算速度快,且可共享中间计算结果,存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化
计算节点间数据交换采用I/O多路复用技术,提供传输吞吐量
10、高可靠性
自动感知节点状态
自带集群资源管理器,通过ZK可支持高可用
也可依赖其他资源管理器(支持高可用)
11、可以使用SQL & Table API进行类SQL查询
12、支持Java、Python和Scala的API,可多种形式编写Flink应用
13、活跃度极高的社区和完善的生态圈的支持
五、常见应用场景
1、数据处理、ETL(ELT)、迭代计算
2、实时计算基础事实数据落入数据仓库,支持进行交互式查询
3、依据业务模型进行机器学习
4、一些流式应用场景,例如:
电商和市场营销:商品评价、数据报表、广告投放、业务流程处理,营销大屏,销量的升降,营销策略的结果进行环比、同比的比较,PV、UV 的统计等。
互联网:实时报警、热门事件新闻、上网行为采集、网络爬虫数据分析等
银行和金融行业:实时数据结算、实时监测异常数据、欺诈监测等
IOT:设备数据采集、预处理、分析等
社交场景:用户点赞、关注、粉丝互动、内容精准推荐、社区发现等
......
六、调优经验
PS:后续逐渐把实践调优过程补充上来
七、API应用:
各个平台一般都有相应的操作组件,下面介绍下java和DoNet平台下的访问组件。
java平台:(1.16以上)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>版本</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>版本</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>版本</version>
</dependency>
其他一些配套连接器依赖,例如:kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>版本</version>
</dependency>
还有Yarn集群配套的Hadoop和Zookeeper依赖,参见前面的Hadoop理论和Zookeeper理论API部分描述。文章来源:https://www.toymoban.com/news/detail-437609.html
DoNet平台:
截至目前,Flink还不支持DoNet平台,若想用可参考:CSharpFlink文章来源地址https://www.toymoban.com/news/detail-437609.html
到了这里,关于大数据系列——Flink理论的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!