特異点検知によるトレーニング / サービング スキューの特定

このドキュメントは、データ ドリフトを検知するために、AI Platform Prediction の ML モデルをモニタリングする方法について説明するシリーズの 5 番目のパートです。このガイドでは、特異点検知モデルを使用して、BigQuery 内のリクエスト レスポンス サービングのために、全体的なドリフト スコアを計算する方法について説明します。このガイドは、AI Platform Prediction を使用したサービング リクエストのロギングと、BigQuery での AI Platform Prediction ログの分析で説明されているコンセプトとタスクに基づいて作成されています。

このガイドは、サービング データが時間とともに変動しているかどうか確認し、変動があればその変動の程度の規模を計算して、本番環境における ML モデルのパフォーマンスを維持したいデータ サイエンティストと ML エンジニアを対象としています。

このシリーズは、次のドキュメントで構成されています。

このドキュメントで説明するプロセスのコードは、Jupyter ノートブックに組み込まれています。ノートブックは GitHub リポジトリにあります。

概要

本番環境での機械学習(ML)モデルの予測パフォーマンスの監視は、MLOps の重要な領域です。デプロイされたモデルの予測パフォーマンスは時間の経過とともに低下する可能性があります。この低下は、トレーニング データとサービング データの差異の増加、ビジネス コンテキストの進化、技術環境の変化など、いくつかの理由で発生します。

次の図は、このガイドで説明するプロセスを示したものです。

プロセスフロー

特徴量レベルのブレとデータセット レベルのブレ

このシリーズの以前のガイドでは、TensorFlow Data Validation(TFDV)を使用してデータセット内のスキーマと分布のスキューや異常を検出する方法を説明しました。TFDV は、トレーニング データから生成された参照スキーマとベースライン統計を使用して、新しいデータ内の異常を検出します。

TFDV を使用して、各特徴量に対して次のようなスキューや異常を特定できます。

  • 特徴量のデータ型が想定通りではない。
  • 数値特徴量の値が予想範囲外になっている。
  • 特定の特徴量の欠損値の割合が、定義済みのしきい値を下回っている。
  • 新しいカテゴリまたは欠損したカテゴリが、カテゴリ特徴量で検知される。
  • 特徴量の値の分布が想定通りではない。

実際には、トレーニング / サービング データのスキューと異常の多くは、個別の特徴量の問題が原因です。たとえば、データ取り込みと処理のパイプラインにおけるエラーや更新は、欠損値、新規の値、範囲外の値、誤ったデータ型を持つ値を生み出す可能性があります。また、次のような状況でも、データスキーマ型に不一致が生じることがあります。

  • データソースの変更。
  • 新しいデータソースの統合。
  • モデルを新しいドメイン(新しい地理的ロケーションなど)で運用する場合。

TFDV を使用すると、このような特徴量レベルのスキューと異常を検知し、それらに対処できるようになります。たとえば、前処理ロジックを更新して、カテゴリ特徴量に新しい語彙を追加し、誤ったデータを破棄して、必要に応じてモデルを再トレーニングできます。

ただし、データセット内の個別の特徴量には、リファレンス スキーマとベースライン統計に関連したかなりのスキューが生じる可能性があります。その場合も、特徴量の同時確率分布は変化を示します。この変化は共変量シフトのタイプを表します。このタイプのシフトは、通常、環境の動力の変化に応じて、新しいトレンドとパターンがデータに出現するにつれて発生します。たとえば、場所に比例した不動産価格の変化や、異なるユーザー層グループ間のファッション アイテムの人気の変化などです。入力の特徴量間の統計的関係は、異なる季節やイベントによって周期的に変化する可能性があるため、共変量シフトも季節的要因の影響を受けます。

したがって、共変量シフトは、すべての特徴量に関して、データセット レベルで全体的なドリフト スコアの計算に役立ちます。このドリフト スコアは、サービング データの歪みの程度を表します。全体的なドリフト スコアを計算すると、時間の経過に伴うデータセットのモニタリングのプロセスが簡略化され、特定のドリフト スコアのしきい値に基づいてアクションを定義して、そのアクションを実施できるようになります。

データセットのブレをスコア化する方法

特徴量レベルではなく、データセット レベルで変動の程度を計算するには、次の方法を使用できます。

  • 特異点と外れ値の検知
  • 2 標本統計テスト

特異点と外れ値の検知

