カスタム scikit-learn パイプラインによる予測

Colab のロゴ このチュートリアルを Colab のノートブックとして実行 GitHub ロゴ GitHub のノートブックを表示

このチュートリアルでは、AI Platform Prediction を使用して、カスタム変換器を使用する scikit-learn パイプラインをデプロイします。

scikit-learn パイプラインを使用すると、複数の推定器を構成できます。たとえば、変換器を使用してデータを前処理し、変換されたデータを分類器に渡すことができます。scikit-learn は、sklearn パッケージで複数の変換器を提供しています。

scikit-learn の FunctionTransformer または TransformerMixin クラスを使用して、独自のカスタム変換器を作成することもできます。カスタム変換器を使用するパイプラインを AI Platform Prediction にデプロイする場合は、そのコードをソース配布パッケージとして AI Platform Prediction に提供する必要があります。

このチュートリアルでは、国勢調査データに関連するサンプル問題を使用して、次の手順について説明します。

  • AI Platform Training でカスタム変換器を使った scikit-learn パイプラインをトレーニングする
  • トレーニング済みのパイプラインとカスタムコードを AI Platform Prediction にデプロイする
  • そのデプロイメントからの予測リクエストを処理する

データセット

このチュートリアルでは、UC Irvine Machine Learning Repository が提供する米国国勢調査の所得データセットを使用します。このデータセットには、1994 年の国勢調査データベースから得られた情報が収録されています。年齢、学歴、配偶者の有無、職種、年収が $50,000 以上あるかどうかなどの情報が含まれます。

このチュートリアルで使用するデータは、Cloud Storage の公開バケットで入手できます。gs://cloud-samples-data/ai-platform/sklearn/census_data/

目標

ここでの目的は、scikit-learn パイプラインをトレーニングすることにあります。この場合、対象者に関して国勢調査情報にある年収以外の情報(特徴)に基づき、その人の年収(ターゲット ラベル)が $50,000 以上になるかどうかを予測します。

このチュートリアルでは、モデル自体の設計よりも、AI Platform Prediction でこのモデルを使用することに重点を置きます。ただし、機械学習システムの構築では、問題や意図しない結果が生じる可能性があることを常に意識する必要があります。国勢調査のデータセットに偏りが生じる原因や、機械学習の公平性に関する一般的な説明については、公平性に関する機械学習集中講座をご覧ください。

費用

このチュートリアルでは、Google Cloud の課金対象となるコンポーネントを使用します。

  • AI Platform Training
  • AI Platform Prediction
  • Cloud Storage

詳細については、AI Platform Training の料金AI Platform Prediction の料金Cloud Storage の料金をご覧ください。また、料金計算ツールを使用すると、予想される使用量に基づいて費用を見積もることができます。

始める前に

AI Platform Prediction でモデルをトレーニングしてデプロイする前に、次の準備作業が必要です。

  • ローカル開発環境を設定します。
  • 課金と必要な API が有効になっている Google Cloud プロジェクトを設定します。
  • トレーニング パッケージとトレーニングしたモデルを保存する Cloud Storage バケットを作成します。

ローカル開発環境の設定

このチュートリアルを完了するには、以下のリソースが必要です。

  • Python 3
  • virtualenv
  • Cloud SDK

Python 開発環境を設定するための Google Cloud ガイドでは、上記の要件を満たすための詳しい手順を説明しています。要約した手順を次に示します。

  1. Python 3 をインストールします

  2. virtualenv をインストールして、Python 3 を使用する仮想環境を作成します。

  3. その環境をアクティブにします。

  4. 次のセクションにある手順に従い、Cloud SDK をインストールします。

Google Cloud プロジェクトの設定

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. AI Platform Training & Prediction and Compute Engine API を有効にします。

    API を有効にする

  5. Cloud SDK をインストールして初期化します。

GCP アカウントの認証

認証を設定するには、サービス アカウント キーを作成し、そのサービス アカウント キーへのファイルパスの環境変数を設定する必要があります。

  1. 認証に使用するサービス アカウント キーを作成します。
    1. Cloud Console で、[サービス アカウント キーの作成] ページに移動します。

      [サービス アカウント キーの作成] ページに移動
    2. [サービス アカウント] プルダウン リストから [新しいサービス アカウント] を選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。
    4. [役割] プルダウン リストから、[Machine Learning Engine] > [ML Engine 管理者]、および [ストレージ] > [ストレージ オブジェクト管理者] を選択します。

      : [役割] フィールドの設定に応じて、サービス アカウントによるリソースへのアクセスが承認されます。このフィールドは、後で Cloud Console を使用して表示および変更できます。本番環境のアプリを開発している場合は、[Machine Learning Engine] > [ML Engine 管理者][ストレージ] > [ストレージ オブジェクト管理者] よりも詳細な権限を指定する必要があります。詳しくは、AI Platform Prediction のアクセス制御をご覧ください。
    5. [作成] をクリックします。キーを含む JSON ファイルがパソコンにダウンロードされます。
  2. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのファイルパスに設定します。この変数は現在の shell セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

