并行处理

流水线可在机器集群中执行。它们可以通过 拆分需要完成的工作,然后在 并行处理。一般来说, 分块(也称为分区)的数量越多, 可以运行流水线中的并行级别由 流水线中的来源和 shuffle 阶段。

来源

在每个流水线运行开始时,流水线中的每个来源都会计算 以及如何将数据拆分为多个块。对于 假设有一个从 Cloud Storage 读取数据的基本流水线, 执行一些 Wrangler 转换,然后将内容写回 Cloud Storage

显示 Cloud Storage 来源、Wrangler 转换和 Cloud Storage 接收器的基本流水线

流水线启动时,Cloud Storage 来源会检查输入 文件,并根据文件大小将其拆分为多个文件。例如, 单个 GB 的文件可以拆分为 100 个拆分文件,每个文件 10 MB 。每个执行器都会读取该分块的数据,并运行 Wrangler 转换,然后将输出写入 part 文件。

将 Cloud Storage 中的分区数据并行转换为 Wrangler 文件并转换为部分文件

如果流水线运行缓慢,首先要检查的事项之一是, 您的源会创建足够的分块来充分利用并行性。 例如,某些类型的压缩会导致明文文件不可拆分。如果您 正在读取经过 gzip 压缩的文件 相较于读取未压缩文件或 使用 BZIP(可拆分)压缩。同样,如果您使用 并且已将其配置为仅使用单个分块 会比将它配置为使用更多分块的速度慢一些

随机播放

某些类型的插件会导致数据在集群中进行重排。这个 当一个执行器正在处理的记录需要发送到另一个执行程序时,会发生以下情况: Executor 来执行计算。Shuffle 的操作成本很高,因为 它们涉及很多 I/O 操作导致数据重排的插件都显示在 在 Pipeline Studio 的 Analytics 部分中操作。其中包括 分组依据、去重、唯一和连接符。例如,假设一个 Group By 阶段添加到上述示例的流水线中。

另外,假设正在读取的数据表示在杂货店进行的购买。 每条记录都包含一个 item 字段和一个 num_purchased 字段。在小组中 在阶段,我们配置流水线以对item字段上的记录进行分组,并 计算 num_purchased 字段的总和。

流水线运行时,系统会如前所述拆分输入文件。更新后 也就是说,系统会在集群中重排每条记录, 相同的项属于同一个执行器。

如前面的示例所示,苹果购买的记录为 最初分布在多个执行器中。要执行聚合操作 需要跨集群发送到同一个执行器。

大多数需要 shuffle 的插件都允许您指定分区数量 数据重排时使用。这控制着用于执行 会处理重排的数据。

在前面的示例中,如果分区数设置为 2,则每个执行器都会计算两个(而不是一个)项的聚合。

请注意,在这之后,您可以降低流水线的并行处理量 阶段。例如,请考虑以下流水线的逻辑视图:

如果来源将数据划分到 500 个分区,但“分组依据”使用 200 个分区, 500 至 200。你无需将 500 个不同的部分文件 那么您只有 200 个 Cloud Storage 存储分区。

选择分区

如果分区数量太少,您将无法使用 并行处理尽可能多的工作。同时设置分区 就会增加不必要的开销一般来说, 使用的分区要多于少的分区。您需要担心额外的开销 如果您的流水线需要几分钟的运行时间,而您想要缩减 几分钟。如果您的流水线需要数小时的运行时间,那么开销通常不会 一些需要担心的问题

一种有用但过于简单的方法,用于确定要 方法是将其设置为 max(cluster CPUs, input records / 500,000)。在其他 将输入记录数除以 500,000。如果该号码是 大于集群 CPU 的数量,则将其用作分区数量。 否则,请使用集群 CPU 数量。例如,如果您的集群 100 个 CPU 和 Shuffle 阶段预计有 1 亿个输入 记录,使用 200 个分区。

更完整的答案是,当 每个分区的 shuffle 数据都可以完全放入执行程序的内存中, 这样就无需将任何内容洒到磁盘上Spark 预留出来的 用于保存 shuffle 数据的执行程序内存。确切数字为 (总内存 - 300 MB) * 30%。如果我们假设每个执行器都设置为使用 2 GB 的内存, 这意味着每个分区的大小不应超过 (2 GB - 300 MB) * 30% = 大约 500 MB 记录。如果我们假设每条记录向下压缩, 大小为 1 KB,则意味着 (500 MB / 分区) / (1 KB / record) = 每个分区 500,000 条记录。如果您的执行器使用 或者您的记录较小,则可以相应地调整此数值。

数据倾斜

请注意,在前面的示例中,各种商品的购买量平均进行了 分发。也就是说,他们分别购买了三次,分别是苹果、香蕉和 胡萝卜和鸡蛋对均匀分布键进行随机化处理的效果最佳 是 shuffle 类型的,但许多数据集没有此属性。继续学习 那么前面的示例中提到的杂货店购买,那么您应该会获得很多 鸡蛋的购买量要高于婚礼卡片的购买量当存在几次重排时 比其他键更常见,则您需要处理偏差 数据。与非偏差数据相比,偏差数据的效果会显著下降, 为数不多的工作量 执行器。它会导致一小部分分区比所有分区都大得多 其他。

在本例中,鸡蛋的购买次数是卡片购买次数的 5 倍, 也就是说,计算蛋蛋聚合所需的时间大约是前者的 5 倍。它 在处理只有 10 条(而非两条)记录时,意义不大, 在处理 50 亿条记录(而不是一条)时意义重大 亿。当出现数据倾斜时,shuffle 中使用的分区数 对流水线性能没有太大影响。

您可以通过检查随时间变化的输出记录图表来识别数据偏差。 如果阶段在开始时以快得多的速度输出记录 然后突然变慢,这可能意味着数据有偏差。

您还可以通过检查一段时间内的集群内存使用情况来识别数据倾斜。如果 您的集群已用尽容量,但突然出现内存用量偏低的情况 这也表示您遇到了数据偏差。

联接时,偏差数据对性能的影响最大 错误。有几个技巧可用于提高性能 用于偏差联接的计算方法。如需了解详情,请参阅 JOIN 操作的并行处理

针对执行进行自适应调整

如需自适应调整执行,请指定要使用的分区范围,而不是 确切的分区号。确切的分区号(即使在流水线中设置了分区编号) 配置,在启用自适应执行时会被忽略。

如果您使用的是临时 Dataproc 集群, Cloud Data Fusion 会自动设置适当的配置,但对于静态数据, Dataproc 或 Hadoop 集群,接下来的两个配置 参数:

  • spark.default.parallelism:将其设置为可用 vCore 的总数 资源。这样可以确保您的集群不会欠载,并能够定义 分区数量下限。
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum:设置为 32 倍 集群中可用的 vCore 数量的百分比。这定义了 分区数量的上限。
  • Spark.sql.adaptive.enabled:若要启用优化,请将此值设置为 true.Dataproc 会自动设置 您必须确保已启用 Hadoop 集群。

您可以在特定 API 的引擎配置中设置这些参数 流水线或静态 Dataproc 的集群属性中 集群。

后续步骤