编排流水线

本页面介绍了如何使用 Cloud Composer 和触发器进行流水线编排。Cloud Data Fusion 建议使用 Cloud Composer 来编排流水线。如果您需要以更简单的方式管理编排,请使用触发器。

Composer

使用 Cloud Composer 编排流水线

使用 Cloud Composer 在 Cloud Data Fusion 中编排流水线执行具有以下优势:

  • 集中式工作流管理:统一管理多个 Cloud Data Fusion 流水线的执行。
  • 依赖项管理:为确保执行正确的执行顺序,请定义流水线之间的依赖关系。
  • 监控和提醒:Cloud Composer 提供监控功能并针对故障发出提醒。
  • 与其他服务集成:借助 Cloud Composer,您可以编排跨越 Cloud Data Fusion 和其他 Google Cloud 服务的工作流。

如需使用 Cloud Composer 编排 Cloud Data Fusion 流水线,请按以下流程操作:

  1. 设置 Cloud Composer 环境。

    • 创建 Cloud Composer 环境。如果没有,请在 Google Cloud 项目中预配环境。此环境就是您的编排工作区。
    • 授予权限。确保 Cloud Composer 服务帐号具有访问 Cloud Data Fusion 的必要权限(例如启动、停止和列出流水线的权限)。
  2. 定义用于编排的有向无环图 (DAG)。

    • 创建 DAG:在 Cloud Composer 中,创建一个 DAG,用于定义 Cloud Data Fusion 流水线的编排工作流。
    • Cloud Data Fusion 运算符:在 DAG 中使用 Cloud Composer 的 Cloud Data Fusion 运算符。这些运算符可让您以编程方式与 Cloud Data Fusion 交互。

Cloud Data Fusion 运算符

Cloud Data Fusion 流水线编排具有以下运算符:

CloudDataFusionStartPipelineOperator

按 ID 触发 Cloud Data Fusion 流水线的执行。它具有以下参数:

  • 流水线 ID
  • 位置(Google Cloud 区域)
  • 流水线命名空间
  • 运行时参数(可选)
  • 等待完成(可选)
  • 超时(可选)
CloudDataFusionStopPipelineOperator

允许您停止正在运行的 Cloud Data Fusion 流水线。

CloudDataFusionDeletePipelineOperator

删除 Cloud Data Fusion 流水线。

构建 DAG 工作流

构建 DAG 工作流时,请考虑以下事项:

  • 定义依赖项:使用 DAG 结构定义任务之间的依赖关系。例如,您可能有一个任务等待一个命名空间中的流水线成功完成,然后再触发另一个命名空间中的另一个流水线。
  • 时间安排:安排 DAG 以特定时间间隔(例如每天或每小时)运行,或设置为手动触发。

如需了解详情,请参阅 Cloud Composer 概览

触发器

使用触发器编排流水线

借助 Cloud Data Fusion 触发器,您可以在一个或多个上游流水线完成(成功、失败或任何指定条件)后自动执行下游流水线。

触发器对于以下任务非常有用:

  • 清理一次数据,然后将其提供给多个下游流水线以供使用。
  • 在流水线之间共享信息,例如运行时参数和插件配置。此任务称为载荷配置
  • 拥有一组使用每小时、每天、每周或每月的数据运行的动态流水线,而不是每次运行时必须更新的静态流水线。

例如,假设您有一个数据集,其中包含公司货物的所有相关信息。根据这些数据,您想要回答几个业务问题。为此,您需要创建一个名为“运单数据清理”的流水线,用于清理有关运单的原始数据。然后,创建第二条流水线“Delayed Shipments USA”,该流水线读取经过清理的数据,并查找美国境内延迟超过指定阈值的货物。上游 Shipments Data Cleaning 流水线成功完成后,可以立即触发 Delayed Shipments USA 流水线。

此外,由于下游流水线使用上游流水线的输出,因此您必须指定:当下游流水线使用此触发器运行时,它还会接收要读取的输入目录(即上游流水线生成其输出的目录)。此过程称为“传递载荷配置”,您可以使用运行时参数对其进行定义。它可让您拥有一组使用每小时、每天、每周或每月的数据运行的动态流水线(而不是静态流水线,每次运行时都必须更新其数据)。

如需使用触发器编排流水线,请按以下流程操作:

  1. 创建上游和下游流水线。

    • 在 Cloud Data Fusion Studio 中,设计和部署形成编排链的流水线。
    • 考虑哪个流水线的完成将激活工作流中的下一个流水线(下游)。
  2. 可选:为上游流水线传递运行时参数。

  3. 在下游流水线上创建入站触发器。

    • 在 Cloud Data Fusion Studio 中,转到列表页面。在已部署标签页中,点击下游流水线的名称。此时将显示该流水线的“部署”视图。
    • 在页面左侧中间位置,点击入站触发器。 系统会显示可用流水线列表。
    • 点击上游流水线。选择一个或多个上游流水线完成状态(成功失败停止)作为下游流水线应运行的时间条件。
    • 如果您希望上游流水线与下游流水线共享信息(称为载荷配置),请点击触发器配置,然后按照相应步骤将载荷配置作为运行时参数传递。否则,请点击启用触发器
  4. 测试触发器。

    • 启动上游流水线运行。
    • 如果触发器配置正确,则下游流水线会根据您配置的条件,在上游流水线完成后自动执行。

将载荷配置作为运行时参数传递

载荷配置允许将信息从上游流水线共享到下游流水线。例如,此信息可以是输出目录、数据格式或流水线运行日期。然后,下游流水线会使用这些信息来做出决策,例如确定要从哪个数据集读取数据。

如需将信息从上游流水线传递到下游流水线,您可以使用运行时参数的值或上游流水线中任何插件的配置值来设置下游流水线的运行时参数。

每当下行流水线触发并运行时,系统都会使用触发下行流水线的特定上行流水线运行作业的运行时参数来设置其载荷配置。

如需将载荷配置作为运行时参数传递,请按以下步骤操作:

  1. 点击触发器配置后,系统会显示您之前为上游流水线设置的所有运行时参数。选择执行此触发器时要从上游流水线传递到下游流水线的运行时参数。
  2. 点击插件配置标签页,以查看在上游流水线触发时将从上游流水线传递到下游流水线的内容列表。
  3. 点击配置并启用触发器