コンテンツに移動
デベロッパー

CI / CD システムとしてのモデル トレーニング: パート II

2021年11月8日
Google Cloud Japan Team

※この投稿は米国時間 2021 年 10 月 26 日に、Google Cloud blog に投稿されたものの抄訳です。

ブログ記事のパート I では、コードの変更をモニタリングし、TensorFlow Extended(TFX)のパイプラインを Vertex AI に送信して実行する方法について説明しました。パート I は、以下の質問で締めくくりました。

  • スケジュール(たとえば 1 時間ごと。通常はユースケースに依存)を維持して Vertex AI でパイプラインの実行をトリガーする場合はどうなるでしょうか。

  • テスト段階で、新しいアーキテクチャが Pub/Sub トピックとして公開されるたびに、同じパイプラインを実行させる(ただし異なるハイパーパラメータで)システムが必要な場合はどうなるでしょうか。

このパート II では、これらの状況の詳細を見てから、可能な対応策について考察します。

アプローチ

図 1 と図 2 に、実現するワークフローの概要を示します。まず、Cloud Build を使用して、以下を行います。

  • 実行可能な TFX パイプラインのビルドとコンパイルに必要なすべてのコードを含む GitHub のリポジトリのクローンを作成します。

  • パイプラインの実行に使用するカスタム Docker イメージをビルドして push します。

  • コンパイルしたパイプラインを Google Cloud Storage(GCS)のバケットにアップロードします。

これを絵で表現すると、図 1 のとおりとなります。

https://storage.googleapis.com/gweb-cloudblog-publish/images/image2_zJt72L0.max-900x900.max-900x900.png

図 1: コンパイルされた TFX パイプラインを生成するためのワークフロー

前述のパイプラインは、ランタイム パラメータを入力として受け取ることができます。これが特に役立つのは、たとえば、パイプラインのコンポーネントは同じまま、異なるハイパーパラメータ群を使用してさまざまなテストを行いたい場合で、パイプラインを再利用し、ハイパーパラメータを変えてテストを作成するだけで済ませることができます。この同じパイプラインは、新しいデータを利用したモデルの再トレーニングにも利用できます。この記事では、わかりやすいよう、モデルのハイパーパラメータ(エポック数とオプティマイザーの学習率)をパイプラインのランタイム パラメータとして渡すことにします。

図 2 は、コンパイルされた TFX パイプラインを取得し、Vertex AI に送信して実行させるワークフローの残り半分を示しています。

https://storage.googleapis.com/gweb-cloudblog-publish/original_images/image6_79NQRAT.max-1700x1700.png

図 2: コンパイルされた TFX のパイプラインを実行するためのワークフロー

コンパイルされたパイプラインの仕様を Vertex AI に送信して実行させることも、トリガー メカニズムを使用してパイプラインの実行を開始することもできます。後者のケースは、イベントとパイプライン実行の間の橋渡しを行いたい場合に、特に有効です。このイベントの例としては、新しいデータ、新しいモデル アーキテクチャ、新しいハイパーパラメータ群、新しい前処理ロジックなどの到着が挙げられます。このようなイベントを契機としてパイプラインの実行を自動的にトリガーしたり、実行をスケジュールしたりするメカニズムが必要になります。

ここでは、以下の 2 つの対応策を紹介します。

  • 一つ目は、Pub/Sub のトピックにメッセージを公開する方法で、Pub/Sub に登録される Cloud Functions の関数が、パイプラインの実行を開始する役割を果たします。なお、トピック メッセージには、モデルのハイパーパラメータとその値が含まれています。

  • もう一つの対応策は、Cloud Scheduler を使用してジョブのスケジュールを設定し、それによってパイプライン実行のための Cloud Functions の関数をトリガーする方法です。

BigQuery データベースに新しいトレーニング データが到着したときにモデル トレーニング パイプラインをトリガーしたい場合は、こちらのブログ記事をご覧ください。

実装の詳細

このセクションでは、先に紹介したアプローチの技術的な詳細について説明します。TFX 関連のコンポーネントにはあまり深く立ち入らず、ここでは、主に使用される具体例に焦点を当てます。また、ここで紹介していない部分の詳細を知りたい方のために、関連の参考資料を紹介します。

このセクションでお見せするコードは、こちらのリポジトリで入手できます。今回は、こちらの Google Cloud リポジトリを主な参考資料としました。

TFX パイプラインとコンパイル

この記事では、こちらの TFX チュートリアルで紹介されている TFX パイプラインを使用します。このパイプラインは Palmer Penguins データセットを使用し、ペンギンの種を予測できるシンプルなニューラルネットワークを TensorFlow でトレーニングします。パイプラインには、CsvExampleGenCloud AI TrainerPusher という TFX コンポーネントがあります。

