Dataflow と Cloud Vision API を使用した ML 可視分析ソリューションの構築

Last reviewed 2024-04-16 UTC

このチュートリアルでは、Cloud Vision で大規模な画像ファイルを処理するために Dataflow パイプラインをデプロイする方法を学習します。Dataflow は結果を BigQuery に保存し、それを使用して BigQuery ML の事前構築モデルをトレーニングします。

このチュートリアルで作成する Dataflow パイプラインは、大量の画像を処理できます。これは Vision の割り当てによってのみ制限されます。スケール要件に基づいて Vision の割り当てを増やすことができます。

このチュートリアルは、データ エンジニアとデータ サイエンティストを対象としています。このチュートリアルは、Apache Beam の Java SDK、BigQuery 標準 SQL、基本的なシェル スクリプトを使用して Dataflow パイプラインをビルドすることに関する基本的な知識をお持ちであることを前提としています。また、Vision に精通していることも前提としています。

目標

  • Cloud Storage の Pub/Sub 通知を使用して、画像メタデータの取り込みパイプラインを作成します。
  • Dataflow を使用してリアルタイム可視分析パイプラインをデプロイします。
  • Vision を使用して、複数の特徴タイプの画像を分析します。
  • BigQuery ML でデータを分析し、トレーニングします。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  2. Google Cloud プロジェクトで課金が有効になっていることを確認します

  3. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

  4. Cloud Shell で、Dataflow、Container Registry、Vision API を有効にします。

    gcloud services enable dataflow.googleapis.com \
    containerregistry.googleapis.com vision.googleapis.com
    
  5. 環境変数を設定します(REGION は、使用可能な Dataflow リージョンのいずれかに置き換えます。例: us-central1)。

    export PROJECT=$(gcloud config get-value project)
    export REGION=REGION
    
  6. チュートリアルの Git リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
    
  7. リポジトリのルートフォルダに移動します。

    cd dataflow-vision-analytics
    

リファレンス アーキテクチャ

次の図は、このチュートリアルで構築するシステムのフローを示しています。

取り込み / トリガー、処理、保存の情報のフローを示すワークフローの図

図で示すように、フローは次のようになります。

  1. クライアントは、Cloud Storage バケットに画像ファイルをアップロードします。

  2. ファイルをアップロードするたびに、Pub/Sub にメッセージを公開してクライアントに自動的に通知します。

  3. Dataflow パイプラインで、新しい通知ごとに次の処理を行います。

    1. Pub/Sub メッセージからファイル メタデータを読み取ります。
    2. 各セグメントをアノテーション処理のために Vision API に送信します。
    3. 詳細な分析のために、すべてのアノテーションを BigQuery テーブルに格納します。

Cloud Storage の Pub/Sub 通知を作成する

このセクションでは、Cloud Storage の Pub/Sub 通知を作成します。この通知は、バケットにアップロードされた画像ファイルのメタデータを公開します。メタデータに基づいて、Dataflow パイプラインがリクエストの処理を開始します。

  1. Cloud Shell で Pub/Sub トピックを作成します。

    export GCS_NOTIFICATION_TOPIC="gcs-notification-topic"
    gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
    
  2. トピックに Pub/Sub サブスクリプションを作成します。

    export  GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription"
    gcloud pubsub subscriptions create  ${GCS_NOTIFICATION_SUBSCRIPTION}  --topic=${GCS_NOTIFICATION_TOPIC}
    
  3. 入力画像ファイルを保存するバケットを作成します。

    export IMAGE_BUCKET=${PROJECT}-images
    gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
    
  4. バケットの Pub/Sub 通知を作成します。

    gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \
      -f json gs://${IMAGE_BUCKET}
    

通知を構成したので、バケットにファイルをアップロードするたびに、作成したトピックに Pub/Sub メッセージがシステムから送信されます。

BigQuery データセットの作成

このセクションでは、Dataflow パイプラインが出力する結果を格納する BigQuery データセットを作成します。パイプラインは、Vision 機能のタイプに基づいて自動的にテーブルを作成します。

  • Cloud Shell で、BigQuery データセットを作成します。

    export BIGQUERY_DATASET="vision_analytics"
    bq mk -d --location=US ${BIGQUERY_DATASET}
    

Dataflow Flex テンプレートの作成

このセクションでは、Apache Beam パイプライン コードを作成し、Dataflow Flex テンプレートを使用して Dataflow パイプラインを Dataflow ジョブとして実行します。

  1. Cloud Shell で、Apache Beam パイプラインのコードをビルドします。

    gradle build
    
  2. Dataflow Flex テンプレートの Docker イメージを作成します。

    gcloud auth configure-docker
    gradle jib \
      --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
    
  3. Dataflow Flex テンプレートを保存する Cloud Storage バケットを作成します。

    export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config
    gsutil mb -c standard -l ${REGION} \
      gs://${DATAFLOW_TEMPLATE_BUCKET}
    
  4. テンプレートの JSON 構成ファイルをバケットにアップロードします。

    cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json
    {
      "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest",
      "sdk_info": {"language": "JAVA"}
    }
    EOF
    

