Cloud Dataflow パイプラインでの予測のための機械学習モデルの比較

Last reviewed 2018-09-07 UTC

このソリューションでは、Dataflow パイプラインで機械学習モデルを呼び出すためのさまざまな設計アプローチを説明して比較し、それぞれのアプローチを選択する際のトレードオフを検証します。バッチ処理パイプラインとストリーム処理パイプラインの両方で、さまざまなアプローチを検討し、これらのトレードオフを説明する一連のテストの結果を紹介します。このソリューションは、機械学習モデルを構築するデータ サイエンティストではなく、トレーニングされたモデルをデータ処理パイプラインに統合する担当者を念頭に置いて設計されています。

概要

この ML モデルを Dataflow パイプラインに統合する担当者は、さまざまなアプローチの内容や、システム要件に最適なアプローチを知りたいと思うでしょう。以下を検討する必要があります。

  • スループット
  • レイテンシ
  • 料金
  • 実装
  • メンテナンス

これらの考慮事項のバランスをとることは必ずしも容易ではありませんが、このソリューションは、優先順位に基づいて意思決定プロセスをナビゲートするのに役立ちます。このソリューションでは、バッチ データ パイプラインとストリーム データ パイプラインの TensorFlow でトレーニングされた機械学習(ML)モデルで予測を行うための 3 つのアプローチを比較します。

  • ストリーミング パイプラインに REST / HTTP API としてデプロイされたモデルを使用する。
  • バッチ パイプラインに AI Platform(AI Platform)バッチ予測ジョブを使用する。
  • バッチ パイプラインとストリーミング パイプラインの両方に Dataflow 直接モデル予測を使用する。

すべてのテストで、Natality データセットと呼ばれる既存のトレーニングされたモデルを使用しています。これは、さまざまな入力に基づいて赤ちゃんの体重を予測します。このソリューションの目標はモデルを構築することではないため、モデルの構築方法やトレーニング方法については説明しません。Natality データセットの詳細については、次のステップをご覧ください。

プラットフォーム

データ パイプラインを実行し、トレーニングされた ML モデルを呼び出すには、さまざまな方法があります。ただし、機能要件は常に同じです。

  1. 制限付き(バッチ)ソースまたは無制限(ストリーミング)ソースからのデータの取り込み。データを取り込むソースの例には、センサーデータ、ウェブサイトの相互作用、金融トランザクションなどがあります。
  2. 予測のために ML モデルを呼び出すことによって入力データを変換し、拡充する。たとえば、JSON ファイルを解析して関連するフィールドを抽出し、メンテナンス日の予測、プロダクトの推奨事項の作成、不正行為の検出を行います。
  3. 分析やバックアップのために変換されたデータと予測を保存するか、新しいイベントや追加のパイプラインをトリガーするキューシステムに渡す。たとえば、潜在的な不正行為をリアルタイムで検出する方法や、ダッシュボードからアクセス可能なストレージにメンテナンス スケジュール情報を保存する方法などがあります。

バッチ ETL プロセスでの予測を使用してデータを変換して拡充する場合は、データバッチ全体に必要な時間を短縮するために、スループットを最大化することを目標にします。一方、オンライン予測のためにストリーミング データを処理する場合は、各予測を(ほぼ)リアルタイムで受信するためにレイテンシを最小化することを目標にします。したがって、モデルの呼び出しがボトルネックになる可能性があります。

コア コンポーネント

このソリューションのバッチテストとストリーミング テストでは、次の 3 つの主要なテクノロジーが使用されます。

  • Dataflow で稼働し、データを処理する Apache Beam
  • ML モデルを実装してトレーニングする TensorFlow
  • トレーニングされた ML モデルでのバッチ予測とオンライン予測の実行にホスティング プラットフォームとして一部のテストで使用される AI Platform

このソリューションでは、以下の理由によりデータ パイプラインの実行に、Dataflow 上で動作する Apache Beam を選択しました。

  • Apache Beam は、ストリーミングとバッチの両方のデータ処理を行うオープンソースの統合プログラミング モデルです。
  • Dataflow は、サーバーなしで Apache Beam ジョブを実行できる Google Cloud プロダクトです。

