伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

StreamTask源码分析

来源:本站原创 浏览:83次 时间:2023-05-11

在前一篇StreamOperator源码简析中提到StreamOperator上层是由StreamTask调用,也就是说StreamTask会在发生不同阶段、不同动作去调用StreamOperator对应的方法,在Flink中将StreamTask称之为Invokable,这篇主要从源码角度分析一下StreamTask。

StreamTask层级结构

AbstractInvokable :是一个抽象类,代表最顶层的Invokable,在这个抽象类里面声明了最要的方法invoke,可以认为是task执行的起点,里面涉及的具体流程稍后会做分析;另外还声明了与checkpoint动作相关的方法triggerCheckpoint/triggerCheckpointOnBarrier/notifyCheckpointComplete;

StreamTask: 是AbstractInvokable的基本抽象实现类,其也是一个抽象类,实现了invoke、triggerCheckpoint等方法,另外声明了init、run等StreamTask声明周期的抽象方法,具体实现类有一下三个:

  • SourceStreamTask 代表源(StreamSource)的Invokable

  • OneInputStreamTask 代表有一个输入流的Invokable

  • TwoInputStreamTask 代表有两个输入流的Invokable

在这里面还有一个其他关于批处理(BatchTask)、迭代流(StreamIterationTail) 相关的Invokable在这里不做描述。

StreamTask分析

StreamTask表示的是一个基础的Invokable,它会去调用一系列的StreamOperator, 前提是这些operator是被chained,在invoke方法里面会调用这个operator的生命周期的方法,我在这里将其分为三步:


初始化:初始化过程分为两种,一个是StreamTask初始化,另外一个是StreamOperator初始化:

  • StreamTask初始化: 包括了与checkpoint相关的的例如Statebackend创建等,当然这些最终也会调用到StreamOperator,另外还会调用init方法,具体实现是在其实现类里面,主要完成StreamTwoInputProcessor初始化,其主要负责读取数据相关处理

  • StreamOperator初始化: 调用initializeState、openAllOperators方法,initializeState会调用到StreamOperator的initializeState方法,完成状态初始化过程,openAllOperators会调用StreamOperator的open方法,调用与用户相关的初始化过程

执行过程:执行过程主要就是调用run方法,实现也是在其实现类里面,对于SourceStreamTask就是生产数据,对于OneInputStreamTask/TwoInputStreamTask主要就是执行读取数据与之后的数据处理流程,正常情况是会一直循环执行

资源释放:任务正常结束或者是异常停止的执行动作,包括closeAllOperators、cleanup、disposeAllOperators等,在这里说明一点close与dispose,在正常流程里面是可以看到close的调用,但是在异常流程只能看到dispose的调用,但是在dispose里面是可以看到对于是否调用close的检测,如果没有调用则需要调用一次,也就是说close是一定会被调用的,因此我们在userFunction里面如果涉及到资源链接一定要在close里面执行资源的释放。

在StreamTask里面除了invoke方法实现,还有checkpoint的相关实现方法triggerCheckpoint、triggerCheckpointOnBarrier。

StreamTask另外两个比较重要的点:

  • 实现了AsyncExceptionHandler接口,这个接口里面包含了一个handleAsyncException方法,这个方法在StreamTask实现是使当前的StreamTask失败,在Flink里面窗口定时触发或者ProcessFunction的onTimer方法等相对于上面提到的run流程是一个异步的过程,也就是由其他线程来执行,但是如果这个异步执行抛出异常我们希望主线程也能够捕获到并进行相应的处理,那么AsyncExceptionHandler就是这么一个功能;

  • 包含了Object类型的lock对象,定时调用与checkpoint执行都会使用到该对象来完成同步功能,用它来保证数据的一致性。例如在定时调动onTimer里面可能会涉及到对状态的操作,但是processElement里面也会对状态操作,状态对于这两个线程都是共享资源,如果不使用锁lock,就会导致状态数据不一致。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net