Cloud Dataflow パイプラインでの予測のための機械学習モデルの比較

このソリューションでは、Cloud Dataflow パイプラインで機械学習モデルを呼び出すためのさまざまな設計アプローチを説明して比較し、それぞれのアプローチを選択する際のトレードオフを検証します。バッチ処理パイプラインとストリーム処理パイプラインの両方で、さまざまなアプローチを検討し、これらのトレードオフを説明する一連のテストの結果を紹介します。このソリューションは、機械学習モデルを構築するデータ サイエンティストではなく、トレーニングされたモデルをデータ処理パイプラインに統合する担当者を念頭に置いて設計されています。

はじめに

この ML モデルを Cloud Dataflow パイプラインに統合する担当者は、さまざまなアプローチの内容や、システム要件に最適なアプローチを知りたいと思うでしょう。以下を検討する必要があります。

  • スループット
  • レイテンシ
  • 料金
  • 実装
  • メンテナンス

これらの考慮事項のバランスをとることは必ずしも容易ではありませんが、このソリューションは、優先順位に基づいて意思決定プロセスをナビゲートするのに役立ちます。このソリューションでは、バッチ データ パイプラインとストリーム データ パイプラインの TensorFlow でトレーニングされた機械学習(ML)モデルで予測を行うための 3 つのアプローチを比較します。

  • ストリーミング パイプライン用に REST / HTTP API としてデプロイされたモデルを使用する。
  • バッチ パイプライン用の Cloud Machine Learning Engine(Cloud ML Engine)バッチ予測ジョブを使用する。
  • バッチ パイプラインとストリーミング パイプラインの両方に Cloud Dataflow 直接モデル予測を使用する。

すべてのテストで、Natality データセットと呼ばれる既存のトレーニングされたモデルを使用しています。これは、さまざまな入力に基づいて赤ちゃんの体重を予測します。このソリューションの目標はモデルを構築することではないため、モデルの構築方法やトレーニング方法については説明しません。Natality データセットの詳細については、次のステップをご覧ください。

プラットフォーム

データ パイプラインを実行し、トレーニングされた ML モデルを呼び出すには、さまざまな方法があります。ただし、機能要件は常に同じです。

  1. 制限付き(バッチ)ソースまたは無制限(ストリーミング)ソースからのデータの取り込み。データを取り込むソースの例には、センサーデータ、ウェブサイトの相互作用、金融トランザクションなどがあります。
  2. 予測のために ML モデルを呼び出すことによって入力データを変換し、拡充する。たとえば、JSON ファイルを解析して関連するフィールドを抽出し、メンテナンス日の予測、プロダクトの推奨事項の作成、不正行為の検出を行います。
  3. 分析やバックアップのために変換されたデータと予測を保存するか、新しいイベントや追加のパイプラインをトリガーするキューシステムに渡す。たとえば、潜在的な不正行為をリアルタイムで検出する方法や、ダッシュボードからアクセス可能なストレージにメンテナンス スケジュール情報を保存する方法などがあります。

バッチ ETL プロセスでの予測を使用してデータを変換して拡充する場合は、データバッチ全体に必要な時間を短縮するために、スループットを最大化することを目標にします。一方、オンライン予測のためにストリーミング データを処理する場合は、各予測を(ほぼ)リアルタイムで受信するためにレイテンシを最小化することを目標にします。したがって、モデルの呼び出しがボトルネックになる可能性があります。

コア コンポーネント

このソリューションのバッチテストとストリーミング テストでは、次の 3 つの主要なテクノロジーが使用されます。

  • Cloud Dataflow で稼働し、データを処理する Apache Beam
  • ML モデルを実装してトレーニングする TensorFlow
  • 一部のテストでは、トレーニングされた ML モデル用のホスティング プラットフォームとしての Cloud ML Engine が、バッチ予測とオンライン予測を行います。

