伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

自定义StreamOperator

来源:本站原创 浏览:90次 时间:2023-05-11

treamOperator接口提供了其生命周期的抽象方法,例如初始化方法setup、open、initializeState,checkpoint相关方法prepareSnapshotPreBarrier、snapshotState,但是我们没有必要去自己一一实现这些方法,可以继承其抽象类AbstractStreamOperator,覆盖一些我们需要重写的方法。在上一篇分析中提到对于source端不需要接受上游数据,也就不需要实现OneInputStreamOperator或者TwoInputStreamOperator接口,如果我们需要接收上游数据就必须实现这两个接口中的一个,主要看一个输入还是两个输入来选择。
案例:假设我们现在需要实现一个通用的定时、定量的输出的StreamOperator。
实现步骤:

  1. 继承AbstractStreamOperator抽象类,实现OneInputStreamOperator接口

  2. 重写open方法,调用flink 提供的定时接口,并且注册定时器

  3. 重写initializeState/snapshotState方法,由于批量写需要做缓存,那么需要保证数据的一致性,将缓存数据存在状态中

  4. 重写processElement方法,将数据存在缓存中,达到一定大小然后输出

  5. 由于需要做定时调用,那么需要有一个定时调用的回调方法,那么定义的类需要实现ProcessingTimeCallback接口,并且实现其onProcessingTime方法(关于flink定时可以参考定时系列文章)

代码:

  1. publicabstractclassCommonSinkOperator<T extendsSerializable>extendsAbstractStreamOperator<Object>

  2. implementsProcessingTimeCallback,OneInputStreamOperator<T,Object>{


  3. privateList<T> list;


  4. privateListState<T> listState;


  5. privateint batchSize;


  6. privatelong interval;


  7. privateProcessingTimeService processingTimeService;


  8. publicCommonSinkOperator(){

  9. }


  10. publicCommonSinkOperator(int batchSize,long interval){

  11. this.chainingStrategy =ChainingStrategy.ALWAYS;

  12. this.batchSize = batchSize;

  13. this.interval = interval;

  14. }


  15. @Overridepublicvoid open()throwsException{

  16. super.open();

  17. if(interval >0&& batchSize >1){

  18. //获取AbstractStreamOperator里面的ProcessingTimeService, 该对象用来做定时调用

  19. //注册定时器将当前对象作为回调对象,需要实现ProcessingTimeCallback接口

  20.            processingTimeService = getProcessingTimeService();

  21. long now = processingTimeService.getCurrentProcessingTime();

  22.            processingTimeService.registerTimer(now + interval,this);

  23. }

  24. }

  25. //状态恢复

  26. @Overridepublicvoid initializeState(StateInitializationContext context)throwsException{

  27. super.initializeState(context);

  28. this.list =newArrayList<T>();

  29.        listState = context.getOperatorStateStore().getSerializableListState("batch-interval-sink");

  30. if(context.isRestored()){

  31.            listState.get().forEach(x ->{

  32.                list.add(x);

  33. });

  34. }


  35. }


  36. @Overridepublicvoid processElement(StreamRecord<T> element)throwsException{

  37.        list.add(element.getValue());

  38. if(list.size()>= batchSize){

  39.            saveRecords(list);

  40. }


  41. }

  42. //checkpoint

  43. @Overridepublicvoid snapshotState(StateSnapshotContext context)throwsException{

  44. super.snapshotState(context);

  45. if(list.size()>0){

  46.            listState.clear();

  47.            listState.addAll(list);

  48. }

  49. }

  50. //定时回调

  51. @Overridepublicvoid onProcessingTime(long timestamp)throwsException{

  52. if(list.size()>0){

  53.            saveRecords(list);

  54.            list.clear();

  55. }

  56. long now = processingTimeService.getCurrentProcessingTime();

  57.        processingTimeService.registerTimer(now + interval,this);//再次注册

  58. }


  59. publicabstractvoid saveRecords(List<T> datas);

  60. }

如何调用?直接使用dataStream.transform方式即可。

整体来说这个demo相对来说是比较简单的,但是这里面涉及的定时、状态管理也是值得研究,比喻说在这里定时我们直接选择ProcessingTimeService,而没有选择InternalTimerService来完成定时注册,主要是由于InternalTimerService会做定时调用状态保存,在窗口操作中需要任务失败重启仍然可以触发定时,但是在我们案例中不需要,直接下次启动重新注册即可,因此选择了ProcessingTimeService。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net