伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink基于zookeeper的高可用实现源码分析

来源:本站原创 浏览:86次 时间:2023-05-09

Flink中JobMaster、ResourceManager、Dispatcher、WebMonitorEndpoint提供了基于zookeeper高可用,涉及到leader选举与监听, leader选举基于zookeeper开源客户端CuratorFramework 的LeaderLatch方式实现,监听则通过NodeCache实现。基于此Flink提供了zookeeper高可用ZooKeeperHaServices, 通过该工具类可以创建LeaderElectionService与LeaderRetrievalService,包含了对应与zookeeper的znode节点分别是

/resource_manager_lock/dispatcher_lock/job_manager_lock/rest_server_lock
LeaderElectionService用于leader选举服务,基于zk的实现类是ZooKeeperLeaderElectionService,LeaderRetrievalService用于监听leader恢复服务,基于zk的实现类是ZooKeeperLeaderRetrievalService。

涉及到两个重要的角色:
LeaderContender接口,在leader选举中使用,代表了参与leader竞争的角色,其实现类有JobManagerRunner、ResourceManager、Dispatcher、WebMonitorEndpoint,该接口中包含了两个重要的方法:      1.  grantLeadership,表示leader竞选成功的回调方法      2.  revokeLeadership,表示由leader变为非leader的回调方法LeaderRetrievalListener接口,在监听leader恢复中使用,代表的是一个监听者,其实现类有ResourceManagerLeaderListener、JobManagerLeaderListener等,该接口包含一个重要的方法:      1. notifyLeaderAddress,leader发生变化时回调方法
以JobMaster与TaskExecutor之间的交互为例,JobMaster需要通过LeaderElectionService完成leader的选举,而TaskExecutor则需要监听jobMaster leader的变化,通过LeaderRetrievalService完成。

JobMaster选举JobManagerRunner负责JobMaster的选举与启动,内部包含了LeaderElectionService对象属性leaderElectionService,也就是ZooKeeperLeaderElectionService的对象,在调用JobManagerRunner的start方法时会调用leaderElectionService的start方法,传入参数this也就是当前的JobManagerRunner对象,
public void start() throws Exception {    try {      leaderElectionService.start(this);    } catch (Exception e) {      log.error("Could not start the JobManager because the leader election service did not start.", e);      throw new Exception("Could not start the leader election service.", e);    }  }
在ZooKeeperLeaderElectionService 的start方法里面
public void start(LeaderContender contender) throws Exception {    Preconditions.checkNotNull(contender, "Contender must not be null.");    Preconditions.checkState(leaderContender == null, "Contender was already set.");
   LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
   synchronized (lock) {
     client.getUnhandledErrorListenable().addListener(this);
     leaderContender = contender;
     leaderLatch.addListener(this);      leaderLatch.start();
     cache.getListenable().addListener(this);      cache.start();
     client.getConnectionStateListenable().addListener(listener);
     running = true;    }  }

leaderLatch 表示的是CuratorFramework里面参与leader选举的LeaderLatch对象,添加了一个this的listener, 也就是LeaderLatchListener,选举成功会调用isLeader方法,由leader变为非leader调用notLeader方法;并且还要通过NodeCache方式添加了监控当前节点变化的listener,也就是NodeCacheListener,当监听的节点发生变化则调用nodeChanged方法。若参与者竞选leader成功就会调用isLeader方法
public void isLeader() {    synchronized (lock) {      if (running) {        issuedLeaderSessionID = UUID.randomUUID();        confirmedLeaderSessionID = null;
       if (LOG.isDebugEnabled()) {          LOG.debug(            "Grant leadership to contender {} with session ID {}.",            leaderContender.getAddress(),            issuedLeaderSessionID);        }
       leaderContender.grantLeadership(issuedLeaderSessionID);      } else {        LOG.debug("Ignoring the grant leadership notification since the service has " +          "already been stopped.");      }    }  }

在里面会调用leaderContender的grantLeadership方法,也就是JobManagerRunner的grantLeadership,就会执行JobMaster的启动并且执行任务的调度。
在检测到选择leader所用的path发生变化就会调用nodeChanged方法,在nodeChanged方法里面判断当前角色是leader就将leader信息写入到zookeeper路径里面去。

TaskExecutor监听在TaskExecutor中包含了一个JobLeaderService的对象属性jobLeaderService,在jobLeaderService中维护了所有的jobmaster leader,并且监听它,监听正是通过ZooKeeperLeaderRetrievalService完成。在申请slot时会调用TaskExecutor的requestSlot方法,在该方法里面调用jobLeaderService.add方法,
public void addJob(final JobID jobId, final String defaultTargetAddress) throws Exception {    Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");
   LOG.info("Add job {} for job leader monitoring.", jobId);
   final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(      jobId,      defaultTargetAddress);
   JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
   final Tuple2<LeaderRetrievalService, JobManagerLeaderListener> oldEntry = jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener));
   if (oldEntry != null) {      oldEntry.f0.stop();      oldEntry.f1.stop();    }
   leaderRetrievalService.start(jobManagerLeaderListener);  }
在该方法里面开始对jobmaster leader的监听,创建一个LeaderRetrievalService对象,并且调用其start方法,传入参数是JobManagerLeaderListener对象,也就是LeaderRetrievalListener对象,在ZooKeeperLeaderRetrievalService的start方法里面,通过NodeCache方式添加了监控当前节点变化的listener,也就是NodeCacheListener,当监听的节点发生变化则调用nodeChanged方法,而在该方法里面会调用LeaderRetrievalListener的notifyLeaderAddress方法,也就是JobManagerLeaderListener的notifyLeaderAddress方法,会重新与新的leader建立连接。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net