このソリューションでデータ パイプラインを実行するために、Cloud Dataflow 上で動作する Apache Beam を選択しました。理由は次のとおりです。

  • Apache Beam は、ストリーミングとバッチの両方のデータ処理を行うオープンソースの統合プログラミング モデルです。
  • Cloud Dataflow は、サーバーなしで Apache Beam ジョブを実行できる Google Cloud Platform(GCP)プロダクトです。

TensorFlow は、機械学習フレームワークとして使用される Google のオープンソースの数学ライブラリです。TensorFlow を使用すると、単一のマシンまたは分散環境でモデルの構築、トレーニング、提供を行えます。モデルはさまざまなデバイスに移植でき、使用可能な CPUGPUTPU などのリソースを活用してトレーニングやサービスを提供できます。

Cloud ML Engine は、ハイパーパラメータの調整機能を使用してトレーニング、調整を行えるサーバーレス プラットフォームであり、DevOps で必要な最小限の管理で大量の TensorFlow モデルを処理します。Cloud ML Engine は、トレーニングされたモデルをオンライン予測用の REST API としてデプロイすること、バッチ予測ジョブを送信することをサポートしています。Cloud ML Engine は、モデルをマイクロサービスとして提供できるいくつかのオプションの 1 つです。

このソリューションで詳しく説明しているアプローチでは、データ処理パイプラインを用意するために Cloud Dataflow を、モデルを HTTP エンドポイントとしてホストするために Cloud ML Engine を使用しています。ただし、これらのアプローチは他のテクノロジーに置換できます。HTTP と直接的な TensorFlow モデルのパフォーマンス比較が大幅に変化することはありません。

バッチデータの処理とストリーミング データの処理

このソリューションのテストには、バッチとストリーミングの両方のユースケースが含まれます。各テストでは、無制限のソースと制限付きのソースでは操作要件が異なるため、入力と出力に異なる GCP プロダクトを利用しています。

制限付きデータセットのバッチ処理

図 1 は、典型的なバッチ処理パイプラインでは、生の入力データが Cloud Storage などのオブジェクト ストレージに格納されることを示しています。構造化データ ストレージ形式には、カンマ区切り値(CSV)、最適化された行列(ORC)、Parquet、または Avro があります。これらの形式は、データがデータベースまたはログから取得される場合によく使用されます。

一般的なバッチ処理パイプラインのアーキテクチャ
図 1. バッチ処理アーキテクチャ

BigQuery などの分析プラットフォームの中には、クエリ機能に加えてストレージを提供するものもあります。BigQuery ではストレージ用に Capacitor を使用します。Cloud Dataflow 上の Apache Beam では、バッチ処理パイプラインの他のストレージ オプションに加えて、BigQuery と Cloud Storage の両方から読み書きできます。

無制限のデータ ストリームのストリーム処理

ストリーミングの場合、データ処理パイプラインへの入力は通常、図 2 に示すようにメッセージング システムです。Cloud Pub/Sub や Kafka などのテクノロジーは、通常、JSON、CSV、protobuf などの形式の個々のデータポイントを取り込むために使用されます。

一般的なストリーム処理パイプラインのアーキテクチャ
図 2. ストリーム処理アーキテクチャ

データポイントは、ウィンドウ処理機能を使用して一時的なイベント処理を実行することによって、個別に、またはマイクロバッチでグループとして処理できます。処理されたデータは、以下を含めたいくつかの宛先に送信できます。

  1. ストリーミング API によるアドホック分析用の BigQuery。
  2. リアルタイム情報を提供するための Cloud Bigtable。
  3. 後続のプロセスやパイプラインをトリガーするための Cloud Pub/Sub トピック。

制限付きと無制限の両方のデータソース シンクのソースコネクタ(入力)とシンクコネクタ(出力)の完全なリストは、Apache Beam I/O ページにあります。

TensorFlow モデルの呼び出し