一連の Vision 機能の Dataflow パイプラインを実行する

次の表に示すパラメータは、この Dataflow パイプラインに固有のものです。

標準 Dataflow 実行パラメータの全一覧については、Dataflow ドキュメントをご覧ください。

パラメータ 説明

windowInterval

BigQuery と Pub/Sub に結果を送信するウィンドウ間隔(秒単位)。デフォルトは 5 です。

batchSize

Vision API へのリクエストに含める画像の数。デフォルトは 1 です。最大 16 まで増やすことができます。

subscriberId

入力の Cloud Storage 通知を受信する Pub/Sub サブスクリプションの ID。

keyRange

このパラメータを使用すると、大規模なデータセットの処理パフォーマンスを向上させることができます。値が大きいほど、ワーカー間の並列処理が増えます。デフォルトは 1 です。

visionApiProjectId

Vision API で使用するプロジェクト ID。

datasetName

出力 BigQuery データセットの参照。

features

画像処理機能のリスト。

labelAnnottationTable, landmarkAnnotationTable, logoAnnotationTable, faceAnnotationTable, imagePropertiesTable, cropHintAnnotationTable, errorLogTable

さまざまなアノテーションのテーブル名を含む文字列パラメータ。各テーブルにはデフォルト値が用意されています。
  1. Cloud Shell で、Dataflow パイプラインのジョブ名を定義します。

    export JOB_NAME=vision-analytics-pipeline-1
    
  2. Dataflow パイプラインのパラメータを使用してファイルを作成します。

    PARAMETERS=params.yaml
    cat << EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION
      datasetName: ${BIGQUERY_DATASET}
    EOF
    
  3. これらの機能のタイプ IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION の画像を処理する Dataflow パイプラインを実行します。

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    

    このコマンドは、前掲の表に記載されているパラメータを使用します。

  4. 実行中の Dataflow ジョブの ID を取得します。

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
    
  5. Dataflow ジョブのウェブページの URL を表示します。

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  6. 表示された URL を新しいブラウザタブで開きます。数秒後、Dataflow ジョブのグラフが表示されます。

    Dataflow ジョブのワークフロー図。

    Dataflow パイプラインは現在実行中で、Pub/Sub から入力通知を受信しようとしています。

  7. Cloud Shell で、いくつかのテストファイルを入力バケットにアップロードして、Dataflow パイプラインをトリガーします。

    gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
    
  8. Google Cloud Console で、Dataflow 内のカスタム カウンタ(Dataflow ジョブの右側のパネル)を確認し、5 つの画像がすべて処理されていることを確認します。

    ファイル アップロードで返された画像のリスト。

  9. Cloud Shell で、テーブルが自動的に作成されたことを確認します。

    bq query "select table_name, table_type from \
    ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
    

    出力は次のとおりです。

    +----------------------+------------+
    |      table_name      | table_type |
    +----------------------+------------+
    | face_annotation      | BASE TABLE |
    | label_annotation     | BASE TABLE |
    | crop_hint_annotation | BASE TABLE |
    | landmark_annotation  | BASE TABLE |
    | image_properties     | BASE TABLE |
    +----------------------+------------+
    
  10. landmark_annotation テーブルのスキーマを表示します。リクエストされた場合、LANDMARK_DETECTION 機能は API 呼び出しから返された属性をキャプチャします。

    bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
    

    出力は次のとおりです。

    [
      {
        "mode": "REQUIRED",
        "name": "gcs_uri",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "mid",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "description",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "score",
        "type": "FLOAT"
      },
      {
        "fields": [
          {
            "fields": [
              {
                "mode": "REQUIRED",
                "name": "x",
                "type": "FLOAT"
              },
              {
                "mode": "REQUIRED",
                "name": "y",
                "type": "FLOAT"
              }
            ],
            "mode": "REPEATED",
            "name": "vertices",
            "type": "RECORD"
          }
        ],
        "mode": "NULLABLE",
        "name": "bounding_poly",
        "type": "RECORD"
      },
      {
        "mode": "REPEATED",
        "name": "locations",
        "type": "GEOGRAPHY"
      },
      {
        "mode": "REQUIRED",
        "name": "transaction_timestamp",
        "type": "TIMESTAMP"
      }
    ]
    
  11. パイプラインを停止します。

    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    

    処理する Pub/Sub 通知はありませんが、作成したストリーミング パイプラインはこのコマンドを入力するまで実行されます。

Flickr30K データセットの分析

