伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

StreamOperator源码简析

来源:本站原创 浏览:97次 时间:2023-05-11

treamOperator是任务执行过程中实际处理类,上层由StreamTask调用,下层调用UserFunction,列举一些常见的StreamOperator

  • env.addSource对应StreamSource

  • dataStream.map 对应StreamMap

  • dataStrem.window对应WindowOperator

  • dataStream.addSink对应StreamSink

  • dataStream.keyBy(..).process对应KeyedProcessOperator

StreamOperator涉及数据处理、checkpoint、状态存储、定时调用等,本篇幅将从源码角度分析StreamOperator所涉及的核心调用流程。

StreamOperator层级结构

最顶层是一个StreamOperator的接口,定义了其生命周期一些方法,继承接口如下:

  • CheckpointListener接口,notifyCheckpointComplete表示checkpoint完成后的回调方法

  • KeyContext接口,用于当前key的切换,使用在KeyedStream中state的key设置

  • Disposable接口,dispose方法定义了资源释放

  • Serializable序列化接口

AbstractStreamOperator是StreamOperator的基础抽象实现类,所有的operator都必须继承该抽象类;
AbstractUdfStreamOperator 是继承AbstractStreamOperator的抽象实现类,其内部包含了userFunction, 在Task的生命周期都会调用userFunction中对应的方法;
OneInputStreamOperator、TwoInputStreamOperator都是继承StreamOperator的接口,分别表示处理一个输入、两个输入的Operator,包含了processElement/processWatermark/processLatencyMarker方法;

  • OneInputStreamOperator实现类StreamMap、WindowOperator、KeyedProcessOperator等单流入处理operator

  • TwoInputStreamOperator实现类CoStreamMap、KeyedCoProcessOperator、IntervalJoinOperator等多流处理operator

StreamSource表示的source端的operator,其既没有实现OneInputStreamOperator接口也没有实现TwoInputStreamOperator接口,由于其为流处理的源头,不需要接受输入

AbstractStreamOperator/AbstractUdfStreamOperator分析

AbstractStreamOperator是所有operator的基础抽象类,而AbstractUdfStreamOperator则是面向userFunction调用,接下来分析一下这两个类,其大部分方法都是由StreamTask触发调用,用于初始化或者资源释放等操作,以StreamTask.invoke方法为入口来分析里面的方法:

  • initializeState状态初始化,会调用到StreamOperator的initializeState方法,初始化operatorStateBackend/keyedStateBackend状态后端,定时器恢复初始化,对于KeyedState来说会自动初始化恢复,但是operatorState则需要手动初始化恢复,所以在其继承的AbstractUdfStreamOperator会调用userFunction的initializeState方法,前提是该userFunction需要实现CheckpointedFunction接口;

  • open初始化方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写open方法完成一些用户初始化工作,例如创建资源链接

  • run方法,在任务正常情况下一直执行的方法,根据收到的不同数据类型调用AbstractStreamOperator不同的方法

  1. 如果是watermark,会调用其processWatermark方法,在该方法里面做一些定时触发的判断与调用

  2. 如果是LatencyMarker,其表示的是一个延时标记,同于统计数据从source到下游operator耗时,会调用 processLatencyMarker方法,在该方法里面会上报Histogram类型的metric, 在默认情况下该功能是关闭的

  3. 如果是StreamRecord,也就是处理的业务数据,首先会调用setKeyContextElement方法,用于切换 KeyedStream类型的的statebackend的当前key, 然后调用processElement具体的数据处理流程

  4. 如果是CheckpointBarrier,表示的是需要checkpoint,首先会调用prepareSnapshotPreBarrier方法,在AbstractStreamOperator中是一个空实现doNothing,然后调用snapshotState方法,在AbstractUdfStreamOperator会调用userFunction的snapshotState方法,前提是该userFunction需要实现CheckpointedFunction接口;

  • close方法,任务正常结束调用方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写close方法完成一些资源释放;

  • dispose方法,任务正常结束或者异常结束调用的方法,如果是异常结束那么就会调用到close方法,正常结束不会重复调用,在dispose里面完成一些状态最终资源的释放;

其他方法:

  • setup方法,初始化做一些参数配置

  • notifyCheckpointComplete方法,在checkpoint完成时调用的方法,面向用户实现的userFunction需要实现CheckpointListener接口


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net