并行处理

流水线可在机器集群中执行。它们通过拆分需要完成的工作,然后在分布在集群中的多个执行器上并行运行这些工作来实现高吞吐量。一般而言,拆分(也称为分区)的数量越大,流水线的运行速度就越快。流水线中的并行级别由流水线中的来源和重排阶段决定。

来源

在每次流水线运行开始时,流水线中的每个来源都会计算需要读取哪些数据,以及如何将这些数据划分为分块。例如,假设有一个基本流水线,用于从 Cloud Storage 读取数据、执行一些 Wrangler 转换,然后写回 Cloud Storage。

显示 Cloud Storage 源、Wrangler 转换和 Cloud Storage 接收器的基本流水线

流水线启动后,Cloud Storage 源会检查输入文件,并根据文件大小将其拆分为多个分块。例如,一个 1 GB 的文件可以拆分为 100 个分块,每个分块的大小为 10 MB。每个执行器都会读取该分块的数据,运行 Wrangler 转换,然后将输出写入 part 文件。

将 Cloud Storage 中的数据分区为并行 Wrangler 转换为分块文件

如果流水线运行缓慢,首先要检查的是您的来源是否创建了足够的分块,以充分利用并行处理。例如,某些压缩类型会导致纯文本文件无法拆分。如果您要读取经过 gzip 压缩的文件,可能会发现与读取未压缩的文件或使用 BZIP(可分割)压缩的文件相比,流水线的运行速度要慢得多。同样,如果您使用的是数据库源,并将其配置为仅使用一个分块,那么与将其配置为使用更多分块相比,其运行速度会慢得多。

随机播放

某些类型的插件会导致数据在集群中进行洗牌。如果由一个执行器处理的记录需要发送到另一个执行器以执行计算,就会发生这种情况。由于涉及大量 I/O,Shuffle 是一项开销较高的操作。导致数据乱序的插件都会显示在 Pipeline Studio 的分析部分。这些插件包括“Group By”“Deduplicate”“Distinct”和“Joiner”等。例如,假设在前面的示例中向流水线添加了分组阶段。

此外,假设要读取的数据代表在杂货店中进行的购买交易。每个记录包含一个 item 字段和一个 num_purchased 字段。在阶段,我们将流水线配置为按 item 字段对记录进行分组,并计算 num_purchased 字段的总和。

流水线运行时,系统会按前面所述的方式拆分输入文件。之后,系统会在集群中对每条记录进行重排,以便具有相同项的每条记录都属于同一执行程序。

如前面的示例所示,Apple 购买交易的记录最初分散在多个执行器中。为了执行汇总,所有这些记录都需要跨集群发送到同一执行器。

大多数需要进行随机排序的插件都允许您指定在随机排序数据时要使用的分区数量。这用于控制用于处理已洗牌数据的执行器数量。

在前面的示例中,如果将分区数量设置为 2,则每个执行器都会计算两个项(而非一个项)的汇总。

请注意,您可以在该阶段之后降低流水线的并行度。例如,请考虑流水线的逻辑视图:

如果来源将数据划分到 500 个分区,但“Group By”使用 200 个分区进行重排,则“Group By”之后的最大并行处理级别会从 500 降至 200。您只会写入 200 个不同的分块文件到 Cloud Storage,而不是 500 个。

选择分区

如果分区数量过低,您将无法使用集群的全部容量来尽可能并行处理更多工作。如果将分区设置得过高,会增加不必要的开销。一般而言,使用过多的分区总比过少的分区要好。如果您的流水线需要几分钟才能运行,而您又想缩短几分钟的时间,则需要考虑额外的开销。如果您的流水线需要数小时才能运行完毕,您通常无需担心开销。

一种用于确定要使用的分区数量的实用但过于简单的方法是将其设置为 max(cluster CPUs, input records / 500,000)。换句话说,就是将输入记录数除以 500,000。如果该数字大于集群 CPU 数量,请将其用作分区数量。否则,请使用集群 CPU 数量。例如,如果您的集群有 100 个 CPU,并且预计分屏阶段将有 1 亿个输入记录,请使用 200 个分区。

更完整的答案是,当每个分区的中间 Shuffle 数据完全可以放入执行器的内存中,而无需将任何数据溢出到磁盘时,Shuffle 的性能最好。Spark 仅会预留不到 30% 的执行器内存来存储 Shuffle 数据。确切数值为(总内存 - 300 MB)* 30%。如果我们假设每个执行器都设置为使用 2 GB 内存,则意味着每个分区应最多存储 (2 GB - 300 MB) * 30% = 大约 500 MB 的记录。如果我们假设每条记录压缩后大小为 1 KB,则意味着 (500 MB / 分区) / (1 KB / 记录) = 每个分区 50 万条记录。如果您的执行器使用了更多内存,或者记录较小,您可以相应地调整此数字。

数据倾斜

请注意,在上面的示例中,各种商品的购买次数均匀分布。也就是说,苹果、香蕉、胡萝卜和鸡蛋各有 3 次购买交易。按均匀分布键进行随机排序是性能最高的随机排序类型,但许多数据集不具备此属性。继续以前面的例子中的杂货店购买交易为例,您预计鸡蛋的购买次数会比婚礼贺卡多得多。如果有几个随机播放键比其他键更常见,则表示您处理的数据存在偏差。与非偏斜数据相比,偏斜数据的性能可能会明显下降,因为只有少数几个执行器执行了大量工作。这会导致一小部分分区比所有其他分区大得多。

在本例中,鸡蛋购买交易的数量是卡片购买交易的五倍,这意味着计算鸡蛋汇总数据所需的时间大约是卡片汇总数据的五倍。如果只处理 10 条记录(而不是 2 条),这点影响不大,但如果处理 50 亿条记录(而不是 10 亿条),则会产生很大差异。当存在数据倾斜时,Shuffle 中使用的分区数量对流水线性能没有太大影响。

您可以通过检查一段时间内输出记录的图表来识别数据倾斜。如果相应阶段在流水线运行开始时以更快的速度输出记录,然后突然减速,则可能表示数据存在偏差。

您还可以通过检查一段时间内的集群内存用量来识别数据倾斜。如果您的集群在一段时间内处于满载状态,但突然在一段时间内内存用量较低,这也表明您正在处理数据倾斜问题。

在执行联接时,偏斜数据对性能的影响最为显著。您可以使用一些技巧来提高 skewed 联接的性能。如需了解详情,请参阅JOIN 操作进行并行处理

针对执行进行自适应调优

如需自适应调整执行方式,请指定要使用的分区范围,而不是确切的分区编号。启用自适应执行后,系统会忽略确切的分区编号,即使是在流水线配置中设置的分区编号也是如此。

如果您使用的是临时 Dataproc 集群,Cloud Data Fusion 会自动设置适当的配置;但对于静态 Dataproc 或 Hadoop 集群,您可以设置以下两个配置参数:

  • spark.default.parallelism:将其设置为集群中可用的 vCore 总数。这可确保集群不会过载,并为分区数量定义下限。
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum:将其设置为集群中可用 vCore 数量的 32 倍。这定义了分区数量的上限。
  • Spark.sql.adaptive.enabled:如需启用优化,请将此值设置为 true。Dataproc 会自动进行设置,但如果您使用的是通用 Hadoop 集群,则必须确保已启用该功能。

您可以在特定流水线的引擎配置中或静态 Dataproc 集群的集群属性中设置这些参数。

后续步骤