伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink SQL 中TableFunction使用分析

来源:本站原创 浏览:101次 时间:2023-05-11
基本使用

表函数TableFunction相对标量函数ScalarFunction一对一,它是一个一对多的情况,通常使用TableFunction来完成列转行的一个操作。先通过一个实际案例了解其用法:终端设备上报数据,数据类型包含温度、耗电量等,上报方式是以多条方式上报,例如:

现在希望得到如下数据格式:

这是一个典型的列转行或者一行转多行的场景,需要将data列进行拆分成为多行多列,先看下代码实现:

  1. public class MyUDTF extends TableFunction<Row>{


  2. public void eval(String s){

  3. JSONArray jsonArray =JSONArray.parseArray(s);

  4. for(int i =0; i < jsonArray.size(); i++){

  5. JSONObject jsonObject = jsonArray.getJSONObject(i);

  6. String type = jsonObject.getString("type");

  7. String value = jsonObject.getString("value");

  8.            collector.collect(Row.of(type, value));

  9. }

  10. }


  11. @Overridepublic TypeInformation<Row> getResultType(){

  12. returnTypes.ROW(Types.STRING(),Types.STRING());

  13. }

  14. }

在MyUDTF中继承了TableFunction<T>, 所有的自定义表函数都必须继承该抽象类,其中T表示返回的数据类型,通常如果是原子类型则直接指定例如String, 如果是复合类型通常会选择Row, FlinkSQL 通过类型提取可以自动识别返回的类型,如果识别不了需要重载其getResultType方法,指定其返回的TypeInformation,重点看下eval 方法定义:

  • eval 方法, 处理数据的方法,必须声明为public/not static,并且该方法可以重载,会自动根据不同的输入参数选择对应的eval, 在eval方法里面可以使用collector对象将数据发送出去,该对象是从TableFunction继承过来的。

调用如下:

  1. def main(args:Array[String]):Unit={

  2.    val env =StreamExecutionEnvironment.getExecutionEnvironment

  3.    val tabEnv =TableEnvironment.getTableEnvironment(env)

  4.    tabEnv.registerFunction("udtf",newMyUDTF)

  5.    val kafkaConfig =newProperties();

  6.    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

  7.    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

  8.    val consumer =newFlinkKafkaConsumer[String]("topic1",newSimpleStringSchema(), kafkaConfig);


  9.    val ds:DataStream[(String, java.lang.Long,String)]= env.addSource(consumer)

  10. .map(x =>{

  11.        val obj = JSON.parseObject(x, classOf[RawData])

  12. Tuple3.apply(obj.devId, obj.time, obj.data)

  13. })


  14.    tabEnv.registerDataStream("tbl1", ds,'devId, 'time,'data)

  15.    val rsTab = tabEnv.sqlQuery("select devId,`time`,`type`,`value` from tbl1 , LATERAL TABLE(udtf(data)) as t(`type`,`value`) ")

  16.      .writeToSink(new PaulRetractStreamTableSink)

  17.    env.execute()

  18.  }

测试数据:

{"devid":"dev01","time":1574944573000,"data":[{"type":"temperature","value":"10"},{"type":"battery","value":"1"}]}

得到结果:

3>(true,dev01,1574944573000,temperature,10)3>(true,dev01,1574944573000,battery,1)

至此拿到了符合要求的数据。在Flink SQL中使用TableFunction需要搭配LATERAL TABLE一起使用,将其认为是一张虚拟的表,整个过程就是一个Join with Table Function过程,左表(tbl1) 会join 右表(t1) 的每一条记录。但是也存在另外一种情况右表(t1)没有输出但是也需要左表输出那么可以使用LEFT JOIN LATERAL TABLE,用法如下:

SELECT users, tagFROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

对于右表没有输出会自动补上null。

源码分析

在介绍源码分析之前先安利一个小技巧,很多时候比较难找到Flink SQL解析之后的任务具体执行过程,这个时候可以通过先打印其执行计划,使用方式:

println(tabEnv.explain(rsTab))

就可以得到其抽象语法树、逻辑执行计划、物理执行计划:

  1. ==AbstractSyntaxTree==

  2. LogicalProject(devId=[$0], time=[$1], type=[$3], value=[$4])

  3. LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])

  4. LogicalTableScan(table=[[tbl1]])

  5. LogicalTableFunctionScan(invocation=[udtf($cor0.data)], rowType=[RecordType(VARCHAR(65536) f0, VARCHAR(65536) f1)], elementType=[class[Ljava.lang.Object;])


  6. ==OptimizedLogicalPlan==

  7. DataStreamCalc(select=[devId, time, f0 AS type, f1 AS value])

  8. DataStreamCorrelate(invocation=[udtf($cor0.data)], correlate=[table(udtf($cor0.data))],select=[devId, time, data, f0, f1], rowType=[RecordType(VARCHAR(65536) devId, BIGINT time, VARCHAR(65536) data, VARCHAR(65536) f0, VARCHAR(65536) f1)], joinType=[INNER])

  9. DataStreamScan(table=[[tbl1]])


  10. ==PhysicalExecutionPlan==

  11. Stage1:DataSource

  12.    content : collect elements withCollectionInputFormat


  13. Stage2:Operator

  14.        content :Map

  15.        ship_strategy : FORWARD


  16. Stage3:Operator

  17.            content :from:(devId, time, data)

  18.            ship_strategy : FORWARD

  19. ...........

可以从逻辑执行计划入手,Table Function Join 对应DataStreamCorrelate,重点在于其translateToPlan方法:

  • generateFunction 调用,生成一个ProcessFunction函数,内部封装用户自定义的TableFunction, 在该ProcessFunction里面会调用TableFunction的eval方法,由于该Function是动态生成的,可以通过debug方法查看,这里感受一下在processElement里面调用eval的代码:

function_udf$MyUDTF$086f769e79e46e52752c8500480e4b32.eval(isNull$21 ?null:(java.lang.String) result$20);
  • generateCollector调用,生成的是一个TableFunctionCollector 类型的collector,这部分也是动态生成的

  • CRowCorrelateProce***unner 也是一个ProcessFunction, 内部包含了generateFunction生成的function 与generateCollector生成的collector, 在其初始化open的时候会将该collector赋给function

接下来从CRowCorrelateProce***unner的processElement方法看整个调用流程:

    cRowWrapper.out=out    cRowWrapper.setChange(in.change)    collector.setCollector(cRowWrapper)    collector.setInput(in.row)//重点input 信息设置到动态生成的collector    collector.reset()
function.processElement(in.row,      ctx.asInstanceOf[ProcessFunction[Row,Row]#Context],      cRowWrapper)

这步调用动态生成的function, 在其processElement里面调用eval方法,eval 会调用动态生成的collector,这个步骤就可以理解为是一个join过程, 最终输出组合数据。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net