機械学習モデルの最適化

Colab ロゴ このチュートリアルを Colab のノートブックとして実行 GitHub ロゴ GitHub のノートブックを表示

このチュートリアルでは、AI Platform Optimizer の条件付き目標の最適化について説明します。

データセット

このサンプルでトレーニングに使用している国勢調査所得データセットは、UC Irvine Machine Learning Repository でホストされています。

ある人物の年齢、教育、婚姻状況、職業が含まれる国勢調査データを使用して、その人物に 1 年に 50,000 ドル以上の収入があるかどうかを予測します。ロジスティック回帰モデルをトレーニングします。ロジスティック回帰モデルでは、個々の情報によって 0~1 の数値が出力されます。この数値は、個人の年収が 50,000 ドルを超える確率と解釈できます。

目的

このチュートリアルでは、AI Platform Optimizer を使用して機械学習モデルのハイパーパラメータ検索を最適化する方法を説明します。

このサンプルでは、AI Platform Optimizer と AI Platform Training の組み込みアルゴリズムを使用して、国勢調査データセットで分類モデルを最適化する自動学習デモを実装します。AI Platform Optimizer を使用して、推奨されるハイパーパラメータ値を取得し、推奨されるハイパーパラメータ値でモデル トレーニング ジョブを AI Platform Training の組み込みアルゴリズムに送信します。

費用

このチュートリアルでは、Google Cloud の課金対象となるコンポーネントを使用します。

  • AI Platform Training
  • Cloud Storage

詳しくは、AI Platform Training の料金Cloud Storage の料金をご覧ください。また、料金計算ツールを使用すると、予想される使用量に基づいて費用の見積もりを作成できます。

PIP インストール パッケージと依存関係

ノートブック環境にインストールされていない依存関係をインストールします。

  • フレームワークの最新の一般提供メジャー バージョンを使用します。
! pip install -U google-api-python-client
! pip install -U google-cloud
! pip install -U google-cloud-storage
! pip install -U requests

# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

Google Cloud プロジェクトを設定する

ノートブック環境に関係なく、次の手順が必要です。

  1. Google Cloud プロジェクトを選択または作成します。

  2. プロジェクトに対して課金が有効になっていることを確認します。

  3. AI Platform API を有効にします。

  4. このノートブックをローカルで実行する場合は、Google Cloud SDK をインストールする必要があります。

  5. 以下のセルにプロジェクト ID を入力します。次に、以下のセルを実行して、Cloud SDK がこのノートブックのすべてのコマンドに適切なプロジェクトを使用するようにします。

: Jupyter は先頭に ! が付いた行をシェルコマンドとして実行し、これらのコマンドに先頭に $ が付いた Python 変数を挿入します。

PROJECT_ID = "[project-id]" #@param {type:"string"}
! gcloud config set project $PROJECT_ID

Google Cloud アカウントを認証する

AI Platform Notebooks を使用している場合、環境はすでに認証されています。以下の手順を省略します。

次のセルでは、自分を 2 回認証する必要があります。

import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your Google Cloud account. This provides access
# to your Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

if 'google.colab' in sys.modules:
    from google.colab import auth as google_auth
    google_auth.authenticate_user()

# If you are running this tutorial in a notebook locally, replace the string
# below with the path to your service account key and run this cell to
# authenticate your Google Cloud account.
else:
    %env GOOGLE_APPLICATION_CREDENTIALS your_path_to_credentials.json

# Log in to your account on Google Cloud
! gcloud auth application-default login
! gcloud auth login

ライブラリをインポートする

import json
import time
import datetime
from googleapiclient import errors

チュートリアル

設定

このセクションでは、AI Platform Optimizer API を呼び出すためのパラメータとユーティリティ メソッドを定義します。開始するには、次の情報を入力してください。

# Update to your username
USER = '[user-id]' #@param {type: 'string'}

