Dataflow 组件

借助 DataflowPythonJobOp,您可以创建 Vertex AI Pipelines 组件,以通过将用 Python 编写的 Apache Beam 作业提交到 Dataflow 来执行数据准备。

Python Beam 代码使用 Dataflow Runner 运行。使用 Dataflow 服务运行流水线时,运行程序会将可执行代码(由 python_module_path 指定)和依赖项上传到 Cloud Storage 存储桶(由 temp_location 指定),并创建 Dataflow 作业以对 Google Cloud 中的代管式资源执行 Apache Beam 流水线。

如需详细了解 Dataflow Runner,请参阅使用 Dataflow Runner

Dataflow Python 组件接受通过 Beam 运行程序传递到 Apache Beam 代码的参数列表。这些参数由 args 指定。例如,您可以使用这些参数来设置 apache_beam.options.pipeline_options 以指定网络、子网、客户管理的加密密钥 (CMEK)。和其他选项。

WaitGcpResourcesOp

Dataflow 作业通常需要很长时间才能完成。busy-wait 容器(启动 Dataflow 作业并等待结果的容器)的费用可能会昂贵。

相反,通过 Beam 运行程序提交 Dataflow 作业后,DataflowPythonJobOp 组件将立即终止。Ajob_id将由组件以序列化gcp_resourcesproto。此输出可传递给 WaitGcpResourcesOp 组件以等待 Dataflow 作业完成。

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines 优化了 WaitGcpResourcesOp 以无服务器方式执行,并且费用为零。

如果 DataflowPythonJobOp 不符合您的要求(例如,如果您需要调用 Beam Runner 之前进行预处理,或使用 Beam Java 等其他语言),则还可以创建自己的组件来输出 gcp_resources 参数以利用 WaitGcpResourcesOp 组件。如需详细了解如何创建 gcp_resources 输出参数,请参阅编写组件以显示 Google Cloud 控制台链接

API 参考文档

如需了解组件参考,请参阅 google_cloud_pipeline_components SDK 参考

教程

版本历史记录和更改日志

日期 版本 备注
03/2022 GCPC v1.0 组件 1.0 版。
02/2022 GCPC v0.3 组件的新实验性版本。
11/2021 GCPC v0.2 组件的实验版本。

技术支持联系人

如果您有任何疑问,请联系 kubeflow-pipelines-components@google.com