TensorFlow は、機械学習フレームワークとして使用される Google のオープンソースの数学ライブラリです。TensorFlow を使用すると、単一のマシンまたは分散環境でモデルの構築、トレーニング、提供を行えます。モデルは、さまざまなデバイスに移植でき、使用可能な CPUGPUTPU などのリソースをトレーニングやサービスに利用できます。

AI Platform は、ハイパーパラメータの調整機能を使用してトレーニング、調整を行えるサーバーレス プラットフォームであり、DevOps で必要な最小限の管理で大量の TensorFlow モデルを処理します。AI Platform は、トレーニングされたモデルをオンライン予測用の REST API としてのデプロイと、バッチ予測ジョブの送信をサポートしています。AI Platform は、モデルをマイクロサービスとして提供できるいくつかのオプションの 1 つです。

このソリューションで詳しく説明しているアプローチでは、データ処理パイプラインに Dataflow を使用し、モデルを HTTP エンドポイントとしてホストするために AI Platform を使用しています。ただし、これらのアプローチは他のテクノロジーに置換できます。HTTP と直接的な TensorFlow モデルのパフォーマンス比較が大幅に変化することはありません。

バッチデータの処理とストリーミング データの処理

このソリューションのテストには、バッチとストリーミングの両方のユースケースが含まれます。各テストでは、無制限のソースと制限付きのソースでは操作要件が異なるため、入力と出力に異なる Google Cloud プロダクトを利用しています。

制限付きデータセットのバッチ処理

図 1 は、典型的なバッチ処理パイプラインでは、生の入力データが Cloud Storage などのオブジェクト ストレージに格納されることを示しています。構造化データ ストレージ形式には、カンマ区切り値(CSV)、最適化された行列(ORC)、Parquet、または Avro があります。これらの形式は、データがデータベースまたはログから取得される場合によく使用されます。

一般的なバッチ処理パイプラインのアーキテクチャ
図 1. バッチ処理アーキテクチャ

BigQuery などの分析プラットフォームの中には、クエリ機能に加えてストレージを提供するものもあります。BigQuery ではストレージ用に Capacitor を使用します。Dataflow 上の Apache Beam では、バッチ処理パイプラインの他のストレージ オプションに加えて、BigQuery と Cloud Storage の両方から読み書きできます。

無制限のデータ ストリームのストリーム処理

ストリーミングの場合、データ処理パイプラインへの入力は通常、図 2 に示すようにメッセージング システムです。Pub/Sub や Kafka などのテクノロジーは、通常、JSON、CSV、protobuf などの形式の個々のデータポイントを取り込むために使用されます。

一般的なストリーム処理パイプラインのアーキテクチャ
図 2. ストリーム処理アーキテクチャ

データポイントは、ウィンドウ処理機能を使用して一時的なイベント処理を実行することによって、個別に、またはマイクロバッチでグループとして処理できます。処理されたデータは、以下を含めたいくつかの宛先に送信できます。

  1. ストリーミング API によるアドホック分析用の BigQuery。
  2. リアルタイム情報を提供するための Bigtable。
  3. 後続のプロセスやパイプラインをトリガーするための Pub/Sub トピック。

制限付きと無制限の両方のデータソース シンクのソースコネクタ(入力)とシンクコネクタ(出力)の完全なリストは、Apache Beam I/O ページにあります。

TensorFlow モデルの呼び出し

TensorFlow トレーニング モデルは、次の 3 つの方法で呼び出せます。

  1. オンライン予測用の HTTP エンドポイントを介して呼び出す。
  2. バッチ予測とオンライン予測の際に、保存されたモデルファイルを使用して直接呼び出す。
  3. バッチ予測の際に AI Platform バッチ予測ジョブで呼び出す。

オンライン予測用の HTTP エンドポイント

TensorFlow モデルは、ストリーム データ処理パイプラインまたはクライアント アプリを介して呼び出される HTTP エンドポイントとしてデプロイされ、リアルタイムで予測を行います。

TensorFlow モデルは、TensorFlow Serving や他の Seldon などのホスティング サービスを使用して、オンライン予測の HTTP エンドポイントとしてデプロイできます。図 3 に示すように、次のオプションのいずれかを選択できます。

  1. 1 つまたは複数の Compute Engine インスタンスにモデルをデプロイします。
  2. Compute Engine または Google Kubernetes EngineDocker イメージを使用します。
  3. Kubeflow を利用して、Kubernetes または Google Kubernetes Engine へのデプロイを容易にします。
  4. App Engine をエンドポイントとともに使用して、ウェブアプリでモデルをホストします。
  5. フルマネージド ML トレーニングと Google Cloud 上でのサービス提供を行う AI Platform を使用します。
