编写组件以显示 Google Cloud 控制台链接
通常,在运行组件时,您不仅会看到指向正在启动的组件作业的链接,而且还会看到底层云资源(如 Vertex 批量预测作业或 Dataflow 作业)的链接。
gcp_resource
proto 是一个特殊参数,您可以在组件中使用该参数,以便 Google Cloud 控制台在 Vertex AI Pipelines 控制台中提供资源日志和状态的自定义视图。
输出 gcp_resource
参数
使用基于容器的组件
首先,您需要在组件中定义 gcp_resource
参数,如下面的 component.py
文件示例所示:
Python
如需了解如何安装或更新 Python 版 Vertex AI SDK,请参阅安装 Python 版 Vertex AI SDK。如需了解详情,请参阅 Python API 参考文档。
接下来,在容器中安装 Google Cloud 流水线组件软件包:
pip install --upgrade google-cloud-pipeline-components
接下来,在 Python 代码中,将资源定义为 gcp_resource
参数:
Python
如需了解如何安装或更新 Python 版 Vertex AI SDK,请参阅安装 Python 版 Vertex AI SDK。如需了解详情,请参阅 Python API 参考文档。
from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson
dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'
with open(gcp_resources, 'w') as f:
f.write(MessageToJson(dataflow_resources))
使用 Python 组件
或者,您可以返回 gcp_resources
输出参数,就像返回任何字符串输出参数一样:
@dsl.component(
base_image='python:3.9',
packages_to_install=['google-cloud-pipeline-components==2.14.1'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs", [("gcp_resources", str)]):
# Launch the dataflow job
dataflow_job_id = [dataflow-id]
dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
gcp_resources=MessageToJson(dataflow_resources)
return gcp_resources
支持的 resource_type
值
您可以将 resource_type
设置为任意字符串,但只有以下类型在 Google Cloud 控制台中具有链接:
- BatchPredictionJob
- BigQueryJob
- CustomJob
- DataflowJob
- HyperparameterTuningJob
编写组件以取消底层资源
取消流水线作业后,Google Cloud 底层资源默认会继续运行。它们不会自动取消。要更改此行为,您应该将 SIGTERM 处理程序附加到流水线作业。建议在长时间运行作业的轮询循环之前执行此操作。
多个 Google Cloud 流水线组件已实现取消操作,包括:
- 批量预测作业
- BigQuery ML 作业
- 自定义作业
- Dataproc Serverless 批量作业
- 超参数调节作业
如需了解详情(包括说明如何附加 SIGTERM 处理程序的示例代码),请参阅以下 GitHub 链接:
- https://github.com/kubeflow/pipelines/blob/google-cloud-pipeline-components-2.14.1/components/google-cloud/google_cloud_pipeline_components/container/utils/execution_context.py
- https://github.com/kubeflow/pipelines/blob/google-cloud-pipeline-components-2.14.1/components/google-cloud/google_cloud_pipeline_components/container/v1/gcp_launcher/job_remote_runner.py#L124
实现 SIGTERM 处理程序时,请考虑以下事项:
- 取消传播仅在组件运行几分钟后才生效。这通常是因为需要在调用 Python 信号处理程序之前处理后台启动任务。
- 某些 Google Cloud 资源可能未实现取消操作。例如,创建或删除 Vertex AI 端点或模型可能会创建一个长时间运行的操作,以通过其 REST API 接受取消请求,但不会实现取消操作本身。