Dataflow 组件

借助 DataflowPythonJobOp,您可以创建 Vertex AI Pipelines 组件,以通过将用 Python 编写的 Apache Beam 作业提交到 Dataflow 来执行数据准备。

Python Beam 代码使用 Dataflow Runner 运行。使用 Dataflow 服务运行流水线时,运行程序会将可执行代码(由 python_module_path 指定)和依赖项上传到 Cloud Storage 存储分区(由 temp_location 指定)创建 Dataflow 作业,该作业对 Google Cloud 中的托管资源执行 Apache Beam 流水线。

如需详细了解 Dataflow Runner,请参阅使用 Dataflow Runner

Dataflow Python 组件接受通过 Beam 运行程序传递到 Apache Beam 代码的参数列表。这些参数由 args 指定。例如,您可以使用这些参数来设置 apache_beam.options.pipeline_options 以指定网络、子网、客户管理的加密密钥 (CMEK)。和其他选项。

WaitGcpResourcesOp

Dataflow 作业通常需要很长时间才能完成。busy-wait 容器(启动 Dataflow 作业并等待结果的容器)的费用可能会昂贵。

相反,通过 Beam 运行程序提交 Dataflow 作业后,DataflowPythonJobOp 组件将立即终止。Ajob_id将由组件以序列化gcp_resourcesproto。此输出可传递给 WaitGcpResourcesOp 组件以等待 Dataflow 作业完成。

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines 优化了 WaitGcpResourcesOp 以无服务器方式执行,并且费用为零。

如果 DataflowPythonJobOp 不符合您的要求(例如,如果您需要调用 Beam 运行程序之前进行预处理,或使用 Beam 等其他语言),则可以创建自己的输出组件: gcp_resources 参数以利用 WaitGcpResourcesOp 组件。如需详细了解如何创建 gcp_resources 输出参数,请参阅 Kubeflow 流水线代码库的 GCP 资源原型部分。

API 参考文档

如需了解组件参考,请参阅 google_cloud_pipeline_components SDK 参考

教程

版本历史记录和更改日志

日期 版本 备注
11/2021 GCPC v0.2 组件的实验版本。

技术支持联系人

如果您有任何疑问,请联系 kubeflow-pipelines-components@google.com