TensorFlow トレーニング モデルは、次の 3 つの方法で呼び出せます。

  1. オンライン予測用の HTTP エンドポイントを介して呼び出す。
  2. バッチ予測とオンライン予測のために保存されたモデルファイルを使用して直接呼び出す。
  3. バッチ予測用の Cloud ML Engine バッチ予測ジョブで呼び出す。

オンライン予測用の HTTP エンドポイント

TensorFlow モデルは、ストリーム データ処理パイプラインまたはクライアント アプリケーションを介して呼び出される HTTP エンドポイントとしてデプロイされ、リアルタイムで予測を行います。

TensorFlow ServingSeldon などの他のホスティング サービスを使用して、オンライン予測用の HTTP エンドポイントとして TensorFlow モデルをデプロイできます。図 3 に示すように、次のオプションのいずれかを選択できます。

  1. 1 つまたは複数の Compute Engine インスタンスにモデルをデプロイします。
  2. Compute Engine または Google Kubernetes EngineDocker イメージを使用します。
  3. Kubeflow を活用して、Kubernetes または Google Kubernetes Engine へのデプロイを容易にします。
  4. App Engine を Cloud Endpoints と併用して、ウェブアプリでモデルをホストします。
  5. フルマネージド ML トレーニングと GCP 上でのサービス提供を行う Cloud ML Engine を使用します。
モデルを HTTP エンドポイントとして提供する Cloud Dataflow のオプション
図 3. モデルを HTTP エンドポイントとして提供する Cloud Dataflow のさまざまなオプション

Cloud ML Engine はフルマネージド型のサービスであるため、他のオプションより実装が簡単です。したがって、このテストでは、モデルを HTTP エンドポイントとして提供するためのオプションとして、このサービスを使用します。さまざまな HTTP モデル提供オプションを比較する代わりに、直接モデルと Cloud ML Engine の HTTP エンドポイントとのパフォーマンスの違いに焦点を当てることができます。

ML Engine によるオンライン予測

オンライン予測を行うには、次の 2 つのタスクを実行する必要があります。

  1. モデルのデプロイ。
  2. 推論のためにデプロイされたモデルとの対話(つまり、予測を行う)。

Cloud ML Engine を使用して HTTP エンドポイントとしてモデルをデプロイするには、次の手順が必要です。

  1. トレーニング済みのモデルファイルを Cloud Storage で使用できることを確認します。
  2. gcloud ml-engine models create コマンドを使用してモデルを作成します。
  3. Cloud Storage のモデルファイルとともに、gcloud ml-engine versions create コマンドを使用してモデル バージョンをデプロイします。

モデルをデプロイするには、次のようなコマンドを使用します。

PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

このコードは、モデル バージョン v1 で、GCP プロジェクトに babyweight_estimator という Cloud ML Engine モデルを作成します。

モデルがデプロイされたら、そのモデルを呼び出せるようになります。次の Python コードは、Cloud ML Engine でモデル バージョンを REST API として呼び出す方法を示しています。

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)

def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

BigQuery や Cloud Storage などの大規模なデータセットがあり、プロセス全体のスループットを最大化する場合、バッチ予測用に ML モデルを HTTP エンドポイントとして提供することは推奨されていません。これにより、データポイントごとに 1 つの HTTP リクエストが生成され、膨大な量の HTTP リクエストが発生します。次のセクションでは、バッチ予測により適したオプションを示します。

バッチ予測とオンライン予測のための直接モデル

直接モデル予測テクノロジーでは、Cloud Dataflow インスタンスのローカル TensorFlow SavedModel を利用します。保存されたモデルは、TensorFlow モデルの構築とトレーニングを完了した後に作成された出力ファイルのコピーです。TensorFlow SavedModel は、次のいずれかです。

  • Cloud Dataflow ジョブとして送信されるパイプライン ソースコードの一部。
  • 図 4 に示すように、Cloud Storage からダウンロードされた内容。
Cloud Dataflow における直接モデル予測
図 4. Cloud Dataflow における直接モデル予測

