导读:大家好,很荣幸跟大家分享 Apache Beam 架构原理及应用实践。讲这门课之前大家可以想想,从进入 IT 行业以来,不停的搬运数据,不管职务为前端,还是后台服务器端开发。随着这两年科技的发展,各种数据库,数据源,应运而生,大数据组件,框架也是千变万化,从 Hadoop 到现在的 Spark、Flink,数据库从先前的 oracle、MySQL 到现在的 NOSQL,不断延伸。那么有没有统一的框架,统一的数据源搬砖工具呢?
带着这样的疑问,开始我们今天的分享,首先是内容概要:
Apache Beam 是什么?
Apache Beam 的优势
Apache Beam 的架构设计
Apache Beam 的核心组件刨析
AloT PB 级实时数据,怎么构建自己的“AI 微服务”?
▌Apache Beam 是什么?
1. Apache Beam 的前世今生
大数据起源于 Google 2003年发布的三篇论文 GoogleFS、MapReduce、BigTable 史称三驾马车,可惜 Google 在发布论文后并没有公布其源码,但是 Apache 开源社区蓬勃发展,先后出现了 Hadoop,Spark,Apache Flink 等产品,而 Google 内部则使用着闭源的 BigTable、Spanner、Millwheel。这次 Google 没有发一篇论文后便销声匿迹,2016年2月 Google 宣布 Google DataFlow 贡献给 Apache 基金会孵化,成为 Apache 的一个顶级开源项目。然后就出现了 Apache Beam,这次不它不是发论文发出来的,而是谷歌开源出来的。2017年5月17日 发布了第一个稳定版本2.0。
2. Apache Beam 的定义
Apache Beam 的定义如上图,其定位是做一个统一前后端的模型。其中,管道处理和逻辑处理是自己的,数据源和执行引擎则来自第三方。那么,Apache Beam 有哪些好处呢?
▌Apache Beam 的优势
1. 统一性
① 统一数据源,现在已经接入的 java 语言的数据源有34种,正在接入的有7种。Python 的13种。这是部分的数据源 logo,还有一些未写上的,以及正在集成的数据源。基本涵盖了整个 IT 界每个时代的数据源,数据库。
② 统一编程模型,Beam 统一了流和批,抽象出统一的 API 接口。
③ 统一大数据引擎,现在支持性最好的是 flink,spark,dataflow 还有其它的大数据引擎接入进来。
2. 可移植性
Beam 的 jar 包程序可以跨平台运行,包括 Flink、Spark 等。
3. 可扩展性
很多时候,随着业务需求的不断变化,用户的需要也随之变化,原来 Apache Beam 的功能可能需要进行扩展。程序员就会根据不同的需求扩展出新的技术需求,例如我想用 spark 新特性,能不能重写一下 sparkrunner 换个版本。我想重写一下 kafkaIO 可以吗?对于数据的编码,我可以自定义吗?最后干脆我感觉 Pulsar 技术不错,我想自己写个 SDKIO,集成进去可以不?答案都是可以的。Apache Beam 是具有可扩展性的,零部件都可以重塑。
4. 支持批处理和流处理
如果在 AIoT 行业,开发过程中,我们可能经常碰到两种数据:
摄像头等传感器的实时报警信息
不同数据库的数据,进行一起处理
Beam 对这两种数据是同时支持的。
5. 支持多语言开发
此外 Beam 支持 java,Python,go,Scala 语言,大家可以利用自己擅长的语言开发自己的 Beam 程序。
6. DAG 高度抽象
DAG,中文名“有向无环图”。“有向”指的是有方向,准确的说应该是同一个方向,“无环”则指够不成闭环。如果做一些去重、统计、分组等,开发人员不用再做 Map Reduce ,Beam 已经封装提供了相应的高级操作。
▌Apache Beam 的架构设计
我们接下来看一下 Beam 架构是怎样的:
1. Apache Beam 的总体架构
Apache Beam 的总体架构是这样的,上面有各种语言,编写了不同的 SDKs,Beam 通过连接这些 SDK 的数据源进行管道的逻辑操作,最后发布到大数据引擎上去执行。需要注意的是,Local 虽然是一个 runner 但是不能用于生产上,它是用于调试/开发使用的。
2. Apache Beam 的部署流程图
让我们一起看下 Apache Beam 总体的部署流程。首先我们去构建这个 Beam jobAPI .jar 通过 job 服务器以及设置大数据执行平台,最后提交 flink 或 spark 的任务集群去执行任务。
▌Apache Beam 的核心组件刨析
1. SDks+Pipeline+Runners (前后端分离)
如上图,前端是不同语言的 SDKs,读取数据写入管道, 最后用这些大数据引擎去运行。可以发现完整的 beam 程序由 SDks+Pipeline+Runners 构成的。
2. 什么是 SDK?
什么是 SDK,就是一个编写 beam 管道构成的一部分,一个客户端或一个类库组件也可以,最后提交到大数据运行平台上。
3. Beam 版本和 Kafka-clients 依赖情况表
我们以 kafka 为例,看一下 Kafka-client 对版本的依赖情况,从图中可以看出 beam 2.6.0 版本的 api 改变基本是稳定的。当然,现在用的比较多的2.4、2.5版本。吐个槽,2.6版本之前的兼容性问题,上个版本还有这个类或方法,下一个版本就没有了,兼容性不是很好。
4. SDK beam-sdks-java-io-kafka 读取源码剖析
① 指定 KafkaIO 的模型,从源码中不难看出这个地方的 KafkaIO<K,V> 类型是 Long 和 String 类型,也可以换成其他类型。
pipeline.apply(KafkaIO.<Long, String>read() pipeline.apply(KafkaIO.<Long, String>read()
② 设置 Kafka 集群的集群地址。
.withBootstrapServers("broker_1:9092,broker_2:9092")
③ 设置 Kafka 的主题类型,源码中使用了单个主题类型,如果是多个主题类型则用 withTopics(List) 方法进行设置。设置情况基本跟 Kafka 原生是一样的。
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
④ 设置序列化类型。Apache Beam KafkaIO 在序列化的时候做了很大的简化,例如原生 Kafka 可能要通过 Properties 类去设置 ,还要加上很长一段 jar 包的名字。
Beam KafkaIO 的写法:
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
原生 Kafka 的设置:
Properties props = new Properties();
props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
⑤ 设置 Kafka 的消费者属性,这个地方还可以设置其他的属性。源码中是针对消费分组进行设置。
.updateConsumerProperties(ImmutableMap.of("group.id", my_beam_app_1"))
⑥ 设置 Kafka 吞吐量的时间戳,可以是默认的,也可以自定义。
.withLogAppendTime()
⑦ 相当于 Kafka 中 "isolation.level" , "read_committed",指定 KafkaConsumer 只应读取非事务性消息,或从其输入主题中提交事务性消息。流处理应用程序通常在多个读取处理写入阶段处理其数据,每个阶段使用前一阶段的输出作为其输入。通过指定 read_committed 模式,我们可以在所有阶段完成一次处理。针对 "Exactly-once" 语义,支持 Kafka 0.11 版本。
.withReadCommitted()
⑧ 设置 Kafka 是否自动提交属性 "AUTO_COMMIT",默认为自动提交,使用 Beam 的方法来设置。
set CommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize)
.commitOffsetsInFinalize()
⑨ 设置是否返回 Kafka 的其他数据,例如 offset 信息和分区信息,不用可以去掉。
.withoutMetadata() // PCollection<KV<Long, String>>
⑩ 设置只返回 values 值,不用返回 key。例如 PCollection,而不是 PCollection<Long,String>。
.apply(Values.<String>create()) // PCollection<String>
在写入 Kafka 时完全一次性地提供语义,这使得应用程序能够在 Beam 管道中的一次性语义之上提供端到端的一次性保证。它确保写入接收器的记录仅在 Kafka 上提交一次,即使在管道执行期间重试某些处理也是如此。重试通常在应用程序重新启动时发生(如在故障恢复中)或者在重新分配任务时(如在自动缩放事件中)。Flink runner 通常为流水线的结果提供精确一次的语义,但不提供变换中用户代码的副作用。如果诸如 Kafka 接收器之类的转换写入外部系统,则这些写入可能会多次发生。
在此处启用 EOS 时,接收器转换将兼容的 Beam Runners 中的检查点语义与 Kafka 中的事务联系起来,以确保只写入一次记录。由于实现依赖于 runners checkpoint 语义,因此并非所有 runners 都兼容。Beam 中 FlinkRunner 针对 Kafka 0.11+ 版本才支持,然而 Dataflow runner 和 Spark runner 如果操作 kafkaIO 是完全支持的。
关于性能的注意事项:
"Exactly-once" 在接收初始消息的时候,除了将原来的数据进行格式化转换外,还经历了 2 个序列化 - 反序列化循环。根据序列化的数量和成本,CPU 可能会涨的很明显。通过写入二进制格式数据(即在写入 Kafka 接收器之前将数据序列化为二进制数据)可以降低 CPU 成本。
5. Pipeline
您输入的数据存储在哪里?
首先要确定你要构造几条数据源,在 Beam 可以构建多条,构建之前可以选择自己的 SDK 的 IO。
您的数据类型是什么样的?
Beam 提供的是键值对的数据类型,你的数据可能是日志文本,格式化设备事件,数据库的行,所以在 PCollection 就应该确定数据集的类型。
您想怎么去处理数据?
对数据进行转换,过滤处理,窗口计算,SQL 处理等。在管道中提供了通用的 ParDo 转换类,算子计算以及 BeamSQL 等操作。
您打算把数据最后输出到哪里去?
在管道末尾进行 Write 操作,把数据最后写入您自己想存放或最后流向的地方。
重要的是要理解变换不消耗 PCollections;相反,他们会考虑 a 的每个元素 PCollection 并创建一个新 PCollection 的输出。这样,您可以对不同的元素执行不同的操作 PCollection。这里是出现了两条管,例如输入 AR,AI,VAR,BT,BMP。
例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。
一种是收费的拓蓝公司出品叫 Talend Big Data Studio,有没有免费的呢?
有的,它叫 kettle-beam。例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。大家可以去 github 去看一下插件相应的安装及使用说明。从图中可以看出大部分 beam 的输入输出现在都是支持的。
https://github.com/mattcasters/kettle-beam
6. Runners
我们在看一下运行平台,这是运行平台支持度的截图。例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。
Runners 在 Beam Model 模型中有4个支持的维度:
What,如何对数据进行计算?例如,机器学习中训练学习模型可以用 Sum 或者 Join 等。在 Beam SDK 中由 Pipeline 中的操作符指定。
Where,数据在什么范围中计算?例如,基于 Process-Time 的时间窗口、基于 Event-Time 的时间窗口、滑动窗口等等。在 Beam SDK 中由 Pipeline 的窗口指定。
When,何时输出计算结果?例如,在 1 小时的 Event-Time 时间窗口中,每隔 1 分钟将当前窗口计算结果输出。在 Beam SDK 中由 Pipeline 的 Watermark 和触发器指定。
How,迟到数据如何处理?例如,将迟到数据计算增量结果输出,或是将迟到数据计算结果和窗口内数据计算结果合并成全量结果输出。在 Beam SDK 中由 Accumulation 指定。
① What
对数据如果处理,计算。分组的矩阵图,提到这里说一下,这些运行平台已经集成到 Beam,只是没有更新到官方首页而已。以及或者是官方不打算主推的,就没有写上去。
② Where