Vertex AI Pipelines で Vertex AI TensorBoard を使用する

トレーニング コードは、カスタム トレーニング コンポーネントにパッケージ化して、パイプライン ジョブで実行できます。TensorBoard のログは、Vertex AI TensorBoard テストに自動的にストリーミングされます。Vertex AI TensorBoard では、Cloud Storage に Vertex AI TensorBoard ログが書き込まれる際にそれをストリーミングするため、この統合を使用することで、ほぼリアルタイムにトレーニングをモニタリングできます。

初期設定については、Vertex AI TensorBoard を設定するをご覧ください。

トレーニング スクリプトの変更点

TensorBoard ログを Cloud Storage バケット(Vertex AI Training サービスで事前定義の環境変数 AIP_TENSORBOARD_LOG_DIR によって自動的に利用可能になる場所)に書き込むように、トレーニング スクリプトを構成する必要があります。

これを行うには、通常はオープンソースの TensorBoard ログ書き込み API にログ ディレクトリとして os.environ['AIP_TENSORBOARD_LOG_DIR'] を指定します。 AIP_TENSORBOARD_LOG_DIR の場所は、staging_bucket 変数で設定します。

TensorFlow 2.x でトレーニング スクリプトを構成するには、TensorBoard コールバックを作成して、log_dir 変数を os.environ['AIP_TENSORBOARD_LOG_DIR'] に設定します。TensorBoard コールバックは TensorFlow model.fit コールバックのリストに組み込まれます。

  tensorboard_callback = tf.keras.callbacks.TensorBoard(
       log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'],
       histogram_freq=1
  )
  
  model.fit(
       x=x_train,
       y=y_train,
       epochs=epochs,
       validation_data=(x_test, y_test),
       callbacks=[tensorboard_callback],
  )
  

詳しくは、Vertex AI がカスタム トレーニング環境で環境変数を設定する方法をご覧ください。

パイプラインを構築して実行する

次の例は、Kubeflow Pipelines DSL パッケージを使用してパイプラインを構築して実行する方法を示しています。その他の例と詳細については、Vertex AI Pipelines のドキュメントをご覧ください。

トレーニング コンポーネントを作成する

トレーニング コードをカスタム コンポーネントにパッケージ化します。そのコードでは、TensorBoard ログを Cloud Storage バケットに書き込むように構成してください。他の例については、独自のパイプライン コンポーネントの構築をご覧ください。

from kfp.v2.dsl import component

@component(
    base_image="tensorflow/tensorflow:latest",
    packages_to_install=["tensorflow_datasets"],
)
def train_tensorflow_model_with_tensorboard():
    import datetime, os
    import tensorflow as tf

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    def create_model():
        return tf.keras.models.Sequential(
            [
                tf.keras.layers.Flatten(input_shape=(28, 28)),
                tf.keras.layers.Dense(512, activation="relu"),
            ]
        )

    model = create_model()
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'],
        histogram_freq=1
    )

    model.fit(
        x=x_train,
        y=y_train,
        epochs=5,
        validation_data=(x_test, y_test),
        callbacks=[tensorboard_callback],
    )

パイプラインを構築してコンパイルする

create_custom_training_job_op_from_component でコンポーネント仕様を指定し、作成したコンポーネントからカスタム トレーニング ジョブを作成します。tensorboard_resource_name を TensorBoard インスタンスに設定し、API 呼び出し中にアーティファクト(TensorBoard ログを含む)をステージングする場所に staging_bucket を設定します。

次に、このジョブを含めるパイプラインを構築し、そのパイプラインを JSON ファイルにコンパイルします。

その他の例と情報については、カスタムジョブ コンポーネントパイプラインを構築するをご覧ください。

from kfp.v2 import compiler
from google_cloud_pipeline_components.v1.custom_job.utils import \
    create_custom_training_job_op_from_component
from kfp.v2 import dsl

def create_tensorboard_pipeline_sample(
    project, location, staging_bucket, display_name, service_account, experiment, tensorboard_resource_name
):

    @dsl.pipeline(
        pipeline_root=f"{staging_bucket}/pipeline_root",
        name=display_name,
    )
    def pipeline():
        custom_job_op = create_custom_training_job_op_from_component(
            component_spec=train_tensorflow_model_with_tensorboard,
            tensorboard=tensorboard_resource_name,
            base_output_directory=staging_bucket,
            service_account=service_account,
        )
        custom_job_op(project=project, location=location)

    compiler.Compiler().compile(
        pipeline_func=pipeline, package_path=f"{display_name}.json"
    )

Vertex AI Pipelines を送信する

パイプラインは、Vertex AI SDK for Python を使用して送信します。詳細については、パイプラインを実行するをご覧ください。

Python

from typing import Any, Dict, Optional

from google.cloud import aiplatform


def log_pipeline_job_to_experiment_sample(
    experiment_name: str,
    pipeline_job_display_name: str,
    template_path: str,
    pipeline_root: str,
    project: str,
    location: str,
    parameter_values: Optional[Dict[str, Any]] = None,
):
    aiplatform.init(project=project, location=location)

    pipeline_job = aiplatform.PipelineJob(
        display_name=pipeline_job_display_name,
        template_path=template_path,
        pipeline_root=pipeline_root,
        parameter_values=parameter_values,
    )

    pipeline_job.submit(experiment=experiment_name)

  • experiment_name: テストの名前を入力します。
  • pipeline_job_display_name: パイプライン ジョブの表示名。
  • template_path: コンパイルされたパイプライン テンプレートへのパス。
  • pipeline_root: パイプライン サービス アカウントがアクセスできる Cloud Storage URI を指定します。パイプライン実行のアーティファクトはパイプライン ルート内に保存されます。
  • parameter_values: この実行に渡すパイプライン パラメータ。たとえば、パラメータ名にディクショナリ キーを使用し、パラメータ値にディクショナリ値を使用して、dict() を作成します。
  • project: 実際のプロジェクト ID。パイプラインを実行する Google Cloud プロジェクト。ID は、Google Cloud コンソールの [ようこそ] ページで確認できます。
  • location: パイプラインを実行するロケーション。これは、使用している TensorBoard インスタンスと同じロケーションにする必要があります。

次のステップ