このセクションでは、ラベルとランドマークを検出する Flickr30K データセットを分析します。

  1. Cloud Shell で、新しいジョブ名を定義します。

    export JOB_NAME=vision-analytics-pipeline-2
    
  2. Dataflow パイプライン パラメータを変更して、大規模なデータセット向けに最適化されるようにします。batchSizekeyRange の値を大きくすると、スループットが向上します。Dataflow は必要に応じてワーカーの数を調整します。

    cat <<EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: LABEL_DETECTION,LANDMARK_DETECTION
      datasetName: ${BIGQUERY_DATASET}
      batchSize: "16"
      windowInterval: "5"
      keyRange: "2"
    EOF
    
  3. パイプラインを実行します。

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    
  4. データセットを入力バケットにアップロードします。

    gsutil -m  cp gs://df-vision-ai-test-data/*  gs://${IMAGE_BUCKET}
    
  5. 実行中の Dataflow ジョブの ID を取得します。

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
    
  6. Dataflow ジョブのウェブページの URL を表示します。

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  7. 表示された URL を新しいブラウザタブで開きます。

  8. Google Cloud Console で Dataflow のカスタム カウンタを検証し、すべてのファイルが処理されていることを確認します。通常、ファイルはすべて 30 分未満で処理されます。

  9. [Process Annotations] で、カスタム カウンタを使ってフィルタします。

    出力は次のとおりです。

    カスタム カウンタでフィルタした後に返されるカウンタのリスト。カウンタ名、値、ステップが表示されます。

    processedFiles 指標(31,935)は、バケットにアップロードされた画像の合計数(合計ファイル数は 31,936)と一致します。ただし、numberOfRequests 指標(1,997)はパイプラインを通過したファイルの数より低くなります。この違いは、batchSizeDistribution_* 指標の値に示すように、パイプラインがリクエストごとに最大 16 個のファイルをバッチ処理するためです。

  10. パイプラインをシャットダウンします。

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}"
    --region ${REGION}
    --format "value(id)"
    --status active) \
    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    
  11. Google Cloud コンソールで、BigQuery の [クエリエディタ] ページに移動します。

    [クエリエディタ] に移動

  12. 各ファイルで最も可能性の高いラベルを見つけます。

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM (
      SELECT
        gcs_uri,
        description,
        score,
        ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC )
    AS row_num
      FROM
         `vision_analytics.label_annotation`)
    WHERE
      row_num = 1
    ORDER BY
      gcs_uri DESC
    

    出力は次のとおりです。レスポンスで、st_basils.jpeg ファイルがランドマークである可能性が高いことがわかります。

    画像ファイル名、説明、スコアのリスト。

  13. 上位 10 個のラベルとそれらの最大スコアが表示されます。

    SELECT
      description,
      COUNT(*) AS found,
      MAX(score) AS max_score
    FROM
      `vision_analytics.label_annotation`
    GROUP BY
      description
    ORDER BY
      found DESC
    LIMIT 10
    

    最終的な出力は次のようになります。

    検出された上位 10 件のラベルのリスト。リストには説明、検出された回数、最大スコアが含まれます。

  14. 人気のランドマークトップ 10 を見つける:

    SELECT
      description,
      COUNT(*) AS count,
      MAX(score) AS max_score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LENGTH(description)>0
    GROUP BY
      description
    ORDER BY
      count DESC
    LIMIT 10
    

    出力は次のとおりです。レスポンスからタイムズ スクエアが最も人気のある目的地のようだとわかります。

    クエリで返された上位 10 件のランドマークのリスト。説明、数、最大スコアが含まれます。

  15. 滝がある画像を探します。

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LOWER(description) LIKE '%fall%'
    ORDER BY score DESC
    

    出力は次のとおりです。滝の画像のみが含まれている。

    滝のリスト。ファイル名、説明、スコアが含まれます。

  16. ローマのコロセウムから 3 km 以内のランドマークの画像を探します(ST_GEOPOINT 関数は、コロセウムの経度と緯度を使用します)。

    WITH
      landmarksWithDistances AS (
      SELECT
        gcs_uri,
        description,
        location,
        ST_DISTANCE(location,
          ST_GEOGPOINT(12.492231,
            41.890222)) distance_in_meters,
      FROM
        `vision_analytics.landmark_annotation` landmarks
      CROSS JOIN
        UNNEST(landmarks.locations) AS location )
    SELECT
      SPLIT(gcs_uri,"/")[OFFSET(3)] file,
      description,
        ROUND(distance_in_meters) distance_in_meters,
      location,
      CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url
    FROM
      landmarksWithDistances
    WHERE
      distance_in_meters < 3000
    ORDER BY
      distance_in_meters
    LIMIT
      100
    

    出力は次のとおりです。これらの画像には、いくつかの人気目的地がいくつか表示されます。

    ローマのコロセウムから 3 km 以内のすべての画像のリスト。ファイル名、説明、コロセウムからの距離(メートル単位)、位置が含まれます。

    同じ画像に、同じランドマークの位置を複数含めることができます。この機能については、Vision API のドキュメントをご覧ください。LocationInfo 要素は複数存在することがあります。これは、ある位置で画像内のシーンの場所を示し、別の位置で画像の撮影位置を示す可能性があるためです。位置情報は通常、ランドマークのために存在しています。

    前のクエリを貼り付けることで、BigQuery Geo Viz でデータを可視化できます。地図上のポイントを選択すると、その詳細が表示されます。Image_url 属性には、ブラウザで開ける画像ファイルへのリンクが含まれます。

    コロセウムからの場所と距離の地図。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

Google Cloud プロジェクトを削除する

課金を停止する最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

次のステップ