STUDY_ID = '{}_study_{}'.format(USER, datetime.datetime.now().strftime('%Y%m%d_%H%M%S')) #@param {type: 'string'}
REGION = 'us-central1'
def study_parent():
  return 'projects/{}/locations/{}'.format(PROJECT_ID, REGION)

def study_name(study_id):
  return 'projects/{}/locations/{}/studies/{}'.format(PROJECT_ID, REGION, study_id)

def trial_parent(study_id):
  return study_name(study_id)

def trial_name(study_id, trial_id):
  return 'projects/{}/locations/{}/studies/{}/trials/{}'.format(PROJECT_ID, REGION,
                                                                study_id, trial_id)

def operation_name(operation_id):
  return 'projects/{}/locations/{}/operations/{}'.format(PROJECT_ID, REGION, operation_id)

print('USER: {}'.format(USER))
print('PROJECT_ID: {}'.format(PROJECT_ID))
print('REGION: {}'.format(REGION))
print('STUDY_ID: {}'.format(STUDY_ID))

API クライアントのビルド

次のセルは、Google API Discovery Service を使用して自動生成された API クライアントをビルドします。JSON 形式 API スキーマは、Cloud Storage バケットにホストされています。

from google.cloud import storage
from googleapiclient import discovery

_OPTIMIZER_API_DOCUMENT_BUCKET = 'caip-optimizer-public'
_OPTIMIZER_API_DOCUMENT_FILE = 'api/ml_public_google_rest_v1.json'

def read_api_document():
  client = storage.Client(PROJECT_ID)
  bucket = client.get_bucket(_OPTIMIZER_API_DOCUMENT_BUCKET)
  blob = bucket.get_blob(_OPTIMIZER_API_DOCUMENT_FILE)
  return blob.download_as_string()

ml = discovery.build_from_document(service=read_api_document())
print('Successfully built the client.')

スタディの構成

このチュートリアルでは、AI Platform Optimizer でスタディを作成し、トライアルをリクエストします。トライアルごとに、AI Platform Training の組み込みアルゴリズム ジョブを作成し、推奨されるハイパーパラメータを使用してモデル トレーニングを行います。各トライアルの測定値はモデル accuracy として報告されます。

AI Platform Optimizer によって提供される条件パラメータ機能では、ハイパーパラメータのツリー形式の検索空間を定義します。最上位のハイパーパラメータは model_type で、LINEAR~WIDE_AND_DEEP の値が指定されます。各モデルタイプには、対応する第 2 レベルのハイパーパラメータがあり、以下を調整します。

  • model_typeLINEAR の場合、learning_rate が調整されます。
  • model_typeWIDE_AND_DEEP の場合、learning_ratednn_learning_rate の両方が調整されます。

model_type が LINEAR または WIDE_AND_Deep のディシジョン ツリー。LINEAR は learning_rate をポイントし、WIDE_AND_DEEP は learning_rate と dnn_learning_rate をポイントします

以下は、階層的 Python 辞書として構築されたスタディ構成のサンプルです。これはすでに入力されています。セルを実行してスタディを構成します。

param_learning_rate = {
    'parameter': 'learning_rate',
    'type' : 'DOUBLE',
    'double_value_spec' : {
        'min_value' : 0.00001,
        'max_value' : 1.0
    },
    'scale_type' : 'UNIT_LOG_SCALE',
    'parent_categorical_values' : {
        'values': ['LINEAR', 'WIDE_AND_DEEP']
    },
}

param_dnn_learning_rate = {
    'parameter': 'dnn_learning_rate',
    'type' : 'DOUBLE',
    'double_value_spec' : {
        'min_value' : 0.00001,
        'max_value' : 1.0
    },
    'scale_type' : 'UNIT_LOG_SCALE',
    'parent_categorical_values' : {
        'values': ['WIDE_AND_DEEP']
    },
}

