伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

spark改七行源码实现高效处理kafka数据积压

来源:本站原创 浏览:91次 时间:2022-07-21
spark改七行源码实现高效处理kafka数据积压

浪尖 浪尖聊大数据

  1. 劳力士

spark streaming消费kafka,大家都知道有两种方式,也是面试考基本功常问的:

a.基于receiver的机制。这个是spark streaming最基本的方式,spark streaming的receiver会定时生成block,默认是200ms,然后每个批次生成blockrdd,分区数就是block数。架构如下:

b.direct API。这种api就是spark streaming会每个批次生成一个kafkardd,然后kafkardd的分区数,由spark streaming消费的kafkatopic分区数决定。过程如下:

kafkardd与消费的kafka分区数的关系如下:

2.常见积压问题


kafka的producer生产数据到kafka,正常情况下,企业中应该是轮询或者随机,以保证kafka分区之间数据是均衡的。

在这个前提之下,一般情况下,假如针对你的数据量,kafka分区数设计合理。实时任务,如spark streaming或者flink,有没有长时间的停掉,那么一般不会有有积压。

消息积压的场景:

a.任务挂掉。比如,周五任务挂了,有没有写自动拉起脚本,周一早上才处理。那么spark streaming消费的数据相当于滞后两天。这个确实新手会遇到。

周末不加班,估计会被骂。

b.kafka分区数设少了。其实,kafka单分区生产消息的速度qps还是很高的,但是消费者由于业务逻辑复杂度的不同,会有不同的时间消耗,就会出现消费滞后的情况。

c.kafka消息的key不均匀,导致分区间数据不均衡。kafka生产消息支持指定key,用key携带写信息,但是key要均匀,否则会出现kafka的分区间数据不均衡。

上面三种积压情况,企业中很常见,那么如何处理数据积压呢?

一般解决办法,针对性的有以下几种:

a.任务挂掉导致的消费滞后。
任务启动从最新的消费,历史数据采用离线修补。
最重要的是故障拉起脚本要有,还要就是实时框架异常处理能力要强,避免数据不规范导致的不能拉起。

b.任务挂掉导致的消费滞后。
任务启动从上次提交处消费处理,但是要增加任务的处理能力,比如增加资源,让任务能尽可能的赶上消费最新数据。

c.kafka分区少了。
假设数据量大,直接增加kafka分区是根本,但是也可以对kafkardd进行repartition,增加一次shuffle。

d.个别分区不均衡。
可以生产者处可以给key加随机后缀,使其均衡。也可以对kafkardd进行repartition。

3.浪尖的骚操作


其实,以上都不是大家想要的,因为spark streaming生产的kafkardd的分区数,完全可以是大于kakfa分区数的。

其实,经常阅读源码或者星球的看过浪尖的源码视频的朋友应该了解,rdd的分区数,是由rdd的getPartitions函数决定。比如kafkardd的getPartitions方法实现如下:

  override def getPartitions: Array[Partition] = {    offsetRanges.zipWithIndex.map { case (o, i) =>        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)    }.toArray  }

offsetRanges其实就是一个数组:

    val offsetRanges: Array[OffsetRange],

OffsetRange存储一个kafka分区元数据及其offset范围,然后进行map操作,转化为KafkaRDDPartition。实际上,我们可以在这里下手,将map改为flatmap,然后对offsetrange的范围进行拆分,但是这个会引发一个问题,浪尖在这里就不赘述了,你可以测测。

其实,我们可以在offsetRange生成的时候做下转换。位置是DirectKafkaInputDstream的compute方法。具体实现:
首先,浪尖实现中增加了三个配置,分别是:

是否开启自动重分区分区sparkConf.set("enable.auto.repartition","true")避免不必要的重分区操作,增加个阈值,只有该批次要消费的kafka的分区内数据大于该阈值才进行拆分sparkConf.set("per.partition.offsetrange.threshold","300")拆分后,每个kafkardd 的分区数据量。sparkConf.set("per.partition.after.partition.size","100")

然后,在DirectKafkaInputDstream里获取着三个配置,方法如下:

