JOIN 操作的并行处理

本页面介绍 Cloud Data Fusion 中 JOIN 操作的性能调整。

JOIN 操作可能是流水线开销最高的部分。喜欢所有内容 其他流水线中,操作会并行执行。JOIN 的第一步是重排数据,以便将具有相同 JOIN 键的每条记录都发送到同一执行程序。在所有数据被重排之后,这些数据会被联接,并且 继续通过流水线输出。

JOIN 操作中的并行处理示例

例如,假设您对名为JOIN PurchasesItems。每条购买记录都包含商品名称和编号 。每个商品记录都包含商品名称和商品价格。系统会对商品名称执行 JOIN 以计算每次购买的总价格。联接数据时,系统会在集群中对数据进行洗牌,以便具有相同 ID 的记录最终位于同一执行器上。

JOIN 键相当均匀地分布时,JOIN 操作会执行得很好,因为它们可以并行执行。

与任何 shuffle 一样,数据倾斜会对性能产生负面影响。在上例中,鸡蛋的购买频率远高于鸡肉或牛奶,这意味着联接鸡蛋购买交易的执行程序会比其他执行程序执行更多工作。如果您发现 JOIN 歪斜,可以通过以下两种方式进行改善 性能

自动拆分偏差分区

通过自适应查询执行,系统会自动处理非常严重的偏差。 一旦 JOIN 生成的一些分区比其他分区大得多,这些分区就会立即发生 拆分成多个较小的文件如需确认您是否已启用自适应查询执行,请参阅自动调整

使用内存中的 JOIN

如果 JOIN 的一侧足够小,则可以执行内存中 JOIN 放入内存中。在这种情况下,将小数据集加载到内存中, 然后广播给每个执行器大型数据集在 消除了在磁盘上重排时生成的不均匀分区 JOIN 键。

在前面的示例中,项目数据集首先加载到 Spark 驱动程序。然后将其广播到每个执行器。执行器现在无需对任何购买交易数据集进行洗牌,即可联接数据。

此方法要求您为 Spark 驱动程序和 执行器,以允许它们将广播数据集存储在内存中。默认情况下,Spark 会预留略低于 30% 的内存来存储此类数据。使用内存中的 JOIN 时,将数据集的大小乘以 4: 并将其设为执行器和驱动程序内存。例如,如果 items 数据集 为 1 GB,则需要将执行器和驱动程序内存设置为 至少 4 GB。大于 8 GB 的数据集无法加载到内存中。

密钥分配

JOIN 的两侧太大而无法放入内存时,可以使用其他方法将每个 JOIN 键拆分为多个键,以提高并行处理级别。此方法可应用于 INNER JOINLEFT OUTER JOIN 操作。它不能用于 FULL OUTER JOIN 操作。

在此方法中,偏斜边通过一个包含一个新的整数列进行加盐处理, 从 1 到 N 的随机数。未倾斜的一侧会展开,每个现有行都会生成 N 个新行。分解侧面会添加一个新列,填充的值为 每个数字从 1 到 N。然后,系统会执行常规 JOIN,但新的 列会作为 JOIN 键的一部分添加。这样一来,以前存储在单个分区中的所有数据现在最多会分散到 N 个不同的分区中。

在前面的示例中,分布因子 N 设置为 3。通过 原始数据集如左侧所示。加盐版和爆炸版 数据集。重排后的数据显示在右侧,其中 三个不同的执行器加入鸡蛋购买交易,而不是一个。

通过增加分布来实现更高的并行性。然而,由于 代价是分解 JOIN 的一侧,导致更多数据重排 所有资源因此,随着分发范围的扩大,这种好处会逐渐减少。在大多数情况下,请将其设置为 20 或更小值。

后续步骤