param_model_type = {
    'parameter': 'model_type',
    'type' : 'CATEGORICAL',
    'categorical_value_spec' : {'values': ['LINEAR', 'WIDE_AND_DEEP']},
    'child_parameter_specs' : [param_learning_rate, param_dnn_learning_rate,]
}

metric_accuracy = {
    'metric' : 'accuracy',
    'goal' : 'MAXIMIZE'
}

study_config = {
    'algorithm' : 'ALGORITHM_UNSPECIFIED',  # Let the service choose the `default` algorithm.
    'parameters' : [param_model_type,],
    'metrics' : [metric_accuracy,],
}

study = {'study_config': study_config}
print(json.dumps(study, indent=2, sort_keys=True))

スタディを作成する

次に、2 つの目標を最適化するために後で実行するスタディを作成します。

# Creates a study
req = ml.projects().locations().studies().create(
    parent=study_parent(), studyId=STUDY_ID, body=study)
try :
  print(req.execute())
except errors.HttpError as e:
  if e.resp.status == 409:
    print('Study already existed.')
  else:
    raise e

入出力パラメータを設定する

次の出力パラメータを設定します。

OUTPUT_BUCKETOUTPUT_DIR は、AI Platform Training ジョブの「job_dir」として使用される Cloud Storage バケットとディレクトリです。OUTPUT_BUCKET はプロジェクト内のバケットで、OUTPUT_DIR はバケット内の出力フォルダに付ける名前です。

job_dir の形式は「gs://$OUTPUT_BUCKET/$OUTPUT_DIR/」です。

TRAINING_DATA_PATH は、入力トレーニング データセットのパスです。

# `job_dir` will be `gs://${OUTPUT_BUCKET}/${OUTPUT_DIR}/${job_id}`
OUTPUT_BUCKET = '[output-bucket-name]' #@param {type: 'string'}
OUTPUT_DIR = '[output-dir]' #@param {type: 'string'}
TRAINING_DATA_PATH = 'gs://caip-optimizer-public/sample-data/raw_census_train.csv' #@param {type: 'string'}

print('OUTPUT_BUCKET: {}'.format(OUTPUT_BUCKET))
print('OUTPUT_DIR: {}'.format(OUTPUT_DIR))
print('TRAINING_DATA_PATH: {}'.format(TRAINING_DATA_PATH))

# Create the bucket in Cloud Storage
! gsutil mb -p $PROJECT_ID gs://$OUTPUT_BUCKET/

指標の評価

このセクションでは、トライアル評価を行う方法を定義します。

トライアルごとに、AI Platform Optimizer によって推奨されたハイパーパラメータを使用して、機械学習モデルをトレーニングするための AI Platform の組み込みアルゴリズム ジョブを送信します。各ジョブで、ジョブの完了時にモデルの概要ファイルを Cloud Storage に書き込みます。ジョブ ディレクトリからモデルの精度を取得し、トライアルの final_measurement として報告できます。

import logging
import math
import subprocess
import os
import yaml

from google.cloud import storage

_TRAINING_JOB_NAME_PATTERN = '{}_condition_parameters_{}_{}'
_IMAGE_URIS = {'LINEAR' : 'gcr.io/cloud-ml-algos/linear_learner_cpu:latest',
               'WIDE_AND_DEEP' : 'gcr.io/cloud-ml-algos/wide_deep_learner_cpu:latest'}
_STEP_COUNT = 'step_count'
_ACCURACY = 'accuracy'

