伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Checkpoint对齐机制源码分析

来源:本站原创 浏览:89次 时间:2023-05-07
heckpoint是保证Flink状态容错的重要机制,通过checkpoint可以实现不同的数据语义,也就是我们所说的Exactly-Once与At-Least-Once,通过不同的checkpoint机制实现不同的数据语义,这里所说的机制表示的是checkpoint对齐机制:对齐,实现Exactly-Once语义,不对齐,实现At-Least-Once语义。

官方文档解释:

对齐通常发生在需要接受上游多个输入流的操作中,例如keyBy、join等操作,接下来将会从源码角度分析对齐机制的实现。

checkpoint机制的处理发生在StreamInputProcessor/StreamTwoInputProcessor中,该类主要负责从远端读取数据然后交给StreamOperator处理,数据读取由CheckpointBarrierHandler完成,同时也负责对齐机制的处理,由getNextNonBlocked方法完成,该接口有两个不同的实现类BarrierBuffer与BarrierTracker:

//在StreamInputProcessor/StreamTwoInputProcessor 中创建CheckpointBarrierHandler//被调用public static CheckpointBarrierHandler createCheckpointBarrierHandler(      StreamTask<?, ?> checkpointedTask,      CheckpointingMode checkpointMode,      IOManager ioManager,      InputGate inputGate,      Configuration taskManagerConfig) throws IOException {
   CheckpointBarrierHandler barrierHandler;    if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {      long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);      if (!(maxAlign == -1 || maxAlign > 0)) {        throw new IllegalConfigurationException(          TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()          + " must be positive or -1 (infinite)");      }      if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {        barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);      } else {        barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);      }    } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {      barrierHandler = new BarrierTracker(inputGate);    } else {      throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);    }
   if (checkpointedTask != null) {      barrierHandler.registerCheckpointEventHandler(checkpointedTask);    }    return barrierHandler;  }
由此可见BarrierBuffer用来实现对齐机制,BarrierTracker用来实现非对齐机制。

