Flink基础5篇-2-Flink架构
系统架构
flink是一个分布式系统,可以和很多集群管理器(Apache Mesos、YARN 及Kubernetes )集成。也可以作为独立的集群运行。
flink组件
Flink 的搭建需要四个不同组件,它们相互协作,共同执行流式应用。这些组 件是:J obManager、ResourceManager、TaskManager 和Dispatcher。
JobManager
jobManager控制着单个应用程序的执行。换句话说,每个应用程序都有一个不同的JobManager掌控。
JobManager接收如下内容:
JobGraph,就是表示数据流转的Dataflow
打包了全部类库的jar包
JobManager做如下操作:把JobGraph转换为名为ExecutionGraph的物理DataFlow图。该图包含了那些可以并行执行的任务。
从ResourceManager申请资源(TaskManager的处理槽),一旦收到了足够的槽,就将executionGraph分发给TaskManager执行。
执行过程中负责集中协调,比如创建检查点,保存点,状态恢复等。
ResourceManager
根据集群资源提供者的不同,如Apache Mesos、YARN 及Kubernetes,flink提供不同的ResoruceManager。
- ResourceManger接收来自JobManger的申请,指示有空闲处理槽的TaskManager把处理槽提供给JobManager。
- 如果处理槽不足,则ResourceManager和集群资源提供者通信,让它提供额外容器来启动更多TaskManager进程。
- ResourceManager还负责终止空闲的TaskManager释放资源。
TaskManager
TaskManager是flink的工作进程,通常在flink搭建过程中要启动多个TaskManager。每个TaskManager提供一定数量的处理槽slot。slot数量限制了一个TaskManager可以执行的任务数量。
- TaskManager启动后向ResourceManager注册它的处理槽。
- 收到ResourceManager的指示之后,TaskManager向JobManager提供slots。JobManager向slot分配任务。
- 运行同一个应用的不同任务的TaskManager之间会进行数据交换。
Dispatcher
Dispatcher跨多个作业运行,提供了一套REST接口让我们提交需要执行的应用。
一个TaskManager允许执行多个任务,这些任务可以属于同一个算子(数据并行),也可以来自不同算子(任务并行),也可以来自不同应用(作业并行)。
每个slot可以执行应用的一部分,即算子的一个并行任务。(但不同算子的任务可以放在一个slot里面)
下图展示Taskmanager,slot,任务,算子之间的关系
- 算子的并行度决定了这个算子有几个任务,同时这个值就决定了这个算子要占用几个slot
- 根据算子间的关系,就可以确定任务间的关系,和这些任务所在的TM的通信关系
根据上述关系可得,一个应用所需要的slot个数,等于其算子的最大并行度。
再进一步,确定了需要的slot数量,再根据每个TM拥有的slot数量,就知道了需要的TM数量。
- 上图中左边就是JobGraph,右边就是executionGraph
- 这里可以看到,在分配具体任务给slot的时候,遵循的原则是尽量把完整的DAG放在了同一个slot里面,只有并行度小于4的算子任务没有出现在所有slot里面。这样的好处是TM中的多个任务可以在同一进程内高效执行数据交换,无需访问网络。
- TM会在同一个JVM进程中以多线程的方式执行任务
高可用性设置
发生故障时也不能停止,而是需要恢复。系统首先重启故障进程,随后重启应用并恢复其状态。
Taskmanager故障
当TaskManger发生故障时,将导致可用的slot不足。这时JobManager会从ResourceManager再次申请。若无法完成,则无法重启应用。根据应用重启策略的配置,Jobmanager会以一定的频率和间隔尝试重启应用。
JobManager故障
JobManager进程消失会导致应用无法继续处理数据。JM是flink应用中的单点失效组件。Flink提供了高可用模式,支持在JM消失的情况下将作业的管理职责和元数据迁移到另一个JM。
Flink用以下两种方式实现高可用
- 依赖zookeeper完成。
- JM在高可用模式下工作时,会将jobGraph和全部所需元数据比如jar包写入远程持久化存储中。此外,还会将存储的位置写入zk中。
- 执行过程中,JM会将状态句柄写入远程存储,将路径写入zk。
当JM故障时,其下应用的所有任务都会自动取消,新接手的JM执行以下步骤
- 向zk请求存储位置,获取jobGraph和jar包,最新的checkpoint
- 向ResourceManager请求slot来继续执行应用
- 重启应用并利用最新的检查点重制任务状态
TaskManager负责把数据从发送任务传输到接收任务。它的网络模块在记录传输之前先将其收集到缓冲区中。换言之,记录并非逐条发送,而是在缓冲区中以批次形式发送。
每一个TM都有一个用于收发数据的网络缓冲池。当发送和接收任务在不同的TM进程中时,就需要进行通信。因此,每一对TM之间都需要维护一个永久的TCP连接。对每一个接收任务,TM要提供一个专用的网络缓冲区。
- 接收端并行度为4,所以发送端需要4个缓冲区来向任意接收任务发送数据(此时是shuffle模式)。
- 接收端也需要4个缓冲区来接收来自4个不同任务的数据
- TCP连接是共享的,这里两个TM,只有两个单向的TCP连接
- 由此可得,缓冲区的数量是相关算子任务数量的平方级别
基于信用值的流量控制
缓冲的问题是会增加延时,为了平衡延时和单条发送资源浪费,flink采用了基于信用值的流量控制方法。
原理如下:
接收任务会给发送任务授予一定的信用值,其实就是保留一些用来接收它数据的网络缓冲。一旦发送端收到信用通知,就会在信用值限定的范围内尽可能多的传输缓冲数据,并会附带上积压量的大小。接收端使用保留的缓冲来处理收到的数据,同时根据各发送端的积压量信息来计算所有相连的发送端下一轮的信用优先级。
简单理解就是
接收端会通知每个发送端,这一次你大概能发多少数据,我会给你预留这么大的缓冲空间。
发送端根据这个值,尽可能多发送数据,同时告知接收端,自己这里还剩多少(积压值)。
接收端收到各个发送端的数据,写入缓存。同时根据各个发送端的积压,分配下一轮每个发送端能发多少。(积压多的多分点)
任务链接
任务链接是一种优化技术,用于降低本地通信开销
使用条件是多个算子必须拥有相同的并行度并且通过本地转发通道相连。
如下图就满足条件
在上图的情况下,任务链接会把多个算子函数融合到同一个任务中,在同一个线程中执行。(类似多个算子合并成一个算子)
如果不需要,可以通过代码控制是否进行任务链接。
事件时间处理
如果选择使用事件时间而非处理时间,则要面临对迟到事件,事件乱序的处理问题。在flink中的事件时间处理机制如下:
时间戳
所有处理记录必须包含时间戳。Flink内部采用8字节Long值对时间戳进行编码,并将它们以元数据的形式附加在记录上。内置算子会将这个Long值解析为毫秒精度的Unix时间戳。自定义算子可以有自己的时间戳解析机制。
水位线
基于事件时间必须提供水位线。在Flink中,水位线是利用一些包含Long值时间戳的特殊记录来实现的。它们像带有额外时间戳的常规记录一样在数据中流动。
水位线满足两个基本属性
- 必须单调增
- 和记录的时间戳存在联系。一个时间戳为T的水位线表示,接下来的所有记录的时间戳一定大于T
通过第二个性质来处理基于事件时间的乱序数据。当收到一个违反水位线的记录时,该记录本应参加的计算已经完成了。我们称此类记录为迟到记录。Flink提供了不同的机制来处理迟到记录,具体参看第4篇笔记。
水位线传播和事件时间
水位线和正常记录一样,可以在任务间进行接收和发送。任务内部的时间服务会维护一些计时器,它们依靠接收水位线来激活。这些计时器是由任务在时间服务内注册,并在将来某个时间点执行计算。
比如:窗口算子会为每一个活动窗口注册一个计时器,它们会在事件时间超过窗口的结束时间时清理窗口状态。
当任务接收到一个水位线,执行下面操作:
- 基于水位线时间戳更新内部事件时间时钟。
- 任务的时间服务会找出所有触发时间小于更新后事件时间的计时器。对于每个到期的计时器,调用回调函数,执行计算或发出记录。
- 任务根据更新后的事件时间将水位线发出。
下面详细介绍一个任务如何将水位线发送给多个输出任务,以及从多个输入任务获取水位线后如何处理。
一个任务会为它的每个输入分区维护一个分区水位线,当收到某个分区传来的水位线后,任务会以接收值和当前值中较大的那个去更新对应分区水位线的值。随后,任务会把事件时间时钟调整为所有分区水位线中最小的那个值。如果事件时间时钟向前推进,任务会先处理因此而触发的所有计时器,之后才会把对应的水位线发往所有连接的输出分区,以事件时间到全部下游的广播。
- 上图描述了一个有4个上游输入分区,3个下游输出分区的任务
- 对于输入,每个分区保留最大水位线时间戳
- 对于输出,只有全局最小水位线更新的时候才触发相应操作和向下游广播水位线
如果一个算子有两个输入流时,其事件时钟会受制于相对较慢的流。
由于整个处理流程都依赖最慢的分区的水位线,这就表明即便当前分区没有数据产生了,也需要按照固定间隔发送水位线。
时间戳分配和水位线生成
Flink有三种方式完成时间戳的分配和水位线的生成
- 在数据源完成时,利用SourceFunction在应用读入数据流的时候分配时间戳和生成水位线。源函数发出的每一条记录都附带时间戳,水位线作为特殊记录在任何时间点发出。如果源函数不再发出数据,可以声明自己为空闲,避免因为没有水位线而停止下游所有流程。
- 周期分配器,DataStream API提供一个AssignerWithPeriodicWatermarks的自定义函数。可以从每条记录提取时间戳,并周期性相应获取当前水位线的查询请求。提取到的时间戳附加在每条记录里,定期查询到的水位线注入流中。
- 定点分配器,另一个自定义函数AssignerWithPunctuatedWatermarks,满足根据特殊输入记录生成水位线的情况。
状态管理
状态的定义:函数里所有需要任务去维护并用来计算结果的数据都输入任务的状态
数据和状态的交互如上图
Flink中,状态都是和特定算子关联的,为了让Flink知道算子有哪些状态,算子需要对其进行注册。根据作用域的不同,状态分为两类:
算子状态的作用域是某个算子的一个任务。任务级别的隔离(该算子的每个任务都维护一个状态,相互不能访问)
算子状态有三类原语
键值分区状态的作用域是一个算子。按照算子输入记录定义的键值来进行维护和访问。处理一条记录时,只能访问这条记录对应键值对的状态。因此,所有键值相同的记录访问到的状态是相同的。
对于键值分区中值的部分,flink提供下面几种原语:
为了保证flink的低延时,每个并行任务都会把状态维护在本地。
具体的状态存储,访问和维护,是由一个名为状态后端的可插拔组件决定的。
状态后端负责两件事:
本地状态管理
将状态以检查点的形式写入远程存储
对于本地状态管理,flink状态后端可以按以下两种形式进行维护以内存数据结构形式存入JVM堆内存
序列化状态对象后存入RocksDB,写入本地硬盘
有状态算子的扩缩容
有状态算子扩容时需要把状态重新分组,分配到与之前数量不等的并行任务上。Flink提供了4中扩缩容模式。
带有键值分区状态的算子在扩缩容时会根据新的任务数量对键值重新分区。把所有键值分成不同组,每个任务分配一个键值组,即每个任务持有部分键值,所有任务加起来是完整键值。
带有算子列表状态的算子扩容时会对列表中的条目进行重新分配。列表条目统一收集,然后均匀分配给每个任务
带有算子联合列表状态的算子扩缩容时会把状态列表的所有条目广播到所有任务上,之后任务决定保留哪些状态。
带有算子广播状态的算子在扩缩容时会把状态拷贝到全部新任务上(每个任务持有完全一样的状态),缩容时直接停止就好,不需要转移状态
检查点、保存点和状态恢复
一致性检查点
如果所有算子都将它们全部都状态写入检查点并从中恢复,所有输入流的消费位置都能重置到检查点生成的那一刻,那么该检查点和恢复机制就能提供精确一次的一致性保障。
Flink一致性算法
Flink检查点基于Chandy-Lamport分布式快照算法实现,不会暂停整个应用,而是将检查点生成和数据处理过程分离。具体原理如下
- 和水位线类似,Flink在常规数据中注入一类特殊数据——检查点分隔符,每个分隔符有独特的编号
- 数据被一个检查点分隔符分成了两个部分,所有先于分隔符的状态变更被包含在此次检查点中,晚于的状态则被纳入之后的检查点
考虑下图任务:
- 数据源任务:
首先JM向每个数据源任务发送一个新的检查点编号,启动生成流程
数据源收到后(上图中的三角2),暂停发出数据,利用状态后端触发生成本地状态的检查点,并把该检查点分隔符连同检查点编号广播至所有传出的数据流分区。状态后端会在状态存为检查点完成后通知任务,随后任务会给JM发送消息。在将所有分隔符发出后,数据源恢复工作。
通俗来讲就是数据源知道了要生成检查点,自己完成自己检查点的生成,通知JM和下游。
- 中间的处理算子任务:
说完了数据源,再看下游的任务如何处理。
类似的,当任务收到一个新的检查点分隔符时,会继续等待所有其他输入分区也发来这个检查点的分隔符。在等待过程中,会继续处理还未收到分隔符分区发来的数据。已收到的分区记录会缓存但不处理。这个等待过程叫分隔符对齐
集齐所有分区的分隔符后,通知自己的状态后端生成检查点,同时广播下游。
发出所有分隔符后,任务继续正常处理,包括之前缓存的数据。
- 数据汇任务:
最终,检查点分隔符到达了数据汇算子,数据汇算子也执行分隔符对齐。之后将自身状态写入检查点(通过状态后端)。然后向JM通知已确认检查点2。
JM在收到所有数据汇发送的确认通知后,会认为此次检查点已完成,后续恢复可以使用。
总结一下:
状态恢复的步骤很简单
- 重启整个应用
- 利用最新检查点重置任务状态
- 恢复所有任务的处理
检查点对性能的影响
对性能的影响主要考虑两个点
- 生成和存储检查点的耗时,通过选择RocksDB状态后端来启用异步检查点生成机制和增量检查点机制,减少延迟
- 分隔符对齐的耗时,可以改为不缓存已收到分隔符分区的数据,而是继续处理。这样会存在重复处理的数据,但是延迟会降低
保存点
保存点的生成算法和检查点完全一致,但保存点是需要用户显示触发的
保存点可以用于
- 启动一个不同但兼容的应用,比如任务迭代或bug修复,不希望从头开始消费,通过生成保存点,后续从保存点启动
- 用不同并行度启动应用,需要创建保存点,停止任务,用新并行度启动
- 改换集群运行应用
- 应用暂停
- 应用归档
从保存点启动应用的过程如下: