伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink SQL中可撤回机制解密

来源:本站原创 浏览:94次 时间:2023-05-12
场景案例

先从一个实际业务场景理解Flink SQL中的撤回机制:设备状态上线/下线数量统计,上游采集设备状态发送到Kafka中,最开始是一个上线状态,此时统计到上线数量+1,过了一段时间该设备下线了,收到的下线的状态,那么此时应该是上线数量-1,下线数量+1,现在需要实现这样一个需求,看一下在Flink SQL里面如何实现

  1. val env=StreamExecutionEnvironment.getExecutionEnvironment

  2.    val tabEnv=TableEnvironment.getTableEnvironment(env)

  3.    tabEnv.registerFunction("latestTimeUdf",newLatestTimeUdf())

  4.    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  5.    val kafkaConfig=newProperties()

  6.    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")

  7.    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")


  8.    val consumer=newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema,kafkaConfig)

  9.    val ds=env.addSource(consumer)

  10. .map(x=>{

  11.        val a=x.split(",")

  12. DevData(a(0),a(1).toInt,a(2).toLong)

  13. }).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[DevData](Time.milliseconds(1000)){

  14. overridedef extractTimestamp(element:DevData):Long= element.times

  15. })


  16.    tabEnv.registerDataStream("tbl1",ds,'devId,'status,'times,'rt.rowtime)

  17.    val dw=tabEnv.sqlQuery(

  18. """

  19.        select st,count(*) from (

  20.                select latestTimeUdf(status,times) st,devId from tbl1 group by devId

  21.                ) a group by st

  22.      """.stripMargin)

  23.    dw.writeToSink(newPaulRetractStreamTableSink)

  24.        env.execute()

自定义udf获取最新的设备状态

  1. publicclassLatestTimeUdfextendsAggregateFunction<Integer,TimeAndStatus>{


  2. @OverridepublicTimeAndStatus createAccumulator(){

  3. returnnewTimeAndStatus();

  4. }


  5. publicvoid accumulate(TimeAndStatus acc,Integer status,Long time){

  6. if(time > acc.getTimes()){

  7.            acc.setStatus(status);

  8.            acc.setTimes(time);

  9. }

  10. }


  11. @OverridepublicInteger getValue(TimeAndStatus timeAndStatus){

  12. return timeAndStatus.getStatus();

  13. }

  14. }

看一组测试数据:
输入数据 dev1,1,1574347472000 得到结果:

2>(true,1,1)

继续输入dev1,0,1574347474000 得到结果:

2>(false,1,1)//撤回2>(true,0,1)

第二条数据输入dev1新的状态数据,导致最后结果的变更。

源码分析

首先分析一下上述得到结果编码Ture或者False是如何确定的:
内部sql1: select latestTimeUdf(times,status) st,devId from tbl1 group by devId,这是一个聚合操作,目的是求出设备当前的状态;
对于外部sql2: select st,count(*) from (sql1) a group by st,同样是一个聚合操作,用于求出不同状态对应的设备数量
输入第一条数据dev1,1,1574347472000对于sql1 来说会产生(true,dev1,1) 的结果,sql2 接受到该结果生成(true,1,1) 就是是结果数据;接着输入第二条数据dev1,0,1574347474000 ,由于dev1的设备状态发生变更,sql1首先发送一条撤回数据(false,dev1,1),sql2收到该条数据判断是撤回数据会将之前的结果撤回产生一条(false,1,1)的数据,sql1同时还会产生一条(true,dev1,0) dev1当前的最新状态,sql2收到该条数据重新计算得到(true,0,1)