このソリューションでは、GitHub のソースコードの一部である SavedModel を使用します。インスタンスにモデルを読み込むには、次の操作を行います。

  1. Cloud Dataflow ジョブを作成するときに、モデルファイルを含めて、読み込むパイプラインの依存関係を指定します。次の Python コードは、Cloud Dataflow ジョブと一緒に送信されるモデルファイルが含まれる setup.py ファイルを示しています。

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. パイプラインでローカルモデル ファイルを呼び出します。これにより、指定されたインスタンスの予測が生成されます。以下の Python コードにその方法を示しています。

    predictor_fn = None
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

詳細については、Apache Beam の複数ファイルの依存関係ページをご覧ください。

Cloud ML Engine のバッチ予測ジョブ

Cloud ML Engine では、モデルを HTTP エンドポイントとしてデプロイするほか、デプロイされたモデル バージョンまたは Cloud Storage の TensorFlow SavedModel を使用してバッチ予測ジョブを実行できます。

Cloud ML Engine のバッチ予測ジョブは、入力データファイルの Cloud Storage の場所をパラメータとして取得します。モデルを使用してそのデータの予測を取得し、予測結果をパラメータとして指定された Cloud Storage の別の出力場所に保存します。次の例は、Cloud ML Engine のバッチ予測ジョブを送信する gcloud コマンドを示しています。

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

ポイントごとのオンライン予測とマイクロバッチのオンライン予測

リアルタイム予測パイプラインでは、モデルを HTTP エンドポイントとして提供している場合でも、モデルをワーカーから直接使用している場合でも、着信データポイントの予測を取得する 2 つのオプションがあります。

  • 個々のポイント。この基本的なオプションでは、各データポイントを個別にモデルに送信して予測を得ます。
  • マイクロバッチ。この最適化オプションでは、ウィンドウ処理機能を使用して、特定の期間内(たとえば、5 秒ごと)にデータポイントをグループ化する、マイクロバッチを作成します。その後、マイクロバッチをモデルに送信して、すべてのインスタンスの予測を同時に取得します。

次の Python コードは、Apache Beam パイプラインでウィンドウ処理機能を使用して時間ベースのマイクロバッチを作成する方法を示しています。

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

マイクロバッチ処理のアプローチでは、HTTP エンドポイントとしてデプロイされたモデルを使用します。これにより、HTTP リクエスト数が大幅に削減され、レイテンシが短縮されます。直接モデルで使用されるマイクロバッチ処理方法であっても、演算はベクトル化されているため、長さが 1 のテンソルを送信するよりも、N 個のインスタンスがあるテンソルを予測用に送信する方が効率的です。

バッチテスト

バッチテストでは、TensorFlow 回帰モデルを使用して BigQuery の Natality データセットで赤ちゃんの体重を予測します。Cloud Dataflow のバッチ パイプラインを使用して、Cloud Storage の予測結果を CSV ファイルとして保存する必要があります。次のセクションでは、この作業を達成するために試したさまざまなテストについて説明します。

アプローチ 1: 直接モデル予測を使用した Cloud Dataflow

このアプローチでは、Cloud Dataflow ワーカーが、各レコードのバッチ処理パイプラインで予測のために直接呼び出される TensorFlow SavedModel をホストします。図 5 は、このアプローチのアーキテクチャの概要を示しています。

バッチ アプローチ 1: 直接モデル予測を使用した Cloud Dataflow
図 5. バッチ アプローチ 1: 直接モデル予測を使用した Cloud Dataflow

Cloud Dataflow パイプラインでは、次の手順に従います。

  1. BigQuery からデータを読み取ります。
  2. 予測用の BigQuery レコードを準備します。
  3. 各レコードの予測を取得するために、ローカルの TensorFlow SavedModel を呼び出します。
  4. 結果(入力レコードと予測された赤ちゃんの体重)を CSV ファイルに変換します。
  5. CSV ファイルを Cloud Storage に書き込みます。

