编排流水线

本页介绍了如何使用 Cloud Composer 和触发器进行流水线编排。Cloud Data Fusion 建议使用 Cloud Composer 来编排流水线。如果您需要更简单的方式来管理编排,请使用触发器。

Composer

使用 Cloud Composer 编排流水线

使用 Cloud Composer 在 Cloud Data Fusion 中编排流水线执行具有以下优势:

  • 集中式工作流管理:统一管理多个 Cloud Data Fusion 流水线的执行。
  • 依赖项管理:为确保正确的执行顺序,请定义流水线之间的依赖项。
  • 监控和提醒:Cloud Composer 提供监控功能和故障提醒。
  • 与其他服务集成:借助 Cloud Composer,您可以编排跨 Cloud Data Fusion 和其他Google Cloud 服务的工作流。

如需使用 Cloud Composer 编排 Cloud Data Fusion 流水线,请按以下步骤操作:

  1. 设置 Cloud Composer 环境。

    • 创建一个 Cloud Composer 环境。如果您还没有环境,请在 Google Cloud 项目中预配环境。此环境是您的编排工作区。
    • 授予权限。确保 Cloud Composer 服务账号拥有访问 Cloud Data Fusion 的必要权限(例如启动、停止和列出数据流的权限)。
  2. 定义有向无环图 (DAG) 以进行编排。

    • 创建 DAG:在 Cloud Composer 中,创建一个 DAG,用于定义 Cloud Data Fusion 流水线的编排工作流。
    • Cloud Data Fusion 运算符:在 DAG 中使用 Cloud Composer 的 Cloud Data Fusion 运算符。借助这些运算符,您可以以编程方式与 Cloud Data Fusion 交互。

Cloud Data Fusion 运算符

Cloud Data Fusion 流水线编排有以下运算符:

CloudDataFusionStartPipelineOperator

根据 Cloud Data Fusion 流水线的 ID 触发其执行。它具有以下参数:

  • 流水线 ID
  • 位置(Google Cloud 区域)
  • 流水线命名空间
  • 运行时参数(可选)
  • 等待完成(可选)
  • 超时(可选)
CloudDataFusionStopPipelineOperator

用于停止正在运行的 Cloud Data Fusion 流水线。

CloudDataFusionDeletePipelineOperator

删除 Cloud Data Fusion 流水线。

构建 DAG 工作流

构建 DAG 工作流时,请考虑以下事项:

  • 定义依赖项:使用 DAG 结构定义任务之间的依赖项。例如,您可以创建一个任务,让其等待一个命名空间中的流水线成功完成,然后再触发另一个命名空间中的另一个流水线。
  • 安排时间表:将 DAG 安排为按特定时间间隔(例如每天或每小时)运行,或将其设置为手动触发。

如需了解详情,请参阅 Cloud Composer 概览

触发器

使用触发器编排流水线

借助 Cloud Data Fusion 触发器,您可以在一个或多个上游流水线完成(成功、失败或满足任何指定条件)后自动执行下游流水线。

触发器非常适用于以下任务:

  • 清理一次数据,然后将其提供给多个下游流水线。
  • 在流水线之间共享信息,例如运行时参数和插件配置。此任务称为载荷配置
  • 拥有一组使用小时、天、周或月的数据运行的动态流水线,而不是每次运行时都必须更新的静态流水线。

例如,您有一个数据集,其中包含与贵公司运输相关的所有信息。您希望根据这些数据回答几个业务问题。为此,您需要创建一个用于清理有关配送的原始数据的流水线,称为配送数据清理。然后,您创建第二个流水线“美国境内延迟送货”,该流水线会读取经过清理的数据,并找出美国境内延迟时间超过指定阈值的货物。上游送货数据清理流水线成功完成后,系统即可触发美国延迟送货流水线。

此外,由于下游流水线会使用上游流水线的输出,因此您必须指定,当下游流水线使用此触发器运行时,它还会收到要读取的输入目录(即上游流水线生成输出的目录)。此过程称为传递载荷配置,您可以使用运行时参数定义此配置。借助此功能,您可以拥有一组使用小时、天、周或月的数据运行的动态流水线(而不是每次运行时都必须更新的静态流水线)。

如需使用触发器编排流水线,请按以下流程操作:

  1. 创建上游和下游流水线。

    • 在 Cloud Data Fusion Studio 中,设计并部署构成编排链的流水线。
    • 考虑哪个流水线的完成会激活工作流中的下一个流水线(下游)。
  2. 可选:为上游流水线传递运行时参数。

  3. 在下游流水线上创建入站触发器。

    • 在 Cloud Data Fusion Studio 中,前往列表页面。在已部署标签页中,点击下游流水线的名称。您将看到该流水线的“部署”视图。
    • 点击页面中间的入站触发器。 系统会显示可用流水线的列表。
    • 点击上游流水线。选择一种或多种上游流水线完成状态(成功失败停止)作为下游流水线应运行的时间条件。
    • 如果您希望上游流水线与下游流水线共享信息(称为载荷配置),请点击触发器配置,然后按照将载荷配置作为运行时参数传递的步骤操作。否则,请点击启用触发器
  4. 测试触发器。

    • 启动上游流水线的运行。
    • 如果触发器配置正确,下游流水线会在上游流水线完成后根据您配置的条件自动执行。

将载荷配置作为运行时参数传递

载荷配置允许将来自上游流水线的信息共享到下游流水线。例如,此信息可以是输出目录、数据格式或流水线的运行日期。然后,下游流水线会使用此信息做出决策,例如确定要读取的正确数据集。

如需将信息从上游流水线传递给下游流水线,请使用下游流水线的运行时参数或任何插件的配置设置下游流水线的运行时参数。

每当下游流水线触发并运行时,其载荷配置将使用触发下游流水线的特定上游流水线运行的运行时参数进行设置。

如需将载荷配置作为运行时参数传递,请按以下步骤操作:

  1. 从您在创建入站触发器部分,点击触发器配置,任何运行时参数之前设置。选择在此触发器执行时要从上游流水线传递到下游流水线的运行时参数。
  2. 点击插件配置标签页以查看在上游流水线被触发时将传递给下游流水线的列表。
  3. 点击配置并启用触发器