Cloud Storage バケットの作成

このチュートリアルでは、Cloud Storage をいくつかの方法で使用します。

  • Cloud SDK を使用してトレーニング ジョブを送信するときは、トレーニング コードを収めた Python パッケージを Cloud Storage バケットにアップロードします。AI Platform Training では、このパッケージからコードが実行されます。

  • このチュートリアルでは、ジョブで得られたトレーニング済みモデルも、AI Platform Training によって同じバケットに保存されます。

  • カスタムコードを使用して予測を行う scikit-learn パイプラインを AI Platform Prediction にデプロイするには、パイプラインが使用するカスタム変換器を Cloud Storage にアップロードする必要があります。

予測を行う AI Platform Prediction バージョンのリソースを作成する場合は、トレーニング済みの scikit-learn パイプラインとカスタムコードを Cloud Storage URI として提供します。

Cloud Storage バケットの名前を環境変数として設定します。この名前は、すべての Cloud Storage バケットにわたって一意なものにする必要があります。

BUCKET_NAME="your-bucket-name"

AI Platform TrainingAI Platform Prediction を利用できるリージョンを選択して、別の環境変数を作成します。例:

REGION="us-central1"

このリージョンに Cloud Storage バケットを作成し、後のトレーニングと予測で同じリージョンを使用します。そのバケットが存在しない場合は、次のコマンドを実行して作成します。

gsutil mb -l $REGION gs://$BUCKET_NAME

トレーニング アプリケーションとカスタム パイプライン コードを作成する

国勢調査データを使用して scikit-learn パイプラインをトレーニングするアプリケーションを作成します。このチュートリアルでは、トレーニング済みのパイプラインが予測中に使用するカスタムコードがトレーニング パッケージに含まれています。通常、パイプラインはトレーニングと予測で同じ変換器を使用するように設計されているため、これは便利なパターンです。

次の構造に一致するディレクトリを作成します。このディレクトリの中には 3 つのファイルが存在します。

census_package/
    __init__.py
    my_pipeline.py
    train.py

まず、空の census_package/ ディレクトリを作成します。

mkdir census_package

census_package/ 内に __init__.py という名前の空のファイルを作成します。

touch ./census_package/__init__.py

これで、census_package/ を Python パッケージとしてインポートできます。

カスタム変換器を作成する

scikit-learn では、パイプラインの一部として使用できる複数の変換器が用意されていますが、独自のカスタム変換器を定義することもできます。これらの変換器は、トレーニング中に保存された状態を学習し、予測で使用します。

sklearn.base.TransformerMixin を展開して、3 つの変換器を定義します。

  • PositionalSelector インデックス C のリストと行列 M が与えられると、C で示される M の列のサブセットを含む行列を返します。

  • StripString 文字列の行列が与えられると、各文字列から空白を除去します。

  • SimpleOneHotEncoder: 文字列の行列に適用できるシンプルなワンホット エンコーダです。

census_package/my_pipeline.py というファイルに次のコードを記述します。

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

class PositionalSelector(BaseEstimator, TransformerMixin):
    def __init__(self, positions):
        self.positions = positions

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return np.array(X)[:, self.positions]

