在前一篇StreamOperator源码简析中提到StreamOperator上层是由StreamTask调用,也就是说StreamTask会在发生不同阶段、不同动作去调用StreamOperator对应的方法,在Flink中将StreamTask称之为Invokable,这篇主要从源码角度分析一下StreamTask。
StreamTask层级结构AbstractInvokable :是一个抽象类,代表最顶层的Invokable,在这个抽象类里面声明了最要的方法invoke,可以认为是task执行的起点,里面涉及的具体流程稍后会做分析;另外还声明了与checkpoint动作相关的方法triggerCheckpoint/triggerCheckpointOnBarrier/notifyCheckpointComplete;
StreamTask: 是AbstractInvokable的基本抽象实现类,其也是一个抽象类,实现了invoke、triggerCheckpoint等方法,另外声明了init、run等StreamTask声明周期的抽象方法,具体实现类有一下三个:
SourceStreamTask 代表源(StreamSource)的Invokable
OneInputStreamTask 代表有一个输入流的Invokable
TwoInputStreamTask 代表有两个输入流的Invokable
在这里面还有一个其他关于批处理(BatchTask)、迭代流(StreamIterationTail) 相关的Invokable在这里不做描述。
StreamTask分析StreamTask表示的是一个基础的Invokable,它会去调用一系列的StreamOperator, 前提是这些operator是被chained,在invoke方法里面会调用这个operator的生命周期的方法,我在这里将其分为三步:
初始化:初始化过程分为两种,一个是StreamTask初始化,另外一个是StreamOperator初始化:
StreamTask初始化: 包括了与checkpoint相关的的例如Statebackend创建等,当然这些最终也会调用到StreamOperator,另外还会调用init方法,具体实现是在其实现类里面,主要完成StreamTwoInputProcessor初始化,其主要负责读取数据相关处理
StreamOperator初始化: 调用initializeState、openAllOperators方法,initializeState会调用到StreamOperator的initializeState方法,完成状态初始化过程,openAllOperators会调用StreamOperator的open方法,调用与用户相关的初始化过程
执行过程:执行过程主要就是调用run方法,实现也是在其实现类里面,对于SourceStreamTask就是生产数据,对于OneInputStreamTask/TwoInputStreamTask主要就是执行读取数据与之后的数据处理流程,正常情况是会一直循环执行
资源释放:任务正常结束或者是异常停止的执行动作,包括closeAllOperators、cleanup、disposeAllOperators等,在这里说明一点close与dispose,在正常流程里面是可以看到close的调用,但是在异常流程只能看到dispose的调用,但是在dispose里面是可以看到对于是否调用close的检测,如果没有调用则需要调用一次,也就是说close是一定会被调用的,因此我们在userFunction里面如果涉及到资源链接一定要在close里面执行资源的释放。
在StreamTask里面除了invoke方法实现,还有checkpoint的相关实现方法triggerCheckpoint、triggerCheckpointOnBarrier。
StreamTask另外两个比较重要的点:
实现了AsyncExceptionHandler接口,这个接口里面包含了一个handleAsyncException方法,这个方法在StreamTask实现是使当前的StreamTask失败,在Flink里面窗口定时触发或者ProcessFunction的onTimer方法等相对于上面提到的run流程是一个异步的过程,也就是由其他线程来执行,但是如果这个异步执行抛出异常我们希望主线程也能够捕获到并进行相应的处理,那么AsyncExceptionHandler就是这么一个功能;
包含了Object类型的lock对象,定时调用与checkpoint执行都会使用到该对象来完成同步功能,用它来保证数据的一致性。例如在定时调动onTimer里面可能会涉及到对状态的操作,但是processElement里面也会对状态操作,状态对于这两个线程都是共享资源,如果不使用锁lock,就会导致状态数据不一致。