Google Cloud コンソールのリンクを表示するコンポーネントを作成する
コンポーネントを実行するときに、起動するコンポーネント ジョブへのリンクだけでなく、基盤となるクラウド リソース(Vertex バッチ予測ジョブや Dataflow ジョブなど)へのリンクも含めることが少なくありません。
gcp_resource
proto は、コンポーネントで使用する特別なパラメータです。これにより、Google Cloud コンソールで Vertex AI Pipelines コンソールにリソースのログとステータスをカスタマイズして表示できます。
gcp_resource
パラメータを出力する
コンテナベースのコンポーネントの使用
まず、コンポーネントで gcp_resource
パラメータを定義する必要があります。次の component.py
サンプル ファイルをご覧ください。
Vertex AI SDK for Python
Vertex AI SDK for Python のインストールまたは更新の方法については、Vertex AI SDK for Python をインストールするをご覧ください。詳細については、Vertex AI SDK for Python API のリファレンス ドキュメントをご覧ください。
次に、コンテナ内に Google Cloud パイプライン コンポーネント パッケージをインストールします。
pip install --upgrade google-cloud-pipeline-components
次に、Python コードで、リソースを gcp_resource
パラメータとして定義します。
Vertex AI SDK for Python
Vertex AI SDK for Python のインストールまたは更新の方法については、Vertex AI SDK for Python をインストールするをご覧ください。詳細については、Vertex AI SDK for Python API のリファレンス ドキュメントをご覧ください。
from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson
dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'
with open(gcp_resources, 'w') as f:
f.write(MessageToJson(dataflow_resources))
Python コンポーネントの使用
他の文字列出力パラメータと同様に、gcp_resources
出力パラメータを返すこともできます。
@dsl.component(
base_image='python:3.9',
packages_to_install=['google-cloud-pipeline-components==2.19.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs", [("gcp_resources", str)]):
# Launch the dataflow job
dataflow_job_id = [dataflow-id]
dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
gcp_resources=MessageToJson(dataflow_resources)
return gcp_resources
サポートされる resource_type
値
resource_type
は任意の文字列に設定できますが、Google Cloud コンソール内にリンクがあるのは次の型のみです。
- BatchPredictionJob
- BigQueryJob
- CustomJob
- DataflowJob
- HyperparameterTuningJob
基盤となるリソースをキャンセルするコンポーネントの作成
パイプライン ジョブがキャンセルされた場合、デフォルトの動作では、基盤となる Google Cloud リソースが実行を継続します。自動的にキャンセルされることはありません。この動作を変更するには、パイプライン ジョブに SIGTERM ハンドラを接続する必要があります。長時間実行可能なジョブのポーリング ループの直前に、このような処理を行うことをおすすめします。
キャンセルは、次のようないくつかの Google Cloud パイプライン コンポーネントに実装されています。
- バッチ予測ジョブ
- BigQuery ML ジョブ
- カスタムジョブ
- Dataproc Serverless バッチジョブ
- ハイパーパラメータ チューニング ジョブ
SIGTERM ハンドラの接続方法を示すサンプルコードの詳細については、次の GitHub リンクをご覧ください。
- https://github.com/kubeflow/pipelines/blob/google-cloud-pipeline-components-2.19.0/components/google-cloud/google_cloud_pipeline_components/container/utils/execution_context.py
- https://github.com/kubeflow/pipelines/blob/google-cloud-pipeline-components-2.19.0/components/google-cloud/google_cloud_pipeline_components/container/v1/gcp_launcher/job_remote_runner.py#L124
SIGTERM ハンドラを実装する際は、以下の点を考慮してください。
- キャンセルの伝播は、コンポーネントが数分間実行された後にのみ機能します。これは通常、Python シグナル ハンドラが呼び出される前にバックグラウンドの起動タスクが処理されることが原因です。
- 一部の Google Cloud リソースでキャンセルが実装されていない場合があります。たとえば、Vertex AI エンドポイントやモデルの作成または削除を行うと、REST API を介してキャンセル リクエストを受け入れ、キャンセル オペレーション自体は実装しない長時間実行オペレーションを作成できます。