コンテンツに移動
デベロッパー

ML ワークフローにおけるデータドリフトのイベント トリガー検出

2021年3月29日
https://storage.googleapis.com/gweb-cloudblog-publish/images/055-GC-AI-ML-Blog-Header-3.max-2800x2800.max-2600x2600.jpg
Google Cloud Japan Team

※この投稿は米国時間 2021 年 3 月 16 日に、Google Cloud blog に投稿されたものの抄訳です。

ML ワークフローでは、特定のモデルを 1 回トレーニングしデプロイするだけでは不十分なことがよくあります。最初はモデルの精度が望ましいレベルであっても、予測リクエストに使用されるデータが(おそらく、時間の経過に応じて)最初にモデルのトレーニングに使用したデータとまったく違うものになっている場合、精度が変わってくることがあります。

モデルの再トレーニングに使用できる新しいデータが登場した場合、データ「ドリフト」を分析する手法により、そのドリフトが再トレーニングを必要とする程度に異常であるかどうかを判断するとよいでしょう。

さらに、新しいデータが到着したときに、そのような分析(そして、必要に応じたトレーニング パイプラインの再実行)を自動的にトリガーできると便利です。

今回のブログ投稿では、Kubeflow Pipelines(KFP)を使用してこうしたシナリオに対応する方法を解説するサンプル ノートブックを取り上げます。

このサンプル ノートブックでは、データセットのバージョンが新しくなるたびにドリフトの統計情報をチェックし、その情報を使用してモデルの(再)トレーニングを行うかどうかを判断するパイプラインを構築する方法と、新しいデータが到着したときにパイプライン ジョブのイベント ドリブン デプロイを構成する方法を説明しています(今回のサンプルでは、新しいデータセットでのモデルの完全な再トレーニングをご紹介します。ここでは説明していませんが、別のシナリオとして、新しいデータで既存のモデルを調整するというものもあります)。

このノートブックの基盤となっているのは、KFP のトレーニングとサービング パイプラインについて説明した以前のブログ投稿で取り上げたサンプルです。これに加えて、主要なコンセプトが新たに 2 つ追加されています。

  • このサンプルでは、データセットの統計情報を取得し、新旧のバージョンのデータセット間のドリフトを検出するパイプライン コンポーネントを構築するための TensorFlow Data Validation(TFDV)ライブラリの使い方を説明しています。また、ドリフトに関する情報を使用して新しいデータでモデルを再トレーニングするかどうかを判断する方法も示しています。

  • このサンプルでは、Cloud Functions(GCF)の関数から Kubeflow Pipelines 実行の開始をイベントによってトリガーする方法を示しています。この関数の実行は、ファイルを特定の Cloud Storage(GCS)バケットに追加することでトリガーされます。

この機械学習タスクでは、表形式のデータセットを使用してロンドンの自転車レンタルに関する情報を気象データと結び付け、レンタル時間を予測するように Keras モデルをトレーニングします。データセットとモデルのアーキテクチャに関する背景情報については、こちらこちらのブログ投稿、ならびに関連する README を参照してください。

https://storage.googleapis.com/gweb-cloudblog-publish/images/CleanShot_2021-02-26_at_11.30.132x.max-210.max-2100x2100.png

TFDV ベースのコンポーネントを使用して「データドリフト」を検出するパイプライン実行。

サンプル ノートブックの実行

このサンプル ノートブックには、Google Cloud Platform(GCP)のアカウントとプロジェクト、そして理想的には GPU を使用するための割り当てが必要です。また、ノートブックで詳しく説明されているように、AI Platform Pipelines(ホストされている Kubeflow Pipelines)のインストールも必要です(つまり、KFP を Google Kubernetes Engine(GKE)にインストールします)。インストールの完了後に、いくつかの構成を追加で行います。

このノートブックの実行には、Colab直接開きます)または AI Platform Notebooks直接開きます)を使用します。

TFDV ベースの KFP コンポーネントの作成

最初のステップは、パイプラインで使用する TFDV コンポーネントの構築です。

注: このサンプルでは、トレーニング データは CSV 形式のファイルで GCS にあります。このため、CSV ファイルを処理する TFDV の機能を利用できます。TFDV ライブラリは TFRecords 形式のファイルも処理できます。