モデルを HTTP エンドポイントとして提供する Dataflow のオプション
図 3. モデルを HTTP エンドポイントとして提供する Dataflow の各種のオプション

AI Platform はフルマネージド型のサービスであるため、他のオプションより実装が簡単です。このため、このテストでは、モデルを HTTP エンドポイントとして提供するためのオプションとして、このサービスを使用します。これにより、さまざまな HTTP モデル提供オプションを比較するのではなく、直接モデルと AI Platform の HTTP エンドポイントとのパフォーマンスの比較を中心に扱うことができます。

AI Platform 予測によるオンライン予測の提供

オンライン予測を提供するには、次の 2 つのタスクを実行する必要があります。

  1. モデルのデプロイ。
  2. デプロイされたモデルとの推論のための対話(つまり、予測の実施)。

AI Platform を使用して HTTP エンドポイントとしてモデルをデプロイするには、次の手順が必要です。

  1. トレーニング済みのモデルファイルを Cloud Storage で使用できることを確認します。
  2. gcloud ml-engine models create コマンドを使用してモデルを作成します。
  3. Cloud Storage のモデルファイルとともに、gcloud ml-engine versions create コマンドを使用してモデル バージョンをデプロイします。

モデルをデプロイするには、次のようなコマンドを使用します。


PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

このコードは、モデル バージョン v1 で、Google Cloud プロジェクトに babyweight_estimator という AI Platform Prediction モデルを作成します。

モデルのデプロイ後は、そのモデルを呼び出せるようになります。次の Python コードは、AI Platform でモデル バージョンを REST API として呼び出す方法を示しています。

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)

def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

BigQuery や Cloud Storage などの大規模なデータセットがあり、プロセス全体のスループットを最大化する場合、バッチ予測用に ML モデルを HTTP エンドポイントとして提供することは推奨されていません。これにより、データポイントごとに 1 つの HTTP リクエストが生成され、膨大な量の HTTP リクエストが発生します。次のセクションでは、バッチ予測により適したオプションを示します。

バッチ予測とオンライン予測のための直接モデル

直接モデル予測テクノロジーでは、Dataflow インスタンスのローカル TensorFlow SavedModel を利用します。保存されたモデルは、TensorFlow モデルの構築とトレーニングを完了した後に作成された出力ファイルのコピーです。TensorFlow SavedModel は、次のいずれかです。

  • Dataflow ジョブとして送信されるパイプライン ソースコードの一部。
  • 図 4 に示すように、Cloud Storage からダウンロードされた内容。
Dataflow における直接モデル予測
図 4. Dataflow における直接モデル予測

このソリューションでは、GitHub のソースコードの一部である SavedModel を使用しています。インスタンスにモデルを読み込むには、次のように行います。

  1. Dataflow ジョブを作成するときに、モデルファイルを含めて、読み込むパイプラインの依存関係を指定します。次の Python コードは、Dataflow ジョブと一緒に送信されるモデルファイルが含まれる setup.py ファイルを示しています。

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. パイプラインでローカルモデル ファイルを呼び出します。これにより、指定されたインスタンスの予測が生成されます。以下の Python コードにその方法を示しています。

    predictor_fn = None
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

詳細については、Apache Beam の複数ファイルの依存関係ページをご覧ください。

AI Platform バッチ予測ジョブ

モデルを HTTP エンドポイントとしてデプロイする以外に、AI Platform では、デプロイされたモデル バージョンや Cloud Storage の TensorFlow SavedModel を使用してバッチ予測ジョブを実行できます。

AI Platform のバッチ予測ジョブでは、入力データファイルの Cloud Storage のロケーションがパラメータとして渡されます。モデルを使用してそのデータの予測を取得し、予測結果をパラメータとして指定された Cloud Storage の別の出力場所に保存します。次の例は、AI Platform バッチ予測ジョブを送信する gcloud コマンドを示しています。

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

ポイントごとのオンライン予測とマイクロバッチのオンライン予測

