本页面介绍 Cloud Data Fusion 中 JOIN
操作的性能调整。
JOIN
操作可能是流水线开销最高的部分。与流水线中的其他一切一样,操作是并行执行的。JOIN
的第一步是重排数据,以便将具有相同 JOIN
键的每条记录发送到同一执行器。数据重排后,这些数据会被联接,并且输出会通过流水线继续。
JOIN
操作中的并行处理示例
例如,假设您对名为 Purchases
和 Items
的数据集执行 JOIN
操作。每条购买记录都包含所购商品的名称和编号。每个商品记录包含商品名称和价格。对商品名称执行 JOIN
以计算每次购买交易的总价。联接数据时,数据在整个集群中重排,使具有相同 ID 的记录最终位于同一执行器上。
当 JOIN
键分布相当均匀时,JOIN
操作会表现良好,因为它们可以并行执行。
与任何 shuffle 一样,数据倾斜会对性能产生负面影响。在前面的示例中,鸡蛋的购买频率远高于鸡肉或牛奶,这意味着加入鸡蛋购买的执行程序比其他执行程序执行的工作更多。如果您发现 JOIN
有偏差,可以通过以下两种方式提升性能。
自动拆分偏差分区
通过自适应查询执行,系统会自动处理非常严重的偏差。
只要 JOIN
生成的一些分区比其他分区大得多,这些分区就会拆分为更小的分区。如需确认您已启用自适应查询执行,请参阅自动调节。
使用内存中的 JOIN
如果 JOIN
的一侧很小,可以放入内存中,则可以执行内存中 JOIN
。在这种情况下,将小型数据集加载到内存中,然后广播到每个执行器。大型数据集根本不会重排,从而移除在 JOIN
键上重排时生成的不均匀分区。
在前面的示例中,items 数据集首先加载到了 Spark 驱动程序的内存中。然后将其广播到每个执行器。执行器现在可以联接数据,而无需重排任何购买数据集。
此方法要求您为 Spark 驱动程序和执行程序提供足够的内存,以允许它们将广播数据集存储在内存中。默认情况下,Spark 会预留稍微少于 30% 的内存来存储此类数据。使用内存中 JOIN
时,将数据集的大小乘以 4,并将其设置为执行器和驱动程序内存。例如,如果 items 数据集的大小为 1 GB,我们需要将执行器和驱动程序内存设置为至少 4 GB。大于 8 GB 的数据集无法加载到内存中。
密钥分配
当 JOIN
两侧都太大而无法放入内存时,可以使用不同的方法将每个 JOIN
键拆分为多个键,以提高并行性级别。此方法可应用于 INNER JOIN
和 LEFT OUTER JOIN
运算。它不能用于 FULL OUTER JOIN
操作。
在此方法中,偏差端会使用一个随机值从 1 到 N 的新整数列进行加盐。未偏斜的一面将爆炸,每个现有行都生成 N
个新行。分解侧将添加一个新列,其中填充从 1 到 N 的每个数字。然后,执行常规 JOIN
,但新列将作为 JOIN
键的一部分添加。这样一来,过去进入单个分区的所有数据现在都可以扩展到多达 N
个不同的分区。
在前面的示例中,分布因子 N
设置为 3
。左侧显示原始数据集。中间是数据集的加盐版本和分解版本。重排后的数据显示在右侧,其中三个不同的执行程序正在加入鸡蛋购买,而不是一个。
通过增加分布情况来实现更高的并行性。但是,这会导致 JOIN
的一侧爆炸,导致集群中有更多数据重排。因此,随着分布的扩大,优势会逐渐减小。在大多数情况下,请将其设置为 20 或更小值。
后续步骤
- 详细了解 Cloud Data Fusion 中的并行处理。