优化机器学习模型

Colab 徽标 在 Colab 中以笔记本的形式运行本教程 GitHub 徽标在 GitHub 上查看笔记本

本教程将演示如何优化 AI Platform Optimizer 条件目标。

数据集

本示例中用于训练的人口普查收入数据集由 UC Irvine 机器学习代码库托管。

目标是利用个人的年龄、教育程度、婚姻状况和职业(特征)等人口普查数据,预测个人的年收入是否超过 5 万美元(目标标签)。您将训练一个逻辑回归模型,该模型根据给出的个人信息输出 0 到 1 之间的数字。此数字可以理解为个人年收入超过 5 万美元的概率。

目标

本教程演示如何使用 AI Platform Optimizer 优化机器学习模型的超参数搜索。

本示例演示了自动学习过程,即通过 AI Platform Optimizer 和 AI Platform Training 内置算法,使用人口普查数据集优化分类模型。您将使用 AI Platform Optimizer 获取建议的超参数值,并通过 AI Platform Training 内置算法提交包含这些建议的超参数值的模型训练作业。

费用

本教程使用 Google Cloud 的以下收费组件:

  • AI Platform Training
  • Cloud Storage

了解 AI Platform Training 价格Cloud Storage 价格,并使用价格计算器根据您的预计使用情况来估算费用。

PIP 安装软件包和依赖项

安装未在笔记本环境中安装的其他依赖项。

  • 使用最新的主要正式版框架。
! pip install -U google-api-python-client
! pip install -U google-cloud
! pip install -U google-cloud-storage
! pip install -U requests

# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

设置您的 Google Cloud 项目

无论您使用哪种笔记本环境,都必须执行以下步骤。

  1. 选择或创建 Google Cloud 项目。

  2. 确保您的项目已启用结算功能。

  3. 启用 AI Platform API。

  4. 如果您在本地运行此笔记本,则需要安装 Google Cloud SDK

  5. 在下面的单元中输入您的项目 ID。然后运行该单元,确保 Cloud SDK 将适当的项目用于此笔记本中的所有命令。

注意:Jupyter 将前面带 ! 的代码行作为 shell 命令运行,并将前面带 $ 的 Python 变量插入到这些命令中。

PROJECT_ID = "[project-id]" #@param {type:"string"}
! gcloud config set project $PROJECT_ID

验证您的 Google Cloud 账号

如果您使用的是 AI Platform Notebooks,则您的环境已通过身份验证。请跳过这些步骤。

以下单元要求您进行两次身份验证。

import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your Google Cloud account. This provides access
# to your Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

if 'google.colab' in sys.modules:
    from google.colab import auth as google_auth
    google_auth.authenticate_user()

# If you are running this tutorial in a notebook locally, replace the string
# below with the path to your service account key and run this cell to
# authenticate your Google Cloud account.
else:
    %env GOOGLE_APPLICATION_CREDENTIALS your_path_to_credentials.json

# Log in to your account on Google Cloud
! gcloud auth application-default login
! gcloud auth login

导入库

import json
import time
import datetime
from googleapiclient import errors

教程

设置

本部分定义了用于调用 AI Platform Optimizer API 的一些参数和实用程序方法。首先,请填写以下信息。

# Update to your username
USER = '[user-id]' #@param {type: 'string'}

STUDY_ID = '{}_study_{}'.format(USER, datetime.datetime.now().strftime('%Y%m%d_%H%M%S')) #@param {type: 'string'}
REGION = 'us-central1'
def study_parent():
  return 'projects/{}/locations/{}'.format(PROJECT_ID, REGION)

def study_name(study_id):
  return 'projects/{}/locations/{}/studies/{}'.format(PROJECT_ID, REGION, study_id)

def trial_parent(study_id):
  return study_name(study_id)

def trial_name(study_id, trial_id):
  return 'projects/{}/locations/{}/studies/{}/trials/{}'.format(PROJECT_ID, REGION,
                                                                study_id, trial_id)

def operation_name(operation_id):
  return 'projects/{}/locations/{}/operations/{}'.format(PROJECT_ID, REGION, operation_id)

print('USER: {}'.format(USER))
print('PROJECT_ID: {}'.format(PROJECT_ID))
print('REGION: {}'.format(REGION))
print('STUDY_ID: {}'.format(STUDY_ID))

构建 API 客户端

