流水线可在机器集群中执行。它们通过 拆分需要完成的工作,然后在 并行处理。一般而言,拆分(也称为分区)的数量越大,流水线的运行速度就越快。流水线中的并行级别由 流水线中的来源和 shuffle 阶段。
来源
在每个流水线运行开始时,流水线中的每个来源都会计算 以及如何将数据拆分为多个块。例如,假设有一个基本流水线,用于从 Cloud Storage 读取数据、执行一些 Wrangler 转换,然后写回 Cloud Storage。
流水线启动后,Cloud Storage 源会检查输入文件,并根据文件大小将其拆分为多个分块。例如, 单个 GB 的文件可以拆分为 100 个拆分文件,每个文件 10 MB 。每个执行器都会读取相应分块的数据,运行 Wrangler 转换,然后将输出写入 part 文件。
如果流水线运行缓慢,首先要检查的是您的来源是否创建了足够的分块,以充分利用并行处理。例如,某些类型的压缩会导致明文文件不可拆分。如果您 正在读取经过 gzip 压缩的文件 相较于读取未压缩文件或 使用 BZIP(可拆分)压缩。同样,如果您使用的是数据库源,并将其配置为仅使用一个分块,那么与将其配置为使用更多分块相比,其运行速度会慢得多。
随机播放
某些类型的插件会导致数据在集群中进行重排。这个 需要将一个执行器处理的记录发送到另一个执行程序时,会发生以下情况: Executor 来执行计算。Shuffle 的操作成本高昂, 它们涉及很多 I/O 操作导致数据重排的插件都显示在 在 Pipeline Studio 的 Analytics 部分中操作。其中包括 分组依据、去重、唯一和连接符。例如,假设一个 Group By 阶段添加到上述示例的流水线中。
另外,假设正在读取的数据表示在杂货店进行的购买。
每个记录包含一个 item
字段和一个 num_purchased
字段。在小组中
在阶段,我们配置流水线以对item
字段上的记录进行分组,并
计算 num_purchased
字段的总和。
流水线运行时,系统会如前所述拆分输入文件。更新后 也就是说,系统会在集群中重排每条记录, 相同的项属于同一个执行器。
如前面的示例所示,苹果购买的记录为 最初分布在多个执行器中。为了执行汇总,所有这些记录都需要跨集群发送到同一执行器。
大多数需要进行随机排序的插件都允许您指定在对数据进行随机排序时要使用的分区数量。这用于控制用于处理已洗牌数据的执行器数量。
在前面的示例中,如果分区数设置为 2
,则每个执行器都会计算两个(而不是一个)项的聚合。
请注意,您可以在该阶段之后降低流水线的并行度。例如,请考虑流水线的逻辑视图:
如果来源将数据划分到 500 个分区,但“分组依据”使用 200 个分区, 500 到 200。你无需将 500 个不同的部分文件 那么您只有 200 个 Cloud Storage 存储分区。
选择分区
如果分区数量太少, 并行处理尽可能多的工作。同时设置分区 就会增加不必要的开销一般而言,使用过多的分区总比过少的分区要好。您需要担心额外的开销 假设您流水线的运行时间需要几分钟 几分钟。如果您的流水线需要数小时的运行时间,那么开销通常不会 一些需要担心的问题
一种有用但过于简单的方法,用于确定要
方法是将其设置为 max(cluster CPUs, input records / 500,000)
。在其他
将输入记录数除以 500,000。如果该数字大于集群 CPU 数量,请将其用作分区数量。否则,请使用集群 CPU 数量。例如,如果您的集群
100 个 CPU 和 Shuffle 阶段预计有 1 亿个输入
记录,使用 200 个分区。
更完整的答案是,当 每个分区的 shuffle 数据都可以完全放入执行程序的内存中, 这样就无需将任何内容洒到磁盘上Spark 预留出来的 用于保存 shuffle 数据的执行程序内存。确切数值为(总内存 - 300 MB)* 30%。如果我们假设每个执行器都设置为使用 2 GB 的内存, 这意味着每个分区的大小不应超过 (2 GB - 300 MB) * 30% = 大约 500 MB 记录。如果我们假设每条记录向下压缩, 大小为 1 KB,则意味着 (500 MB / 分区) / (1 KB / record)= 每个分区 500,000 条记录。如果您的执行器使用 或者您的记录较小,则可以相应地调整此数值。
数据倾斜
请注意,在上面的示例中,各种商品的购买次数均匀分布。也就是说,他们分别购买了三次,分别是苹果、香蕉和 胡萝卜和鸡蛋按均匀分布键进行随机排序是性能最高的随机排序类型,但许多数据集不具备此属性。继续以前面的例子中的杂货店购买交易为例,您预计鸡蛋的购买次数会比婚礼贺卡多得多。如果有几个随机播放键比其他键更常见,则表示您处理的数据存在偏差。与非偏斜数据相比,偏斜数据的性能可能会明显下降,因为只有少数几个执行器执行了大量工作。这会导致一小部分分区比所有其他分区大得多。
在本例中,鸡蛋的购买次数是卡片购买次数的 5 倍, 也就是说,计算蛋蛋聚合所需的时间大约多出五倍。它 处理 10 条(而非两条)记录时意义不大, 在处理 50 亿条记录(而不是 1 条)时意义重大 亿。当出现数据倾斜时,Shuffle 中使用的分区数量 对流水线性能没有太大影响。
您可以通过检查一段时间内输出记录的图表来识别数据倾斜。如果阶段在开始时以快得多的速度输出记录, 然后突然变慢,这可能意味着数据有偏差。
您还可以通过检查一段时间内的集群内存用量来识别数据倾斜。如果您的集群在一段时间内处于满载状态,但突然在一段时间内内存用量较低,这也表明您正在处理数据倾斜问题。
在执行联接时,偏斜数据对性能的影响最为显著。您可以使用一些技巧来提高 skewed 联接的性能。如需了解详情,请参阅对 JOIN
操作进行并行处理。
针对执行进行自适应调优
如需自适应调整执行,请指定要使用的分区范围,而不是 确切的分区号。确切的分区号(即使在流水线中设置了分区编号) 配置,在启用自适应执行时会被忽略。
如果您使用的是临时 Dataproc 集群,Cloud Data Fusion 会自动设置适当的配置;但对于静态 Dataproc 或 Hadoop 集群,您可以设置以下两个配置参数:
spark.default.parallelism
:将其设置为可用 vCore 的总数 资源。这可确保集群不会过载,并为分区数量定义下限。spark.sql.adaptive.coalescePartitions.initialPartitionNum
:将其设置为集群中可用 vCore 数量的 32 倍。这定义了分区数量的上限。Spark.sql.adaptive.enabled
:如需启用优化,请将此值设置为true
。Dataproc 会自动设置 您必须确保已启用 Hadoop 集群。
您可以在特定流水线的引擎配置中或静态 Dataproc 集群的集群属性中设置这些参数。
后续步骤
- 了解
JOIN
操作的并行处理。