def EvaluateTrials(trials):
  """Evaluates trials by submitting training jobs to AI Platform Training service.

     Args:
       trials: List of Trials to evaluate

     Returns: A dict of <trial_id, measurement> for the given trials.
  """
  trials_by_job_id = {}
  mesurement_by_trial_id = {}

  # Submits a AI Platform Training job for each trial.
  for trial in trials:
    trial_id = int(trial['name'].split('/')[-1])
    model_type = _GetSuggestedParameterValue(trial, 'model_type', 'stringValue')
    learning_rate = _GetSuggestedParameterValue(trial, 'learning_rate',
                                                'floatValue')
    dnn_learning_rate = _GetSuggestedParameterValue(trial, 'dnn_learning_rate',
                                                    'floatValue')
    job_id = _GenerateTrainingJobId(model_type=model_type,
                                    trial_id=trial_id)
    trials_by_job_id[job_id] = {
        'trial_id' : trial_id,
        'model_type' : model_type,
        'learning_rate' : learning_rate,
        'dnn_learning_rate' : dnn_learning_rate,
    }
    _SubmitTrainingJob(job_id, trial_id, model_type, learning_rate, dnn_learning_rate)

  # Waits for completion of AI Platform Training jobs.
  while not _JobsCompleted(trials_by_job_id.keys()):
    time.sleep(60)

  # Retrieves model training result(e.g. global_steps, accuracy) for AI Platform Training jobs.
  metrics_by_job_id = _GetJobMetrics(trials_by_job_id.keys())
  for job_id, metric in metrics_by_job_id.items():
    measurement = _CreateMeasurement(trials_by_job_id[job_id]['trial_id'],
                                     trials_by_job_id[job_id]['model_type'],
                                     trials_by_job_id[job_id]['learning_rate'],
                                     trials_by_job_id[job_id]['dnn_learning_rate'],
                                     metric)
    mesurement_by_trial_id[trials_by_job_id[job_id]['trial_id']] = measurement
  return mesurement_by_trial_id

def _CreateMeasurement(trial_id, model_type, learning_rate, dnn_learning_rate, metric):
  if not metric[_ACCURACY]:
    # Returns `none` for trials without metrics. The trial will be marked as `INFEASIBLE`.
    return None
  print(
      'Trial {0}: [model_type = {1}, learning_rate = {2}, dnn_learning_rate = {3}] => accuracy = {4}'.format(
          trial_id, model_type, learning_rate,
          dnn_learning_rate if dnn_learning_rate else 'N/A', metric[_ACCURACY]))
  measurement = {
      _STEP_COUNT: metric[_STEP_COUNT],
      'metrics': [{'metric': _ACCURACY, 'value': metric[_ACCURACY]},]}
  return measurement

def _SubmitTrainingJob(job_id, trial_id, model_type, learning_rate, dnn_learning_rate=None):
  """Submits a built-in algo training job to AI Platform Training Service."""
  try:
    if model_type == 'LINEAR':
      subprocess.check_output(_LinearCommand(job_id, learning_rate), stderr=subprocess.STDOUT)
    elif model_type == 'WIDE_AND_DEEP':
      subprocess.check_output(_WideAndDeepCommand(job_id, learning_rate, dnn_learning_rate), stderr=subprocess.STDOUT)
    print('Trial {0}: Submitted job [https://console.cloud.google.com/ai-platform/jobs/{1}?project={2}].'.format(trial_id, job_id, PROJECT_ID))
  except subprocess.CalledProcessError as e:
    logging.error(e.output)

def _GetTrainingJobState(job_id):
  """Gets a training job state."""
  cmd = ['gcloud', 'ai-platform', 'jobs', 'describe', job_id,
         '--project', PROJECT_ID,
         '--format', 'json']
  try:
    output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, timeout=3)
  except subprocess.CalledProcessError as e:
    logging.error(e.output)
  return json.loads(output)['state']

def _JobsCompleted(jobs):
  """Checks if all the jobs are completed."""
  all_done = True
  for job in jobs:
    if _GetTrainingJobState(job) not in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
      print('Waiting for job[https://console.cloud.google.com/ai-platform/jobs/{0}?project={1}] to finish...'.format(job, PROJECT_ID))
      all_done = False
  return all_done