この記事ではパイプラインを詳しく説明しないため、上記のリンク先にある元のチュートリアルをご参照ください。

パイプライン コードは最初、GitHub レポジトリにホストされています。コードは BitBucket や GitLab のほか、Cloud Repository にもホストできます。図 1 を再度ご覧ください。このパイプラインをコンパイルして、GCS Bucket にアップロードします。

このパイプラインは、実行時にパラメータを受け取れるようにしたいので、TFX が提供する RuntimeParameter を使用します。ここでは、エポック数とオプティマイザーの学習率を使用します。これは以下のように行います。

読み込んでいます...

パイプラインの作成とコンパイルの全コードはここから参照できます。

ただし、話はここで終わりません。すべてのユーティリティ スクリプトやその他の Python パッケージを含むカスタム Docker イメージをビルドして push する必要もあります。この Docker イメージは、送信されたパイプラインを実行するために、最終的に Vertex AI によって使用されます。これに加えて、これまで説明したすべてのステップを一種のビルドプロセスとして自動化する必要があり、そのために Cloud Build を使用します。

Cloud Build は YAML 仕様で動作し、私たちの仕様は以下のようになっています。
読み込んでいます...

この仕様ファイルは、上記のリンク先にある YAML 仕様書を参照するとさらに読みやすくなります。先頭に「$」が付いている変数は、この YAML ファイルを呼び出して Cloud Build でビルドプロセスを開始するときに設定するものです。この仕様書ファイルの構成が終わったら、あとは Cloud Build で実行を開始するだけです。

読み込んでいます...

SUBSTITUTIONS は、パイプラインの仕様に関連するすべての変数を保持します。

読み込んでいます...

このノートブックでは、ビルドプロセス全体を実演しています。ビルドが Cloud Build に正常に送信されると、ダッシュボードにそのように表示されます。

https://storage.googleapis.com/gweb-cloudblog-publish/original_images/image3_j3FUueQ.max-1600x1600.png

図 3: Cloud Build でのデモビルド

ビルドの出力は、コンパイルされたパイプライン仕様ファイル(.json 形式)で、これを Vertex AI(または他のオーケストレーター)に送信して実行させることができます。

Pub/Sub と Cloud Functions の関数

次に、Pub/Sub トピックを作成し、この Pub/Sub トピックに登録される Cloud Functions の関数をデプロイします。

 $ gcloud pubsub topics create {PUBSUB_TOPIC}


このトピックにメッセージを公開し、これが行われると直ちに Cloud Functions の関数がトリガーされるようにします。もし、この部分で混乱しているとしても、心配はいりません。すぐに疑問は晴れます。

Cloud Functions の関数は、Pub/Sub トピックに公開されたメッセージを解析し、Vertex AI で実行されるパイプラインをトリガーする役割を担います。これは、以下のようになります。

読み込んでいます...

Python 関数(trigger_pipeline()) に注目してください。これは、Cloud Functions の関数をデプロイするときに重要な役割を果たします。Cloud Functions の関数のすべてのコンポーネントについては、こちらをご覧ください。

Cloud Functions の関数をデプロイするには、環境変数を指定してから関数のデプロイメントを実行します。

読み込んでいます...

trigger-top は Pub/Sub トピックの名前、source は Cloud Functions の関数に固有の関連ファイルがホストされているディレクトリ、entry-point は前述した Python 関数の名前です。さらに、source が指すディレクトリには、Cloud Functions の関数に必要な Python パッケージを指定した requirements.txt、trigger_pipeline() の定義を含む main.py といったファイルが格納されています。

Cloud Functions の関数がデプロイされたら、ダッシュボードで確認し、いくつかの重要な統計情報を得ることができます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/image4_vAKtWUp.max-1300x1300.max-1300x1300.png

図 4: Cloud Functions ダッシュボード

ここで、事前に作成しておいた Pub/Sub トピックにメッセージを公開します。その後すぐに、そのトピックに登録されている Cloud Functions の関数がトリガーされ、解析されたパラメータを含むパイプラインが Vertex AI に送信されます。

読み込んでいます...

パイプラインはこのように見えます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/image7_YQpBm5u.max-700x700.max-700x700.png

図 5: Vertex AI 上での TFX パイプラインのグラフィック表現

Cloud Functions の関数と Pub/Sub の統合の全体像は、このノートブックに記載されています。

Cloud Scheduler

パイプラインを定期的に実行したい状況はさまざまです。たとえば、十分なデータが得られるまで、一定期間待ちたい場合があります。これに基づいて、バッチ予測を実行してエンベディングを抽出したり、モデルのパフォーマンスをモニタリングしたりできます。