リアルタイム予測パイプラインでは、モデルを HTTP エンドポイントとして提供している場合でも、モデルをワーカーから直接使用している場合でも、着信データポイントの予測を取得する 2 つのオプションがあります。

  • 個々のポイント。この基本的なオプションでは、各データポイントを個別にモデルに送信して予測を得ます。
  • マイクロバッチ。この最適化オプションでは、ウィンドウ処理機能を使用して、特定の期間内(たとえば、5 秒ごと)にデータポイントをグループ化する、マイクロバッチを作成します。その後、マイクロバッチをモデルに送信して、すべてのインスタンスの予測を同時に取得します。

次の Python コードは、Apache Beam パイプラインでウィンドウ処理機能を使用して時間ベースのマイクロバッチを作成する方法を示しています。

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

マイクロバッチ処理のアプローチでは、HTTP エンドポイントとしてデプロイされたモデルを使用します。これにより、HTTP リクエスト数が大幅に削減され、レイテンシが短縮されます。直接モデルで使用されるマイクロバッチ処理方法であっても、演算はベクトル化されているため、長さが 1 のテンソルを送信するよりも、N 個のインスタンスがあるテンソルを予測用に送信する方が効率的です。

バッチテスト

バッチテストでは、TensorFlow 回帰モデルを使用して BigQuery の Natality データセットで赤ちゃんの体重を予測します。Dataflow のバッチ パイプラインを使用して、Cloud Storage の予測結果を CSV ファイルとして保存する必要があります。次のセクションでは、この作業を達成するために試したさまざまなテストについて説明します。

アプローチ 1: 直接モデル予測を使用した Dataflow

このアプローチでは、Dataflow ワーカーが、各レコードのバッチ処理パイプラインで予測のために直接呼び出される TensorFlow SavedModel をホストします。図 5 は、このアプローチのアーキテクチャの概要を示しています。

バッチ アプローチ 1: 直接モデル予測を使用した Dataflow
図 5. バッチ アプローチ 1: 直接モデル予測を使用した Dataflow

Dataflow パイプラインでは、次の手順に従います。

  1. BigQuery からデータを読み取ります。
  2. 予測用の BigQuery レコードを準備します。
  3. 各レコードの予測を取得するために、ローカルの TensorFlow SavedModel を呼び出します。
  4. 結果(入力レコードと予測された赤ちゃんの体重)を CSV ファイルに変換します。
  5. CSV ファイルを Cloud Storage に書き込みます。

このアプローチでは、HTTP エンドポイントとして AI Platform にデプロイされたモデルなど、リモート サービスへの呼び出しはありません。予測は、TensorFlow SavedModel を使用して各 Dataflow ワーカー内でローカルに行われます。

アプローチ 2: AI Platform バッチ予測を使用した Dataflow

このアプローチでは、TensorFlow SavedModel が Cloud Storage に格納され、AI Platform で予測に使用されます。ただし、前述のアプローチのように各レコードについて、デプロイされたモデルへの API 呼び出しを行う代わりに、予測用にデータが準備され、バッチとして送信されます。

このアプローチには 2 つのフェーズがあります。

  1. Dataflow は BigQuery から予測用のデータを作成し、そのデータを Cloud Storage に格納します。
  2. AI Platform バッチ予測ジョブは作成されたデータとともに送信され、予測結果は Cloud Storage に格納されます。

図 6 に、この 2 フェーズのアプローチのアーキテクチャ全体を示します。

バッチ アプローチ 2: AI Platform バッチ予測を使用した Dataflow
図 6. バッチ アプローチ 2: AI Platform バッチ予測を使用した Dataflow

Dataflow パイプラインを含めたワークフローの手順は次のとおりです。

  1. BigQuery からデータを読み取ります。
  2. 予測用の BigQuery レコードを準備します。
  3. Cloud Storage に JSON データを書き込みます。モデル内の serving_fn 関数では、JSON インスタンスを入力として想定しています。
  4. Cloud Storage で作成したデータとともに AI Platform バッチ予測ジョブを送信します。このジョブでは、予測結果も Cloud Storage に書き込みます。

Dataflow ジョブでは、AI Platform の予測ジョブが送信されるのではなく、予測のためのデータが作成されます。つまり、データ準備タスクとバッチ予測タスクは密結合されていません。Cloud Functions、Airflow、または任意のスケジューラでは、Dataflow ジョブを実行してから、AI Platform ジョブを送信してバッチ予測を行うことによって、ワークフローをオーケストレートできます。