以下单元使用 Google API 发现服务构建自动生成的 API 客户端。JSON 格式的 API 架构托管在 Cloud Storage 存储桶中。

from google.cloud import storage
from googleapiclient import discovery

_OPTIMIZER_API_DOCUMENT_BUCKET = 'caip-optimizer-public'
_OPTIMIZER_API_DOCUMENT_FILE = 'api/ml_public_google_rest_v1.json'

def read_api_document():
  client = storage.Client(PROJECT_ID)
  bucket = client.get_bucket(_OPTIMIZER_API_DOCUMENT_BUCKET)
  blob = bucket.get_blob(_OPTIMIZER_API_DOCUMENT_FILE)
  return blob.download_as_string()

ml = discovery.build_from_document(service=read_api_document())
print('Successfully built the client.')

研究配置

在本教程中,AI Platform Optimizer 会创建研究并请求试验。对于每个试验,您将创建一个 AI Platform Training 内置算法作业,以使用建议的超参数训练模型。每个试验的测量结果均作为 accuracy 模型上报。

AI Platform Optimizer 提供的条件参数功能为超参数定义树状搜索空间。顶级超参数是 model_type,由 LINEARWIDE_AND_DEEP 两者确定。每种模型类型都有对应的二级超参数用于调节:

  • 如果 model_typeLINEAR,则会调节 learning_rate
  • 如果 model_typeWIDE_AND_DEEP,则会同时调节 learning_ratednn_learning_rate

模型类型为 LINEAR 或 WIDE_AND_DEEP 的决策树;LINEAR 指向 learning_rate,WIDE_AND_DEEP 指向 learning_rate 和 dnn_learning_rate

下面是一个作为分层 python 字典构建的研究配置示例。它已经填好了。请运行该单元以配置研究。

param_learning_rate = {
    'parameter': 'learning_rate',
    'type' : 'DOUBLE',
    'double_value_spec' : {
        'min_value' : 0.00001,
        'max_value' : 1.0
    },
    'scale_type' : 'UNIT_LOG_SCALE',
    'parent_categorical_values' : {
        'values': ['LINEAR', 'WIDE_AND_DEEP']
    },
}

param_dnn_learning_rate = {
    'parameter': 'dnn_learning_rate',
    'type' : 'DOUBLE',
    'double_value_spec' : {
        'min_value' : 0.00001,
        'max_value' : 1.0
    },
    'scale_type' : 'UNIT_LOG_SCALE',
    'parent_categorical_values' : {
        'values': ['WIDE_AND_DEEP']
    },
}

param_model_type = {
    'parameter': 'model_type',
    'type' : 'CATEGORICAL',
    'categorical_value_spec' : {'values': ['LINEAR', 'WIDE_AND_DEEP']},
    'child_parameter_specs' : [param_learning_rate, param_dnn_learning_rate,]
}

metric_accuracy = {
    'metric' : 'accuracy',
    'goal' : 'MAXIMIZE'
}

study_config = {
    'algorithm' : 'ALGORITHM_UNSPECIFIED',  # Let the service choose the `default` algorithm.
    'parameters' : [param_model_type,],
    'metrics' : [metric_accuracy,],
}

study = {'study_config': study_config}
print(json.dumps(study, indent=2, sort_keys=True))

创建研究

接下来,创建研究,您稍后将运行该研究以优化目标。

# Creates a study
req = ml.projects().locations().studies().create(
    parent=study_parent(), studyId=STUDY_ID, body=study)
try :
  print(req.execute())
except errors.HttpError as e:
  if e.resp.status == 409:
    print('Study already existed.')
  else:
    raise e

设置输入/输出参数

接下来,设置以下输出参数。

OUTPUT_BUCKETOUTPUT_DIR 是用作 AI Platform Training 作业的 'job_dir' 的 Cloud Storage 存储桶和目录。OUTPUT_BUCKET 应该是项目中的存储桶,而 OUTPUT_DIR 是您要为存储桶中的输出文件夹指定的名称。

job_dir 格式为 'gs://$OUTPUT_BUCKET/$OUTPUT_DIR/'

TRAINING_DATA_PATH 是输入训练数据集的路径。

# `job_dir` will be `gs://${OUTPUT_BUCKET}/${OUTPUT_DIR}/${job_id}`
OUTPUT_BUCKET = '[output-bucket-name]' #@param {type: 'string'}
OUTPUT_DIR = '[output-dir]' #@param {type: 'string'}
TRAINING_DATA_PATH = 'gs://caip-optimizer-public/sample-data/raw_census_train.csv' #@param {type: 'string'}

