伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink去重第一弹:MapState去重

来源:本站原创 浏览:82次 时间:2023-05-10

去重计算应该是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通常可以通过distinct的方式得到去重结果,而实时计算是一种增量、长期计算过程,我们在面对不同的场景,例如数据量的大小、计算结果精准度要求等可以使用不同的方案。此篇介绍如何通过编码方式实现精确去重,以一个实际场景为例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID、用户设备ID(idfa/imei/cookie)、点击时间。

实现步骤分析:
  1. 为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分

  2. 数据分组使用广告位ID+点击事件所属的小时

  3. 选择processFunction来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量

  4. 计算完成之后的数据清理,按照时间进度注册定时器清理

实现

广告数据

case class AdData(id:Int,devId:String,time:Long)

分组数据

case class AdKey(id:Int,time:Long)

主流程

  1. val env=StreamExecutionEnvironment.getExecutionEnvironment

  2.  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  3.    val kafkaConfig=new Properties()

  4.    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")

  5.    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")

  6.    val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)

  7.    val ds=env.addSource(consumer)

  8.      .map(x=>{

  9.        val s=x.split(",")

  10.        AdData(s(0).toInt,s(1),s(2).toLong)

  11.      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)) {

  12.      override def extractTimestamp(element: AdData): Long = element.time

  13.    })

  14.      .keyBy(x=>{

  15.        val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0,

  16.          Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds

  17.        AdKey(x.id,endTime)

  18.      })

指定时间时间属性,这里设置允许1min的延时,可根据实际情况调整;
时间的转换选择TimeWindow.getWindowStartWithOffset Flink在处理window中自带的方法,使用起来很方便,第一个参数 表示数据时间,第二个参数offset偏移量,默认为0,正常窗口划分都是整点方式,例如从0开始划分,这个offset就是相对于0的偏移量,第三个参数表示窗口大小,得到的结果是数据时间所属窗口的开始时间,这里加上了窗口大小,使用结束时间与广告位ID作为分组的Key。

去重逻辑
自定义Distinct1ProcessFunction 继承了KeyedProcessFunction, 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后在输出;
定义两个状态:MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,mapstate中value作为rocksdb中的value, rocksdb中value 大小是有上限的,这种方式可以减少rocksdb value的大小;另外一个ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。

  1. class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {

  2.  var devIdState: MapState[String, Int] = _

  3.  var devIdStateDesc: MapStateDescriptor[String, Int] = _


  4.  var countState: ValueState[Long] = _

  5.  var countStateDesc: ValueStateDescriptor[Long] = _


  6.  override def open(parameters: Configuration): Unit = {


  7.    devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))

  8.    devIdState = getRuntimeContext.getMapState(devIdStateDesc)


  9.    countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))

  10.    countState = getRuntimeContext.getState(countStateDesc)

  11.  }


  12.  override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {


  13.    val currW=ctx.timerService().currentWatermark()

  14.    if(ctx.getCurrentKey.time+1<=currW) {

  15.        println("late data:" + value)

  16.        return

  17.      }


  18.    val devId = value.devId

  19.    devIdState.get(devId) match {

  20.      case 1 => {

  21.        //表示已经存在

  22.      }

  23.      case _ => {

  24.        //表示不存在

  25.        devIdState.put(devId, 1)

  26.        val c = countState.value()

  27.        countState.update(c + 1)

  28.        //还需要注册一个定时器

  29.        ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)

  30.      }

  31.    }

  32.    println(countState.value())

  33.  }


  34.  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {

  35.    println(timestamp + " exec clean~~~")

  36.    println(countState.value())

  37.    devIdState.clear()

  38.    countState.clear()

  39.  }

  40. }

数据清理通过注册定时器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示当watermark大于该小时结束时间+1就会执行清理动作,调用onTimer方法。

在处理逻辑里面加了

val currW=ctx.timerService().currentWatermark()if(ctx.getCurrentKey.time+1<=currW){        println("late data:" + value)        return  }

主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果,做了一个类似window机制里面的一个延时判断,将延时的数据过滤掉,也可以使用OutputTag 单独处理。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net