那么关于这一整套逻辑在Flink中是如何实现的?代码入口是:DataStreamGroupAggregate 聚合操作的物理执行计划,另外说明在table/SQL api里面数据流动的格式是CRow,包含两个字段:一个是Boolean类型,表示是否是撤回,另外一个是Row类型,真正的数据。

  1. 具体的执行逻辑是通过其translateToPlan来生成,通过AggregateUtil.createGroupAggregateFunction方法动态生成具体的Function,在生成Function 会判断上游消费的数据是否是可撤回来决定是否生成retract方法,比喻说sql1上游是消费kafka 非撤回流,所以在定义LatestTimeUdf 并没有定义retract,sql2 消费sql1的输出,sql1会产生可撤回消息,那么在其内部会生成retract方法,这部分是代码自动生成的

  2. 生成的Function被GroupAggProcessFunction包装,最主要的就是这里面processElement方法的逻辑

  • registerProcessingCleanupTimer注册状态的过期时间,过期配置通过StreamQueryConfig获取,后面定时触发会调用onTimer方法

val currentTime = ctx.timerService().currentProcessingTime()// register state-cleanup timerregisterProcessingCleanupTimer(ctx, currentTime)
  • state 存储中间结果状态、cntState存储流入对应key数量,获取当前中间结果accumulators,如果为空则,通过createAccumulators创建,获取当前对应key数量inputCnt,如果为空,则初始化为0

val input = inputC.row// get accumulators and input countervar accumulators = state.value()var inputCnt = cntState.value()if(null== accumulators){      firstRow =true      accumulators =function.createAccumulators()}else{      firstRow =false}if(null== inputCnt){      inputCnt =0L}
  • newRow/prevRow 分别对应新产生结果(撤回标识True)与之前的结果(撤回标识False),setForwardedFields 设置输出的key, setAggregationResults将之前的结果设置到prevRow中

// Set group keys value to the final outputfunction.setForwardedFields(input, newRow.row)function.setForwardedFields(input, prevRow.row)// Set previous aggregate result to the prevRowfunction.setAggregationResults(accumulators, prevRow.row)
  • 如果输入的是insert即True, 则inputCnt+1, 调用accumulate 将当前流入数据添加到中间结果accumulators中得到新的结果,调用setAggregationResults设置新的结果到newRow结果中, 如果输入的是retract即False, 则inputCnt-1,调用retract从accumulators撤回当前的输入得到新的结果,调用setAggregationResults设置新的结果到newRow结果中

// update aggregate result and set to the newRowif(inputC.change){      inputCnt +=1// accumulate inputfunction.accumulate(accumulators, input)function.setAggregationResults(accumulators, newRow.row)}else{      inputCnt -=1// retract inputfunction.retract(accumulators, input)function.setAggregationResults(accumulators, newRow.row)}
  • 如果当前的inputCnt!=0, 表明当前中间状态还有数据,那么就更新当前state/cntState, 接下来判断是否发送撤回数据,如果当前没有中间状态,那么就表示需要撤回之前的数据,然后清空状态

  1. if(inputCnt !=0){

  2. // we aggregated at least one record for this key


  3. // update the state

  4.      state.update(accumulators)//更新状态操作

  5.      cntState.update(inputCnt)


  6. // if this was not the first row

  7. if(!firstRow){

  8. if(prevRow.row.equals(newRow.row)&&!stateCleaningEnabled){

  9. //如果处理前后的结果是一致的并且也没有TTL那么就没有发送一条数据到下游,

  10. //这里前后一致不发送很好理解,假如说有ttl, 那也是需要针对下游需要做状态过期时间的更新

  11. return

  12. }else{

  13. // retract previous result

  14. if(generateRetraction){//是否生成撤回数据

  15. out.collect(prevRow)

  16. }

  17. }

  18. }

  19. // emit the new result

  20. out.collect(newRow)//发出新的结果


  21. }else{

  22. // we retracted the last record for this key

  23. // sent out a delete message

  24. out.collect(prevRow)

  25. // and clear all state

  26.      state.clear()//清空状态

  27.      cntState.clear()

  28. }

总结

总的来说撤回机制是需要状态、撤回操作的支持,状态是为了保存当前的数据,下次如果需要发生撤回,就将该数据发出去,撤回操作可以理解为function里面的retract方法,能够支持这个数据撤回的计算操作。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net