両方の TFDV KFP パイプライン コンポーネントを「軽量」な Python 関数ベースのコンポーネントとして定義します。各コンポーネントについて関数を定義してから、その関数で kfp.components.func_to_container_op() を呼び出し、.yaml 形式で再利用可能なコンポーネントを構築します。

この仕組みについて、もう少し見てみましょう(詳細については、ノートブックを参照)。

以下に示しているのは、一連の csv ファイルから TFDV 統計情報を生成するために使用する Python 関数です。この関数(そして、それから作成するコンポーネント)は、生成された統計情報ファイルへのパスを出力します。このコンポーネントを使用するパイプラインを定義するとき、このステップの出力を別のパイプライン ステップへの入力として使用します。

TFDV は Beam パイプライン(KFP パイプラインとは別のもの)を使用して、統計情報の生成を実装します。構成に応じて、コンポーネントは Direct(ローカル)ランナーまたは Dataflow ランナーを使用できます。大きなデータセットの場合、ローカルではなく Dataflow で Beam パイプラインを実行するとよいでしょう。

読み込んでいます...

この関数を KFP コンポーネントに変換するには、kfp.components.func_to_container_op() を呼び出します。そして、gcr.io/google-samples/tfdv-tests:v1 のように、使用するベースコンテナ イメージを渡します。このベースイメージにはすでに TFDV ライブラリがインストールされているため、このコンポーネントに基づいてパイプライン ステップを実行する際に、それらを「インライン」でインストールする必要はありません。

読み込んでいます...

2 つ目の TFDV ベースのコンポーネントを構築する際も同じアプローチをとります。このコンポーネントは、統計情報を比較してデータセット間のドリフトを検出するものです。この場合、TFDV ライブラリを使用すると簡単です。回帰モデルに適したドリフト コンパレータを(サンプルのパイプラインで使用されているように)使用し、特定のフィールド セットでドリフトを探します(この場合は、サンプル用途のため 1 つだけです)。

その後、tensorflow_data_validation.validate_statistics() 呼び出しにより、そのフィールドのドリフトの異常が指定されたしきい値を超えているかどうかを判断します。詳細については、TFDV のドキュメントを参照してください。

読み込んでいます...

(この 2 つ目のコンポーネント定義の詳細は、サンプル ノートブックに記載されています)。

TFDV コンポーネントを使用するパイプラインの定義

両方の TFDV コンポーネント(データセットの統計情報を生成するものと、データセット間のドリフトを検出するもの)を定義したら、これらのコンポーネントを使用する Kubeflow Pipeline を、トレーニングとサービングのワークフロー向けに構築済みのコンポーネントと関連付けて構築できます。

コンポーネントからパイプライン オペレーションをインスタンス化する

yaml 形式の KFP コンポーネントは共有可能かつ再利用可能です。パイプライン構築のベースとして、いくつかの構築済みのコンポーネント(詳細についてはこちらを参照)を使用します。これらは、基本的な「トレーニング / 評価 / デプロイ」ワークフローをサポートするものです。

既存のコンポーネントを URL 経由で読み込むことで、これらのコンポーネントから以下のようにパイプライン オペレーションをインスタンス化します。

読み込んでいます...

次に、定義されたオペレーションから KFP パイプラインを定義します。ここではパイプライン全体については示しません。詳細については、ノートブックを参照してください。

2 つのパイプライン ステップは、統計情報を生成する tfdv_op に基づいています。tfdv1 はテストデータの統計情報、tfdv2 はトレーニング データの統計情報を生成します。

以下では、tfdv_drift ステップ(tfdv_drift_op に基づくもの)が、tfdv2(トレーニング データの統計情報)ステップからの出力を入力として受け入れています。

読み込んでいます...

パイプラインの詳細すべてが示されているわけではありませんが、このパイプライン定義にはいくつかの条件式が含まれていることがわかるでしょう。パイプラインの一部は、「上流」ステップの出力が特定の条件を満たした場合にのみ実行されます。ドリフトの異常が検出された場合、モデルのトレーニング ステップを開始します(また、トレーニングが完了したら、その評価指標が特定のしきい値を満たす場合にのみサービング用のモデルをデプロイします)。

以下は、このパイプラインの DAG です。条件式が反映されています。また、テスト データセットの統計情報を生成するためのステップが下流の依存関係を一切提供していない一方で、トレーニング セットの統計情報がドリフト検出ステップ向けの入力として使用されていることがわかります。