特異点検知と外れ値検知のアルゴリズムでは、配信されていないサンプルを検出するタスクを模倣することを目的とします。参照(トレーニング)データを使用してモデルをビルドし、そのモデルを使用して、新しい(サービング)データポイントが外れ値であるかどうかを識別します。ドリフトスコアは、サービング データセット内の外れ値となることが予測されるデータポイントの割合として算出されます。この手法の利点は、0~100 の範囲でドリフトスコアが生成されるだけでなく、異常とみなされるデータポイントが識別されることです。

特異点検知と外れ値検知のアルゴリズムには次のようなものがあります。

時系列データでの外れ値の検知に使用されるアルゴリズムは、スペクトル残余シーケンス ツー シーケンス自動エンコーディングです。

2 標本統計テスト

2 標本統計テストでは、統計的仮説検定を実施して、トレーニングの分散と配信の分散の等価を検証します。これらのアルゴリズムの出力は、p(0~1)です。これは、2 つのデータセットが同じ配信に由来する確率を示しています。このデータセットにはいくつかの機能が含まれているため、単純な一変量の統計検定では不十分です。そのため、次のオプションが用意されています。

ドリフト検知の手法の詳細については、Fail Loudly: An Empirical Study of Methods for Detecting Dataset Shift をご覧ください。また、表形式のデータ、画像、時系列を対象とするドリフト検知のためのオープンソース Python ライブラリである、Alibi Detect をご覧ください。

目標

  • トレーニングとサービングのデータ分割をダウンロードする。
  • トレーニング データを使用して楕円エンベロープ モデルをトレーニングする。
  • 通常のデータセットと変更済みデータセットでモデルをテストする。
  • リクエスト / レスポンスの BigQuery データでドリフト スコアを計算する Apache Beam パイプラインを実装する。
  • パイプラインを実行し、ドリフト検知の出力を表示する。

料金

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

始める前に、このシリーズのパート 1パート 2 を完了する必要があります。

これらのパートを完了すると、以下の準備ができます。

  • TensorFlow 2.3 を使用する Notebooks インスタンス
  • このガイドで必要な Jupyter ノートブックを含む GitHub リポジトリのクローン。
  • リクエスト / レスポンスのログと、未加工のリクエスト / レスポンスのデータポイントを解析するビューを含む BigQuery テーブル。

このシナリオ用の Jupyter ノートブック

このワークフローのすべてのプロセスは、このドキュメントに関連付けられている GitHub リポジトリの Jupyter ノートブックに組み込まれています。ノートブックの手順は、UCI Machine Learning Repository の CoverType データセットに基づいています。このデータセットは、このシリーズの前のドキュメントのサンプルデータに使用したものと同じです。

ノートブック設定の構成

このセクションでは、シナリオのコードを実行する Python 環境を準備します。

  1. Cloud Console でパート 1 の AI Platform Notebooks インスタンスをまだ開いていない場合は、次の操作を行います。

    1. [Notebooks] ページに移動します。

      [Notebooks] ページに移動

    2. Notebooks インスタンス リストでノートブックを選択し、[Jupyterlab を開く] をクリックします。ブラウザで JupyterLab 環境が開きます。

    3. ファイル ブラウザで mlops-on-gcp ファイルを開き、skew-detection ディレクトリに移動します。

  2. 04-covertype-drift-detection_novelty_model.ipynb ノートブックを開きます。

  3. ノートブックの [設定] で、[パッケージと依存関係のインストール] セルを実行して、必要な Python パッケージをインストールし、環境変数を構成します。

  4. [Google Cloud 環境設定の構成] で、次の変数を設定します。

    • PROJECT_ID: リクエスト / レスポンス データの BigQuery データセットがロギングされる Google Cloud プロジェクトの ID。
    • BUCKET: 生成されたアーティファクトを保存する Cloud Storage バケットの名前。
    • BQ_DATASET_NAME: リクエスト / レスポンスのログが保存される BigQuery データセットの名前。
    • BQ_TABLE_NAME: リクエスト / レスポンスのログが保存される BigQuery テーブルの名前。
    • MODEL_NAME: AI Platform Prediction にデプロイされるモデルの名前。
    • MODEL_VERSION: AI Platform Prediction にデプロイされるモデルのバージョン名。バージョンは vN という形式です(例: v1)。
  5. [設定] の残りのセルを実行して、環境の構成を完了します。

    1. GCP アカウントの認証
    2. ローカル ワークスペースを作成する
  6. [データ分割のダウンロード] セクションのセルを実行して、データをノートブックで利用できるようにします。

特異点検知モデルのトレーニング