データが次のケースに該当する場合、パフォーマンスと使いやすさの両方のために、AI Platform のバッチ予測をおすすめします。

  • 以前にデータの取り込みプロセスが実行されたため、Cloud Storage に予測用の形式でデータが存在する。
  • ワークフローの第 1 フェーズを制御できない。たとえば、Cloud Storage に保存される予測用データを準備する Dataflow パイプラインを制御できない場合です。

テスト構成

3 回のテストで以下の構成を使用しました。

  • データサイズ: 10K100K1M、および 10M
  • Cloud Storage クラス: Regional Storage
  • Cloud Storage のロケーション: europe-west1-b
  • Dataflow のリージョン: europe-west1-b
  • Dataflow ワーカーのマシンタイプ: n1-standard-1
  • 最大 100 万レコードまでのバッチデータ用の Dataflow の自動スケーリング
  • Dataflow num_worker: バッチデータの場合は 20、最大 1,000 万レコード
  • AI Platform バッチ予測 max-worker-count の設定: 20

Cloud Storage のロケーションと Dataflow リージョンは同じにする必要があります。このソリューションでは、任意の値として europe-west1-b リージョンを使用します。

結果

次の表は、さまざまなサイズのデータセットでバッチ予測と直接モデル予測を行った結果(タイミング)をまとめたものです。

バッチ データ サイズ 指標 Dataflow、AI Platform バッチ予測 直接モデル予測を使用した Dataflow
1 万行 実行時間 15 分 30 秒

(Dataflow: 7 分 47 秒 +
AI Platform: 7 分 43 秒)
8 分 24 秒
合計 vCPU 時間 0.301 時間

(Dataflow: 0.151 時間 +
AI Platform: 0.15 時間)
0.173 時間
10 万行 実行時間 16 分 37 秒

(Dataflow: 8 分 39 秒 +
AI Platform: 7 分 58 秒)
10 分 31 秒
合計 vCPU 時間 0.334 時間

(Dataflow: 0.184 時間 +
AI Platform: 0.15 時間)
0.243 時間
100 万行 実行時間 21 分 11 秒
(Dataflow: 11 分 7 秒 +
AI Platform: 10 分 4 秒)
17 分 12 秒
合計 vCPU 時間 0.446 時間

(Dataflow: 0.256 時間 +
AI Platform: 0.19 時間)
1.115 時間
1,000 万行 実行時間 33 分 08 秒
(Dataflow: 12 分 15 秒 +
AI Platform: 20 分 53 秒)
25 分 02 秒
合計 vCPU 時間 5.251 時間

(Dataflow: 3.581 時間 +
AI Platform: 1.67 時間)
7.878 時間

図 7 にこれらの結果のグラフを示します。

4 つの異なるデータセット サイズに対する 3 つのアプローチのタイミングを示すグラフ
図 7. 4 つの異なるデータセット サイズに対する 3 つのアプローチのタイミングを示すグラフ

この結果が示すように、AI Platform のバッチ予測ジョブは、データがすでに予測に使用される形式で Cloud Storage に存在する場合、入力データに対する予測の生成に要する時間が短くなります。ただし、バッチ予測ジョブを前処理ステップ(予測データの BigQuery から Cloud Storage への抽出と作成)および後処理ステップ(データを BigQuery に保存する)と組み合わせると、直接モデルのアプローチのほうが全体的な実行時間は短くなります。さらに、(ストリーミング テストに関して後で説明する)マイクロバッチ処理を使用すると、直接モデル予測アプローチのパフォーマンスをさらに改善することができます。

ストリーム テスト

ストリーミング テストでは、Dataflow パイプラインで Pub/Sub トピックからデータポイントを読み取り、ストリーミング API を使用して BigQuery にデータを書き込みます。Dataflow ストリーミング パイプラインでは、TensorFlow の赤ちゃんの体重の推定モデルを使用してデータを処理し、予測を取得します。

このトピックでは、データポイントを生成するストリーム シミュレータからデータを受け取ります。データポイントは、事前に定義された毎秒のイベントレートで、赤ちゃんの体重を予測するインスタンスです。これは、無制限のデータソースの実際の例をシミュレートします。次の Python コードは、Pub/Sub トピックに送信されるデータ ストリームをシミュレートします。

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