これは、Cloud Scheduler を既存システムにインテグレーションすることで実現できます。Cloud Scheduler は cron ジョブを扱うフルマネージド型エンタープライズ対応サービスで、Pub/Sub などの他の GCP サービスにも簡単に接続できます。

Cloud Scheduler のジョブを作成するには 2 つの方法があります。一つ目の方法は、gcloud CLI ツールを使用する方法です。サービス アカウントで Cloud Scheduler を使用するための資格情報を取得する必要があります。サービス アカウントの作成方法とサービス アカウント キーのダウンロード方法については、こちらの公式ドキュメントをご参照ください。サービス アカウント キーをダウンロードしたら、サービス アカウント キーの JSON ファイルを指す環境変数を以下のとおり設定する必要があります。

読み込んでいます...

gcloud コマンドは、この環境変数を自動的に認識します。gcloud scheduler jobs create pubsub は、任意のメッセージを含む Pub/Sub トピックを公開する定期ジョブを作成します。--schedule オプションの値は、標準的な cron ジョブの形式に合わせて設定します。たとえば、「*/3 * * *」は 3 分ごとにタスクを実行することを意味します。MLOps のパイプラインを 3 分ごとに実行するのは、Cloud Scheduler の動作を実証するためだけの設定で、実際の状況を反映するものではありません。

読み込んでいます...

--topic オプションの値は、以前に作成した Pub/Sub のトピック名と一致させる必要があります。--message-body オプションを使用すると、Pub/Sub に追加データを JSON 形式で配信できます。この例では、ハイパーパラメータを Cloud Functions の関数に push するために使用しています。Jupyter Notebook を使用するときの注意点として、JSON 形式の文字列は json.dumps メソッドでエンコードする必要があります。こうしておくと、CLI に渡された JSON 形式の文字列は壊れません。

読み込んでいます...

https://storage.googleapis.com/gweb-cloudblog-publish/images/image1_RgwtAvA.max-900x900.max-900x900.png

図 6: Vertex AI 上で定期的に起動される TFX パイプライン実行

2 つ目の方法は、Python API for Google Cloud Scheduler を使用する方法です。この API は言語に依存しない gRPC / プロトコル バッファ上に構築されているため、実際にはさまざまなプログラミング言語に対応した複数の API クライアント ライブラリが存在します。ここでは、Python での使用例のみ紹介します。

読み込んでいます...

gcloud コマンドとの違いは主に 3 点あります。まず、メッセージは UTF-8 でエンコードする必要があります。これにより、メッセージがバイト形式でエンコードされます。PubsubTarget の data パラメータには、メッセージをバイト形式で指定する必要があります。第二に、Pub/Sub トピックの名前は、「projects/<PROJECT-ID>/topics/<TOPIC-NAME>」の形式に従う必要があります。第三に、スケジューラのジョブ名は、「projects/<PROJECT-ID>/locations/<REGION-ID>/jobs/<JOB-NAME>」の形式に従う必要があります。これらの違いを理解すると、上記のコードを簡単に理解できるはずです。

Python API の詳細については、RPC 仕様や、Python API の公式ドキュメントをご覧ください。また、このノートブックで取り上げられている内容の完全なデモンストレーションもご覧いただけます。

費用

この記事では、Pub/Sub や Cloud Functions などの他のコンポーネントの使用量が非常に少ないため、費用算出は Vertex AI のみを対象としています。Vertex AI で実行されるパイプラインの費用は 1 回あたり $0.03 です。モデルのトレーニングには、1 時間あたり $0.19 の n1-standard-4 マシンタイプを選択し、GPU は使用しませんでした。そのため、Google の試算では、発生する費用の上限は $5 を超えないはずです。

いずれにしても、この GCP 料金計算ツールを使用して、GCP サービスを利用した場合の費用がいくらになるかをより正確に把握しておくことをおすすめします。

結論

この 2 回にわたるブログ記事では、モデル トレーニングを CI / CD システムとしてどのように扱うかを取り上げました。また、そのために必要なさまざまなツールを、特に GCP の観点から紹介しました。このアプローチがなぜ大規模運用の際に役立つのか、少しでもご理解いただけたなら幸いです。しかし、これはほんの氷山の一角です。Vertex AI のようなツールを使えば可能性は無限に広がりますので、ぜひ Vertex AI に独自のワークフローを実装してみてください。

謝辞

Google によるテストをサポートするために GCP クレジットを提供いただいた ML-GDE プログラムに感謝申し上げます。レビューに協力いただいた Google の Karl Weinmeister に感謝します。

- ML Google Developer Expert Chansung Park

- ML Google Developer Expert Sayak Paul

投稿先