对 JOIN 运算进行并行处理

本页介绍了 Cloud Data Fusion 中 JOIN 操作的性能调优。

JOIN 操作可能是流水线中开销最大的部分。与流水线中的所有其他内容一样,操作是并行执行的。JOIN 的第一步是重排数据,以便将具有相同 JOIN 键的每条记录都发送到同一执行程序。所有数据都重排之后,会被联接,然后输出继续通过流水线。

JOIN 操作中的并行处理示例

例如,假设您对名为 PurchasesItems 的数据集执行 JOIN 操作。每条购买记录都包含商品名称和购买数量。每个商品记录都包含商品名称和商品价格。系统会对商品名称执行 JOIN 以计算每次购买的总价格。联接数据时,系统会在集群中对数据进行洗牌,以便具有相同 ID 的记录最终位于同一执行器上。

JOIN 键相当均匀地分布时,JOIN 操作会执行得很好,因为它们可以并行执行。

与任何重排一样,数据倾斜都会对性能产生负面影响。在上例中,鸡蛋的购买频率远高于鸡肉或牛奶,这意味着联接鸡蛋购买交易的执行程序会比其他执行程序执行更多工作。如果您发现 JOIN 存在偏差,可以通过两种方式提高性能。

自动拆分偏斜分区

借助自适应查询执行,系统会自动处理严重的偏差。 一旦 JOIN 生成的分区比其他分区大得多,这些分区就会拆分为较小的分区。如需确认您是否已启用自适应查询执行,请参阅自动调整

使用内存中的 JOIN

如果 JOIN 的一侧足够小,可以放入内存中,则可以执行内存中 JOIN。在这种情况下,系统会将小型数据集加载到内存中,然后广播到每个执行器。大型数据集不会进行任何洗牌,从而移除了按 JOIN 键洗牌时生成的不均匀分区。

在前面的示例中,items 数据集会先加载到 Spark 驱动程序的内存中。然后,系统会将其广播到每个执行器。执行器现在可以联接数据,而无需对任何购买交易数据集进行洗牌。

这种方法要求您向 Spark 驱动程序和执行程序分配足够的内存,以便它们将广播数据集存储在内存中。默认情况下,Spark 会预留略低于 30% 的内存来存储此类数据。使用内存 JOIN 时,请将数据集的大小乘以 4,并将其设置为执行器和驱动程序内存。例如,如果 items 数据集的大小为 1 GB,则需要将执行器和驱动程序内存设置为至少 4 GB。大于 8 GB 的数据集无法加载到内存中。

密钥分配

JOIN 的两侧太大而无法放入内存时,可以使用其他方法将每个 JOIN 键拆分为多个键,以提高并行处理级别。此方法可应用于 INNER JOINLEFT OUTER JOIN 操作。它不能用于 FULL OUTER JOIN 操作。

在此方法中,系统会使用一个包含 1 到 N 之间的随机数字的新整数列对偏斜的一侧进行盐处理。未倾斜的一侧会展开,每个现有行都会生成 N 个新行。系统会在展开侧添加一个新列,并填充 1 到 N 之间的每个数字。然后,系统会执行正常的 JOIN,但会将新列添加为 JOIN 键的一部分。这样一来,以前存储在单个分区中的所有数据现在最多会分散到 N 个不同的分区中。

在上面的示例中,分布系数 N 设置为 3。原始数据集显示在左侧。中间显示了经过盐处理和展开处理的数据集版本。右侧显示了经过洗牌的数据,其中有三个不同的执行器加入了鸡蛋购买交易,而不是一个。

通过增加分布来实现更高的并行性。不过,这会导致 JOIN 的一侧发生爆炸,从而导致更多数据在集群中进行重排。因此,随着分发范围的扩大,这种好处会逐渐减少。在大多数情况下,将其设置为 20 或更小值。

后续步骤