アプローチ 1: AI Platform オンライン予測を使用した Dataflow

このアプローチでは、TensorFlow モデルが、AI Platform で REST API としてデプロイされてホストされます。Dataflow ストリーミング パイプラインでは、Pub/Sub で取得した予測の各メッセージの API を呼び出します。図 8 は、このアプローチのアーキテクチャの概要を示しています。

ストリーム アプローチ 1: AI Platform オンライン予測を使用した Dataflow
図 8. ストリーム アプローチ 1: AI Platform オンライン予測を使用した Dataflow。HTTP リクエストには、マイクロバッチ内の単一のデータポイントまたはデータポイントのグループが含まれる場合があります。

このアプローチでは、Dataflow パイプラインで次の手順に従います。

  1. Pub/Sub トピックからメッセージを読み込みます。
  2. AI Platform モデルの API に HTTP リクエストを送信して、各メッセージの予測を取得します。
  3. ストリーミング API を使用して BigQuery に結果を書き込みます。

マイクロバッチ処理はより優れたアプローチです。つまり、Pub/Sub から読み取られる各メッセージについてモデルの REST API に対して HTTP リクエストを送信する代わりに、Dataflow では 1 秒間に受信したメッセージをグループ化します。次に、このグループのメッセージを、単一の HTTP リクエスト内のマイクロバッチとしてモデルの API に送信します。このアプローチでは、Dataflow パイプラインで次の手順に従います。

  1. Pub/Sub トピックからメッセージを読み込みます。
  2. 1 秒間のウィンドウ処理オペレーションを適用して、メッセージのマイクロバッチを作成します。
  3. HTTP リクエストをマイクロバッチとともに AI Platform モデルの API に送信して、メッセージの予測を取得します。
  4. ストリーミング API を使用して BigQuery に結果を書き込みます。

このアプローチの理論的基礎は以下のとおりです。

  1. AI Platform モデルなどのリモートサービスへの呼び出し数を減らします。
  2. 各メッセージを処理する平均レイテンシを短縮します。
  3. パイプライン全体の処理時間を短縮します。

このテストでは、時間枠を 1 秒に設定しました。ただし、マイクロバッチサイズ(AI Platform モードにバッチとして送信されるメッセージ数)はさまざまです。マイクロバッチサイズは、メッセージの生成頻度(1 秒あたりのメッセージ数)によって異なります。

次のセクションでは、メッセージ数が毎秒 50、100、500 の 3 つの異なる頻度でのテストについて説明します。つまり、マイクロバッチ サイズは 50、100、500 です。

アプローチ 2: 直接モデル予測を使用した Dataflow

このアプローチは、バッチテストで使用されたアプローチと同様です。TensorFlow SavedModel は Dataflow ワーカーでホストされ、各レコードのストリーム処理パイプラインでの予測のために呼び出されます。図 9 は、このアプローチのアーキテクチャの概要を示しています。

ストリーム アプローチ 2: 直接モデル予測を使用した Dataflow
図 9. ストリーム アプローチ 2: 直接モデル予測を使用した Dataflow

このアプローチでは、Dataflow パイプラインで次の手順に従います。

  1. Pub/Sub トピックからメッセージを読み込みます。
  2. 各レコードの予測を取得するために、ローカルの TensorFlow SavedModel を呼び出します。
  3. ストリーミング API を使用して BigQuery に結果を書き込みます。

マイクロバッチ処理テクノロジーは、直接モデル予測アプローチを使用したストリーム パイプラインでも使用できます。1 つのデータ インスタンスのテンソルをモデルに送信する代わりに、N 個のデータ インスタンスのテンソルを送信できます。ここで、N は Dataflow の時間枠内にモデルで受信したメッセージ数です。このアプローチでは、TensorFlow モデルのベクトル化された演算を使用して、いくつかの予測を並行して取得します。

テスト構成

これらのテストには次の構成を使用しました。

  • ストリーム データ サイズ: 10K records (messages)
  • シミュレートされた 1 秒あたりのメッセージ数(MPS): 50100500
  • 時間枠のサイズ(マイクロバッチ テスト): 1 second
  • Dataflow のリージョン: europe-west1-b
  • Dataflow ワーカーのマシンタイプ: n1-standard-1
  • Dataflow num_worker: 5(自動スケーリングなし)
  • AI Platform モデルの API ノード: 3 (manualScale)

