在本教程中,您将了解如何部署 Dataflow 流水线,以使用 Cloud Vision 处理大型图片文件。Dataflow 将结果存储在 BigQuery 中,以便将其用于训练 BigQuery ML 预构建模型。
您在教程中创建的 Dataflow 流水线可以处理大量图片。该 Dataflow 流水线仅受 Vision 配额的限制。您可以根据规模需求增加 Vision 配额。
本教程适用于数据工程师和数据科学家。本教程假定您具备使用 Apache Beam 的 Java SDK、BigQuery 标准 SQL 和基本 Shell 脚本构建 Dataflow 流水线的基础知识。此外还假定您熟悉 Vision。
目标
- 使用适用于 Cloud Storage 的 Pub/Sub 通知创建图片元数据提取流水线。
- 使用 Dataflow 部署实时视觉分析流水线。
- 使用 Vision 针对一组特征类型分析图片。
- 使用 BigQuery ML 分析和训练数据。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, activate Cloud Shell.
在 Cloud Shell 中,启用 Dataflow、Container Registry 和 Vision API。
gcloud services enable dataflow.googleapis.com \ containerregistry.googleapis.com vision.googleapis.com
设置一些环境变量。(将 REGION 替换为一个可用的 Dataflow 区域。例如
us-central1
)。export PROJECT=$(gcloud config get-value project) export REGION=REGION
克隆教程的 Git 代码库:
git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
转到代码库的根文件夹:
cd dataflow-vision-analytics
参考架构
下图展示了您将在本教程中构建的系统流程。
如图所示,流程如下:
客户端将图片文件上传到 Cloud Storage 存储桶。
对于每项文件上传操作,系统会通过向 Pub/Sub 发布消息自动通知客户端。
对于每条新通知,Dataflow 流水线都会执行以下操作:
- 从 Pub/Sub 消息中读取文件元数据。
- 将每个片段发送至 Vision API 进行注释处理。
- 将所有注释存储在 BigQuery 表中以供进一步分析。
为 Cloud Storage 创建 Pub/Sub 通知
在本部分中,您将创建适用于 Cloud Storage 的 Pub/Sub 通知。此通知会发布上传到存储桶中的图片文件的元数据。Dataflow 流水线根据元数据开始处理请求。
在 Cloud Shell 中,创建 Pub/Sub 主题:
export GCS_NOTIFICATION_TOPIC="gcs-notification-topic" gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
为该主题创建 Pub/Sub 订阅:
export GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription" gcloud pubsub subscriptions create ${GCS_NOTIFICATION_SUBSCRIPTION} --topic=${GCS_NOTIFICATION_TOPIC}
创建一个存储桶以存储输入图片文件:
export IMAGE_BUCKET=${PROJECT}-images gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
为存储桶创建 Pub/Sub 通知:
gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \ -f json gs://${IMAGE_BUCKET}
现在您已配置通知,接下来系统会向您创建的主题发送 Pub/Sub 消息。每次您将文件上传到存储桶时系统都会执行此操作。
创建 BigQuery 数据集
在本部分中,您将创建 BigQuery 数据集,以用于存储由 Dataflow 流水线输出的结果。该流水线会根据视觉特征类型自动创建表。
在 Cloud Shell 中,创建一个 BigQuery 数据集:
export BIGQUERY_DATASET="vision_analytics" bq mk -d --location=US ${BIGQUERY_DATASET}
创建 Dataflow Flex 模板
在本部分中,您将创建 Apache Beam 流水线代码,然后使用 Dataflow Flex 模板将 Dataflow 流水线作为 Dataflow 作业运行。
在 Cloud Shell 中,构建 Apache Beam 流水线的代码:
gradle build
为 Dataflow 柔性模板创建 Docker 映像:
gcloud auth configure-docker gradle jib \ --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
创建 Cloud Storage 存储桶以存储 Dataflow 柔性模板:
export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config gsutil mb -c standard -l ${REGION} \ gs://${DATAFLOW_TEMPLATE_BUCKET}
将模板的 JSON 配置文件上传到该存储桶中:
cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json { "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest", "sdk_info": {"language": "JAVA"} } EOF
针对一组 Vision 特征运行 Dataflow 流水线
下表中列出的参数特定于 Dataflow 流水线。
如需查看标准 Dataflow 执行参数的完整列表,请参阅 Dataflow 文档。
参数 | 说明 |
---|---|
|
将结果输出到 BigQuery 和 Pub/Sub 的时间间隔(以秒为单位)。默认值为 5。 |
|
要在对 Vision API 的请求中包含的图片的数量。默认值为 1。您最多可以将其增加到 16 个。 |
|
接收输入 Cloud Storage 通知的 Pub/Sub 订阅的 ID。 |
|
此参数可让您提高大型数据集的处理性能。值越高,工作器之间的并行性就越高。默认值为 1。 |
|
要用于 Vision API 的项目 ID。 |
|
输出 BigQuery 数据集的引用。 |
|
图片处理特征列表。 |
|
包含各种注释的表名称的字符串参数。系统会为每个表提供默认值。 |
在 Cloud Shell 中,为 Dataflow 流水线定义作业名称:
export JOB_NAME=vision-analytics-pipeline-1
创建一个包含 Dataflow 流水线的参数的文件:
PARAMETERS=params.yaml cat << EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION datasetName: ${BIGQUERY_DATASET} EOF
运行 Dataflow 流水线来处理以下特征类型的图片:
IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION
。gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
此命令使用上表中列出的参数。
检索正在运行的 Dataflow 作业的 ID:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
显示 Dataflow 作业网页的网址:
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
在新浏览器标签页中打开显示的网址。几秒钟后,Dataflow 作业图将显示:
Dataflow 流水线现在正在运行,并等待接收来自 Pub/Sub 的输入通知。
在 Cloud Shell 中,通过将一些测试文件上传到输入存储桶来触发 Dataflow 流水线:
gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
在 Google Cloud Console 中,查看 Dataflow 中的自定义计数器(位于 Dataflow 作业的右侧面板中),并验证是否已处理所有这五张图片:
在 Cloud Shell 中,验证是否已自动创建表:
bq query "select table_name, table_type from \ ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
输出如下所示:
+----------------------+------------+ | table_name | table_type | +----------------------+------------+ | face_annotation | BASE TABLE | | label_annotation | BASE TABLE | | crop_hint_annotation | BASE TABLE | | landmark_annotation | BASE TABLE | | image_properties | BASE TABLE | +----------------------+------------+
查看
landmark_annotation
表的架构。如果已请求,则LANDMARK_DETECTION
特征会捕获从 API 调用返回的特性。bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
输出如下所示:
[ { "mode": "REQUIRED", "name": "gcs_uri", "type": "STRING" }, { "mode": "NULLABLE", "name": "mid", "type": "STRING" }, { "mode": "REQUIRED", "name": "description", "type": "STRING" }, { "mode": "REQUIRED", "name": "score", "type": "FLOAT" }, { "fields": [ { "fields": [ { "mode": "REQUIRED", "name": "x", "type": "FLOAT" }, { "mode": "REQUIRED", "name": "y", "type": "FLOAT" } ], "mode": "REPEATED", "name": "vertices", "type": "RECORD" } ], "mode": "NULLABLE", "name": "bounding_poly", "type": "RECORD" }, { "mode": "REPEATED", "name": "locations", "type": "GEOGRAPHY" }, { "mode": "REQUIRED", "name": "transaction_timestamp", "type": "TIMESTAMP" } ]
停止流水线:
gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
虽然没有更多要处理的 Pub/Sub 通知,但您创建的流处理流水线仍会运行,直到您输入此命令。
分析 Flickr30K 数据集
在本部分中,您将分析 flickr30K 数据集以进行标签和地标检测。
在 Cloud Shell 中,定义新的作业名称:
export JOB_NAME=vision-analytics-pipeline-2
更改 Dataflow 流水线参数,以便针对大型数据集优化流水线。增大
batchSize
和keyRange
以允许更高的吞吐量。Dataflow 会根据需要扩缩工作器数量:cat <<EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: LABEL_DETECTION,LANDMARK_DETECTION datasetName: ${BIGQUERY_DATASET} batchSize: "16" windowInterval: "5" keyRange: "2" EOF
运行流水线:
gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
将数据集上传到输入存储桶:
gsutil -m cp gs://df-vision-ai-test-data/* gs://${IMAGE_BUCKET}
检索正在运行的 Dataflow 作业的 ID:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
显示 Dataflow 作业网页的网址:
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
在新浏览器标签页中打开显示的网址。
在 Google Cloud Console 中,验证 Dataflow 中的自定义计数器,以确保所有文件均已得到处理。所有文件通常会在 30 分钟内完成处理。
按处理注释 (Process Annotations) 下的自定义计数器过滤。
输出如下所示:
processedFiles
指标 (31935) 匹配存储桶中已上传的图片总数(文件总数为 31936)。但是,numberOfRequests
指标 (1997) 低于已通过流水线的文件数。这种差异是因为流水线在每个请求中最多批处理 16 个文件,如batchSizeDistribution_*
指标的值所示。关停流水线:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active) \ gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
在 Google Cloud 控制台中,转到 BigQuery 查询编辑器页面。
查找每个文件最可能的标签:
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM ( SELECT gcs_uri, description, score, ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC ) AS row_num FROM `vision_analytics.label_annotation`) WHERE row_num = 1 ORDER BY gcs_uri DESC
输出如下所示。您可以从响应中看出,地标是
st_basils.jpeg
文件最可能的说明。查找前 10 个标签及其最高得分:
SELECT description, COUNT(*) AS found, MAX(score) AS max_score FROM `vision_analytics.label_annotation` GROUP BY description ORDER BY found DESC LIMIT 10
最终输出类似于以下内容:
查找前 10 个热门地标:
SELECT description, COUNT(*) AS count, MAX(score) AS max_score FROM `vision_analytics.landmark_annotation` WHERE LENGTH(description)>0 GROUP BY description ORDER BY count DESC LIMIT 10
输出如下所示。您可以从响应中看到,时代广场似乎是最热门的目的地。
查找任何包含瀑布的图片:
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM `vision_analytics.landmark_annotation` WHERE LOWER(description) LIKE '%fall%' ORDER BY score DESC
输出如下所示。其中仅包含瀑布图片。
在罗马斗兽场 3 公里的范围内查找地标图片(
ST_GEOPOINT
函数使用斗兽场的经度和纬度):WITH landmarksWithDistances AS ( SELECT gcs_uri, description, location, ST_DISTANCE(location, ST_GEOGPOINT(12.492231, 41.890222)) distance_in_meters, FROM `vision_analytics.landmark_annotation` landmarks CROSS JOIN UNNEST(landmarks.locations) AS location ) SELECT SPLIT(gcs_uri,"/")[OFFSET(3)] file, description, ROUND(distance_in_meters) distance_in_meters, location, CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url FROM landmarksWithDistances WHERE distance_in_meters < 3000 ORDER BY distance_in_meters LIMIT 100
输出如下所示。您可以看到下图显示了多个热门目的地:
同一张图片可以包含同一地标的多个位置。Vision API 文档介绍了此功能。由于一个位置可能表示图片中的场景位置,因此可能存在多个
LocationInfo
元素。另一个位置可能表示图片的拍摄位置。地标通常存在位置信息。通过粘贴上一个查询,您可以在 BigQuery Geo Viz 中直观呈现数据。在地图上选择一个点后,即可看到其详细信息。
Image_url
特性包含可以在浏览器中打开的图片文件的链接。
清除数据
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除 Google Cloud 项目
避免产生费用的最简单的方法是删除您为本教程创建的 Google Cloud 项目。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
后续步骤
- 详细了解智能分析参考模式。
- 探索有关 Google Cloud 的参考架构、图表和最佳做法。查看我们的 Cloud Architecture Center。