伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

重要|Spark driver端得到executor返回值的方法

来源:本站原创 浏览:97次 时间:2022-07-13
重要|Spark driver端得到executor返回值的方法

浪院长 浪尖聊大数据


有人说spark的代码不优雅,这个浪尖就忍不了了。实际上,说spark代码不优雅的主要是对scala不熟悉,spark代码我觉得还是很赞的,最值得阅读的大数据框架之一。
今天这篇文章不是为了争辩Spark 代码优雅与否,主要是讲一下理解了spark源码之后我们能使用的一些小技巧吧。
spark 使用的时候,总有些需求比较另类吧,比如有球友问过这样一个需求:

浪尖,我想要在driver端获取executor执行task返回的结果,比如task是个规则引擎,我想知道每条规则命中了几条数据,请问这个怎么做呢?

这个是不是很骚气,也很常见,按理说你输出之后,在mysql里跑条sql就行了,但是这个往往显的比较麻烦。而且有时候,在 driver可能还要用到这些数据呢?具体该怎么做呢?

大部分的想法估计是collect方法,那么用collect如何实现呢?大家自己可以考虑一下,我只能告诉你不简单,不如输出到数据库里,然后driver端写sql分析一下。

还有一种考虑就是使用自定义累加器。这样就可以在executor端将结果累加然后在driver端使用,不过具体实现也是很麻烦。大家也可以自己琢磨一下下~

那么,浪尖就给大家介绍一个比较常用也比较骚的操作吧。

其实,这种操作我们最先想到的应该是count函数,因为他就是将task的返回值返回到driver端,然后进行聚合的。我们可以从idea count函数点击进去,可以看到...

  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

也即是sparkcontext的runJob方法。
Utils.getIteratorSize _这个方法主要是计算每个iterator的元素个数,也即是每个分区的元素个数,返回值就是元素个数:

/**   * Counts the number of elements of an iterator using a while loop rather than calling   * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower   * in the current version of Scala.   */  def getIteratorSize[T](iterator: Iterator[T]): Long = {    var count = 0L    while (iterator.hasNext) {      count += 1L      iterator.next()    }    count  }

然后就是runJob返回的是一个数组,每个数组的元素就是我们task执行函数的返回值,然后调用sum就得到我们的统计值了。

那么我们完全可以借助这个思路实现我们开头的目标。浪尖在这里直接上案例了:

import org.apache.spark.{SparkConf, SparkContext, TaskContext}import org.elasticsearch.hadoop.cfg.ConfigurationOptionsobject es2sparkRunJob {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)    conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")    conf.set(ConfigurationOptions.ES_PORT, "9200")    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")    conf.set("es.write.rest.error.handlers", "ignoreConflict")    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")    val sc = new SparkContext(conf)    import org.elasticsearch.spark._    val rdd = sc.esJsonRDD("posts").repartition(10)    rdd.count()    val func = (itr : Iterator[(String,String)]) => {      var count = 0      itr.foreach(each=>{        count += 1      })      (TaskContext.getPartitionId(),count)    }    val res = sc.runJob(rdd,func)    res.foreach(println)    sc.stop()  }}

例子中driver端获取的就是每个task处理的数据量。
效率高,而且操作灵活高效~
是不是很骚气~~

  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net