如需停止 Dataflow 作业,请使用 Google Cloud 控制台、Cloud Shell(使用 Google Cloud CLI 安装的本地终端)或 Dataflow REST API。
您可以通过以下三种方式之一停止 Dataflow 作业:
取消作业。此方法对流处理流水线和批处理流水线都适用。取消作业会阻止 Dataflow 服务处理任何数据,包括已缓冲的数据。如需了解详情,请参阅取消作业。
排空作业。此方法仅适用于流处理流水线。排空作业使 Dataflow 服务可完成缓冲数据的处理,同时停止提取新数据。如需了解详情,请参阅排空作业。
强制取消作业。此方法对流处理流水线和批处理流水线都适用。强制取消作业会立即阻止 Dataflow 服务处理任何数据,包括已缓冲的数据。在强制取消之前,您必须先尝试常规取消。强制取消仅适用于卡在常规取消过程中的作业。如需了解详情,请参阅强制取消作业。
取消作业时无法重启它。如果您未使用 Flex 模板,则可以克隆已取消的流水线,并通过克隆的流水线启动新作业。
在停止流处理流水线之前,请考虑创建作业的快照。Dataflow 快照保存了流处理流水线的状态,因此您可以在不丢失状态的情况下启动 Dataflow 作业的新版本。如需了解详情,请参阅使用 Dataflow 快照。
如果您有一个复杂的流水线,请考虑创建一个模板并通过该模板运行作业。
您无法删除 Dataflow 作业,但可以归档已完成的作业。所有已完成的作业(包括归档作业列表中的作业)在保留 30 天后都会被删除。
取消 Dataflow 作业
取消作业时,Dataflow 服务会立即停止作业。
取消作业时会进行以下操作:
Dataflow 服务停止所有数据提取和数据处理操作。
Dataflow 服务开始清理关联到您作业的 Google Cloud 资源。
这可能包括关停 Compute Engine 工作器实例,并断开与 I/O 源或接收器的活跃连接。
有关取消作业的重要信息
取消作业会立即停止流水线的处理。
取消作业可能会丢失运行中数据。运行中数据是指已读取但仍在由流水线处理的数据。
在取消作业之前已从流水线写入输出接收器的数据在输出接收器上可能仍处于可访问状态。
如果您不太在意数据丢失,取消作业可确保尽快关闭与作业关联的 Google Cloud 资源。
排空 Dataflow 作业
当您排空作业时,Dataflow 服务会以当前状态完成作业。如果您希望在关闭流处理流水线时防止数据丢失,最好的方法是排空作业。
排空作业时会进行以下操作:
收到排空请求后,您的作业很快(通常在几分钟之内)就会停止从输入源提取新数据。
Dataflow 服务会保留所有现有资源(例如工作器实例),以完成流水线中任何已缓冲数据的处理和写入操作。
在完成所有等待中的处理操作和写入操作后,Dataflow 服务会关停与您的作业关联的 Google Cloud 资源。
如需排空作业,Dataflow 会停止读取新输入,用事件时间戳标记无限,然后通过流水线传播无限时间戳。因此,排空过程中的流水线可能具有无限水印。
有关排空作业的重要信息
批处理流水线不支持排空作业。
在完成所有处理和写入操作之前,您的流水线会因维护任何关联的 Google Cloud 资源而继续产生费用。
您可以更新正在排空的流水线。如果您的流水线卡住,请使用可修复导致问题的错误的代码来更新流水线,以便在不丢失数据的情况下成功排空。
您可以取消当前正在排空的作业。
排空作业可能需要花费很长时间才能完成,例如,当流水线中包含大量已缓冲数据。
如果流处理流水线包含可拆分 DoFn,则必须先截断结果,然后再运行排空选项。如需详细了解如何截断可拆分 DoFn,请参阅 Apache Beam 文档。
在某些情况下,Dataflow 作业可能无法完成排空操作。您可以查看作业日志以确定根本原因并采取相应措施。
数据保留
Dataflow 流式传输可容忍工作器重启,并且不会在出现错误时使流式传输作业失败。相反,Dataflow 服务会重试,直到您执行取消或重启作业等操作。当您排空作业时,Dataflow 会继续重试,这可能会导致流水线卡住。在这种情况下,如需启用成功的排空操作并避免数据丢失,请使用可修复导致问题的错误的代码来更新流水线。
在 Dataflow 服务持久提交消息之前,Dataflow 不会确认消息。例如,借助 Kafka,您可以将此过程视为从 Kafka 到 Dataflow 的消息所有权的安全移交,从而消除了数据丢失的风险。
作业卡住
- 排空无法解决流水线卡住问题。如果数据移动被阻止,则在执行排空命令之后,流水线仍会卡住。如需解决流水线卡住的问题,请使用更新命令更新流水线,其中包含解决导致问题的错误的代码。您也可以取消卡住的作业,但取消作业可能会导致数据丢失。
计时器
如果流处理流水线代码包含循环计时器,则作业可能会很慢或无法排空。由于排空直到所有计时器结束为止,因此具有无限循环计时器的流水线永远不会排空。
Dataflow 会等待所有处理时间计时器完成运行,而不是立即触发它们,这可能会导致排空缓慢。
排空作业的影响
当您排空流处理流水线时,Dataflow 会立即关闭所有正在处理的窗口并触发所有触发器。
在排空操作中,系统不会等待任何基于时间的未完成窗口结束运行。
例如,如果在您排空作业时,您的流水线已在一个时长两小时的窗口中运行了十分钟,则 Dataflow 不会等待窗口的剩余部分结束运行。它会立即关闭窗口并生成部分结果。Dataflow 通过将数据水印提升到无穷大来关闭未完成窗口。此功能同样适用于自定义数据源。
在排空使用自定义数据源类的流水线时,Dataflow 会停止发出获取新数据的请求,将数据水印提升到无穷大,并针对最后一个检查点调用源的 finalize()
方法。
排空可能会导致窗口部分填充。在这种情况下,如果您重启排空的流水线,则同一窗口可能会再次触发,这可能会导致数据出现问题。例如,在以下场景中,文件可能具有冲突的名称,并且数据可能会被覆盖:
如果您在 12:34 PM 排空具有每小时窗口的流水线,则 12:00 PM 到 1:00 PM 的窗口将仅关闭在该窗口的前 34 分钟内触发的数据。流水线在 12:34 PM 之后不会读取新数据。
如果您随后立即重启该流水线,则 12:00 PM 到 1:00 PM 窗口会再次触发,只有从 12:35 PM 读取到 1:00 PM 的数据。不会发送任何重复项,但如果重复文件名,数据会被覆盖。
在 Google Cloud 控制台中,您可以查看流水线转换的详细信息。下图展示了进行中的排空操作的效果。请注意,水印已提升为最大值。
图 1. 排空操作的步骤视图。
强制取消 Dataflow 作业
仅当您无法使用其他方法取消作业时才使用强制取消。强制取消会在不清理所有资源的情况下终止作业。如果您反复使用强制取消,泄漏的资源可能会累积,而泄漏的资源会占用您的配额。
当您强制取消作业时,Dataflow 服务会立即停止作业,从而泄露 Dataflow 作业创建的所有虚拟机。必须尝试至少 30 分钟的常规取消才能进行常强制取消。
强制取消作业时会发生以下行为:
- Dataflow 服务停止所有数据提取和数据处理操作。
有关强制取消作业的重要信息
强制取消作业会立即停止流水线的处理。
强制取消作业仅适用于卡在常规取消过程中的作业。
Dataflow 作业创建的所有工作器实例不一定会发布,这可能会导致工作器实例泄露。泄露的工作器实例不会产生作业费用,但可能会使用您的配额。作业取消完成后,您可以删除这些资源。
对于 Dataflow Prime 作业,您无法查看或删除泄露的虚拟机。在大多数情况下,这些虚拟机不会造成问题。不过,如果泄露的虚拟机导致问题(例如消耗虚拟机配额),请与支持团队联系。
停止 Dataflow 作业
控制台
转到 Dataflow 作业页面。
点击要停止的作业。
要停止作业,作业状态必须为正在运行。
在作业详情页面上,点击停止。
执行下列其中一项操作:
对于批处理流水线,请点击取消或强制取消。
对于流处理流水线,请点击取消、排空或强制取消。
要确认您的选择,请点击停止作业。
gcloud
要排空或取消 Dataflow 作业,您可以使用 Cloud Shell 中的 gcloud dataflow jobs
命令或随 gcloud CLI 安装的本地终端。
登录 shell。
列出当前正在运行的 Dataflow 作业的作业 ID,然后记下要停止的作业的作业 ID:
gcloud dataflow jobs list
如果未设置
--region
标志,则会显示所有可用区域中的 Dataflow 作业。执行下列其中一项操作:
排空流处理作业:
gcloud dataflow jobs drain JOB_ID
将
JOB_ID
替换为您之前复制的作业 ID。取消批处理或流处理作业:
gcloud dataflow jobs cancel JOB_ID
将
JOB_ID
替换为您之前复制的作业 ID。强制取消批处理或流处理作业:
gcloud dataflow jobs cancel JOB_ID --force
将
JOB_ID
替换为您之前复制的作业 ID。
API
如需使用 Dataflow REST API 取消或排空作业,您可以选择 projects.locations.jobs.update
或 projects.jobs.update
。在请求正文中,在所选 API 的作业实例的 requestedState
字段中传递所需的作业状态。
重要提示:建议使用 projects.locations.jobs.update
,因为 projects.jobs.update
只允许更新在 us-central1
中运行的作业的状态。
如需取消作业,请将作业状态设置为
JOB_STATE_CANCELLED
。如需排空作业,请将作业状态设置为
JOB_STATE_DRAINED
。要强制取消作业,请将作业状态设置为
JOB_STATE_CANCELLED
并使用标签"force_cancel_job": "true"
。请求正文如下:{ "requestedState": "JOB_STATE_CANCELLED", "labels": { "force_cancel_job": "true" } }
检测 Dataflow 作业的完成情况
如需检测作业取消或排空何时完成,请使用以下任一方法:
- 使用 Cloud Composer 等工作流编排服务监控 Dataflow 作业。
- 同步运行流水线,以便在流水线完成之前阻止任务运行。如需了解详情,请参阅“设置流水线选项”中的控制执行模式。
使用 Google Cloud CLI 中的命令行工具轮询作业状态。如需获取项目中所有 Dataflow 作业的列表,请在 shell 或终端中运行以下命令:
gcloud dataflow jobs list
输出会显示每个作业的作业 ID、名称、状态 (
STATE
) 以及其他信息。如需了解详情,请参阅使用 Dataflow 命令行界面。
归档 Dataflow 作业
归档 Dataflow 作业时,作业将从控制台的 Dataflow 作业页面的作业列表中移除。系统会将该作业移动到已归档作业列表。您只能归档已完成的作业,包括以下状态的作业:
JOB_STATE_CANCELLED
JOB_STATE_DRAINED
JOB_STATE_DONE
JOB_STATE_FAILED
JOB_STATE_UPDATED
如需了解详情,请参阅本文档中的检测 Dataflow 作业完成情况。如需了解问题排查信息,请参阅“排查 Dataflow 错误”中的归档作业错误。
所有归档作业在保留 30 天后都会被删除。
将作业归档
请按照以下步骤操作,从 Dataflow 作业页面的主作业列表中移除已完成的作业。
控制台
在 Google Cloud 控制台中,转到 Dataflow 作业页面。
此时系统将显示 Dataflow 作业及其状态的列表。
选择一个作业。
在作业详情页面上,点击归档。如果作业尚未完成,则归档选项将不可用。
API
如需使用 API 归档作业,请使用 JobMetadata
字段。在 JobMetadata
字段中,对于 userDisplayProperties
,请使用键值对 "archived":"true"
。
您的 API 请求还必须包含 updateMask 查询参数。
curl --request PUT \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
--data
'{"job_metadata":{"userDisplayProperties":{"archived":"true"}}}' \
--compressed
替换以下内容:
PROJECT_ID
:您的项目 IDREGION
:Dataflow 区域JOB_ID
:您的 Dataflow 作业的 ID
查看和恢复已归档的作业
请按照以下步骤操作,查看已归档的作业或将已归档的作业恢复到 Dataflow 作业页面上的主作业列表。
控制台
在 Google Cloud 控制台中,转到 Dataflow 作业页面。
点击已归档切换开关。此时将显示已归档的 Dataflow 作业列表。
选择一个作业。
如需将作业恢复到 Dataflow 作业页面上的主作业列表,请在作业详情页面上点击恢复。
API
如需使用 API 恢复作业,请使用 JobMetadata
字段。在 JobMetadata
字段中,对于 userDisplayProperties
,请使用键值对 "archived":"false"
。
您的 API 请求还必须包含 updateMask 查询参数。
curl --request PUT \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
--data
'{"job_metadata":{"userDisplayProperties":{"archived":"false"}}}' \
--compressed
替换以下内容:
PROJECT_ID
:您的项目 IDREGION
:Dataflow 区域JOB_ID
:您的 Dataflow 作业的 ID
后续步骤
- 探索 Dataflow 命令行。
- 探索 Dataflow REST API。
- 探索 Google Cloud 控制台中的 Dataflow 监控界面。
- 详细了解如何更新流水线。