このアプローチでは、リモート サービス(HTTP エンドポイントとして Cloud ML Engine にデプロイされたモデルなど)への呼び出しはありません。予測は、TensorFlow SavedModel を使用して各 Cloud Dataflow ワーカー内でローカルに行われます。

アプローチ 2: Cloud ML Engine を使用した Cloud Dataflow でのバッチ予測

このアプローチでは、TensorFlow SavedModel が Cloud Storage に保存され、Cloud ML Engine によって予測に使用されます。ただし、前述のアプローチのように各レコードについて、デプロイされたモデルへの API 呼び出しを行う代わりに、予測用にデータが準備され、バッチとして送信されます。

このアプローチには 2 つのフェーズがあります。

  1. Cloud Dataflow は予測用に BigQuery からデータを準備し、そのデータを Cloud Storage に保存します。
  2. Cloud ML Engine のバッチ予測ジョブは、準備されたデータとともに送信され、予測結果は Cloud Storage に保存されます。

図 6 に、この 2 フェーズのアプローチのアーキテクチャ全体を示します。

バッチ アプローチ 2: Cloud ML Engine を使用した Cloud Dataflow のバッチ予測
図 6. バッチ アプローチ 2: Cloud ML Engine を使用した Cloud Dataflow でのバッチ予測

Cloud Dataflow パイプラインを含めたワークフローの手順は次のとおりです。

  1. BigQuery からデータを読み取ります。
  2. 予測用の BigQuery レコードを準備します。
  3. Cloud Storage に JSON データを書き込みます。モデル内の serving_fn 関数では、JSON インスタンスを入力として想定しています。
  4. 準備したデータを Cloud Storage に保存した後、Cloud ML Engine のバッチ予測ジョブを送信します。このジョブでは、予測結果も Cloud Storage に書き込みます。

Cloud Dataflow ジョブでは、Cloud ML Engine 予測ジョブを送信する代わりに、予測用のデータを準備します。つまり、データ準備タスクとバッチ予測タスクは密結合されていません。Cloud Functions、Airflow、または任意のスケジューラで、Cloud Dataflow ジョブを実行してから、バッチ予測用に Cloud ML Engine ジョブを送信することによって、ワークフローをオーケストレートできます。

データが次のケースに該当する場合、パフォーマンスと使いやすさの両方のために、Cloud ML Engine のバッチ予測が推奨されます。

  • 以前にデータの取り込みプロセスが実行されたため、Cloud Storage に予測用の形式でデータが存在する。
  • ワークフローの第 1 フェーズを制御できない。たとえば、Cloud Storage に保存される予測用データを準備する Cloud Dataflow パイプラインを制御できない場合です。

テスト構成

3 回のテストで以下の構成を使用しました。

  • データサイズ: 10K100K1M、および 10M
  • Cloud Storage クラス: Regional Storage
  • Cloud Storage のロケーション: europe-west1-b
  • Cloud Dataflow のリージョン: europe-west1-b
  • Cloud Dataflow ワーカー マシンタイプ: n1-standard-1
  • 最大 100 万レコードまでのバッチデータ用の Cloud Dataflow の自動スケーリング
  • Cloud Dataflow num_worker: バッチデータの場合は 20、最大 1,000 万レコード
  • Cloud ML Engine のバッチ予測 max-worker-count 設定: 20

Cloud Storage のロケーションと Cloud Dataflow のリージョンは同じにする必要があります。このソリューションでは、任意の値として europe-west1-b リージョンを使用します。

結果

次の表は、さまざまなサイズのデータセットでバッチ予測と直接モデル予測を行った結果(タイミング)をまとめたものです。

バッチ データ サイズ 指標 Cloud ML Engine を使用した Cloud Dataflow でのバッチ予測 直接モデル予測を使用した Cloud Dataflow
1 万行 実行時間 15 分 30 秒

(Cloud Dataflow: 7 分 47 秒 +
Cloud ML Engine: 7 分 43 秒)
8 分 24 秒
合計 vCPU 時間 0.301 時間