print('OUTPUT_BUCKET: {}'.format(OUTPUT_BUCKET))
print('OUTPUT_DIR: {}'.format(OUTPUT_DIR))
print('TRAINING_DATA_PATH: {}'.format(TRAINING_DATA_PATH))

# Create the bucket in Cloud Storage
! gsutil mb -p $PROJECT_ID gs://$OUTPUT_BUCKET/

指标评估

本部分定义了用于执行试验评估的方法。

对于每个试验,请提交 AI Platform 内置算法作业,以使用 AI Platform Optimizer 建议的超参数训练机器学习模型。当作业完成时,每个作业都会将模型摘要文件写入 Cloud Storage。您可以从作业目录检索模型准确率,并将其作为试验的 final_measurement 上报。

import logging
import math
import subprocess
import os
import yaml

from google.cloud import storage

_TRAINING_JOB_NAME_PATTERN = '{}_condition_parameters_{}_{}'
_IMAGE_URIS = {'LINEAR' : 'gcr.io/cloud-ml-algos/linear_learner_cpu:latest',
               'WIDE_AND_DEEP' : 'gcr.io/cloud-ml-algos/wide_deep_learner_cpu:latest'}
_STEP_COUNT = 'step_count'
_ACCURACY = 'accuracy'

def EvaluateTrials(trials):
  """Evaluates trials by submitting training jobs to AI Platform Training service.

     Args:
       trials: List of Trials to evaluate

     Returns: A dict of <trial_id, measurement> for the given trials.
  """
  trials_by_job_id = {}
  mesurement_by_trial_id = {}

  # Submits a AI Platform Training job for each trial.
  for trial in trials:
    trial_id = int(trial['name'].split('/')[-1])
    model_type = _GetSuggestedParameterValue(trial, 'model_type', 'stringValue')
    learning_rate = _GetSuggestedParameterValue(trial, 'learning_rate',
                                                'floatValue')
    dnn_learning_rate = _GetSuggestedParameterValue(trial, 'dnn_learning_rate',
                                                    'floatValue')
    job_id = _GenerateTrainingJobId(model_type=model_type,
                                    trial_id=trial_id)
    trials_by_job_id[job_id] = {
        'trial_id' : trial_id,
        'model_type' : model_type,
        'learning_rate' : learning_rate,
        'dnn_learning_rate' : dnn_learning_rate,
    }
    _SubmitTrainingJob(job_id, trial_id, model_type, learning_rate, dnn_learning_rate)

  # Waits for completion of AI Platform Training jobs.
  while not _JobsCompleted(trials_by_job_id.keys()):
    time.sleep(60)

  # Retrieves model training result(e.g. global_steps, accuracy) for AI Platform Training jobs.
  metrics_by_job_id = _GetJobMetrics(trials_by_job_id.keys())
  for job_id, metric in metrics_by_job_id.items():
    measurement = _CreateMeasurement(trials_by_job_id[job_id]['trial_id'],
                                     trials_by_job_id[job_id]['model_type'],
                                     trials_by_job_id[job_id]['learning_rate'],
                                     trials_by_job_id[job_id]['dnn_learning_rate'],
                                     metric)
    mesurement_by_trial_id[trials_by_job_id[job_id]['trial_id']] = measurement
  return mesurement_by_trial_id

def _CreateMeasurement(trial_id, model_type, learning_rate, dnn_learning_rate, metric):
  if not metric[_ACCURACY]:
    # Returns `none` for trials without metrics. The trial will be marked as `INFEASIBLE`.
    return None
  print(
      'Trial {0}: [model_type = {1}, learning_rate = {2}, dnn_learning_rate = {3}] => accuracy = {4}'.format(
          trial_id, model_type, learning_rate,
          dnn_learning_rate if dnn_learning_rate else 'N/A', metric[_ACCURACY]))
  measurement = {
      _STEP_COUNT: metric[_STEP_COUNT],
      'metrics': [{'metric': _ACCURACY, 'value': metric[_ACCURACY]},]}
  return measurement