このガイドでは、BigQuery のログに記録されたリクエストの配信 / レスポンス データの総合的なドリフトスコアを算出するため、円エンベロープを画期的な検出モデルとして使用する方法について説明します。このセクションでは、次の操作を行う方法について説明します。

  • トレーニング データを使用して、楕円エンベロープをトレーニングする。
  • 通常のデータセットと変換されたデータセットの両方でエンベロープをテストする。
  • ドリフト スコアを比較する。

楕円エンベロープ アルゴリズムは、トレーニング データが既知の分布(例: ガウス分布)に由来することを前提とするパラメータ アルゴリズムです。このアルゴリズムは、ロバスト共分散推定を使用して、データの「形状」を定義しようとします。この推定を使用すると、モデルは、適した形状から指定された距離にある、離れたデータポイントを特定できます。共分散推定からマハラノビス距離を測定できます。この距離は順番に、外れ値らしさの測定に使用されます。

データの前処理

楕円エンベロープ モデルをトレーニングするために、データには前処理が 1 種類のみ必要です。カテゴリ特徴量は、特徴量の数値表現を生成するために、ワンホット エンコーディングされる必要があります。まず、ノートブックのコードは、トレーニング データ内のカテゴリを使用して、一連の OneHotEncoder オブジェクト(カテゴリ特徴量ごとに 1 つ)に適合します。次に、このコードは、適合するエンコーダを使用して、入力データセットのカテゴリ特徴量を変換します。この変換を行うには、次のコード スニペットに示すように、このコードは prepare_data メソッドを使用します。

from sklearn.preprocessing import OneHotEncoder

encoders = dict()

for feature_name in CATEGORICAL_FEATURE_NAMES:
  encoder = OneHotEncoder(handle_unknown='ignore')
  encoder.fit(train_data[[feature_name]])
  encoders[feature_name] = encoder

def prepare_data(data_frame):

  if type(data_frame) != pd.DataFrame:
    data_frame = pd.DataFrame(data_frame)
  data_frame = data_frame.reset_index()
  for feature_name, encoder in encoders.items():
    encoded_feature = pd.DataFrame(
      encoder.transform(data_frame[[feature_name]]).toarray())
    data_frame = data_frame.drop(feature_name, axis=1)
    encoded_feature.columns = [feature_name+"_"+str(column)
                              for column in encoded_feature.columns]
    data_frame = data_frame.join(encoded_feature)
   return data_frame

高次元データセット(多くの特徴量を含むデータセット、大量の画像を含むデータセット、大規模な語彙セットを含むテキストデータなど)を扱う際は、多くの場合、モデルをトレーニングする前に次元の削減を行います。一般的な手法には、プリンシパル コンポーネント分析(PCA)オートエンコーダランダム プロジェクションなどがあります。

モデル トレーニング

楕円エンベロープ モデルをトレーニングするには、コードで次の処理を行います。

  1. prepare_data メソッドを使用してトレーニング データを準備します。

  2. トレーニング データに外れ値がないと仮定して、EllipticEnvelope オブジェクトをインスタンス化し、contamination 値を 0 に設定します。

  3. model.fit メソッドを呼び出して、prepared_training_data オブジェクトを使用しモデルをトレーニングします。

次のコード スニペットは、この手順を示したものです。

from sklearn.covariance import EllipticEnvelope

prepared_training_data = prepare_data(train_data)
model = EllipticEnvelope(contamination=0.)
model.fit(prepared_training_data)

モデルのテスト

楕円エンベロープ モデルをトレーニングするために、このコードを使用すると、特定のデータポイントとトレーニング データの同時分布を表すモデル間のマハラノビス距離を計算できます。距離を計算するには、このコードは model.mahalanobis メソッドを呼び出し、特定のデータポイントを渡します。ただし、データポイントが外れ値と見なされる距離を特定するには、コードで次の手順を行う必要があります。

  1. モデルに関連するトレーニング データポイントのマハラノビス距離の平均と標準偏差を計算します。これらの値は、model._mean 値および model._stdv 値としてモデルに格納されます。

    平均に N 個の標準偏差ユニットを足したものよりも大きいマハラノビス距離にある新しいデータポイントはいずれも、外れ値と見なされます。

  2. 入力(サービング)データセットのサイズに対する外れ値の数の比率として、ドリフト スコアを計算します。

    このロジックは次のコード スニペットに表示されます。

    def compute_drift_score(model, data_frame, stdv_units=2):
      distances = model.mahalanobis(data_frame)
      threshold = model._mean + (stdv_units * model._stdv)
      score = len([v for v in distances if v >= threshold]) / len(data_frame.index)
      return score
    

