伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink SQL自定义聚合函数

来源:本站原创 浏览:91次 时间:2023-05-12
基本使用

Flink Table/SQL Api中自带了一些常见的聚合函数,例如sum、min、max等,但是在实际开发中需要自定义符合业务需求的聚合函数,先从一个实际案例入手:设备随时上报状态,现在需要求出设备的当前最新状态。分析:设备上报状态会产生多条数据,现在只需要最新的状态数据即可,很明显这是多对一的聚合类型的操作,聚合逻辑是每次保留设备的最新状态与时间,下次设备上报数据时间与保留的数据时间进行比较,如果比其大则更新。实现代码如下:

  1. publicclassLatestTimeUdfextendsAggregateFunction<Integer,TimeAndStatus>{


  2. @OverridepublicTimeAndStatus createAccumulator(){

  3. returnnewTimeAndStatus();

  4. }


  5. publicvoid accumulate(TimeAndStatus acc,Integer status,Long time){

  6. if(time > acc.getTimes()){

  7.            acc.setStatus(status);

  8.            acc.setTimes(time);

  9. }

  10. }


  11. @OverridepublicInteger getValue(TimeAndStatus timeAndStatus){

  12. return timeAndStatus.getStatus();

  13. }

  14. }

在Flink Table/SQL Api中自定义聚合函数需要继承AggregateFunction<T,ACC>, 其中T表示自定义函数返回的结果类型,在这里返回的是Integer 表示状态标识,ACC表示聚合的中间结果类型,这个表示TimeAndStatus存放时间与状态数据,该函数有两个指定该类型的方法getAccumulatorType与getResultType返回的都是TypeInformation类型,如果我们的T或者ACC是复杂类型Flink不能自动抽取的则需要手动指定。其每个方法定义如下:

  • createAccumulator 表示创建一个中间结果数据,由于是以设备为维度那么对于每一个设备都会调用一次该方法;

  • accumulate 表示将流入的数据聚合到createAccumulator创建的中间结果数据中,第一个参数表示的是ACC类型的中间结果数据,其他的表示自定义函数的入参,该方法可以接受不同类型、个数的入参,也就是该方法可以被重载,Flink会自动根据类型提取找到合适的方法。在这里接受的是Integer类型的设备状态与long类型的时间戳,处理逻辑就是与中间结果数据时间进行比较,如果比其大则将流入的时间与设备状态更新到中间结果中。另外在做一点补充accumulate的调用是相同维度的调用,即acc每次都是该维度的中间结果数据,入参也是该维度的数据;

  • getValue 表示一次返回的结果,结果数据从acc中获取,这个方法在accumulate之后被调用。

对于自定义聚合函数来说至少需要createAccumulator、accumulate、getValue这三个方法,并且这三个方法是public 、not static的类型。接下来就可以直接注册然后使用:

 tabEnv.registerFunction("latestTimeUdf",newLatestTimeUdf())val dw=tabEnv.sqlQuery("""select latestTimeUdf(status,times) st,devId from tbl1 group by devId      """.stripMargin)
撤回定义

撤回机制对于Flink来说是一个很重要的特性,在Flink SQL中可撤回机制解密中详细分析了撤回的实现,其中retract是一个不可或缺的环节,其表示具体的回撤操作,对于自定义聚合函数,如果其接受到的是撤回流那么就必须实现该方法,看下其定义:

publicvoid retract(ACC accumulator,[user defined inputs])}

accumulator表示对应维度的中间结果数据,流入的数据表示需要撤回的数据,该方法表示将需要撤回的数据从中间结果中去除掉,Flink中默认实现了一些撤回的函数,例如SumWithRetractAggFunction:

def retract(acc:SumWithRetractAccumulator[T], value:Any):Unit={if(value !=null){      val v = value.asInstanceOf[T]      acc.f0 = numeric.minus(acc.f0, v)      acc.f1 -=1}}

表示求和类的撤回,将需要撤回的value从acc中减掉。
在AggregateFunction 还提供了另外两个函数merge与resetAccumulator,merge 用在session window group或者批处理中,需要做一个合并的过程,resetAccumulator 用在批处理中,表示对中间结果的重置。

在源码中的调用位置

由于是聚合类的操作,仍然以GroupAggProcessFunction 来分析,在这里会调用自定义函数,但是只能是在非窗口的聚合中,通过processElement方法看下其调用流程

  • 中间结果acc是存储在状态中的,如果得到的状态为空,那么就会调用createAccumulator 方法

var accumulators = state.value()var inputCnt = cntState.value()if(null== accumulators){if(!inputC.change){return}      firstRow =true      accumulators =function.createAccumulators()//初始化}else{      firstRow =false}

state是ValueState类型的,acc就存储在这里面,说明acc是容错并且具有一致性的。

  • 如果流入的数据是Insert类型就会调用accumulate方法,如果是Retract就调用retract方法,并且会调用getValue获取当前的结果数据

if(inputC.change){      inputCnt +=1// accumulate inputfunction.accumulate(accumulators, input)function.setAggregationResults(accumulators, newRow.row)//会调用getValue}else{      inputCnt -=1// retract inputfunction.retract(accumulators, input)function.setAggregationResults(accumulators, newRow.row)//会调用getValue}
总结

自定义聚合函数是一个增量聚合的过程,中间结果保存在状态中,能够保证其容错与一致性语义。用户自定义聚合函数继承AggregateFunction即可,至少实现createAccumulator 、accumulate 、getValue这三个方法,其他方法都是可选的。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net