伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink 中的一把锁

来源:本站原创 浏览:97次 时间:2023-05-07
那把锁

锁用于多线程安全场景下,在Flink中存在一把锁,被用于数据处理线程、定时器调用线程、checkpoint线程。在StreamTask中定义了一个Object对象lock,通过使用synchronized方式进行同步,在task的初始化过程中该对象传给了SystemProcessingTimeService、StreamInputProcessor、StreamTwoInputProcessor。
数据处理线程
这里所说的数据处理线程表示正常的数据处理流程,可以认为就是processElement处理过程,StreamInputProcessor/StreamTwoInputProcessor主要工作就是读取数据然后调用对应的operator处理读取到的数据也就是调用processElement方法:

StreamRecord<IN> record = recordOrMark.asRecord();            synchronized (lock) {              numRecordsIn.inc();              streamOperator.setKeyContextElement1(record);              streamOperator.processElement(record);            }
通过源码可以发现每次调用processElement之前都会使用synchronized锁住lock,然后才能进行后续的处理。
定时器调用线程Flink中有一个很重要的功能那就是定时器,窗口触发需要定时器、用户自定义注册定时器需要定时器,但是定时器又可以按照时间属性分为两种:事件时间语义下watermark推进触发的定时器、处理时间语义下定时调度的定时器。watermark也是作为一种StreamElement在管道中流动,在watermark向前推进时也是需要获得锁才能触发定时器:
public void handleWatermark(Watermark watermark) {      try {        synchronized (lock) {          watermarkGauge.setCurrentWatermark(watermark.getTimestamp());          operator.processWatermark(watermark);        }      } catch (Exception e) {        throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);      }    }
同样在处理时间触发的定时器也是需要获得锁才能执行:
    //SystemProcessingTimeServuce中    public void run() {      synchronized (lock) {        try {          if (serviceStatus.get() == STATUS_ALIVE) {            target.onProcessingTime(timestamp);          }        } catch (Throwable t) {          TimerException asyncException = new TimerException(t);          exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);        }      }    }

定时触发为什么需要锁?在processElement中可能会操作状态、在定时回调onTimer中也可能会操作状态,那么状态就是作为共享数据,为了保证数据的一致性,所以这里加了锁。


checkpoint线程checkpoint是由jobmaster协调完成的,会定时向source端发送barrier标记然后在数据流中流动,checkpoint是为了对状态某一个时间点的备份,同样与processElement存在状态数据的竞争,为了保证数据的一致性,在checkpoint过程中会存在锁竞争:
//StreamTask中performCheckpoint方法synchronized (lock) {      if (isRunning) {        operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
       // Step (2): Send the checkpoint barrier downstream        operatorChain.broadcastCheckpointBarrier(            checkpointMetaData.getCheckpointId(),            checkpointMetaData.getTimestamp(),            checkpointOptions);
       // Step (3): Take the state snapshot. This should be largely asynchronous, to not        //           impact progress of the streaming topology        checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);        return true;      }      else {      .....
     }    }


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net