伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink实战:全局TopN分析与实现

来源:本站原创 浏览:81次 时间:2023-05-12

在上一篇Flink实战: 窗口TopN分析与实现中实现了在一个窗口内的分组topN,但是在实际中也会遇到没有窗口期的topN,例如在一些实时大屏监控展示中,展示历史到现在所有的TopN数据,将这个称之为全局topN,仍然以计算区域维度销售额topN的商品为例,看一下全局TopN的实现方法。
先将需求分解为以下几步:

  • 按照区域areaId+商品gdsId分组,计算每个分组的累计销售额

  • 将得到的区域areaId+商品gdsId维度的销售额按照区域areaId分组,然后求得TopN的销售额商品,并且定时更新输出

与窗口TopN不同,全局TopN没有时间窗口的概念,也就没有时间的概念,因此使用ProcessingTime语义即可,并且也不能再使用Window算子来操作,但是在这个过程中需要完成数据累加操作与定时输出功能,选择ProcessFunction函数来完成,使用State保存中间结果数据,保证数据一致性语义,使用定时器来完成定时输出功能。

销售额统计

对数据流按照区域areaId+商品gdsId分组,不断累加销售额保存起来,然后输出到下游。

  1.    val env =StreamExecutionEnvironment.getExecutionEnvironment

  2.    env.setParallelism(1)

  3.    val kafkaConfig =newProperties();

  4.    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

  5.    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");


  6.    val consumer =newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema(), kafkaConfig)

  7.    val orderStream = env.addSource(consumer)

  8.               .map(x =>{

  9.                      val a = x.split(",")

  10.                      Order(a(0), a(1).toLong, a(2), a(3).toDouble, a(4))

  11.                    })


  12.    val salesStream=orderStream.keyBy(x =>{

  13.                  x.areaId +"_"+ x.gdsId

  14.             }).process(newKeyedProcessFunction[String,Order,GdsSales](){


  15.      var orderState:ValueState[Double]= _

  16.      var orderStateDesc:ValueStateDescriptor[Double]= _


  17.     override def open(parameters:Configuration):Unit={

  18.        orderStateDesc =newValueStateDescriptor[Double]("order-state",TypeInformation.of(classOf[Double]))

  19.        orderState = getRuntimeContext.getState(orderStateDesc)

  20. }


  21. override def processElement(value:Order, ctx:KeyedProcessFunction[String,Order,GdsSales]#Context,out:Collector[GdsSales]):Unit={


  22.        val currV = orderState.value()

  23.        if(currV ==null){

  24.             orderState.update(value.amount)

  25.       }else{

  26.          val newV = currV + value.amount

  27.          orderState.update(newV)

  28.        }

  29.       out.collect(GdsSales.of(value.areaId, value.gdsId, orderState.value(), value.orderTime))

  30. }

  31. })

使用keyBy按照areaId+gdsId来分组,然后使用KeyedProcessFunction来完成累加操作。在KeyedProcessFunction里面定义了一个ValueState来保存每个分组的销售额,processElement完成销售额累加操作,并且不断更新ValueState与collect输出。
说明:这里使用ValueState来完成累加过程显得比较繁琐,可以使用ReducingState来替代,这里只是为了表现出累加这个过程。


区域TopN计算

