停止正在运行的流水线

您不能删除 Dataflow 作业,只能将其停止。

如需停止 Dataflow 作业,您可以使用 Google Cloud ConsoleCloud Shell(使用 Cloud SDK 安装的本地终端)或 Dataflow REST API

您可以通过以下两种方式停止 Dataflow 作业:

  • 取消作业。此方法适用于流处理流水线和批处理流水线。取消作业会阻止 Dataflow 服务处理任何数据,包括已缓冲的数据。如需了解详情,请参阅取消作业

  • 排空作业。此方法仅适用于流处理流水线。排空作业使 Dataflow 服务可完成缓冲数据的处理,同时停止提取新数据。如需了解详情,请参阅排空作业

取消作业

取消作业时,Dataflow 服务会立即停止作业。

取消作业时会进行以下操作:

  1. Dataflow 服务停止所有数据提取和数据处理操作。

  2. Dataflow 服务开始清理关联到您作业的 Google Cloud 资源。

    这可能包括关停 Compute Engine 工作器实例,并断开与 I/O 源或接收器的活跃连接。

有关取消作业的重要信息

  • 取消作业会立即停止流水线的处理。

  • 取消作业可能会丢失运行中数据。运行中数据是指已读取但仍在由流水线处理的数据。

  • 在取消作业之前已从流水线写入输出接收器的数据在输出接收器上可能仍处于可访问状态。

  • 如果您不太在意数据丢失,取消作业可确保尽快关闭与作业关联的 Google Cloud 资源。

排空作业

当您排空作业时,Dataflow 服务会以当前状态完成作业。如果您希望在关闭流处理流水线时防止数据丢失,最好的方法是排空作业。

排空作业时会进行以下操作:

  1. 收到排空请求后,您的作业很快(通常在几分钟之内)就会停止从输入源提取新数据。

  2. Dataflow 服务会保留所有现有资源(例如工作器实例),以完成流水线中任何已缓冲数据的处理和写入操作。

  3. 在完成所有等待中的处理操作和写入操作后,Dataflow 服务会关停与您的作业关联的 Google Cloud 资源。

有关排空作业的重要信息

  • 批处理流水线不支持排空作业。

  • 在完成所有处理和写入操作之前,您的流水线会因维护任何关联的 Google Cloud 资源而继续产生费用。

  • 如果流处理流水线代码包含循环计时器,则无法排空作业。

  • Dataflow 流式传输可容忍工作器重启,并且不会在出现错误时使流式传输作业失败。相反,Dataflow 服务将重试,直到用户执行排空、取消或重启作业等操作。建议排空作业,以避免数据丢失。

  • 在 Dataflow 服务持久提交消息之前,Dataflow 不会确认消息。例如,对于 Kafka,这可以视为从 Kafka 到 Dataflow 的消息所有权的安全移交,从而消除了数据丢失的风险。

  • 如果流处理流水线包含可拆分 DoFn,则必须先截断结果,然后再运行排空选项。如需详细了解如何截断可拆分 DoFn,请参阅 Apache Beam 文档。

  • 您可以更新正在排空的流水线。

  • 排空作业可能需要花费很长时间才能完成,例如,当流水线中包含大量已缓冲数据。

  • 您可以取消当前正在排空的作业。

  • 在某些情况下,Dataflow 作业可能无法完成排空操作。您可以查看作业日志以确定根本原因并采取相应措施。

排空作业的影响

当您排空流处理流水线时,Dataflow 会立即关闭所有正在处理的窗口并触发所有触发器

在排空操作中,系统不会等待任何基于时间的未完成窗口结束运行。

例如,如果在您排空作业时,您的流水线已在一个时长两小时的窗口中运行了十分钟,则 Dataflow 不会等待窗口的剩余部分结束运行。它会立即关闭窗口并生成部分结果。Dataflow 通过将系统水印提升到无穷大来关闭未完成窗口。此功能同样适用于自定义数据源。

在排空使用自定义数据源类的流水线时,Dataflow 会停止发出获取新数据的请求,将系统水印提升到无穷大,并针对最后一个检查点调用源的 finalize() 方法。

在 Google Cloud Console 中,您可以查看流水线转换的详细信息。下图展示了进行中的排空操作的效果。请注意,水印已提升为最大值。

排空操作的步骤视图。

图 1. 排空操作的步骤视图。

停止作业

在停止作业之前,您必须了解取消排空作业的影响。

控制台

  1. 转到 Dataflow 作业页面。

    转到作业

  2. 点击要停止的作业。

    要停止作业,作业状态必须为正在运行

  3. 在作业详情页面上,点击停止

  4. 执行下列其中一项操作:

    • 对于批处理流水线,点击取消

    • 对于流处理流水线,点击取消排空

  5. 要确认您的选择,请点击停止作业

gcloud

要排空或取消 Dataflow 作业,您可以使用 Cloud Shell 中的 gcloud dataflow jobs 命令或随 Cloud SDK 安装的本地终端。

  1. 登录 shell。

  2. 列出当前正在运行的 Dataflow 作业的作业 ID,然后记下要停止的作业的作业 ID:

    gcloud dataflow jobs list
    

    如果未设置 --region 标志,则会显示所有可用区域中的 Dataflow 作业。

  3. 执行下列其中一项操作:

    • 排空流处理作业:

       gcloud dataflow jobs drain JOB_ID
      

      JOB_ID 替换为您之前复制的作业 ID。

    • 取消批处理或流处理作业:

      gcloud dataflow jobs cancel JOB_ID
      

      JOB_ID 替换为您之前复制的作业 ID。

API

如需使用 Dataflow REST API 取消或排空作业,您可以选择 projects.locations.jobs.updateprojects.jobs.update。在请求正文中,在所选 API 的作业实例的 requestedState 字段中传递所需的作业状态

  • 如需取消作业,请将作业状态设置为 JOB_STATE_CANCELLED

  • 如需排空作业,请将作业状态设置为 JOB_STATE_DRAINED

后续步骤