def _SubmitTrainingJob(job_id, trial_id, model_type, learning_rate, dnn_learning_rate=None):
  """Submits a built-in algo training job to AI Platform Training Service."""
  try:
    if model_type == 'LINEAR':
      subprocess.check_output(_LinearCommand(job_id, learning_rate), stderr=subprocess.STDOUT)
    elif model_type == 'WIDE_AND_DEEP':
      subprocess.check_output(_WideAndDeepCommand(job_id, learning_rate, dnn_learning_rate), stderr=subprocess.STDOUT)
    print('Trial {0}: Submitted job [https://console.cloud.google.com/ai-platform/jobs/{1}?project={2}].'.format(trial_id, job_id, PROJECT_ID))
  except subprocess.CalledProcessError as e:
    logging.error(e.output)

def _GetTrainingJobState(job_id):
  """Gets a training job state."""
  cmd = ['gcloud', 'ai-platform', 'jobs', 'describe', job_id,
         '--project', PROJECT_ID,
         '--format', 'json']
  try:
    output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, timeout=3)
  except subprocess.CalledProcessError as e:
    logging.error(e.output)
  return json.loads(output)['state']

def _JobsCompleted(jobs):
  """Checks if all the jobs are completed."""
  all_done = True
  for job in jobs:
    if _GetTrainingJobState(job) not in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
      print('Waiting for job[https://console.cloud.google.com/ai-platform/jobs/{0}?project={1}] to finish...'.format(job, PROJECT_ID))
      all_done = False
  return all_done

def _RetrieveAccuracy(job_id):
  """Retrices the accuracy of the trained model for a built-in algorithm job."""
  storage_client = storage.Client(project=PROJECT_ID)
  bucket = storage_client.get_bucket(OUTPUT_BUCKET)
  blob_name = os.path.join(OUTPUT_DIR, job_id, 'model/deployment_config.yaml')
  blob = storage.Blob(blob_name, bucket)
  try:
    blob.reload()
    content = blob.download_as_string()
    accuracy = float(yaml.safe_load(content)['labels']['accuracy']) / 100
    step_count = int(yaml.safe_load(content)['labels']['global_step'])
    return {_STEP_COUNT: step_count, _ACCURACY: accuracy}
  except:
    # Returns None if failed to load the built-in algo output file.
    # It could be due to job failure and the trial will be `INFEASIBLE`
    return None

def _GetJobMetrics(jobs):
  accuracies_by_job_id = {}
  for job in jobs:
    accuracies_by_job_id[job] = _RetrieveAccuracy(job)
  return accuracies_by_job_id

def _GetSuggestedParameterValue(trial, parameter, value_type):
  param_found = [p for p in trial['parameters'] if p['parameter'] == parameter]
  if param_found:
    return param_found[0][value_type]
  else:
    return None

def _GenerateTrainingJobId(model_type, trial_id):
  return _TRAINING_JOB_NAME_PATTERN.format(STUDY_ID, model_type, trial_id)

def _GetJobDir(job_id):
  return os.path.join('gs://', OUTPUT_BUCKET, OUTPUT_DIR, job_id)

def _LinearCommand(job_id, learning_rate):
  return ['gcloud', 'ai-platform', 'jobs', 'submit', 'training', job_id,
          '--scale-tier', 'BASIC',
          '--region', 'us-central1',
          '--master-image-uri', _IMAGE_URIS['LINEAR'],
          '--project', PROJECT_ID,
          '--job-dir', _GetJobDir(job_id),
          '--',
          '--preprocess',
          '--model_type=classification',
          '--batch_size=250',
          '--max_steps=1000',
          '--learning_rate={}'.format(learning_rate),
          '--training_data_path={}'.format(TRAINING_DATA_PATH)]

def _WideAndDeepCommand(job_id, learning_rate, dnn_learning_rate):
  return ['gcloud', 'ai-platform', 'jobs', 'submit', 'training', job_id,
          '--scale-tier', 'BASIC',
          '--region', 'us-central1',
          '--master-image-uri', _IMAGE_URIS['WIDE_AND_DEEP'],
          '--project', PROJECT_ID,
          '--job-dir', _GetJobDir(job_id),
          '--',
          '--preprocess',
          '--test_split=0',
          '--use_wide',
          '--embed_categories',
          '--model_type=classification',
          '--batch_size=250',
          '--learning_rate={}'.format(learning_rate),
          '--dnn_learning_rate={}'.format(dnn_learning_rate),
          '--max_steps=1000',
          '--training_data_path={}'.format(TRAINING_DATA_PATH)]

请求建议/试验的配置