(Cloud Dataflow: 0.151 時間 +
Cloud ML Engine: 0.15 時間)
0.173 時間
10 万行 実行時間 16 分 37 秒

(Cloud Dataflow: 8 分 39 秒 +
Cloud ML Engine: 7 分 58 秒)
10 分 31 秒
合計 vCPU 時間 0.334 時間

(Cloud Dataflow: 0.184 時間 +
Cloud ML Engine: 0.15 時間)
0.243 時間
100 万行 実行時間 21 分 11 秒
(Cloud Dataflow: 11 分 07 秒 +
Cloud ML Engine: 10 分 04 秒)
17 分 12 秒
合計 vCPU 時間 0.446 時間

(Cloud Dataflow: 0.256 時間 +
Cloud ML Engine: 0.19 時間)
1.115 時間
1,000 万行 実行時間 33 分 08 秒
(Cloud Dataflow: 12 分 15 秒 +
Cloud ML Engine: 20 分 53 秒)
25 分 02 秒
合計 vCPU 時間 5.251 時間

(Cloud Dataflow: 3.581 時間 +
Cloud ML Engine: 1.67 時間)
7.878 時間

図 7 にこれらの結果のグラフを示します。

4 つの異なるデータセット サイズに対する 3 つのアプローチのタイミングを示すグラフ
図 7. 4 つの異なるデータセット サイズに対する 3 つのアプローチのタイミングを示すグラフ

結果が示すように、Cloud Storage に予測に使用される形式のデータがすでに存在しているため、Cloud ML Engine のバッチ予測ジョブでは、入力データの予測を生成するための時間が少なくて済みます。ただし、バッチ予測ジョブを前処理ステップ(BigQuery から Cloud Storage に予測用のデータを抽出して準備する)および後処理ステップ(データを BigQuery に保存する)と組み合わせると、直接モデルアプローチのほうが全体的な実行時間は短くなります。さらに、(ストリーミング テストに関して後で説明する)マイクロバッチ処理を使用すると、直接モデル予測アプローチのパフォーマンスをさらに改善することができます。

ストリーム テスト

ストリーミング テストでは、Cloud Dataflow パイプラインで Cloud Pub/Sub トピックからデータポイントを読み取り、ストリーミング API を使用して BigQuery にデータを書き込みます。Cloud Dataflow ストリーミング パイプラインでは、TensorFlow の赤ちゃんの体重の推定モデルを使用してデータを処理し、予測を取得します。

このトピックでは、データポイントを生成するストリーム シミュレータからデータを受け取ります。データポイントは、事前に定義された毎秒のイベントレートで、赤ちゃんの体重を予測するインスタンスです。これは、無制限のデータソースの実際の例をシミュレートします。次の Python コードは、Cloud Pub/Sub トピックに送信されるデータ ストリームをシミュレートします。

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

アプローチ 1: Cloud ML Engine を使用した Cloud Dataflow オンライン予測

このアプローチでは、TensorFlow モデルが、Cloud ML Engine で REST API としてデプロイされてホストされます。Cloud Dataflow ストリーミング パイプラインでは、Cloud Pub/Sub で取得した予測の各メッセージの API を呼び出します。図 8 は、このアプローチのアーキテクチャの概要を示しています。

ストリーム アプローチ 1: Cloud ML Engine を使用した Cloud Dataflow オンライン予測
図 8. ストリーム アプローチ 1: Cloud ML Engine を使用した Cloud Dataflow でのオンライン予測。HTTP リクエストには、マイクロバッチ内に単一のデータポイントまたはデータポイントのグループが含まれる場合があります。

このアプローチでは、Cloud Dataflow パイプラインで次の手順に従います。

  1. Cloud Pub/Sub トピックからメッセージを読み込みます。
  2. Cloud ML Engine モデルの API に HTTP リクエストを送信して、各メッセージの予測を取得します。
  3. ストリーミング API を使用して BigQuery に結果を書き込みます。

