JOIN 操作的并行处理

本页面介绍 Cloud Data Fusion 中 JOIN 操作的性能调整。

JOIN 操作可能是流水线开销最高的部分。与流水线中的其他一切一样,操作是并行执行的。JOIN 的第一步是重排数据,以便将具有相同 JOIN 键的每条记录发送到同一执行器。数据重排后,这些数据会被联接,并且输出会通过流水线继续。

JOIN 操作中的并行处理示例

例如,假设您对名为 PurchasesItems 的数据集执行 JOIN 操作。每条购买记录都包含所购商品的名称和编号。每个商品记录包含商品名称和价格。对商品名称执行 JOIN 以计算每次购买交易的总价。联接数据时,数据在整个集群中重排,使具有相同 ID 的记录最终位于同一执行器上。

JOIN 键分布相当均匀时,JOIN 操作会表现良好,因为它们可以并行执行。

与任何 shuffle 一样,数据倾斜会对性能产生负面影响。在前面的示例中,鸡蛋的购买频率远高于鸡肉或牛奶,这意味着加入鸡蛋购买的执行程序比其他执行程序执行的工作更多。如果您发现 JOIN 有偏差,可以通过以下两种方式提升性能。

自动拆分偏差分区

通过自适应查询执行,系统会自动处理非常严重的偏差。 只要 JOIN 生成的一些分区比其他分区大得多,这些分区就会拆分为更小的分区。如需确认您已启用自适应查询执行,请参阅自动调节

使用内存中的 JOIN

如果 JOIN 的一侧很小,可以放入内存中,则可以执行内存中 JOIN。在这种情况下,将小型数据集加载到内存中,然后广播到每个执行器。大型数据集根本不会重排,从而移除在 JOIN 键上重排时生成的不均匀分区。

在前面的示例中,items 数据集首先加载到了 Spark 驱动程序的内存中。然后将其广播到每个执行器。执行器现在可以联接数据,而无需重排任何购买数据集。

此方法要求您为 Spark 驱动程序和执行程序提供足够的内存,以允许它们将广播数据集存储在内存中。默认情况下,Spark 会预留稍微少于 30% 的内存来存储此类数据。使用内存中 JOIN 时,将数据集的大小乘以 4,并将其设置为执行器和驱动程序内存。例如,如果 items 数据集的大小为 1 GB,我们需要将执行器和驱动程序内存设置为至少 4 GB。大于 8 GB 的数据集无法加载到内存中。

密钥分配

JOIN 两侧都太大而无法放入内存时,可以使用不同的方法将每个 JOIN 键拆分为多个键,以提高并行性级别。此方法可应用于 INNER JOINLEFT OUTER JOIN 运算。它不能用于 FULL OUTER JOIN 操作。

在此方法中,偏差端会使用一个随机值从 1 到 N 的新整数列进行加盐。未偏斜的一面将爆炸,每个现有行都生成 N 个新行。分解侧将添加一个新列,其中填充从 1 到 N 的每个数字。然后,执行常规 JOIN,但新列将作为 JOIN 键的一部分添加。这样一来,过去进入单个分区的所有数据现在都可以扩展到多达 N 个不同的分区。

在前面的示例中,分布因子 N 设置为 3。左侧显示原始数据集。中间是数据集的加盐版本和分解版本。重排后的数据显示在右侧,其中三个不同的执行程序正在加入鸡蛋购买,而不是一个。

通过增加分布情况来实现更高的并行性。但是,这会导致 JOIN 的一侧爆炸,导致集群中有更多数据重排。因此,随着分布的扩大,优势会逐渐减小。在大多数情况下,请将其设置为 20 或更小值。

后续步骤