编排流水线

本页面介绍了如何使用 Cloud Composer 编排流水线,以及 触发器。Cloud Data Fusion 建议使用 Cloud Composer 来编排流水线。如果您需要更简单的方式来管理编排,请使用 触发器。

Composer

使用 Cloud Composer 编排流水线

在 Cloud Data Fusion 中编排流水线执行, Cloud Composer 具有以下优势:

  • 集中式工作流管理:统一管理 多个 Cloud Data Fusion 流水线。
  • 依赖项管理:为确保适当的执行顺序,定义 流水线之间的依赖项。
  • 监控和提醒:Cloud Composer 提供 监控功能和故障提醒。
  • 与其他服务的集成:借助 Cloud Composer,您可以编排跨 Cloud Data Fusion 和其他 Google Cloud 服务的工作流。

使用以下代码编排 Cloud Data Fusion 流水线 Cloud Composer,请按以下流程操作:

  1. 设置 Cloud Composer 环境。

    • 创建 Cloud Composer 环境。如果您还没有环境,请在 Google Cloud 项目中预配环境。此环境就是您的编排工作区。
    • 授予权限。确保 Cloud Composer 拥有访问 Cloud Data Fusion(例如启动、停止和列出权限) 流水线)。
  2. 定义用于编排的有向无环图 (DAG)。

    • 创建 DAG:在 Cloud Composer 中,创建一个 DAG 定义了 Cloud Data Fusion 的编排工作流 流水线。
    • Cloud Data Fusion 运维人员:使用 Cloud Composer 的 DAG 中的 Cloud Data Fusion Operator。这些运算符 可让您以程序化方式与 Cloud Data Fusion 交互。

Cloud Data Fusion 运算符

Cloud Data Fusion 流水线编排具有以下运算符:

CloudDataFusionStartPipelineOperator

按 ID 触发 Cloud Data Fusion 流水线的执行。它 具有以下参数:

  • 流水线 ID
  • 位置(Google Cloud 区域)
  • 流水线命名空间
  • 运行时参数(可选)
  • 等待完成(可选)
  • 超时(可选)
CloudDataFusionStopPipelineOperator

允许您停止正在运行的 Cloud Data Fusion 流水线。

CloudDataFusionDeletePipelineOperator

删除 Cloud Data Fusion 流水线。

构建 DAG 工作流

构建 DAG 工作流时,请考虑以下事项:

  • 定义依赖项:使用 DAG 结构定义依赖项 任务之间。例如,您可能有一个 流水线在一个命名空间中成功完成,然后才能触发 另一个命名空间中的另一个流水线。
  • 时间安排:安排 DAG 以特定时间间隔运行,例如 每天或每小时一次,也可将其设为手动触发。

有关详情,请参阅 Cloud Composer 概览

触发器

使用触发器编排流水线

借助 Cloud Data Fusion 触发器,您可以自动执行下行 在完成(成功、失败或任何指定条件)时触发的流水线 一个或多个上游流水线

触发器对于以下任务非常有用:

  • 清理数据一次,然后提供给 下游流水线以供使用。
  • 共享信息,例如运行时参数和插件 和流水线之间的各种配置此任务称为载荷 配置
  • 拥有一组使用小时、天、周或月的数据运行的动态流水线,而不是每次运行时都必须更新的静态流水线。

例如,假设您有一个数据集,其中包含与您的产品 送货。根据这些数据,您想要回答多个企业的问题。 问题。为此,您需要创建一个流水线,用于清理原始数据 装运数据清理。然后创建第二个 流水线,Delayed Shipments USA,用于读取经过清理的数据并查找 美国境内延迟超过指定值的订单 阈值。延迟发货(美国)渠道可在 上游 Shipments Data Cleaning 流水线成功完成。

此外,由于下游流水线会使用 您必须指定在下游流水线运行时 使用此触发器,它还会接收要读取的输入目录(即 是上游流水线生成其输出的目录)。这个 过程称为传递载荷配置,您可以使用 运行时参数。它支持你有一组动态流水线, 使用小时、天、周或月(而非静态流水线、 每次运行时必须更新该值)。

如需使用触发器编排流水线,请按以下流程操作:

  1. 创建上游和下游流水线。

    • 在 Cloud Data Fusion Studio 中,设计并部署 构成了编排链的各种流水线
    • 考虑哪个流水线的完成将激活下一个 流水线(下游)。
  2. 可选:为上游流水线传递运行时参数。

  3. 在下游流水线上创建入站触发器。

    • 在 Cloud Data Fusion Studio 中,转到列表页面。在 在已部署标签页中,点击下游流水线的名称。通过 此时将显示该流水线的部署视图。
    • 在页面左侧中间位置,点击入站触发器。 系统会显示可用流水线列表。
    • 点击上游流水线。选择一种或多种上游流水线完成状态(成功失败停止)作为下游流水线应运行的时间条件。
    • 如果您希望上游流水线共享信息(称为 载荷配置)和下游流水线,请点击 触发器配置,然后按照相关步骤 将载荷配置作为运行时参数传递。 否则,请点击启用触发器
  4. 测试触发器。

    • 启动上游流水线运行。
    • 如果触发器配置正确,下游流水线 在上游流水线完成后自动执行, 确定合适的出价。

将载荷配置作为运行时参数传递

载荷配置允许从上游共享信息 流水线。例如,这些信息可能是 输出目录、数据格式或流水线运行日期。这个 信息随后会被下游管道用于决策,例如 确定要从哪个数据集读取数据。

要将信息从上游流水线传递到下游流水线, 您可以使用 任何插件的运行时参数或配置 上游流水线。

每当下游流水线触发并运行时,其载荷配置将使用触发下游流水线的特定上游流水线运行的运行时参数进行设置。

如需将载荷配置作为运行时参数传递,请按以下步骤操作:

  1. 从您在创建入站触发器中上次停下的地方继续 点击触发器配置后,您之前记录的任何运行时参数 之前设置的对话框将会显示。选择 从上游流水线传递到 下游流水线。
  2. 点击插件配置标签页,查看将会传递的内容列表 从上游流水线发送到下游流水线 触发。
  3. 点击配置并启用触发器