ドリフト検知用にモデル動作をテストするために、このコードは評価データ分割を使用します。評価データ分割はトレーニング データと同様の分布を持つことを前提としています。このコードにより、各列の値がシャッフルされて、このデータセットの変換バージョンが作成されます。この場合、各特徴量の単変量分布は変更されません。ただし、各レコードの特徴量値の組み合わせはランダムであるため、データセット全体の多変量(同時)分布は変更されます。

変換データセットは、次のコードを使用して生成されます。

def shuffle_values(dataframe):
 shuffeld_dataframe = dataframe.copy()
 for column_name in dataframe.columns:
   shuffeld_dataframe[column_name] = shuffeld_dataframe[column_name].sample(
       frac=1.0).reset_index(drop=True)

通常の評価データセットのドリフト スコアを計算する際、データポイントの 1% 未満が外れ値として検出されます。これに対して、変換データセットでドリフト スコアを計算すると、データポイントの約 62% が外れ値として検出されます。

サービング データでブレの程度を計算する

トレーニング済みの楕円エンベロープ モデルを使用して、BigQuery にログが記録されたリクエスト / レスポンスのサービング データにおけるドリフト スコアを確認できます。Jupyter ノートブックのコードには、Apache Beam パイプラインの実装(Dataflow で大規模に実行が可能)が用意されているので、次の手順を行います。

  1. args.bq_table_fullname パラメータで示された表から未加工のリクエスト データを読み取ります。この表は、次のパラメータでフィルタリングされています。

    • args.model_name
    • args.model_version
    • args.start_time
    • args.end_time
  2. beam.BatchElements メソッドを使用してリクエスト レコードをグループ化し、ベクトル化を改善します。リクエスト レコードは JSON 形式で取得されます。

  3. parse_batch_data メソッドを使用して、取得したレコードバッチを解析し、構造化されたサンプルに変換します。

  4. prepare_data メソッドを使用して、データ(ワンホット エンコーディングのカテゴリ特徴量を使用する)を前処理します。

  5. args.drift_model インスタンスを使用して、データ内の各レコードをスコア化します。このステップでは、現在のバッチ内で外れ値とみなされるレコード数と、このバッチ内のレコードの合計数が返されます。

  6. beam.CombineGlobally メソッドを使用して結果を集約します。

  7. outlier_countrecord_countdrift_score の変数の値を含む最終的な結果オブジェクトを生成します。

  8. args.output_file_path 変数で指定された場所に JSON ファイルとして結果を書き込みます。

次のコード スニペットは、BigQuery でリクエスト / レスポンス データのドリフト スコアを計算する手順用の Apache Beam パイプライン実装を示したものです。

def run_pipeline(args):

 options = beam.options.pipeline_options.PipelineOptions(**args)
 args = namedtuple("options", args.keys())(*args.values())
 query = get_query(
     args.bq_table_fullname, args.model_name, args.model_version,
     args.start_time, args.end_time)

 print("Starting the Beam pipeline...")

 with beam.Pipeline(options=options) as pipeline:
   (
       pipeline
       | 'ReadBigQueryData' >> beam.io.Read(
           beam.io.BigQuerySource(query=query, use_standard_sql=True))
       | 'BatchRecords' >> beam.BatchElements(
           min_batch_size=100, max_batch_size=1000)
       | 'InstancesToBeamExamples' >> beam.Map(parse_batch_data)
       | 'PrepareData' >> beam.Map(prepare_data)
       | 'ScoreData' >> beam.Map(
           lambda data: score_data(data, args.drift_model, stdv_units=1))
       | 'CombineResults' >> beam.CombineGlobally(aggregate_scores)
       | 'ComputeRatio' >> beam.Map(
           lambda result: {
               "outlier_count": result['outlier_count'],
               "records_count": result['records_count'],
               "drift_score": result['outlier_count'] / result['records_count']
               })
        | 'WriteOutput' >> beam.io.WriteToText(
            file_path_prefix=args.output_file_path, num_shards=1, shard_name_template='')
   )

このドキュメントに関連したノートブックで生成されたスキュー データで、このパイプラインを実行すると、次のような結果が得られます。

outlier_count : 103
records_count : 1000
drift_score : 10.3%

偏りのあるデータポイントを使用してリクエスト / レスポンスのログテーブルにデータを入力する方法の詳細は、BigQuery での AI Platform Prediction ログの分析ガイドのサービング データのシミュレーションをご覧ください。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