上一步得到的salesStream是一个按照区域areaId+商品gdsId维度的销售额,并且是不断更新输出到下游的,接下来就需要完成TopN的计算,在Flink实战: 窗口TopN分析与实现中分析到TopN的计算不需要保存所有的结果数据,使用红黑树来模拟类似优先级队列功能即可,但是与其不同在于:窗口TopN每次计算TopN是一个全量的窗口结果,而全局TopN其销售额是会不断变动的,因此需要做以下逻辑判断:

  1. 如果TreeSet[GdsSales]包含该商品的销售额数据,则需要更新该商品销售额,这个过程包含判断商品gdsId是否存在与移除该GdsSales对象功能,但是TreeSet不具备直接判断gdsId是否存在功能,那么可以使用一种额外的数据结构Map, key为商品gdsId, value为商品销售额数据GdsSales,该value对应TreeSet[GdsSales]中数据

  2. 如果TreeSet[GdsSales]包含该商品的销售额数据,当TreeSet里面的数据到达N, 就获取第一个节点数据(最小值)与当前需要插入的数据进行比较,如果比其大,则直接舍弃,如果比其小,那么就将TreeSet中第一个节点数据删除,插入新的数据
    实现代码如下:

  1. salesStream.keyBy(_.getAreaId)

  2. .process(newKeyedProcessFunction[String,GdsSales,Void]{

  3. var topState:ValueState[java.util.TreeSet[GdsSales]]= _

  4. var topStateDesc:ValueStateDescriptor[java.util.TreeSet[GdsSales]]= _


  5. var mappingState:MapState[String,GdsSales]= _

  6. var mappingStateDesc:MapStateDescriptor[String,GdsSales]= _

  7.        val interval:Long=60000

  8.        val N:Int=3

  9. override def open(parameters:Configuration):Unit={

  10.          topStateDesc =newValueStateDescriptor[java.util.TreeSet[GdsSales]]("top-state",TypeInformation.of(classOf[java.util.TreeSet[GdsSales]]))

  11.          topState = getRuntimeContext.getState(topStateDesc)


  12.          mappingStateDesc =newMapStateDescriptor[String,GdsSales]("mapping-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[GdsSales]))

  13.          mappingState = getRuntimeContext.getMapState(mappingStateDesc)

  14. }

  15. override def processElement(value:GdsSales, ctx:KeyedProcessFunction[String,GdsSales,Void]#Context,out:Collector[Void]):Unit={


  16.          val top = topState.value()

  17.          if(top ==null){

  18.            val topMap: util.TreeSet[GdsSales]=new util.TreeSet[GdsSales](newComparator[GdsSales]{

  19.          override def compare(o1:GdsSales, o2:GdsSales):Int=(o1.getAmount - o2.getAmount).toInt

  20. })

  21.            topMap.add(value)

  22.            topState.update(topMap)

  23.            mappingState.put(value.getGdsId, value)

  24.     }else{

  25.            mappingState.contains(value.getGdsId) match {

  26.          case true=>{//已经存在该商品的销售数据

  27.                val oldV = mappingState.get(value.getGdsId)

  28.                mappingState.put(value.getGdsId, value)

  29.                val values = topState.value()

  30.                values.remove(oldV)

  31.                values.add(value)//更新旧的商品销售数据

  32.                topState.update(values)

  33.           }

  34.      case false=>{//不存在该商品销售数据

  35.                if(top.size()>= N){//已经达到N 则判断更新

  36.                  val min = top.first()

  37.                    if(value.getAmount > min.getAmount){

  38.                    top.pollFirst()

  39.                    top.add(value)

  40.                    mappingState.put(value.getGdsId, value)

  41.                    topState.update(top)

  42.                       }

  43.              }else{//还未到达N则直接插入

  44.                     top.add(value)

  45.                     mappingState.put(value.getGdsId, value)

  46.                     topState.update(top)

  47. }

  48. }}}}

  49. })

在open中定义个两个state:ValueState与MapState, ValueState保存该区域下的TopN商品销售数据GdsSales,MapState保存了商品gdsId与商品销售数据GdsSale的对应关系。
在processElement中,首先会判断ValueState是否为空,如果为空则定义按照销售额比较升序排序的Comparator 的TreeSet,则走更新逻辑判断。


定时输出

到这里我们已经计算出了每个时刻的TopN数据,存储在ValueState[java.util.TreeSet[GdsSales]] 中,现在希望每隔1min将TopN的数据输出,可以使用在时间系统系列里面提供较为底层的直接获取到InternalTimeService来完成,由于ProcessFunction本身提供了定时调用功能,我们就按照在窗口实用触发器:ContinuousEventTimeTrigger中讲到的持续触发器的原理来实现,

var fireState:ValueState[Long]= _var fireStateDesc:ValueStateDescriptor[Long]= _//放在open方法中fireStateDesc =newValueStateDescriptor[Long]("fire-time",TypeInformation.of(classOf[Long]))fireState = getRuntimeContext.getState(fireStateDesc)

定义了一个ValueState,保存每一次触发的时间,不使用ReducingState是因为没有Window里面在使用SessionWindow的合并机制。

//放在processElement里面val currTime = ctx.timerService().currentProcessingTime()//1min输出一次if(fireState.value()==null){            val start = currTime -(currTime % interval)            val nextFireTimestamp = start + interval            ctx.timerService().registerProcessingTimeTimer(nextFireTimestamp)            fireState.update(nextFireTimestamp)}

对于每一个区域areaId(key)在processElement只需要注册一次即可。

override def onTimer(timestamp:Long, ctx:KeyedProcessFunction[String,GdsSales,Void]#OnTimerContext,out:Collector[Void]):Unit={          println(timestamp +"===")          topState.value().foreach(x =>{            println(x)          })          val fireTimestamp = fireState.value()          if(fireTimestamp !=null&&(fireTimestamp == timestamp)){                  fireState.clear()                  fireState.update(timestamp + interval)                 ctx.timerService().registerProcessingTimeTimer(timestamp + interval)         }}

onTimer定时输出,并且注册下一个触发的时间点。

测试

准备数据

//2019-11-16 21:25:10orderId01,1573874530000,gdsId03,300,beijingorderId02,1573874540000,gdsId01,100,beijingorderId02,1573874540000,gdsId04,200,beijingorderId02,1573874540000,gdsId02,500,beijingorderId01,1573874530000,gdsId01,300,beijing

等到2019-11-16 21:26:00得到结果

1573910760000===GdsSales{areaId='beijing', gdsId='gdsId03', amount=300.0}GdsSales{areaId='beijing', gdsId='gdsId01', amount=400.0}GdsSales{areaId='beijing', gdsId='gdsId02', amount=500.0}

接着在生产一条数据

orderId02,1573874540000,gdsId04,500,beijing

等到2019-11-16 21:27:00得到结果

1573910820000===GdsSales{areaId='beijing', gdsId='gdsId01', amount=400.0}GdsSales{areaId='beijing', gdsId='gdsId02', amount=500.0}GdsSales{areaId='beijing', gdsId='gdsId04', amount=700.0}

至此完成全局topN的全部实现。

总结

全局TopN要求状态保存所有的聚合数据,对于key比较多的情况,不管是销售额数据还是定时器数据都会占用比较多的内存,可以选择RocksDb作为StateBackend。



  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net