https://storage.googleapis.com/gweb-cloudblog-publish/images/bw_tfdv_pipeline.max-1500x1500.max-1200x1200.png

パイプラインの DAG

以下に、進行中のパイプライン実行を示します。

https://storage.googleapis.com/gweb-cloudblog-publish/images/bw_tfdv_pipeline_run.max-2000x2000.max-2000x2000.png

進行中のパイプライン実行。

このパイプラインの実行方法の詳細については、サンプル ノートブックを参照してください。

イベントによってトリガーされるパイプライン実行

このパイプラインを定義したら、次に、データセットの更新があったときに自動的にパイプラインを実行するようにすると便利です。これによって、データセットの更新があるたびに、データドリフトと必要に応じたモデルの(再)トレーニングの分析がトリガーされます。

Cloud Functions(GCF)を使用してこれを行う方法をご紹介します。新しいデータが GCS バケットに追加されたときにトリガーされる関数を設定します。

データセットが更新されたときにパイプライン実行をトリガーする GCF 関数を設定する

新しいトレーニング データを利用できるようになった時点でこのパイプラインの実行を開始する Cloud Functions(GCF)の関数を定義し、デプロイします。GCS の「トリガー」バケットでファイルが作成または修正されるとトリガーされるようにします。

ほとんどの場合、新しいファイルがデータセットに追加されるたびに新しいパイプライン実行を開始する必要はありません。これは、一般に、データセットを構成するファイルのコレクションについては、複数のファイルを一括で追加または更新するためです。よって、「トリガー バケット」をデータセットのバケットにすることはおすすめしません(データが GCS にある場合)。これによって、不要なパイプライン実行がトリガーされてしまうためです。

代わりに、新しいデータのバッチのアップロードが完了した後にパイプライン実行をトリガーします。

このためには、「トリガー」バケットがデータセット ファイルの保存に使用されるバケットとは異なるものとするアプローチをとります。このバケットにアップロードされた「トリガー ファイル」には、更新されたデータセットのパスと、最後にトレーニングされたモデル向けに生成されたデータ統計情報ファイルへのパスが含まれているはずです。

トリガー ファイルは新しいデータのアップロードが完了した後でアップロードされ、このアップロードにより GCF 関数の実行がトリガーされます。その後、この関数がトリガー ファイルから新しいデータパスの情報を読み取り、パイプライン ジョブを開始します。

GCF 関数を定義する

このプロセスを設定するにはまず、main.py というファイルと、関数の実行前に読み込むライブラリを指定する、同じディレクトリ内の関連要件ファイルで GCF 関数を定義します。この要件ファイルは、KFP SDK をインストールするように求めます。

kfp==1.4

コードは以下のようになります(一部の詳細は省略)。トリガー ファイルのコンテンツを解析し、その情報を使用してパイプライン実行を開始します。このコードでは、GCF 関数のアップロード時に設定する複数の環境変数の値を使用します。

読み込んでいます...

続けて、以下のように GCF 関数をデプロイします。gcs_update 定義を(main.py から)使用し、トリガー バケットを指定している点に注意してください。また、デプロイの一部として環境変数を設定する方法にも注目してください。

読み込んでいます...

新しいデータを利用できるようになった時点でパイプライン実行をトリガーする

設定した GCF 関数は、ファイルがトリガー バケットに追加(またはトリガー バケットで修正)されたときに実行されます。シンプルな例を挙げると、GCF 関数は以下の形式のトリガー ファイルを予期します。1 行目は更新されたデータセットへのパスで、2 行目は以前にトレーニングされたモデルに使用されるデータセットの TFDV 統計情報へのパスです。一般的には、このようなトリガー ファイルには、パイプライン実行をパラメータ化する方法を決定するために必要なあらゆる情報を含めることができます。

読み込んでいます...

次のステップ

今回のブログ投稿では、TFDV ライブラリを使用して Kubeflow Pipeline コンポーネントを構築し、データセットの分析とデータドリフトの検出を行う方法をご紹介しました。また、Cloud Functions を介してイベントによってトリガーされるパイプライン実行をサポートする方法もご説明しました。

TFDV を使用して生成された統計情報を可視化し検討する方法については、この投稿では説明していませんが、こちらのサンプル ノートブックでその方法が説明されています。

また、Kubeflow Pipelines GitHub リポジトリでサンプルを参照することもできます。

-スタッフ デベロッパー アドボケイト Amy Unruh

投稿先