Dataflow 组件

借助 Dataflow 组件,您可以将 Apache Beam 作业提交到 Dataflow 来执行。 在 Dataflow 中,Job 资源表示 Dataflow 作业。

Google Cloud 流水线组件 SDK 包含以下运算符,用于创建 Job 资源并监控其执行:

此外,Google Cloud 流水线组件 SDK 包含 WaitGcpResourcesOp 组件,可用于在运行 Dataflow 作业时降低费用。

DataflowFlexTemplateJobOp

借助 DataflowFlexTemplateJobOp 运算符,您可以创建 Vertex AI Pipelines 组件以启动 Dataflow Flex 模板

在 Dataflow 中,LaunchFlexTemplateParameter 资源表示要启动的 Flex 模板。此组件将创建一个 LaunchFlexTemplateParameter 资源,然后通过启动模板来请求 Dataflow 创建作业。如果模板成功启动,Dataflow 将返回 Job 资源。

Dataflow Flex 模板组件在收到来自 Dataflow 的 Job 资源时终止。该组件将 job_id 输出为序列化 gcp_resources proto。您可以将此参数传递给 WaitGcpResourcesOp 组件,以等待 Dataflow 作业完成。

DataflowPythonJobOp

借助 DataflowPythonJobOp 运算符,您可以创建 Vertex AI Pipelines 组件,以通过向 Dataflow 提交基于 Python 的 Apache Beam 作业来执行数据准备。

Apache Beam 作业的 Python 代码与 Dataflow Runner 一起运行。使用 Dataflow 服务运行流水线时,运行程序会将可执行代码上传到由 python_module_path 参数指定的位置和依赖项上传到 Cloud Storage 存储桶(由 temp_location 指定),然后创建一个 Dataflow 作业,以便对 Google Cloud 中的代管式资源执行 Apache Beam 流水线。

如需详细了解 Dataflow Runner,请参阅使用 Dataflow Runner

Dataflow Python 组件接受通过 Beam 运行程序传递到 Apache Beam 代码的参数列表。这些参数由 args 指定。例如,您可以使用这些参数设置 apache_beam.options.pipeline_options 以在运行数据流作业时指定网络、子网、客户管理的加密密钥 (CMEK) 和其他选项。

WaitGcpResourcesOp

Dataflow 作业通常需要很长时间才能完成。busy-wait 容器(启动 Dataflow 作业并等待结果的容器)的费用可能会昂贵。

通过 Beam 运行程序提交 Dataflow 作业后,DataflowPythonJobOp 组件会立即终止并返回 job_id 输出参数作为序列化 gcp_resources proto。您可以将此参数传递给 WaitGcpResourcesOp 组件,以等待 Dataflow 作业完成。

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines 优化了 WaitGcpResourcesOp 以无服务器方式执行,并且费用为零。

如果 DataflowPythonJobOpDataflowFlexTemplateJobOp 不符合您的要求,您还可以创建自己的组件,该组件会输出 gcp_resources 参数并将其传递给 WaitGcpResourcesOp 组件。

如需详细了解如何创建 gcp_resources 输出参数,请参阅编写组件以显示 Google Cloud 控制台链接

API 参考文档

教程

版本历史记录和版本说明

如需详细了解 Google Cloud 流水线组件 SDK 的版本历史记录和更改,请参阅 Google Cloud 流水线组件 SDK 版本说明

技术支持联系人

如果您有任何疑问,请联系 kubeflow-pipelines-components@google.com