class StripString(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        strip = np.vectorize(str.strip)
        return strip(np.array(X))

class SimpleOneHotEncoder(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        self.values = []
        for c in range(X.shape[1]):
            Y = X[:, c]
            values = {v: i for i, v in enumerate(np.unique(Y))}
            self.values.append(values)
        return self

    def transform(self, X):
        X = np.array(X)
        matrices = []
        for c in range(X.shape[1]):
            Y = X[:, c]
            matrix = np.zeros(shape=(len(Y), len(self.values[c])), dtype=np.int8)
            for i, x in enumerate(Y):
                if x in self.values[c]:
                    matrix[i][self.values[c][x]] = 1
            matrices.append(matrix)
        res = np.concatenate(matrices, axis=1)
        return res

パイプラインを定義してトレーニング モジュールを作成する

次に、国勢調査データで scikit-learn パイプラインをトレーニングするトレーニング モジュールを作成します。このコードの一部でパイプラインを定義します。

このトレーニング モジュールは次のような処理を行います。

  • トレーニング データをダウンロードし、scikit-learn で使用できる pandas DataFrameに読み込みます。
  • トレーニングする scikit-learn パイプラインを定義します。この例では、入力データから 3 つの数値特徴('age''education-num''hours-per-week')と 3 つのカテゴリ特徴('workclass''marital-status''relationship')を使用します。scikit-learn の組み込み StandardScaler を使用して数値特徴を変換し、my_pipeline.py で定義したカスタム ワンホット エンコーダでカテゴリ特徴を変換します。次に、前処理されたデータを分類器への入力として結合します。
  • 最後に、scikit-learn に含まれる joblib のバージョンを使用してモデルをエクスポートし、Cloud Storage バケットに保存します。

次のコードを census_package/train.py に記述します。

import warnings
import argparse
from google.cloud import storage

import pandas as pd
import numpy as np
from sklearn.externals import joblib
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline, FeatureUnion, make_pipeline
import census_package.my_pipeline as mp
warnings.filterwarnings('ignore')

def download_data(bucket_name, gcs_path, local_path):
    bucket = storage.Client().bucket(bucket_name)
    blob = bucket.blob(gcs_path)
    blob.download_to_filename(local_path)

def upload_data(bucket_name, gcs_path, local_path):
    bucket = storage.Client().bucket(bucket_name)
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_path)

def get_features_target(local_path):
    strip = np.vectorize(str.strip)
    raw_df = pd.read_csv(local_path, header=None)
    target_index = len(raw_df.columns) - 1  # Last columns, 'income-level', is the target

    features_df = raw_df.drop(target_index, axis=1)
    features = features_df.as_matrix()
    target = strip(raw_df[target_index].values)
    return features, target

def create_pipeline():
    # We want to use 3 categorical and 3 numerical features in this sample.
    # Categorical features: age, education-num, and hours-per-week
    # Numerical features: workclass, marital-status, and relationship
    numerical_indices = [0, 4, 12]  # age, education-num, and hours-per-week
    categorical_indices = [1, 5, 7]  # workclass, marital-status, and relationship

    p1 = make_pipeline(mp.PositionalSelector(categorical_indices), mp.StripString(), mp.SimpleOneHotEncoder())
    p2 = make_pipeline(mp.PositionalSelector(numerical_indices), StandardScaler())

    feats = FeatureUnion([
        ('numericals', p1),
        ('categoricals', p2),
    ])

    pipeline = Pipeline([
        ('pre', feats),
        ('estimator', GradientBoostingClassifier(max_depth=4, n_estimators=100))
    ])
    return pipeline

def get_bucket_path(gcs_uri):
    if not gcs_uri.startswith('gs://'):
        raise Exception('{} does not start with gs://'.format(gcs_uri))
    no_gs_uri = gcs_uri[len('gs://'):]
    first_slash_index = no_gs_uri.find('/')
    bucket_name = no_gs_uri[:first_slash_index]
    gcs_path = no_gs_uri[first_slash_index + 1:]
    return bucket_name, gcs_path

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--gcs_data_path', action="store", required=True)
    parser.add_argument('--gcs_model_path', action="store", required=True)

    arguments, others = parser.parse_known_args()

    local_path = '/tmp/adul.data'
    data_bucket, data_path = get_bucket_path(arguments.gcs_data_path)
    print('Downloading the data...')
    download_data(data_bucket, data_path, local_path)
    features, target = get_features_target(local_path)
    pipeline = create_pipeline()

    print('Training the model...')
    pipeline.fit(features, target)

    joblib.dump(pipeline, './model.joblib')

    model_bucket, model_path = get_bucket_path(arguments.gcs_model_path)
    upload_data(model_bucket, model_path, './model.joblib')
    print('Model was successfully uploaded.')

AI Platform Training でパイプラインをトレーニングする

gcloud を使用して AI Platform Training にトレーニング ジョブを送信します。次のコマンドは、トレーニング アプリケーションをパッケージ化して Cloud Storage にアップロードし、AI Platform Training にトレーニング モジュールの実行を指示します。

-- 引数は区切り文字です。AI Platform Training サービスは、区切り文字に続く引数を使用しませんが、トレーニング モジュールはこれらの引数にアクセスできます。

gcloud ai-platform jobs submit training census_training_$(date +"%Y%m%d_%H%M%S") \
  --job-dir gs://$BUCKET_NAME/custom_pipeline_tutorial/job \
  --package-path ./census_package \
  --module-name census_package.train \
  --region $REGION \
  --runtime-version 1.13 \
  --python-version 3.5 \
  --scale-tier BASIC \
  --stream-logs \
  -- \
  --gcs_data_path gs://cloud-samples-data/ai-platform/census/data/adult.data.csv \
  --gcs_model_path gs://$BUCKET_NAME/custom_pipeline_tutorial/model/model.joblib

パイプラインをデプロイして予測を提供する

AI Platform Prediction から予測を提供するには、モデルリソースとバージョン リソースをデプロイする必要があります。パイプラインを複数回変更してトレーニングする場合に、モデルを使用してデプロイメントを整理します。バージョンは、トレーニング済みのモデルとカスタムコードを使用して、予測を提供します。

これらのリソースをデプロイするには、2 つのアーティファクトを提供する必要があります。

  • トレーニング済みのパイプラインを含む Cloud Storage ディレクトリ。このファイルは、前の手順のトレーニング ジョブで model.joblib をバケットにエクスポートしたときに作成されています。
  • パイプラインが使用するカスタム変換器を含む Cloud Storage の .tar.gz ソース配布パッケージ。これは次のステップで作成します。

カスタム変換器をパッケージ化する

my_pipeline.py からコードを提供せずにバージョンをデプロイした場合、AI Platform Prediction はカスタム変換器(たとえば、mp.SimpleOneHotEncoder)をインポートできません。このため、予測を提供できなくなります。

以下の setup.py を作成して、コードのソース配布パッケージを定義します。

import setuptools
setuptools.setup(name='census_package',
      packages=['census_package'],
      version="1.0",
      )

次に、次のコマンドを実行して dist/census_package-1.0.tar.gz を作成します。

python setup.py sdist --formats=gztar

最後に、この tarball を Cloud Storage バケットにアップロードします。

gsutil cp ./dist/census_package-1.0.tar.gz gs://$BUCKET_NAME/custom_pipeline_tutorial/code/census_package-1.0.tar.gz

モデルリソースとバージョン リソースを作成する

まず、モデル名とバージョン名を定義します。

MODEL_NAME='CensusPredictor'
VERSION_NAME='v1'

次のコマンドを使用して、モデルリソースを作成します。

gcloud ai-platform models create $MODEL_NAME \
  --regions $REGION

最後に、モデル ディレクトリ(model.joblib を含むディレクトリ)とカスタムコード(census_package-1.0.tar.gz)への Cloud Storage のパスを指定して、バージョン リソースを作成します。

gcloud components install beta

gcloud beta ai-platform versions create $VERSION_NAME --model $MODEL_NAME \
  --origin gs://$BUCKET_NAME/custom_pipeline_tutorial/model/ \
  --runtime-version 1.13 \
  --python-version 3.5 \
  --framework SCIKIT_LEARN \
  --package-uris gs://$BUCKET_NAME/custom_pipeline_tutorial/code/census_package-1.0.tar.gz

オンライン予測を提供する

オンライン予測リクエストを送信してデプロイメントを試してみましょう。まず、Python 用の Google API クライアント ライブラリをインストールします。

pip install --upgrade google-api-python-client

次に、デプロイしたバージョンに国勢調査データの 2 つのインスタンスを送信します。

import googleapiclient.discovery

instances = [
  [39, 'State-gov', 77516, ' Bachelors .  ', 13, 'Never-married', 'Adm-clerical', 'Not-in-family',
   'White', 'Male', 2174, 0, 40, 'United-States', '<=50K'],
  [50, 'Self-emp-not-inc', 83311, 'Bachelors', 13, 'Married-civ-spouse', 'Exec-managerial', 'Husband',
   'White', 'Male', 0, 0, 13, 'United-States', '<=50K']
]

service = googleapiclient.discovery.build('ml', 'v1')
name = 'projects/{}/models/{}/versions/{}'.format(PROJECT_ID, MODEL_NAME, VERSION_NAME)

response = service.projects().predict(
    name=name,
    body={'instances': instances}
).execute()

if 'error' in response:
    raise RuntimeError(response['error'])
else:
  print(response['predictions'])

バージョンは、トレーニング済みのパイプラインを介して入力データを渡し、分類器の結果を返します。結果は、個人の収入範囲の予測に応じて <=50K または >50K になります。

クリーンアップ

このプロジェクトで使用しているすべての Google Cloud リソースをクリーンアップするには、チュートリアルで使用した Google Cloud プロジェクトを削除します。

また、以下のコマンドを実行して、個々のリソースを別々にクリーンアップすることもできます。

# Delete version resource
gcloud ai-platform versions delete $VERSION_NAME --quiet --model $MODEL_NAME

# Delete model resource
gcloud ai-platform models delete $MODEL_NAME --quiet

# Delete Cloud Storage objects that were created
gsutil -m rm -r gs://$BUCKET_NAME/custom_pipeline_tutorial

次のステップ