本文档介绍了如何最大限度地减少作业失败对大型批量流水线的影响。大型工作负载失败会产生特别大的影响,因为需要花费大量时间和金钱来恢复和解决这些失败问题。在这些流水线失败时从头重试,无论是时间还是金钱都代价高昂。
为了减少代价高昂的批量流水线的失败次数,请遵循本页面中的指南。由于您无法始终完全避免失败的元素和流水线失败问题,因此提供的技术侧重于提高弹性、降低失败成本,并在发生失败时更轻松地进行调试和了解失败。
如需了解常规流水线最佳实践,请参阅 Dataflow 流水线最佳实践。
针对大型作业运行小型实验
在运行大型批量作业之前,请先对数据集的子集运行一个或多个较小的作业。此方法既可以提供成本估算,又有助于发现潜在的失败点。
费用估算
运行实验可以提供运行作业的总费用的估算底价。通常,作业费用的计算方法为 cost of test job*size(full dataset)/size(test dataset)
。成本可能会超线性扩展,也可能亚线性扩展,具体取决于流水线。不过,此步骤通常可以很好地估算出作业费用。您还可以尝试输入不同规模的值,以便更好地估算费用的增长情况。您可以根据此信息来决定是继续使用现有流水线,还是重新构建流水线以降低费用。
查找失败点
运行实验可以发现 bug、潜在的失败点或潜在的配置和效率问题。您还可以检查其他流水线指标,例如以下指标:
- 如果您的流水线几乎使用了所有可用内存,则在负载较高或记录异常大时,可能会遇到内存不足 (OOM) 异常。您可能需要为最终作业预配更多内存,以避免这些 OOM 错误。
- 如果您的流水线的吞吐量下降,请检查流水线日志以确定原因。您可能会发现某个卡住的元素或数据集的某一部分性能特别差。您可以单独处理这些数据点,也可以在处理元素时强制执行超时机制。如需了解详情,请参阅本文档中的为昂贵的记录设置超时部分。
- 如果您的流水线在 Dataflow 上执行某个任务的性能比在本地执行时差得多,请检查流水线逻辑以找出原因。例如,如果您在 Dataflow 上使用 8 个核心获得的吞吐量与在本地使用 1 个核心获得的吞吐量相同,则该作业可能会因争用资源而出现瓶颈。如果您发现效果不如预期,请考虑以下一个或多个选项:
- 使用不同的机器或软件配置运行更多实验。
- 在本地同时使用多个核心进行测试。
- 检查代码,找出大规模部署时可能存在的瓶颈。
如果您的流水线有任何 Dataflow 建议,请遵循这些建议以提高性能。
使用死信队列处理意外的错误数据
流水线通常在大多数输入元素上成功,但在输入的一小部分子集上失败。运行小规模实验时,您可能无法发现此问题,因为这些实验只会测试输入的一部分。默认情况下,Dataflow 会在批处理模式下重试这些失败任务四次,在流处理模式下重试次数则不受限制。在批处理模式下,达到重试次数上限后,整个作业都会失败。在流处理模式下,它可能会无限期地陷入停顿。
在许多作业中,您可以从流水线中排除这些失败的元素,并使用死信队列(未处理的消息队列)完成作业的其余部分。死信队列会将失败的记录传递到单独的输出 PCollection
,您可以将该输出与主输出分开管理。借助此配置,您可以为这些记录设计政策。例如,您可以手动将其写入 Pub/Sub,检查和清理它们,然后重新处理记录。
许多 Apache Beam 转换都内置了对死信队列的支持。在 Java 中,您可以使用 ErrorHandler
对象访问它们。在 Python 中,您可以使用 with_exception_handling
方法访问它们。某些转换具有自定义的死信队列定义方式,您可以在转换的文档中了解相关信息。如需了解详情,请参阅使用死信队列处理错误。
如需确定您的作业是否符合死信队列的条件,请参阅本文档中的限制部分。
死信队列限制
在以下情况下,死信队列可能无济于事:
- 完整工作器或
DoFn
生命周期失败。如果整个工作器或软件包的处理失败,死信队列无法捕获失败。例如,如果您的流水线遇到内存不足 (OOM) 异常,则虚拟机上的所有活跃任务都会失败并重试,而不会向死信队列发送任何内容。 - 组合或其他汇总。如果您的流水线执行的计算需要所有输入元素都存在并作为结果的一部分进行处理,请谨慎使用此步骤之前的死信队列。使用死信队列会从结果中排除部分输入数据。添加死信队列可能会以正确性为代价来提高容错能力。
- 死信队列路径上的失败。如果某个元素在发送到死信队列接收器时失败,整个流水线都可能会失败。为避免此失败,请尽可能简化死信队列逻辑。您可以添加等待步骤(请参阅
wait class
),以确保在写入死信队列元素之前完成主输入。此配置可能会降低性能并延迟流水线中的错误信号。 - 部分转换的元素。如果您在流水线中途插入死信队列,死信队列可能会输出部分转换的元素,而无法访问原始元素。因此,您无法清理该元素并针对该元素重新运行流水线。相反,您可能需要应用不同的逻辑来将死信队列中的输出与原始元素相关联,或者您可能需要解释和处理部分转换的元素。这也可能会导致结果不一致。例如,如果元素被发送到流水线的两个分支,并且每个分支都将导致异常的元素发送到死信队列,则单个输入元素可能会发送到一个分支、另一个分支、两个分支或两个分支都没有。
为昂贵的记录设置超时
流水线在处理一小部分较为耗时的元素或遇到导致无法响应的限制(例如死锁)时,可能会停止响应。为了缓解此问题,某些转换可让您设置超时,并在遇到此问题的任何用户代码 DoFn
中将超时元素失败。例如,您可以使用 Python 的 with_exception_handling
方法。当您将超时与死信队列结合使用时,您的流水线可以继续处理正常的元素并取得进展,而您可以单独重新处理耗时较长的元素。此配置可能会导致性能损失。
如需确定哪些 DoFn
操作可能需要超时,请在启动完整流水线之前运行小型实验。
启用纵向自动扩缩
如果您不确定作业需要多少内存,或者认为作业有内存不足的风险,请启用垂直自动扩缩。此功能有助于在流水线大规模运行或遇到异常大的元素时避免 OOM 失败。
由于纵向自动扩缩可能会增加作业的费用,并且不会防止所有内存不足失败,因此您仍需要解决内存消耗过度的问题。纵向自动扩缩还需要 Dataflow Prime,后者具有其他限制和不同的结算模式。
针对容易失败的流水线的解决方法
有些流水线特别容易出错。虽然最好是解决这些错误的来源,但为了降低失败的成本,请考虑以下选项。
物化中间结果
流水线中可能有一个或多个特别耗时的转换,它们会占据大部分的流水线执行时间。此转换后的流水线失败可能会造成特别严重的后果,因为已完成的所有工作都会丢失。为了避免这种情况,请考虑将耗时步骤生成的中间 PCollections
写入 Cloud Storage 等接收器。此配置可降低失败的成本。您需要权衡这种优势与执行额外写入操作的成本。您可以通过以下任一方式使用此物化结果:
- 将原始流水线拆分为两个流水线:一个用于写入中间结果,另一个用于读取中间结果。
- 仅在流水线失败时,才会读取原始来源和已物化的中间集合中的结果并将其展平。
为了确保在进一步处理之前写入这些物化结果,请在任何后续处理步骤之前添加等待步骤(请参阅 wait class
)。