このチュートリアルでは、Cloud Vision で大規模な画像ファイルを処理するために Dataflow パイプラインをデプロイする方法を学習します。Dataflow は結果を BigQuery に保存し、それを使用して BigQuery ML の事前構築モデルをトレーニングします。
このチュートリアルで作成する Dataflow パイプラインは、大量の画像を処理できます。これは Vision の割り当てによってのみ制限されます。スケール要件に基づいて Vision の割り当てを増やすことができます。
このチュートリアルは、データ エンジニアとデータ サイエンティストを対象としています。このチュートリアルは、Apache Beam の Java SDK、BigQuery 標準 SQL、基本的なシェル スクリプトを使用して Dataflow パイプラインをビルドすることに関する基本的な知識をお持ちであることを前提としています。また、Vision に精通していることも前提としています。
目標
- Cloud Storage の Pub/Sub 通知を使用して、画像メタデータの取り込みパイプラインを作成します。
- Dataflow を使用してリアルタイム可視分析パイプラインをデプロイします。
- Vision を使用して、複数の特徴タイプの画像を分析します。
- BigQuery ML でデータを分析し、トレーニングします。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。
Cloud Shell で、Dataflow、Container Registry、Vision API を有効にします。
gcloud services enable dataflow.googleapis.com \ containerregistry.googleapis.com vision.googleapis.com
環境変数を設定します(REGION は、使用可能な Dataflow リージョンのいずれかに置き換えます。例:
us-central1
)。export PROJECT=$(gcloud config get-value project) export REGION=REGION
チュートリアルの Git リポジトリのクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
リポジトリのルートフォルダに移動します。
cd dataflow-vision-analytics
リファレンス アーキテクチャ
次の図は、このチュートリアルで構築するシステムのフローを示しています。
図で示すように、フローは次のようになります。
クライアントは、Cloud Storage バケットに画像ファイルをアップロードします。
ファイルをアップロードするたびに、Pub/Sub にメッセージを公開してクライアントに自動的に通知します。
Dataflow パイプラインで、新しい通知ごとに次の処理を行います。
- Pub/Sub メッセージからファイル メタデータを読み取ります。
- 各セグメントをアノテーション処理のために Vision API に送信します。
- 詳細な分析のために、すべてのアノテーションを BigQuery テーブルに格納します。
Cloud Storage の Pub/Sub 通知を作成する
このセクションでは、Cloud Storage の Pub/Sub 通知を作成します。この通知は、バケットにアップロードされた画像ファイルのメタデータを公開します。メタデータに基づいて、Dataflow パイプラインがリクエストの処理を開始します。
Cloud Shell で Pub/Sub トピックを作成します。
export GCS_NOTIFICATION_TOPIC="gcs-notification-topic" gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
トピックに Pub/Sub サブスクリプションを作成します。
export GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription" gcloud pubsub subscriptions create ${GCS_NOTIFICATION_SUBSCRIPTION} --topic=${GCS_NOTIFICATION_TOPIC}
入力画像ファイルを保存するバケットを作成します。
export IMAGE_BUCKET=${PROJECT}-images gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
バケットの Pub/Sub 通知を作成します。
gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \ -f json gs://${IMAGE_BUCKET}
通知を構成したので、バケットにファイルをアップロードするたびに、作成したトピックに Pub/Sub メッセージがシステムから送信されます。
BigQuery データセットの作成
このセクションでは、Dataflow パイプラインが出力する結果を格納する BigQuery データセットを作成します。パイプラインは、Vision 機能のタイプに基づいて自動的にテーブルを作成します。
Cloud Shell で、BigQuery データセットを作成します。
export BIGQUERY_DATASET="vision_analytics" bq mk -d --location=US ${BIGQUERY_DATASET}
Dataflow Flex テンプレートの作成
このセクションでは、Apache Beam パイプライン コードを作成し、Dataflow Flex テンプレートを使用して Dataflow パイプラインを Dataflow ジョブとして実行します。
Cloud Shell で、Apache Beam パイプラインのコードをビルドします。
gradle build
Dataflow Flex テンプレートの Docker イメージを作成します。
gcloud auth configure-docker gradle jib \ --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
Dataflow Flex テンプレートを保存する Cloud Storage バケットを作成します。
export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config gsutil mb -c standard -l ${REGION} \ gs://${DATAFLOW_TEMPLATE_BUCKET}
テンプレートの JSON 構成ファイルをバケットにアップロードします。
cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json { "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest", "sdk_info": {"language": "JAVA"} } EOF
一連の Vision 機能の Dataflow パイプラインを実行する
次の表に示すパラメータは、この Dataflow パイプラインに固有のものです。
標準 Dataflow 実行パラメータの全一覧については、Dataflow ドキュメントをご覧ください。
パラメータ | 説明 |
---|---|
|
BigQuery と Pub/Sub に結果を送信するウィンドウ間隔(秒単位)。デフォルトは 5 です。 |
|
Vision API へのリクエストに含める画像の数。デフォルトは 1 です。最大 16 まで増やすことができます。 |
|
入力の Cloud Storage 通知を受信する Pub/Sub サブスクリプションの ID。 |
|
このパラメータを使用すると、大規模なデータセットの処理パフォーマンスを向上させることができます。値が大きいほど、ワーカー間の並列処理が増えます。デフォルトは 1 です。 |
|
Vision API で使用するプロジェクト ID。 |
|
出力 BigQuery データセットの参照。 |
|
画像処理機能のリスト。 |
|
さまざまなアノテーションのテーブル名を含む文字列パラメータ。各テーブルにはデフォルト値が用意されています。 |
Cloud Shell で、Dataflow パイプラインのジョブ名を定義します。
export JOB_NAME=vision-analytics-pipeline-1
Dataflow パイプラインのパラメータを使用してファイルを作成します。
PARAMETERS=params.yaml cat << EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION datasetName: ${BIGQUERY_DATASET} EOF
これらの機能のタイプ
IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION
の画像を処理する Dataflow パイプラインを実行します。gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
このコマンドは、前掲の表に記載されているパラメータを使用します。
実行中の Dataflow ジョブの ID を取得します。
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
Dataflow ジョブのウェブページの URL を表示します。
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
表示された URL を新しいブラウザタブで開きます。数秒後、Dataflow ジョブのグラフが表示されます。
Dataflow パイプラインは現在実行中で、Pub/Sub から入力通知を受信しようとしています。
Cloud Shell で、いくつかのテストファイルを入力バケットにアップロードして、Dataflow パイプラインをトリガーします。
gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
Google Cloud Console で、Dataflow 内のカスタム カウンタ(Dataflow ジョブの右側のパネル)を確認し、5 つの画像がすべて処理されていることを確認します。
Cloud Shell で、テーブルが自動的に作成されたことを確認します。
bq query "select table_name, table_type from \ ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
出力は次のとおりです。
+----------------------+------------+ | table_name | table_type | +----------------------+------------+ | face_annotation | BASE TABLE | | label_annotation | BASE TABLE | | crop_hint_annotation | BASE TABLE | | landmark_annotation | BASE TABLE | | image_properties | BASE TABLE | +----------------------+------------+
landmark_annotation
テーブルのスキーマを表示します。リクエストされた場合、LANDMARK_DETECTION
機能は API 呼び出しから返された属性をキャプチャします。bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
出力は次のとおりです。
[ { "mode": "REQUIRED", "name": "gcs_uri", "type": "STRING" }, { "mode": "NULLABLE", "name": "mid", "type": "STRING" }, { "mode": "REQUIRED", "name": "description", "type": "STRING" }, { "mode": "REQUIRED", "name": "score", "type": "FLOAT" }, { "fields": [ { "fields": [ { "mode": "REQUIRED", "name": "x", "type": "FLOAT" }, { "mode": "REQUIRED", "name": "y", "type": "FLOAT" } ], "mode": "REPEATED", "name": "vertices", "type": "RECORD" } ], "mode": "NULLABLE", "name": "bounding_poly", "type": "RECORD" }, { "mode": "REPEATED", "name": "locations", "type": "GEOGRAPHY" }, { "mode": "REQUIRED", "name": "transaction_timestamp", "type": "TIMESTAMP" } ]
パイプラインを停止します。
gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
処理する Pub/Sub 通知はありませんが、作成したストリーミング パイプラインはこのコマンドを入力するまで実行されます。
Flickr30K データセットの分析
このセクションでは、ラベルとランドマークを検出する Flickr30K データセットを分析します。
Cloud Shell で、新しいジョブ名を定義します。
export JOB_NAME=vision-analytics-pipeline-2
Dataflow パイプライン パラメータを変更して、大規模なデータセット向けに最適化されるようにします。
batchSize
とkeyRange
の値を大きくすると、スループットが向上します。Dataflow は必要に応じてワーカーの数を調整します。cat <<EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: LABEL_DETECTION,LANDMARK_DETECTION datasetName: ${BIGQUERY_DATASET} batchSize: "16" windowInterval: "5" keyRange: "2" EOF
パイプラインを実行します。
gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
データセットを入力バケットにアップロードします。
gsutil -m cp gs://df-vision-ai-test-data/* gs://${IMAGE_BUCKET}
実行中の Dataflow ジョブの ID を取得します。
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
Dataflow ジョブのウェブページの URL を表示します。
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
表示された URL を新しいブラウザタブで開きます。
Google Cloud Console で Dataflow のカスタム カウンタを検証し、すべてのファイルが処理されていることを確認します。通常、ファイルはすべて 30 分未満で処理されます。
[Process Annotations] で、カスタム カウンタを使ってフィルタします。
出力は次のとおりです。
processedFiles
指標(31,935)は、バケットにアップロードされた画像の合計数(合計ファイル数は 31,936)と一致します。ただし、numberOfRequests
指標(1,997)はパイプラインを通過したファイルの数より低くなります。この違いは、batchSizeDistribution_*
指標の値に示すように、パイプラインがリクエストごとに最大 16 個のファイルをバッチ処理するためです。パイプラインをシャットダウンします。
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active) \ gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
Google Cloud コンソールで、BigQuery の [クエリエディタ] ページに移動します。
各ファイルで最も可能性の高いラベルを見つけます。
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM ( SELECT gcs_uri, description, score, ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC ) AS row_num FROM `vision_analytics.label_annotation`) WHERE row_num = 1 ORDER BY gcs_uri DESC
出力は次のとおりです。レスポンスで、
st_basils.jpeg
ファイルがランドマークである可能性が高いことがわかります。上位 10 個のラベルとそれらの最大スコアが表示されます。
SELECT description, COUNT(*) AS found, MAX(score) AS max_score FROM `vision_analytics.label_annotation` GROUP BY description ORDER BY found DESC LIMIT 10
最終的な出力は次のようになります。
人気のランドマークトップ 10 を見つける:
SELECT description, COUNT(*) AS count, MAX(score) AS max_score FROM `vision_analytics.landmark_annotation` WHERE LENGTH(description)>0 GROUP BY description ORDER BY count DESC LIMIT 10
出力は次のとおりです。レスポンスからタイムズ スクエアが最も人気のある目的地のようだとわかります。
滝がある画像を探します。
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM `vision_analytics.landmark_annotation` WHERE LOWER(description) LIKE '%fall%' ORDER BY score DESC
出力は次のとおりです。滝の画像のみが含まれている。
ローマのコロセウムから 3 km 以内のランドマークの画像を探します(
ST_GEOPOINT
関数は、コロセウムの経度と緯度を使用します)。WITH landmarksWithDistances AS ( SELECT gcs_uri, description, location, ST_DISTANCE(location, ST_GEOGPOINT(12.492231, 41.890222)) distance_in_meters, FROM `vision_analytics.landmark_annotation` landmarks CROSS JOIN UNNEST(landmarks.locations) AS location ) SELECT SPLIT(gcs_uri,"/")[OFFSET(3)] file, description, ROUND(distance_in_meters) distance_in_meters, location, CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url FROM landmarksWithDistances WHERE distance_in_meters < 3000 ORDER BY distance_in_meters LIMIT 100
出力は次のとおりです。これらの画像には、いくつかの人気目的地がいくつか表示されます。
同じ画像に、同じランドマークの位置を複数含めることができます。この機能については、Vision API のドキュメントをご覧ください。
LocationInfo
要素は複数存在することがあります。これは、ある位置で画像内のシーンの場所を示し、別の位置で画像の撮影位置を示す可能性があるためです。位置情報は通常、ランドマークのために存在しています。前のクエリを貼り付けることで、BigQuery Geo Viz でデータを可視化できます。地図上のポイントを選択すると、その詳細が表示されます。
Image_url
属性には、ブラウザで開ける画像ファイルへのリンクが含まれます。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
Google Cloud プロジェクトを削除する
課金を停止する最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
次のステップ
- スマート アナリティクスのリファレンス パターンの詳細を確認する。
- Google Cloud に関するリファレンス アーキテクチャ、図、ベスト プラクティスを確認する。Cloud アーキテクチャ センター をご覧ください。