def _RetrieveAccuracy(job_id):
  """Retrices the accuracy of the trained model for a built-in algorithm job."""
  storage_client = storage.Client(project=PROJECT_ID)
  bucket = storage_client.get_bucket(OUTPUT_BUCKET)
  blob_name = os.path.join(OUTPUT_DIR, job_id, 'model/deployment_config.yaml')
  blob = storage.Blob(blob_name, bucket)
  try:
    blob.reload()
    content = blob.download_as_string()
    accuracy = float(yaml.safe_load(content)['labels']['accuracy']) / 100
    step_count = int(yaml.safe_load(content)['labels']['global_step'])
    return {_STEP_COUNT: step_count, _ACCURACY: accuracy}
  except:
    # Returns None if failed to load the built-in algo output file.
    # It could be due to job failure and the trial will be `INFEASIBLE`
    return None

def _GetJobMetrics(jobs):
  accuracies_by_job_id = {}
  for job in jobs:
    accuracies_by_job_id[job] = _RetrieveAccuracy(job)
  return accuracies_by_job_id

def _GetSuggestedParameterValue(trial, parameter, value_type):
  param_found = [p for p in trial['parameters'] if p['parameter'] == parameter]
  if param_found:
    return param_found[0][value_type]
  else:
    return None

def _GenerateTrainingJobId(model_type, trial_id):
  return _TRAINING_JOB_NAME_PATTERN.format(STUDY_ID, model_type, trial_id)

def _GetJobDir(job_id):
  return os.path.join('gs://', OUTPUT_BUCKET, OUTPUT_DIR, job_id)

def _LinearCommand(job_id, learning_rate):
  return ['gcloud', 'ai-platform', 'jobs', 'submit', 'training', job_id,
          '--scale-tier', 'BASIC',
          '--region', 'us-central1',
          '--master-image-uri', _IMAGE_URIS['LINEAR'],
          '--project', PROJECT_ID,
          '--job-dir', _GetJobDir(job_id),
          '--',
          '--preprocess',
          '--model_type=classification',
          '--batch_size=250',
          '--max_steps=1000',
          '--learning_rate={}'.format(learning_rate),
          '--training_data_path={}'.format(TRAINING_DATA_PATH)]

def _WideAndDeepCommand(job_id, learning_rate, dnn_learning_rate):
  return ['gcloud', 'ai-platform', 'jobs', 'submit', 'training', job_id,
          '--scale-tier', 'BASIC',
          '--region', 'us-central1',
          '--master-image-uri', _IMAGE_URIS['WIDE_AND_DEEP'],
          '--project', PROJECT_ID,
          '--job-dir', _GetJobDir(job_id),
          '--',
          '--preprocess',
          '--test_split=0',
          '--use_wide',
          '--embed_categories',
          '--model_type=classification',
          '--batch_size=250',
          '--learning_rate={}'.format(learning_rate),
          '--dnn_learning_rate={}'.format(dnn_learning_rate),
          '--max_steps=1000',
          '--training_data_path={}'.format(TRAINING_DATA_PATH)]

候補やトライアルのリクエストを構成する

client_id - 候補をリクエストしているクライアントの ID。複数の SuggestTrialsRequest に同じ client_id が指定されていて、トライアルが PENDING であれば、サービスは同じ推奨トライアルを返します。また、最後の推奨トライアルが完了すると新しいトライアルを入力します。

suggestion_count_per_request - 1 回のリクエストでリクエストされた候補(トライアル)の数。

max_trial_id_to_stop - 停止するまでのトライアル数。コードを実行する時間を短縮するために 4 に設定されているため、収束は期待できません。収束を行うには、20 程度に設定する必要があります(次元数の合計の 10 倍をおすすめします)。

client_id = 'client1' #@param {type: 'string'}
suggestion_count_per_request =   2 #@param {type: 'integer'}
max_trial_id_to_stop =   4 #@param {type: 'integer'}