对齐-BarrierBuffer
在BarrierBuffer包含了对齐使用的几个重要的成员变量:BufferBlocker类型的bufferBlocker、boolean类型数组的blockedChannels ,BufferBlocker内部包含一个ArraryDeque的队列,用于缓存对齐时的数据,blockedChannels用于判断通道是否处于对齐状态中。对齐流程方法:
@Override  public BufferOrEvent getNextNonBlocked() throws Exception {    while (true) {
      //.....      BufferOrEvent bufferOrEvent = next.get();      if (isBlocked(bufferOrEvent.getChannelIndex())) {         //当前获取数据channel处于对齐状态中则将数据添加到缓存中         //也就是 BufferBlocker中        bufferBlocker.add(bufferOrEvent);        checkSizeLimit();      }      else if (bufferOrEvent.isBuffer()) {        //buffer 则直接返回        return bufferOrEvent;      }      else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {        if (!endOfStream) {          // 处理CheckpointBarrier 类型的数据          processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());        }      }      //.......    }  }
processBarrier方法:
  private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {    //barrierId表示当前批次的checkpointId    final long barrierId = receivedBarrier.getId();    // 如果是单输入流 则直接触发checkpoint    if (totalNumberOfInputChannels == 1) {      if (barrierId > currentCheckpointId) {        // new checkpoint        currentCheckpointId = barrierId;        notifyCheckpoint(receivedBarrier);      }      return;    }    //多输入流的处理,numBarriersReceived表示已接收到的     //当前批次checkpointId 的channel 个数     //numBarriersReceived >0 表示正在对齐过程中    if (numBarriersReceived > 0) {      // this is only true if some alignment is already progress and was not canceled      if (barrierId == currentCheckpointId) {        // regular case        onBarrier(channelIndex);      }      else if (barrierId > currentCheckpointId) {        // 如果到来的barrierId也就是checkpointId 大于当前正在        //发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致)        // 并且重置blockedChannels状态 重置numBarriersReceived为0        //然后开启下一次(barrierId) checkpoint对齐机制        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +            "Skipping current checkpoint.",          inputGate.getOwningTaskName(),          barrierId,          currentCheckpointId);
        notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));        releaseBlocksAndResetBarriers();        beginNewAlignment(barrierId, channelIndex);      }      else {        // ignore trailing barrier from an earlier checkpoint (obsolete now)        return;      }    }    else if (barrierId > currentCheckpointId) {      //numBarriersReceived==0   开启一次新的chechpoint      //将对应的blockedChannels置为阻塞状态true      beginNewAlignment(barrierId, channelIndex);    }    else {      // either the current checkpoint was canceled (numBarriers == 0) or      // this barrier is from an old subsumed checkpoint      return;    }
   // check if we have all barriers - since canceled checkpoints always have zero barriers    // this can only happen on a non canceled checkpoint    if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {      // actually trigger checkpoint      if (LOG.isDebugEnabled()) {        LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",          inputGate.getOwningTaskName(),          receivedBarrier.getId(),          receivedBarrier.getTimestamp());      }      //对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中      //被消费 ,然后触发checkpoint      releaseBlocksAndResetBarriers();      notifyCheckpoint(receivedBarrier);    }  }
对齐总体流程:在接受上游多个输入情况,当从一个输入中接受到checkpointBarrier时,会暂时将该输入channel 置为阻塞状态,并且将后续从该channel读取到的数据暂存在缓存中,当后续所有channel的checkpointBarrier都达到后,将暂存数据置为可消费状态,并且开始checkpoint。

非对齐-BarrierTracker对于非对齐机制相对来说就比较简单,不会发生数据缓存,当所有的channel的checkpointBarrier达到就开始执行checkpoint。
public BufferOrEvent getNextNonBlocked() throws Exception {    while (true) {      Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();      if (!next.isPresent()) {        // buffer or input exhausted        return null;      }
     BufferOrEvent bufferOrEvent = next.get();      if (bufferOrEvent.isBuffer()) {        return bufferOrEvent;      }      else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {        processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());      }      else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {        processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());      }      else {        // some other event        return bufferOrEvent;      }    }  }
processBarrier方法:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {    final long barrierId = receivedBarrier.getId();    // 如果只有一个输入则直接触发checkpoint    if (totalNumberOfInputChannels == 1) {      notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());      return;    }
   // general path for multiple input channels    if (LOG.isDebugEnabled()) {      LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);    }
   // find the checkpoint barrier in the queue of pending barriers    CheckpointBarrierCount cbc = null;    int pos = 0;   //寻找同一批次的checkpoint    for (CheckpointBarrierCount next : pendingCheckpoints) {      if (next.checkpointId == barrierId) {        cbc = next;        break;      }      pos++;    }
   if (cbc != null) {      // add one to the count to that barrier and check for completion      int numBarriersNew = cbc.incrementBarrierCount();      if (numBarriersNew == totalNumberOfInputChannels) {        // 集齐七龙珠 可以触发checkpoint了        for (int i = 0; i <= pos; i++) {          pendingCheckpoints.pollFirst();        }
       // notify the listener        if (!cbc.isAborted()) {          if (LOG.isDebugEnabled()) {            LOG.debug("Received all barriers for checkpoint {}", barrierId);          }
         notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());        }      }    }    else {      // 新的开始了      if (barrierId > latestPendingCheckpointID) {        latestPendingCheckpointID = barrierId;        pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
       // make sure we do not track too many checkpoints        if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {          pendingCheckpoints.pollFirst();        }      }    }  }
非对齐总体流程:在接受上游多个输入情况下,每一个批次的checkpoint不会发生数据缓存,会直接交给下游去处理,checkpoint信息会被缓存在一个CheckpointBarrierCount类型的队列中,CheckpointBarrierCount标识了一次checkpoint与其channel输入checkpointBarrier个数,当checkpointBarrier个数与channel个数相同则会触发checkpoint。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net