章子怡遭粉丝倒戈,westone,庞蒂克solstice
运营日志 ETL 服务有着一个对照久的汗青。也许在 2013 年,网易游戏就创建了基于 Hadoop Streaming + Python 预处理/后处理的初版离线 ETL 框架。这套框架是安稳运行了多年。
在 2017 年的时候,跟着 Spark Streaming 的崭露头角,我们开发了基于 Spark Streaming 的第二个版本,相当于一个 POC,但因为微批调优难题且小文件多等问题没有上线应用。
时间来到 2018 年,其时 Flink 已经对照成熟,我们也决意将买卖迁移到 Flink 上,所以我们很天然地开发了基于 Flink DataStream 的第三版运营日志 ETL 服务。这里面对照特殊的一点便是,因为长久以来我们买卖方积累了很多 Python 的 ETL 剧本,然后新版最紧张的一点便是要支持这些 Python UDF 的无缝迁移。
运营日志 ETL 架构
接下来看下两个版本的架构比拟。
在早期 Hadoop Streaming 的版本里面,内容首先会被 dump 到 HDFS 上,然后 Hadoop Streaming 启动 Mapper 来读取内容并通过尺度输入的方式通报给 Python 剧本。Python 剧本里面会分为三个模块:首先预处理 UDF,这里平日会进行基于字符串的替代,一般用作规范化内容,好比有些外洋合作厂商的时间格式可能跟我们分歧,那么就能够在这里进行同一。预处理完的内容会进入通用的解析/转换模块,这里我们会根据运营日志的格式来解析内容,并进行通用转换,好比滤掉测试服内容。通用模块之后,最后还有一个后处理模块进行针对字段的转换,好比常见的汇率转换。之后内容会通过尺度输出返回给 Mapper,然后 Mapper 再将内容批量写到 Hive 目录中。
我们用 Flink 重构后,内容源就由 HDFS 改为直接对接 Kafka,而 IO 模块则用 Flink 的 Source/Sink Operator 来取代原本的 Mapper,然后中间通用模块能够直接重写为 Java,剩余的预处理和后处理则是我们必要支持 Python UDF 的处所。
Python UDF 实现
在具体实现上,我们在 Flink ProcessFunction 之上加入了 Runner 层,Runner 层负责跨语言的执行。手艺选型上是选了 Jython,而没有选择 Py4j,主要因为 Jython 能够直接在 JVM 里面去完成较量,不必要额外启动 Python 历程,这样开发和运维管理本钱都对照低。而 Jython 带来的限定,好比不支持 pandas 等基于 c 的库,这些对于我们的 Python UDF 来说都是可接管的。
整个调用链是,ProcessFunction 在 TaskManager 被调用时会在 open 函数耽误初始化 Runner,这是因为 Jython 是弗成序列化的。Runner 初始化时会负责资源准备,包含将依赖的模块加入 PYTHONPATH,然后根据配置反射调用 UDF 函数。
调用时,对于预处理 UDF Runner 会把字符串转化为 Jython 的 PyUnicode 类型,而对于后处理 UDF 则会把解析后的 Map 对象转为 Jython 的 PyDcitionary,分别作为两者的输入。UDF 能够调用其他模块进行较量,最终返回 PyObject,然后 Runner 再将其转换成 Java String 或者 Map,返回给 ProcessFunction 输出。
本文地址:http://www.wbwb.net/bianchengyuyan/214110.html 转载请注明出处!