val repartitionStep = _ssc.conf.getInt("per.partition.offsetrange.size",1000)val repartitionThreshold = _ssc.conf.getLong("per.partition.offsetrange.threshold",1000)val enableRepartition = _ssc.conf.getBoolean("enable.auto.repartition",false)对offsetRanges生成的过程进行改造,只需要增加7行源码即可。val offsetRanges = untilOffsets.flatMap{ case (tp, uo) =>  val fo = currentOffsets(tp)  val delta = uo -fo  if(enableRepartition&&(repartitionThreshold < delta)){    val offsets = fo to uo by repartitionStep    offsets.map(each =>{      val tmpOffset = each + repartitionStep      OffsetRange(tp.topic, tp.partition, each, Math.min(tmpOffset,uo))    }).toList  }else{    Array(OffsetRange(tp.topic, tp.partition, fo, uo))  }}

测试的主函数如下:

import bigdata.spark.config.Configimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.{SparkConf, TaskContext}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}/*1. 直接消费新数据,数据离线修补。2. repartition(10---->100),给足够多的资源,以便任务逐渐消除滞后的数据。3. directDstream api 生成的是kafkardd,该rdd与kafka分区一一对应。 */object kafka010Repartition {   def main(args: Array[String]) {      //    创建一个批处理时间是2s的context 要增加环境变量      val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")     sparkConf.set("enable.auto.repartition","true")     sparkConf.set("per.partition.offsetrange.threshold","300")     sparkConf.set("per.partition.offsetrange.step","100")     val ssc = new StreamingContext(sparkConf, Seconds(5))      //    使用broker和topic创建DirectStream      val topicsSet = "test1".split(",").toSet      val kafkaParams = Map[String, Object]("bootstrap.servers" -> Config.kafkaHost,        "key.deserializer"->classOf[StringDeserializer],        "value.deserializer"-> classOf[StringDeserializer],        "group.id"->"test1",        "auto.offset.reset" -> "earliest",        "enable.auto.commit"->(false: java.lang.Boolean))      val messages = KafkaUtils.createDirectStream[String, String](        ssc,        LocationStrategies.PreferConsistent,        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))      messages.transform(rdd=>{        println("partition.size : "+rdd.getNumPartitions)        rdd      }).foreachRDD(rdd=>{//        rdd.foreachPartition(each=>println(111))        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges        offsetRanges.foreach(o=>{          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")        })      })     ssc.start()     ssc.awaitTermination()    }}

结果如下:

partition.size : 67test1 0 447 547test1 0 547 647test1 0 647 747test1 0 747 847test1 0 847 947test1 0 947 1047test1 0 1047 1147test1 0 1147 1247test1 0 1247 1347test1 0 1347 1447test1 0 1447 1547test1 0 1547 1647test1 0 1647 1747test1 0 1747 1847test1 0 1847 1947test1 0 1947 2047test1 0 2047 2147test1 0 2147 2247test1 0 2247 2347test1 0 2347 2447test1 0 2447 2547test1 0 2547 2647test1 0 2647 2747test1 0 2747 2847test1 0 2847 2947test1 0 2947 3047test1 0 3047 3147test1 0 3147 3247test1 0 3247 3347test1 0 3347 3447test1 0 3447 3547test1 0 3547 3647test1 0 3647 3747test1 0 3747 3847test1 0 3847 3947test1 0 3947 4047test1 0 4047 4147test1 0 4147 4247test1 0 4247 4347test1 0 4347 4447test1 0 4447 4547test1 0 4547 4647test1 0 4647 4747test1 0 4747 4847test1 0 4847 4947test1 0 4947 5047test1 0 5047 5147test1 0 5147 5247test1 0 5247 5347test1 0 5347 5447test1 0 5447 5547test1 0 5547 5647test1 0 5647 5747test1 0 5747 5847test1 0 5847 5947test1 0 5947 6047test1 0 6047 6147test1 0 6147 6247test1 0 6247 6347test1 0 6347 6447test1 0 6447 6547test1 0 6547 6647test1 0 6647 6747test1 0 6747 6847test1 0 6847 6947test1 0 6947 7047test1 0 7047 7124

【完】

  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net