伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink 1.10 UDF 的一个小问题

来源:本站原创 浏览:54次 时间:2023-04-26

在使用 Flink 1.10 的 SQL 的时候,遇到个小问题:一个返回当前时间的函数返回的结果是启动的时间,并且保持不变。

比如下面这个UDF,获取当前时间的 时分秒(HH:mm:ss 格式)

import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.table.functions.ScalarFunction;import java.text.SimpleDateFormat;/** * Sysdate 返回当前时间的 HH:mm:ss 格式的字符串 */public class Systime extends ScalarFunction {    private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");    public Systime() {    }    public String eval() throws Exception {        // 当前时间的毫秒值,转为 HH:mm:ss        return sdf.format(System.currentTimeMillis());    }    public TypeInformation<?> getResultType(Class<?>[] signature) {        return Types.STRING;    }}

最开始用的时候,直接写了这么个UDF,还自我感觉很溜。。。

在测试的时候,也没有留意到返回的值都是相同的,在线上跑了几天才发现。。。

如sql :

INSERT INTO user_log_sinkSELECT user_id, systime(), item_idFROM user_log a;返回值如下:

全是一样的

在代码里面添加 日志信息,只有在启动的时候输出了一次日志信息(在客户端),在 taskmanager.log 里没有对应日志,说明确实没有执行

对比其他的UDF 除了没有参数,其他好像没有什么不一样,都是继承的 ScalarFunction 、也都在客户端注册。。

在其他UDF 中也添加日志信息,测试发现:

都是在注册的时候,调用了对应的构造方法,但是无参的 UDF,在任务初始化阶段,还调用了一次,而需要参数的UDF 并没有;数据进入的时候,无参的UDF 并没有调用 eval 方法,有参的UDF 就正常调用了。
直接debug 无参的UDF,从日志中发现除了在注册的时候 执行了 构造方法,还调用了有一次 eval 方法,所以在 eval 中添加断点:


于是就找到这个类:

package org.apache.flink.table.planner.codegen/**  * Evaluates constant expressions with code generator.   计算常量表达式  *  * @param allowChangeNullability If the reduced expr's nullability can be changed, e.g. a null  *                               literal is definitely nullable and the other literals are  *                               not null.  */class ExpressionReducer(    config: TableConfig,    allowChangeNullability: Boolean = false)  extends RexExecutor

从注释就可以看出,Flink 在计算常量表达式,而我的 systime() 就被认为是常量表达式了,在客户端执行一次,得到结果,之后的函数,直接使用对应常量,而不再进函数计算返回了。

随后的获取到的SQL输出结果:


看到这里,问题就很清楚了,一些常量的表达式,Flink 在客户端初始化的时候,直接执行一次,缓存了结果,之后就直接返回这个结果,而不是去执行表达式

解决也很简单,直接给 UDF 添加一个参数(注:必须是SQL的字段,如果是常量也会被Flink 优化)

public String eval(String input) throws Exception {        // 当前时间的毫秒值,转为 HH:mm:ss        return sdf.format(System.currentTimeMillis());    }

使用的SQL:

INSERT INTO user_log_sinkSELECT user_id, systime(), systime_param(user_id)FROM user_log a;

systime_param 并不会再初始化的时候执行

返回结果如下:

搞定

注:最近忙着Flink SQL 上线,等忙完了这段,应该会有不少素材了

  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net