独自のパイプライン コンポーネントを構築する

コンポーネントを実行するときに、起動するコンポーネント ジョブへのリンクだけでなく、基盤となるクラウド リソース(Vertex バッチ予測ジョブや Dataflow ジョブなど)へのリンクも含めることが少なくありません。

gcp_resource proto は、コンポーネントで使用する特別なパラメータです。これにより、Google Cloud コンソールで Vertex AI Pipelines コンソールにリソースのログとステータスをカスタマイズして表示できます。

gcp_resource パラメータを出力する

コンテナベースのコンポーネントの使用

まず、コンポーネントで gcp_resource パラメータを定義する必要があります。次の component.py サンプル ファイルをご覧ください。

Python

Vertex AI SDK for Python のインストールまたは更新の方法については、Vertex AI SDK for Python をインストールするをご覧ください。 詳細については、Python API リファレンス ドキュメントをご覧ください。

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath


@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

次に、コンテナ内に Google Cloud Pipeline コンポーネント パッケージをインストールします。

pip install --upgrade google-cloud-pipeline-components

次に、Python コードで、リソースを gcp_resource パラメータとして定義します。

Python

Vertex AI SDK for Python のインストールまたは更新の方法については、Vertex AI SDK for Python をインストールするをご覧ください。 詳細については、Python API リファレンス ドキュメントをご覧ください。

from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

Python コンポーネントの使用

他の文字列出力パラメータと同様に、gcp_resources 出力パラメータを返すこともできます。

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.16.1'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

サポートされる resource_type

resource_type は任意の文字列に設定できますが、Google Cloud コンソール内にリンクがあるのは次の型のみです。

  • BatchPredictionJob
  • BigQueryJob
  • CustomJob
  • DataflowJob
  • HyperparameterTuningJob

基盤となるリソースをキャンセルするコンポーネントの作成

パイプライン ジョブがキャンセルされた場合、デフォルトの動作では、基盤となる Google Cloud リソースが実行を継続します。自動的にキャンセルされることはありません。この動作を変更するには、パイプライン ジョブに SIGTERM ハンドラを接続する必要があります。長時間実行可能なジョブのポーリング ループの直前に、このような処理を行うことをおすすめします。

キャンセルは、次のようないくつかの Google Cloud パイプライン コンポーネントに実装されています。

  • バッチ予測ジョブ
  • BigQuery ML ジョブ
  • カスタムジョブ
  • Dataproc Serverless バッチジョブ
  • ハイパーパラメータ チューニング ジョブ

SIGTERM ハンドラの接続方法を示すサンプルコードの詳細などについては、次の GitHub リンクをご覧ください。

SIGTERM ハンドラを実装する際は、以下の点を考慮してください。

  • キャンセルの伝播は、コンポーネントが数分間実行された後にのみ機能します。これは通常、Python シグナル ハンドラが呼び出される前にバックグラウンドの起動タスクが処理されることが原因です。
  • 一部の Google Cloud リソースでキャンセルが実装されていない場合があります。たとえば、Vertex AI エンドポイントやモデルの作成または削除を行うと、REST API を介してキャンセル リクエストを受け入れ、キャンセル オペレーション自体は実装しない長時間実行オペレーションを作成できます。