マイクロバッチ処理はより優れたアプローチです。つまり、Cloud Pub/Sub から読み取られる各メッセージについてモデルの REST API に対して HTTP リクエストを送信する代わりに、Cloud Dataflow では 1 秒間に受信したメッセージをグループ化します。次に、このグループのメッセージを、単一の HTTP リクエスト内のマイクロバッチとしてモデルの API に送信します。このアプローチでは、Cloud Dataflow パイプラインで次の手順に従います。

  1. Cloud Pub/Sub トピックからメッセージを読み込みます。
  2. 1 秒間のウィンドウ処理オペレーションを適用して、メッセージのマイクロバッチを作成します。
  3. マイクロバッチが含まれる HTTP リクエストを Cloud ML Engine モデルの API に送信して、メッセージの予測を取得します。
  4. ストリーミング API を使用して BigQuery に結果を書き込みます。

このアプローチには、以下の論理的根拠があります。

  1. Cloud ML Engine モデルなどのリモート サービスへの呼び出し回数を減らします。
  2. 各メッセージを処理する平均レイテンシを短縮します。
  3. パイプライン全体の処理時間を短縮します。

このテストでは、時間枠を 1 秒に設定しました。ただし、バッチとして Cloud ML Engine モードに送信されるメッセージの数であるマイクロバッチ サイズは一定ではありません。マイクロバッチ サイズは、メッセージの生成頻度(1 秒あたりのメッセージ数)によって異なります。

次のセクションでは、メッセージ数が毎秒 50、100、500 の 3 つの異なる頻度でのテストについて説明します。つまり、マイクロバッチ サイズは 50、100、500 です。

アプローチ 2: 直接モデル予測を使用した Cloud Dataflow

このアプローチは、バッチテストで使用されたアプローチと同様です。TensorFlow SavedModel は Cloud Dataflow ワーカーでホストされ、各レコードのストリーム処理パイプラインでの予測のために呼び出されます。図 9 は、このアプローチのアーキテクチャの概要を示しています。

ストリーム アプローチ 2: 直接モデル予測を使用した Cloud Dataflow
図 9. ストリーム アプローチ 2: 直接モデル予測を使用した Cloud Dataflow

このアプローチでは、Cloud Dataflow パイプラインで次の手順に従います。

  1. Cloud Pub/Sub トピックからメッセージを読み込みます。
  2. 各レコードの予測を取得するために、ローカルの TensorFlow SavedModel を呼び出します。
  3. ストリーミング API を使用して BigQuery に結果を書き込みます。

マイクロバッチ処理テクノロジーは、直接モデル予測アプローチを使用したストリーム パイプラインでも使用できます。1 つのデータ インスタンスのテンソルをモデルに送信する代わりに、N 個のデータ インスタンスのテンソルを送信できます。ここで、N は Cloud Dataflow の時間枠内にモデルで受信したメッセージ数です。このアプローチでは、TensorFlow モデルのベクトル化された演算を使用して、いくつかの予測を並行して取得します。

テスト構成

これらのテストには次の構成を使用しました。

  • ストリーム データ サイズ: 10K records (messages)
  • シミュレートされた 1 秒あたりのメッセージ数(MPS): 50100500
  • 時間枠のサイズ(マイクロバッチ テスト): 1 second
  • Cloud Dataflow のリージョン: europe-west1-b
  • Cloud Dataflow ワーカー マシンタイプ: n1-standard-1
  • Cloud Dataflow num_worker : 5 (自動スケーリングなし)
  • Cloud ML Engine モデルの API ノード: 3 (manualScale)

結果

次の表は、さまざまな量のデータ(1 秒あたりのメッセージ数)でストリーミング テストを実行した結果をまとめたものです。メッセージの頻度とは、1 秒あたりに送信されるメッセージ数を意味し、シミュレーション時間とは、すべてのメッセージを送信するための時間を意味します。

