过往记忆大数据 过往记忆大数据
本文来自 SPARK + AI SUMMIT 2020 北美会议,分享者来自字节跳动的郭俊。Bucket 在 Hive 和 Spark SQL 中普遍使用,用于消除 Join 或者 group-by-aggregate 场景下的 Shuffle 操作。本文主要介绍字节跳动在 Bucket 方面的优化。
本文主要从以下四个方面介绍:
- Spark SQL 在字节跳动的应用
- 什么是分桶
- Spark 分桶的限制
- 字节跳动在分桶方面的优化
下面是 Spark SQL 在字节跳动的应用。
- 2016年主要是小规模的测试阶段
- 2017年用于处理 Ad-hoc 工作负载
- 2018年在生产环境下处理少量的 ETL 管道工作;
- 2019年在生产环境下全面部署;
- 2020年成为 DW 领域的主要计算引擎。
上面例子展示了创建分桶表的方法。主要关键字是 clustered by (xxx) sorted by (xxx) into N buckets
如果我们往分桶表里面插入数据,可以如下使用'
INSERT INTO order SELECT order_id, user_id, product, amount FROM order_staging
可见,这个和正常表的使用并没有什么区别。
如果我们进行一个 ShuffleHashJoin 的时候,首先需要将表的数据按照 on 的条件进行分区,然后才是进行 Join 操作。
但是如果参与 Join 的表已经实现分桶了,那么在执行 ShuffleHashJoin 的时候省去 Shuffle 的操作。比如上面的例子如果我们对 order 和 user 表按照 user_id 字段进行分桶,那么在 ShuffleHashJoin 的时候就不需要进行 Exchange 操作了。
对于 SortMergeJoin ,需要对 on 里面的条件字段进行 Exchange 操作,然后再进行 Sort 操作,最后才是执行 SortMergeJoin(更多关于 Join 的策略可以参见过往记忆大数据的《每个 Spark 工程师都应该知道的五种 Join 策略》文章)。
如果参与 Join 的表已经分桶了,那么不需要就行 Exchange 和 Sort 操作了。
Spark 分桶的限制小文件问题
执行上面的 SQL,每个 task 最多可能产生 1024 个文件,其中 1024 是分桶的数量。所以如果我们有 M 个 task,那么最多产生的文件个数为 M * 1024。比如上面的 attempt_20200519145628_0014_m_000014_0 目录下产生了 988 个文件。
解决小文件的问题可以加上 DISTRIBUTE BY ,如下:
INSERT INTO order SELECT order_id, user_id, product, amountFROM order_stagingDISTRIBUTE BY user_id
如果 1024 是 M 的倍数,那么最多会产生 1024 个文件,其中 M = spark.sql.shuffle.partitions;
如果 M 是 1024 的倍数,那么最多会产生 M 个文件,其中 M = spark.sql.shuffle.partitions。
Spark 分桶和其他 SQL 引擎不兼容- Spark 的分桶和 Hive 的分桶是不兼容的,同时和 Presto 也是不兼容的;但是 Presto 与 Hive 的分桶是兼容的。
- Spark 的分桶和 Hive 不兼容主要原因是以下原因导致的:
- Hive 在生成分桶的时候会额外进行一个 Reduce 操作,以保证相同分桶的数据都存储在一个文件中。而 Spark SQL 在写分桶文件时不需要 Shuffle 操作,这样就会导致每个分桶最多产生 M 个文件,这就导致上面说的小文件问题;
- Spark 分桶和 Hive 分桶采用不同的 hash 算法。Hive 用的是 HiveHash;而 Spark 用的是 Murmur3,所以数据的分布是不一样的。
因为 Spark 和 Hive 分桶不兼容,所以当 Spark 的分桶表和 Hive 的分桶表进行 SortMergeJoin 的时候是需要进行 Sort 和 Exchange 操作的。
因为 Spark SQL 表中的每个分桶里面最少包含一个文件,所以在进行 Join 之前需要进行额外的排序操作。
如果参与 Join 的表分桶数不一致,那么其中一张表需要进行额外的 Exchange 操作。
当参与 Join 的 key 和分桶的列不一样时,需要额外的 Exchange 操作。
上面的例子尽管参与 Join 的表都是对 user_id 字段进行分桶,并且分桶数一样,但是还是需要额外的 Exchange 操作。
前面介绍了 Spark 和 Hive 分桶不兼容,对于这方面,字节跳动将 Hive 分桶表和 Spark 分桶表进行了对齐,主要包括:
Spark SQL 写 Hive 分桶表的逻辑和 Hive 一致。 重写了 InsertIntoHiveTable#requiredOrdering 和 InsertIntoHiveTable#requiredDistribution,并且也使用了 HiveHash 算法。
对于读方面,重写了 HiveTableScanExec#outputPartitioning 和 HiveTableScanExec#outputOrdering,使用了 HiveHash 算法,并且使用了 Hive 的分桶元数据。
上面是 Spark 读取 Hive 分桶表改进前和改进后的区别。可以看到,改进后,outputPartitioning 为 HashPartitioning,并且 outputOrdering 为 SortOrder,满足了 requireChildDistribution 为 HashClusteredDistribution的要求以及requireChildOrdering 为 SortOrder,从而在进行 SortMergeJoin 的时候省去了 Exchange 和 Sort 操作。
另一个改进是 One to Merge Bucket Join,比如下面例子 A 表有三个分桶,B 表有六个分桶。
如果我们在 Spark 对上面两张表进行 Join 操作,B 表需要额外的 Sort 操作,因为上面两张表的分桶数不一样。但是在字节公司,由于对性能的要求,需要避免 Sort 操作。
一种方法是将 A 表的分桶 0 和 B 表的分桶 0 、分桶 3 进行关联;将 A 表的分桶 1 和 B 表的分桶 1 、分桶 4 进行关联;将 A 表的分桶 2 和 B 表的分桶 2 、分桶 5 进行关联。我们只需要将 A 表复制一份,这样 A 表也满足 6 个分桶。将 A 表和 A 表进行 Union 可以产生 到 6 个分桶的新表,但是 Spark 自带的 Union 操作之后 outputPartitioning 和 outputOrdering 将被删除,所以字节自己开发出 bucket union,使得 outputPartitioning 和 outputOrdering 被保留,这样就可以省去 Sort 和 Exchange 操作。
不过上面的方面在 B left join A 、B left semi join A、B anti join A、B inner join A 可以正常工作,但是在 B right join A、B full outer join A、B cross join A 的时候结果有重复,因为 A 表的数据被扫描了两次。
为了解决这个问题,在 TableScan 后面加上了 hash(10) % buckets = bucket id 的过滤条件,比如 bucket 0 将会把 3、9、15 过滤掉,通过这种办法将会消除重复数据。
字节的另外一个优化是如果 Join 的 Key 不仅仅是分桶的 Key,原生的 Spark 会产生额外的 Exchange 和 Sort 操作。
通过优化后,Exchange 将消除。