您可以使用 Dataflow 数据分析来优化作业性能。本主题演示了如何使用 gcloud
或 REST API 与 Dataflow 数据分析进行交互。您还可以在 Dataflow 控制台中查看数据分析。如需详细了解如何在控制台中查看数据分析,请参阅建议。
概览
Dataflow 数据分析提供的数据分析可用于提高作业性能、降低费用和排查错误。Dataflow 数据分析是 Recommender 服务的一部分,可以通过 google.dataflow.diagnostics.Insight
类型提供。
使用 Dataflow 数据分析时,请注意某些建议可能与您使用场景无关。
准备工作
在开始使用 Dataflow 数据分析之前,您必须完成以下步骤。
- 启用 Recommender API。
设置身份验证。
Select the tab for how you plan to use the samples on this page:
gcloud
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
REST
如需在本地开发环境中使用本页面上的 REST API 示例,请使用您提供给 gcloud CLI 的凭据。
After installing the Google Cloud CLI, initialize it by running the following command:
gcloud init
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
如需了解详情,请参阅 Google Cloud 身份验证文档中的使用 REST 时进行身份验证。
请确保您的账号具有以下权限:
recommender.dataflowDiagnosticsInsights.get
recommender.dataflowDiagnosticsInsights.list
recommender.dataflowDiagnosticsInsights.update
您可以单独授予这些权限,也可以授予以下角色之一:
roles/recommender.dataflowDiagnosticsViewer
roles/recommender.dataflowDiagnosticsAdmin
roles/dataflow.viewer
roles/dataflow.developer
roles/dataflow.admin
请求 Dataflow 数据分析
您可以列出 Dataflow 数据分析,如下所示。对于其他类型的数据分析互动,请参阅 Recommender API 的数据分析指南。
列出 Dataflow 数据分析
如需列出给定区域中项目的所有 Dataflow 数据分析,请使用以下方法之一:
gcloud
您可以使用
gcloud recommender insights list
命令来查看项目在指定区域中的所有 Dataflow 数据分析。在运行命令之前,请先替换以下值:
- PROJECT_ID:您要为其列出数据分析的项目的 ID。
- REGION:您的 Dataflow 作业在其中运行的区域。例如:
us-west1
。
gcloud recommender insights list --insight-type=google.dataflow.diagnostics.Insight \ --project=PROJECT_ID \ --location=REGION
输出会列出项目在指定区域中的所有 Dataflow 数据分析。
REST
您可以使用 Recommender API 的 insights.list 方法列出项目在指定区域中的所有 Dataflow 数据分析。
在使用任何请求数据之前,请先进行以下替换:
- PROJECT_ID:您要为其列出数据分析的项目的 ID。
- REGION:您的 Dataflow 作业在其中运行的区域。例如:
us-west1
。
HTTP 方法和网址:
GET https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights
如需使用 curl(Linux、macOS 或 Cloud Shell)发送请求,请运行以下命令:
curl -X GET \ -H "Authorization: Bearer "$(gcloud auth print-access-token) \ "https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights"
获取单一 Dataflow 数据分析
如需获取有关单一数据分析的更多信息(包括数据分析的说明、状态及其关联的任何建议),请使用以下方法之一:
gcloud
将
gcloud recommender insights describe
命令与您的数据分析 ID 结合使用,以查看单一数据分析的相关信息。在运行命令之前,请先替换以下值:- INSIGHT_ID:您要查看的数据分析的 ID。
- PROJECT_ID:您要为其列出数据分析的项目的 ID。
- REGION:您的 Dataflow 作业在其中运行的区域。例如:
us-west1
。
gcloud recommender insights describe INSIGHT_ID \ --insight-type=google.dataflow.diagnostics.Insight \ --project=PROJECT_ID \ --location=REGION
输出会显示详细数据分析。
REST
Recommender API 的 insights.get 方法可获取单一数据分析。在使用任何请求数据之前,请先进行以下替换:
- PROJECT_ID:您要为其列出数据分析的项目的 ID。
- REGION:您的 Dataflow 作业在其中运行的区域。例如:
us-west1
。 - INSIGHT_ID:您要查看的数据分析的 ID。
HTTP 方法和网址:
GET https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights/INSIGHT_ID
如需使用 curl(Linux、macOS 或 Cloud Shell)发送请求,请运行以下命令:
curl -X GET \ -H "Authorization: Bearer "$(gcloud auth print-access-token) \ "https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights/INSIGHT_ID"
解读 Dataflow 数据分析
获得单个数据分析后,您可以查看其内容,以了解其突出显示的资源使用的模式。除了标准数据分析属性之外,Dataflow 数据分析还提供以下子类型:
AUTOSCALING_NOT_ENABLED
:可以启用自动扩缩功能。作业的 CPU 利用率较高,并且使用的工作器数量已达到上限。启用自动扩缩功能可以提高性能。HIGH_FAN_OUT
:可以在一个或多个转换后插入融合中断,以增加并行性。MAX_NUM_WORKERS
:自动扩缩:工作器数量上限可以增加。该作业正在使用自动扩缩功能,CPU 利用率高,使用的工作器数量已达到上限。增加工作器数量上限可以提高性能。WORKER_OUT_OF_MEMORY
:作业的某些工作器因内存不足而失败,这可能会减慢作业速度或导致作业失败。PREBUILD_NOT_UTILIZED
:使用工作器映像预构建工作流,以改善工作器启动时间和自动扩缩可靠性。ACTIVE_KEYS
(预览版):有效键的总数少于核心总数,并且扩容无法解决此问题。LONG_WORK_ITEM
:融合阶段中的工作的处理时间太长,表示操作运行缓慢或卡住。
如需详细了解如何缓解 Dataflow 数据分析发现的问题,请参阅数据分析。
Dataflow 数据分析还提供一个特殊的
content
字段,该字段包含子字段,子字段包含有关数据分析的其他信息和元数据。根据您的使用场景,以下content
子字段可能会有用:jobName
:Dataflow 作业名称。description
:英文版数据分析的说明。title
:英文版数据分析的名称。
数据分析
检测到高扇出
当 Dataflow 检测到作业具有一个或多个高扇出转换时,系统会显示以下消息:
High fan-out detected
当具有高输出到输入元素计数比率的 ParDo 与后续 ParDo 融合时,就会显示此消息。在这种情况下,第二个 ParDo 会在第一个 ParDo 后依序运行,这会将给定输入的所有输出元素强制放到同一工作器上,从而降低并行性并降低性能。
要解决此问题,请执行以下操作:
- 插入一个
GroupByKey
并在第一个 ParDo 之后取消分组。Dataflow 服务永远不会融合汇总过程中的 ParDo 操作。如需了解详情,请参阅融合优化 - 将中间 PCollection 作为辅助输入传递到另一个 ParDo。Dataflow 服务始终会将辅助输入具体化。
- 插入“重排”步骤。重排可防止融合,为数据添加检查点并重新配置数据选取策略,以避免丢弃数据。虽然在 Apache Beam 文档中重排被标记为已弃用,Dataflow 仍然支持重排(请注意,重排数据可能会增加运行流水线的费用)。
自动扩缩:工作器数量上限可以增加
当 Dataflow 检测到作业使用的工作器数量达到上限
maxNumWorkers
(或max_num_workers
)时,如果提高此上限,作业可能会使用更多工作器,此时会显示以下消息:maximum number of workers could be increased
例如,对于
maxNumWorkers
设置为 50 的批量作业或流处理作业,当所有 50 个工作器的平均工作器 CPU 利用率超过 80% 时,就会提供此建议。此外,对于maxNumWorkers
设置为 50 的流处理作业,当所有 50 个工作器的平均工作器 CPU 利用率超过 50% 且作业的预计处理时间超过 2 分钟时,也会提供此建议。通常,增加
maxNumWorkers
会增加流水线吞吐量。批量流水线可以在更短的时间内完成,而流式流水线可以处理更大的数据峰值以及每秒处理更多元素。不过,这可能会增加费用。如需了解详情,请参阅工作器资源价格。如需详细了解自动扩缩算法的工作原理及其配置方式,请参阅自动扩缩指南。要解决此问题,请执行以下操作:
- 增加或移除
maxNumWorkers
流水线选项。如果没有该选项,Dataflow 将使用自动扩缩指南中列出的默认值。 - 如果流水线性能足够,您可以不执行任何操作。
- 对于批处理流水线,请确保总运行时间符合您的要求。
- 对于流式流水线,请查看作业页面的作业指标标签页上的数据新鲜度图表。验证图中的值是否未不断增加,且是否在可接受的范围内。
自动扩缩:设置初始工作器数量可以提高作业性能
当 Dataflow 检测到作业使用的若干工作器超过运行时间的 50% 时,将初始工作器数量设置为建议值可以缩短批量作业的运行时间或在更新流处理作业时防止积压量增加,从而提高作业性能。
工作器失败并显示 OutOfMemory 错误
当 Dataflow 检测到作业的工作器因内存不足错误而失败时,会显示以下消息:
Some workers are out of memory
作业的某些工作器因内存不足而失败。虽然作业可能会完成,但这些错误也可能会阻止作业成功完成或降低性能。
请尝试以下建议:
- 手动增加可供工作器使用的内存量。
- 通过分析内存用量来减少所需的内存量。如需了解详情,请参阅排查 Dataflow 内存不足错误。
未使用预构建工作流
当 Dataflow 检测到未使用工作器映像预构建工作流的流水线时,会显示以下消息:
pre-build workflow not utilized
如果未使用工作器映像预构建工作流,则流水线具有在运行时重复安装的依赖项。此配置会使工作器启动变慢,从而降低作业的吞吐量,并导致不可靠的自动扩缩行为。
要解决此问题,请在启动流水线时使用工作器映像预构建工作流。如需了解详情,请参阅预构建的 Python 依赖项。
如果自定义的预构建容器已被使用,为避免不必要的安装,请添加“--sdk_location=container”选项,并移除以下选项:
- '--setup_file'
- '--requirements_file'
- '--extra_package(s)'
有效键的数量较少
如果 Dataflow 检测到某个作业落后,原因是有效键数量少于核心总数,并且扩容无法解决此问题,则会显示以下消息:
Active keys can be increased
为了在作业中运行用户代码,Dataflow 使用工作器。每个线程都映射到一个键,每个键负责一组要处理的数据,并且出于正确性原因,一次只能在一个核心上运行。
在某些情况下,一些核心工作量过大,而其他核心则处于空闲状态。如需解决此问题,请增加键的数量,这可以增加活动线程数。
增加键数量的可能解决方案:您可以通过使用更具体的键类型来增加键数量。例如,如果键类型为
IP address
,则可用的键较少。但是,如果您将键类型更改为IP + [user identifier]
,则会有更多可用的键,这可以提高并行性。对于写入 BigQuery 且接收器可能成为瓶颈的流水线,请参阅这篇文章。对于其他来源/接收器,请检查它是否具有numShards
参数并提高它的值。通常,一个分片映射到一个键。如需查看我们的执行模型的常规指导信息,请参阅这篇文章。可以使用扇出来获取单个输入键并向其添加哈希以生成多个输出键。参考文档阶段的作业处理用时过长
当 Dataflow 检测到工作处理时间频繁过长时,会显示以下消息:
Stage spending too long on work
Dataflow 将工作以元素组成的软件包形式发送到融合阶段以进行处理;在针对该阶段处理完所有元素及其输出后,每个软件包会被视为已完成。流式传输流水线针对不到一分钟即可完成处理的工作包进行了优化,因此处理时间过长可能会导致流水线中出现进一步的性能问题。
此问题可能是由卡住或缓慢的用户转换所导致的。这些转换可以通过 Cloud Logging 及其“诊断”标签页中发出的警告,以及关键短语“操作正在进行”或“处理卡住”来识别。如需诊断此问题是否由用户转换所导致,请使用 Cloud Profiler 检查用户转换的性能。然后,跟踪哪些代码导致速度缓慢以及出现缓慢情况的频率。如需了解详情,请参阅排查常见 Dataflow 错误。
如果调查表明处理时间过长不是由用户转换引起的,则我们建议您与 Cloud 支持团队联系,并说明调查步骤。
作业卡在工作项上
当 Dataflow 检测到某个键因单个工作项反复失败并重试后卡住时,系统会显示以下消息:
Job is stuck due to failed and retried work item
在 Dataflow 中,流水线中的所有消息都在特定键下进行处理。如果处理消息时出现错误,则该消息会重试。消息重试两到三次是可以接受的。不过,如果错误反复出现(例如连续出现 10 次),通常表示流水线代码存在根本性问题。当键上的特定消息重试时,同一键下的其他消息无法继续。如果消息失败 10 次或更多次,则问题可能不会自行解决。此消息失败可能会导致流水线问题,例如:
- 延迟水印
- 累积积压
- 阻止排水操作完成
如需调试此问题,请调查报告建议的阶段,并查看日志以确定有问题的代码。然后,使用新的流水线代码更新作业,让作业顺利进行。
未启用 Streaming Engine
当 Dataflow 检测到流处理作业未启用 Streaming Engine 时,会显示以下消息:
This job isn't using Streaming Engine. It might benefit from having Streaming Engine enabled.
使用 Streaming Engine 具有各种潜在优势,包括更好的横向自动扩缩、更高的可支持性以及降低工作器虚拟机上的 CPU、内存和 Persistent Disk 存储资源用量。Streaming Engine 还支持按资源结算。
如未另行说明,那么本页面中的内容已根据知识共享署名 4.0 许可获得了许可,并且代码示例已根据 Apache 2.0 许可获得了许可。有关详情,请参阅 Google 开发者网站政策。Java 是 Oracle 和/或其关联公司的注册商标。
最后更新时间 (UTC):2025-06-18。