メッセージのストリーミング頻度 指標 Cloud ML Engine を使用した Cloud Dataflow でのオンライン予測   直接モデル予測を使用した Cloud Dataflow  
    単一のメッセージ マイクロバッチ処理 単一のメッセージ マイクロバッチ処理
毎秒 50 メッセージ

(シミュレーション時間: 3 分 20 秒)
合計時間 9 分 34 秒 7 分 44 秒 3 分 43 秒 3 分 22 秒
毎秒 100 メッセージ

(シミュレーション時間: 1 分 40 秒)
合計時間 6 分 03 秒 4 分 34 秒 1 分 51 秒 1 分 41 秒
毎秒 500 メッセージ

(シミュレーション時間: 20 秒)
合計時間 該当なし - デフォルト Cloud ML Engine のオンライン予測割り当て 2 分 47 秒 1 分 23 秒 48 秒

図 10 にこれらの結果のグラフを示します。

さまざまなアプローチと頻度のタイミングを示すグラフ
図 10. さまざまなアプローチと頻度のタイミングを示すグラフ

結果に示すように、マイクロバッチ処理テクノロジーは、Cloud ML Engine のオンライン予測と直接モデル予測の両方で実施パフォーマンスを向上させます。さらに、ストリーミング パイプラインで直接モデルを使用すると、オンライン予測のために外部 REST/HTTP API を呼び出す場合と比べて、パフォーマンスが 2 倍から 4 倍向上します。

まとめ

説明したアプローチとテスト結果に従って、以下のアプローチをおすすめします。

バッチ処理

  • バッチデータ処理パイプラインを構築し、パイプラインの一部として予測を行う場合、最高のパフォーマンスを得るには直接モデル アプローチを使用します。
  • ベクトル化された操作の並列化を利用するために予測用のローカルモデルを呼び出す前に、データポイントのマイクロバッチを作成することによって直接モデル アプローチのパフォーマンスを向上させます。
  • 予測に適した形式でデータが Cloud Storage に入力される場合、最高のパフォーマンスを得るには Cloud ML Engine のバッチ予測を使用します。
  • バッチ予測に GPU の機能を利用する場合は、Cloud ML Engine を使用します。
  • バッチ予測に Cloud ML Engine のオンライン予測を使用することは避けてください。

ストリーム処理

  • 最高のパフォーマンスを実現し、平均レイテンシを短縮するには、ストリーミング パイプラインで直接モデルを使用します。予測はローカルで実施され、リモート サービスへの HTTP 呼び出しは行われません。
  • モデルをデータ処理パイプラインから切り離して、オンライン予測で使用されるモデルの保守性を向上させます。最良のアプローチは、Cloud ML Engine またはその他のウェブ ホスティング サービスを使用して、モデルを独立したマイクロサービスとして提供することです。
  • モデルを独立したウェブサービスとしてデプロイし、複数のデータ処理パイプラインとオンライン アプリがモデルサービスをエンドポイントとして使用できるようにします。モデルに対する変更は、そのモデルを使用するアプリやパイプラインに対して透過的です。
  • 負荷分散を使用して複数のサービス インスタンスをデプロイし、モデルのウェブサービスのスケーラビリティと可用性を改善します。Cloud ML Engine では、モデル バージョンをデプロイするときに必要なのは、yaml 構成ファイル内のノード数(manualScaling)または minNodesautoScaling)を指定することだけです。
  • モデルを別のマイクロサービスにデプロイする場合、基盤となるサービス インフラストラクチャに応じて追加コストが発生します。Cloud ML Engine のオンライン予測については、料金に関するよくある質問をご覧ください。
  • 直接モデルと HTTP モデルの両方のサービスでパフォーマンスを向上させるには、ストリーミング データ処理パイプラインでマイクロバッチ処理を使用します。マイクロバッチ処理は、モデルサービスへの HTTP リクエストの数を減らし、TensorFlow モデルのベクトル化された演算を使用して予測を取得します。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...