結果

次の表は、さまざまな量のデータ(1 秒あたりのメッセージ数)でストリーミング テストを実行した結果をまとめたものです。メッセージの頻度とは、1 秒あたりに送信されるメッセージ数を意味し、シミュレーション時間とは、すべてのメッセージを送信するための時間を意味します。

メッセージのストリーミング頻度 指標 AI Platform オンライン予測を使用した Dataflow   直接モデル予測を使用した Dataflow  
    単一のメッセージ マイクロバッチ処理 単一のメッセージ マイクロバッチ処理
毎秒 50 メッセージ

(シミュレーション時間: 3 分 20 秒)
合計時間 9 分 34 秒 7 分 44 秒 3 分 43 秒 3 分 22 秒
毎秒 100 メッセージ

(シミュレーション時間: 1 分 40 秒)
合計時間 6 分 03 秒 4 分 34 秒 1 分 51 秒 1 分 41 秒
毎秒 500 メッセージ

(シミュレーション時間: 20 秒)
合計時間 NA - デフォルトの AI Platform オンライン予測割り当て 2 分 47 秒 1 分 23 秒 48 秒

図 10 にこれらの結果のグラフを示します。

さまざまなアプローチと頻度のタイミングを示すグラフ
図 10. さまざまなアプローチと頻度のタイミングを示すグラフ

結果に示すように、マイクロバッチ処理テクノロジーは、AI Platform のオンライン予測と直接モデル予測の双方で実行パフォーマンスを向上させます。さらに、ストリーミング パイプラインで直接モデルを使用すると、オンライン予測で外部 REST/HTTP API を呼び出す場合と比べて、パフォーマンスが 2 倍から 4 倍向上します。

まとめ

説明したアプローチとテスト結果に従って、以下のアプローチをおすすめします。

バッチ処理

  • バッチデータ処理パイプラインを構築し、パイプラインの一部として予測を行う場合、最高のパフォーマンスを得るには直接モデル アプローチを使用します。
  • 予測のローカルモデルを呼び出す前にデータポイントのマイクロバッチを作成して、ベクトル化された操作の並列化を利用することによって、直接モデル アプローチのパフォーマンスを向上させます。
  • 予測に適した形式でデータが Cloud Storage に入力されている場合、AI Platform のバッチ予測を使用することで最高のパフォーマンスを得ることができます。
  • バッチ予測に GPU の能力を使用したい場合は、AI Platform を使用します。
  • バッチ予測には AI Platform オンライン予測を使用しないでください。

ストリーム処理

  • 最高のパフォーマンスを実現し、平均レイテンシを短縮するには、ストリーミング パイプラインで直接モデルを使用します。予測はローカルで実施され、リモート サービスへの HTTP 呼び出しは行われません。
  • モデルをデータ処理パイプラインから切り離すことで、オンライン予測で使用されるモデルの保守性が向上します。最良のアプローチは、AI Platform またはその他のウェブ ホスティング サービスを使用して、モデルを独立したマイクロサービスとして提供することです。
  • モデルを独立したウェブサービスとしてデプロイすることで、複数のデータ処理パイプラインとオンライン アプリはモデルサービスをエンドポイントとして使用できます。モデルに対する変更は、そのモデルを使用するアプリやパイプラインに対して透過的です。
  • 負荷分散を使用して複数のサービス インスタンスをデプロイし、モデルのウェブサービスのスケーラビリティと可用性を改善します。AI Platform では、モデル バージョンをデプロイするときに必要なのは、yaml 構成ファイル内のノード数(manualScaling)または minNodesautoScaling)を指定することだけです。
  • モデルを別のマイクロサービスにデプロイする場合、基盤となるサービス インフラストラクチャに応じて追加コストが発生します。AI Platform のオンライン予測については、料金に関するよくある質問をご覧ください。
  • 直接モデルと HTTP モデルの両方のサービスでパフォーマンスを向上させるには、ストリーミング データ処理パイプラインでマイクロバッチ処理を使用します。マイクロバッチ処理は、モデルサービスへの HTTP リクエストの数を減らし、TensorFlow モデルのベクトル化された演算を使用して予測を取得します。

次のステップ