client_id - 请求建议的客户的标识符。如果多个 SuggestTrialsRequests 具有相同的 client_id,则服务将返回相同的建议试验(如果该试验 PENDING),并在建议的最后一个试验已完成的情况下提供一个新试验。

suggestion_count_per_request - 每个请求中请求的建议数(试验数)。

max_trial_id_to_stop - 停止前要探索的试验次数。将其设置为 4 可以缩短运行代码的时间,因此预计不会收敛。如需达到收敛状态,试验次数可能需要大约 20 次(一个好的做法是用总维度乘以 10)。

client_id = 'client1' #@param {type: 'string'}
suggestion_count_per_request =   2 #@param {type: 'integer'}
max_trial_id_to_stop =   4 #@param {type: 'integer'}

print('client_id: {}'.format(client_id))
print('suggestion_count_per_request: {}'.format(suggestion_count_per_request))
print('max_trial_id_to_stop: {}'.format(max_trial_id_to_stop))

请求并运行 AI Platform Optimizer 试验

运行试验。

current_trial_id = 0
while current_trial_id < max_trial_id_to_stop:
  # Request trials
  resp = ml.projects().locations().studies().trials().suggest(
    parent=trial_parent(STUDY_ID),
    body={'client_id': client_id, 'suggestion_count': suggestion_count_per_request}).execute()
  op_id = resp['name'].split('/')[-1]

  # Polls the suggestion long-running operations.
  get_op = ml.projects().locations().operations().get(name=operation_name(op_id))
  while True:
      operation = get_op.execute()
      if 'done' in operation and operation['done']:
        break
      time.sleep(1)

  # Featches the suggested trials.
  trials = []
  for suggested_trial in get_op.execute()['response']['trials']:
    trial_id = int(suggested_trial['name'].split('/')[-1])
    trial = ml.projects().locations().studies().trials().get(name=trial_name(STUDY_ID, trial_id)).execute()
    if trial['state'] not in ['COMPLETED', 'INFEASIBLE']:
      print("Trial {}: {}".format(trial_id, trial))
      trials.append(trial)

  # Evaluates trials - Submit model training jobs using AI Platform Training built-in algorithms.
  measurement_by_trial_id = EvaluateTrials(trials)

  # Completes trials.
  for trial in trials:
    trial_id = int(trial['name'].split('/')[-1])
    current_trial_id = trial_id
    measurement = measurement_by_trial_id[trial_id]
    print(("=========== Complete Trial: [{0}] =============").format(trial_id))
    if measurement:
      # Completes trial by reporting final measurement.
      ml.projects().locations().studies().trials().complete(
        name=trial_name(STUDY_ID, trial_id),
        body={'final_measurement' : measurement}).execute()
    else:
      # Marks trial as `infeasbile` if when missing final measurement.
      ml.projects().locations().studies().trials().complete(
        name=trial_name(STUDY_ID, trial_id),
        body={'trial_infeasible' : True}).execute()

[可选] 使用您自己的参数创建试验

除了从服务请求建议参数(suggest 方法)之外,AI Platform Optimizer 的 API 还允许用户使用自己的参数创建试验(create 方法)。AI Platform Optimizer 将帮助记录用户完成的实验,并利用这些知识生成新建议。

例如,如果您使用自己的 model_typelearning_rate(而非 AI Platform Optimizer 建议的参数)运行模型训练作业,则可以在研究中为其创建试验。

# User has to leave `trial.name` unset in CreateTrial request, the service will
# assign it.
custom_trial = {
  "clientId": "client1",
  "finalMeasurement": {
    "metrics": [
      {
        "metric": "accuracy",
        "value": 0.86
      }
    ],
    "stepCount": "1000"
  },
  "parameters": [
    {
      "parameter": "model_type",
      "stringValue": "LINEAR"
    },
    {
      "floatValue": 0.3869103706121445,
      "parameter": "learning_rate"
    }
  ],
  "state": "COMPLETED"
}

trial = ml.projects().locations().studies().trials().create(
    parent=trial_parent(STUDY_ID), body=custom_trial).execute()

print(json.dumps(trial, indent=2, sort_keys=True))

列出试验

列出每次优化试验训练的结果。

resp = ml.projects().locations().studies().trials().list(parent=trial_parent(STUDY_ID)).execute()
print(json.dumps(resp, indent=2, sort_keys=True))

清理

如需清理此项目中使用的所有 Google Cloud 资源,您可以删除用于本教程的 Google Cloud 项目