print('client_id: {}'.format(client_id))
print('suggestion_count_per_request: {}'.format(suggestion_count_per_request))
print('max_trial_id_to_stop: {}'.format(max_trial_id_to_stop))

AI Platform Optimizer のトライアルをリクエストして実行する

トライアルを実行します。

current_trial_id = 0
while current_trial_id < max_trial_id_to_stop:
  # Request trials
  resp = ml.projects().locations().studies().trials().suggest(
    parent=trial_parent(STUDY_ID),
    body={'client_id': client_id, 'suggestion_count': suggestion_count_per_request}).execute()
  op_id = resp['name'].split('/')[-1]

  # Polls the suggestion long-running operations.
  get_op = ml.projects().locations().operations().get(name=operation_name(op_id))
  while True:
      operation = get_op.execute()
      if 'done' in operation and operation['done']:
        break
      time.sleep(1)

  # Featches the suggested trials.
  trials = []
  for suggested_trial in get_op.execute()['response']['trials']:
    trial_id = int(suggested_trial['name'].split('/')[-1])
    trial = ml.projects().locations().studies().trials().get(name=trial_name(STUDY_ID, trial_id)).execute()
    if trial['state'] not in ['COMPLETED', 'INFEASIBLE']:
      print("Trial {}: {}".format(trial_id, trial))
      trials.append(trial)

  # Evaluates trials - Submit model training jobs using AI Platform Training built-in algorithms.
  measurement_by_trial_id = EvaluateTrials(trials)

  # Completes trials.
  for trial in trials:
    trial_id = int(trial['name'].split('/')[-1])
    current_trial_id = trial_id
    measurement = measurement_by_trial_id[trial_id]
    print(("=========== Complete Trial: [{0}] =============").format(trial_id))
    if measurement:
      # Completes trial by reporting final measurement.
      ml.projects().locations().studies().trials().complete(
        name=trial_name(STUDY_ID, trial_id),
        body={'final_measurement' : measurement}).execute()
    else:
      # Marks trial as `infeasbile` if when missing final measurement.
      ml.projects().locations().studies().trials().complete(
        name=trial_name(STUDY_ID, trial_id),
        body={'trial_infeasible' : True}).execute()

[オプション] 独自のパラメータを使用してトライアルを作成する

サービスからのパラメータの候補(suggest メソッド)のリクエスト以外にも、AI Platform Optimizer の API では独自のパラメータを使用してトライアル(create メソッド)を作成できます。AI Platform Optimizer は、ユーザーが行ったテストを記録し、知識を利用して新しい候補を生成するために役立ちます。

たとえば、AI Platform Optimizer によって推奨されるものではなく、独自の model_typelearning_rate を使用してモデル トレーニング ジョブを実行する場合、スタディの一部としてトライアルを作成できます。

# User has to leave `trial.name` unset in CreateTrial request, the service will
# assign it.
custom_trial = {
  "clientId": "client1",
  "finalMeasurement": {
    "metrics": [
      {
        "metric": "accuracy",
        "value": 0.86
      }
    ],
    "stepCount": "1000"
  },
  "parameters": [
    {
      "parameter": "model_type",
      "stringValue": "LINEAR"
    },
    {
      "floatValue": 0.3869103706121445,
      "parameter": "learning_rate"
    }
  ],
  "state": "COMPLETED"
}

trial = ml.projects().locations().studies().trials().create(
    parent=trial_parent(STUDY_ID), body=custom_trial).execute()

print(json.dumps(trial, indent=2, sort_keys=True))

トライアルを一覧表示する

各最適化トライアル トレーニングの結果を一覧表示します。

resp = ml.projects().locations().studies().trials().list(parent=trial_parent(STUDY_ID)).execute()
print(json.dumps(resp, indent=2, sort_keys=True))

クリーンアップ

このプロジェクトで使用しているすべての Google Cloud リソースをクリーンアップするには、チュートリアルで使用した Google Cloud プロジェクトを削除します。