In this tutorial, you'll learn how to deploy a Dataflow pipeline to process large-scale image files with Cloud Vision. Dataflow stores the results in BigQuery so that you can use them to train BigQuery ML pre-built models.
The Dataflow pipeline you create in the tutorial can handle images in large quantities. It's only limited by your Vision quota. You can increase your Vision quota based on your scale requirements.
The tutorial is intended for data engineers and data scientists. It assumes you have basic knowledge of building Dataflow pipelines using Apache Beam's Java SDK, BigQuery Standard SQL, and basic shell scripting. It also assumes that you are familiar with Vision.
Objectives
- Create an image metadata ingestion pipeline with Pub/Sub notifications for Cloud Storage.
- Use Dataflow to deploy a real-time vision analytics pipeline.
- Use Vision to analyze images for a set of feature types.
- Analyze and train data with BigQuery ML.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.
Before you begin
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, activate Cloud Shell.
In Cloud Shell, enable the Dataflow, Container Registry, and Vision APIs.
gcloud services enable dataflow.googleapis.com \ containerregistry.googleapis.com vision.googleapis.com
Set some environment variables. (Replace REGION with one of the available Dataflow regions.
us-central1
, for example).export PROJECT=$(gcloud config get-value project) export REGION=REGION
Clone the tutorial's Git repository:
git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
Go to the repository's root folder:
cd dataflow-vision-analytics
Reference architecture
The following diagram illustrates the flow of the system that you build in this tutorial.
As shown in the diagram, the flow is as follows:
Clients upload image files to a Cloud Storage bucket.
For each file upload, the system automatically notifies the client by publishing a message to Pub/Sub.
For each new notification, the Dataflow pipeline does the following:
- Reads file metadata from the Pub/Sub message.
- Sends each segment to Vision API for annotation processing.
- Stores all annotations in a BigQuery table for further analysis.
Creating a Pub/Sub notification for Cloud Storage
In this section, you create a Pub/Sub notification for Cloud Storage. This notification publishes metadata for the image file that is uploaded in the bucket. Based on the metadata, the Dataflow pipeline starts processing the request.
In Cloud Shell, create a Pub/Sub topic:
export GCS_NOTIFICATION_TOPIC="gcs-notification-topic" gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
Create a Pub/Sub subscription for the topic:
export GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription" gcloud pubsub subscriptions create ${GCS_NOTIFICATION_SUBSCRIPTION} --topic=${GCS_NOTIFICATION_TOPIC}
Create a bucket to store the input image files:
export IMAGE_BUCKET=${PROJECT}-images gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
Create a Pub/Sub notification for the bucket:
gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \ -f json gs://${IMAGE_BUCKET}
Now that you have configured notifications, the system sends a Pub/Sub message to the topic that you created. This action occurs every time you upload a file to the bucket.
Creating a BigQuery dataset
In this section, you create a BigQuery dataset to store the results output by the Dataflow pipeline. The pipeline automatically creates tables based on vision feature types.
In Cloud Shell, create a BigQuery dataset:
export BIGQUERY_DATASET="vision_analytics" bq mk -d --location=US ${BIGQUERY_DATASET}
Creating a Dataflow Flex Template
In this section, you create Apache Beam pipeline code and then run the Dataflow pipeline as a Dataflow job using a Dataflow Flex Template.
In Cloud Shell, build the Apache Beam pipeline's code:
gradle build
Create a Docker image for the Dataflow Flex Template:
gcloud auth configure-docker gradle jib \ --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
Create a Cloud Storage bucket to store the Dataflow Flex Template:
export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config gsutil mb -c standard -l ${REGION} \ gs://${DATAFLOW_TEMPLATE_BUCKET}
Upload the template's JSON configuration file to the bucket:
cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json { "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest", "sdk_info": {"language": "JAVA"} } EOF
Running the Dataflow pipeline for a set of Vision features
The parameters listed in the following table are specific to this Dataflow pipeline.
Refer to the Dataflow documentation for the complete list of standard Dataflow execution parameters.
Parameter | Description |
---|---|
|
The window time interval (in seconds) for outputting results to BigQuery and Pub/Sub. The default is 5. |
|
The number of images to include in a request to the Vision API. The default is 1. You can increase it to a maximum of 16. |
|
The ID of the Pub/Sub subscription that receives input Cloud Storage notifications. |
|
The parameter that enables you to improve processing performance for large data sets. A higher value means increased parallelism among workers. The default is 1. |
|
The project ID to use for the Vision API. |
|
The reference of the output BigQuery dataset. |
|
A list of image-processing features. |
|
String parameters with table names for various annotations. The default values are provided for each table. |
In Cloud Shell, define a job name for the Dataflow pipeline:
export JOB_NAME=vision-analytics-pipeline-1
Create a file with parameters for the Dataflow pipeline:
PARAMETERS=params.yaml cat << EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION datasetName: ${BIGQUERY_DATASET} EOF
Run the Dataflow pipeline to process images for these feature types:
IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION
.gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
This command uses the parameters listed in the preceding table.
Retrieve the ID of the running Dataflow job:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
Display the URL of the Dataflow job's web page:
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
Open the displayed URL in a new browser tab. After a few seconds, the graph for the Dataflow job appears:
The Dataflow pipeline is now running and waiting to receive input notifications from Pub/Sub.
In Cloud Shell, trigger the Dataflow pipeline by uploading some test files into the input bucket:
gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
In the Google Cloud console, review the custom counters in the Dataflow (in the right panel of the Dataflow job) and verify that it processed all five images:
In Cloud Shell, validate that the tables were automatically created:
bq query "select table_name, table_type from \ ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
The output is as follows:
+----------------------+------------+ | table_name | table_type | +----------------------+------------+ | face_annotation | BASE TABLE | | label_annotation | BASE TABLE | | crop_hint_annotation | BASE TABLE | | landmark_annotation | BASE TABLE | | image_properties | BASE TABLE | +----------------------+------------+
View the schema for the
landmark_annotation
table. If requested, theLANDMARK_DETECTION
feature captures the attributes returned from the API call.bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
The output is as follows:
[ { "mode": "REQUIRED", "name": "gcs_uri", "type": "STRING" }, { "mode": "NULLABLE", "name": "mid", "type": "STRING" }, { "mode": "REQUIRED", "name": "description", "type": "STRING" }, { "mode": "REQUIRED", "name": "score", "type": "FLOAT" }, { "fields": [ { "fields": [ { "mode": "REQUIRED", "name": "x", "type": "FLOAT" }, { "mode": "REQUIRED", "name": "y", "type": "FLOAT" } ], "mode": "REPEATED", "name": "vertices", "type": "RECORD" } ], "mode": "NULLABLE", "name": "bounding_poly", "type": "RECORD" }, { "mode": "REPEATED", "name": "locations", "type": "GEOGRAPHY" }, { "mode": "REQUIRED", "name": "transaction_timestamp", "type": "TIMESTAMP" } ]
Stop the pipeline:
gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
Although there are no more Pub/Sub notifications to process, the streaming pipeline you created continues to run until you enter this command.
Analyzing a Flickr30K dataset
In this section, you analyze a flickr30K dataset for label and landmark detection.
In Cloud Shell, define a new job name:
export JOB_NAME=vision-analytics-pipeline-2
Change the Dataflow pipeline parameters so that it's optimized for a large dataset. The
batchSize
andkeyRange
are increased to allow higher throughput. Dataflow will scale the number of workers as needed:cat <<EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: LABEL_DETECTION,LANDMARK_DETECTION datasetName: ${BIGQUERY_DATASET} batchSize: "16" windowInterval: "5" keyRange: "2" EOF
Run the pipeline:
gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
Upload the dataset into an input bucket:
gsutil -m cp gs://df-vision-ai-test-data/* gs://${IMAGE_BUCKET}
Retrieve the ID of the running Dataflow job:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
Display the URL of the Dataflow job's web page:
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
Open the displayed URL in a new browser tab.
In the Google Cloud console, validate the custom counters in the Dataflow to ensure that all files are processed. All the files normally process in less than 30 minutes.
Filter by custom counters under Process Annotations.
The output is as follows:
The
processedFiles
metric (31,935) matches the total number of images that were uploaded in the bucket (total file count is 31,936). However, thenumberOfRequests
metric (1,997) is lower than the number of files that went through the pipeline. This difference is because the pipeline batches up to 16 files per request, as shown in the values of thebatchSizeDistribution_*
metrics.Shut down the pipeline:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active) \ gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
In the Google Cloud console, go to the BigQuery Query editor page.
Find the most likely label for each file:
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM ( SELECT gcs_uri, description, score, ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC ) AS row_num FROM `vision_analytics.label_annotation`) WHERE row_num = 1 ORDER BY gcs_uri DESC
The output is as follows. You can see from the response that Landmark is the most likely description for the
st_basils.jpeg
file.Find the top 10 labels and their max scores:
SELECT description, COUNT(*) AS found, MAX(score) AS max_score FROM `vision_analytics.label_annotation` GROUP BY description ORDER BY found DESC LIMIT 10
The final output looks similar to the following:
Find the top 10 popular landmarks:
SELECT description, COUNT(*) AS count, MAX(score) AS max_score FROM `vision_analytics.landmark_annotation` WHERE LENGTH(description)>0 GROUP BY description ORDER BY count DESC LIMIT 10
The output is as follows. You can see from the response that Times Square appears to be the most popular destination.
Find any image that has a waterfall:
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM `vision_analytics.landmark_annotation` WHERE LOWER(description) LIKE '%fall%' ORDER BY score DESC
The output is as follows. It only contains images of waterfalls.
Find an image of a landmark within 3 kilometers of the Colosseum in Rome (the
ST_GEOPOINT
function uses the Colosseum's longitude and latitude):WITH landmarksWithDistances AS ( SELECT gcs_uri, description, location, ST_DISTANCE(location, ST_GEOGPOINT(12.492231, 41.890222)) distance_in_meters, FROM `vision_analytics.landmark_annotation` landmarks CROSS JOIN UNNEST(landmarks.locations) AS location ) SELECT SPLIT(gcs_uri,"/")[OFFSET(3)] file, description, ROUND(distance_in_meters) distance_in_meters, location, CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url FROM landmarksWithDistances WHERE distance_in_meters < 3000 ORDER BY distance_in_meters LIMIT 100
The output is as follows. You can see that several popular destinations appear in these images:
The same image can contain multiple locations of the same landmark. This functionality is described in the Vision API documentation. Because one location can indicate the location of the scene in the image, multiple
LocationInfo
elements can be present. Another location can indicate where the image was taken. Location information is usually present for landmarks.You can visualize the data in BigQuery Geo Viz by pasting in the previous query. When you select a point on the map, you see its details. The
Image_url
attribute contains the link to the image file that you can open in a browser.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the Google Cloud project
The easiest way to eliminate billing is to delete the Google Cloud project you created for the tutorial.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
What's next
- Learn more about smart analytics reference patterns.
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.