使用 Dataflow 和 Cloud Vision API 部署机器学习视觉分析解决方案

Last reviewed 2024-05-16 UTC

本部署文档介绍了如何部署 Dataflow 流水线,以使用 Cloud Vision API 处理大规模图片文件。此流水线会将处理过的文件的结果存储在 BigQuery 中。您可以将这些文件用于分析目的或训练 BigQuery ML 模型

您在此部署中创建的 Dataflow 流水线每天可处理数百万张图片。唯一的限制是您的 Vision API 配额。您可以根据扩容需求增加 Vision API 配额。

以下说明适用于数据工程师和数据科学家。本文档假定您具备使用 Apache Beam 的 Java SDK、GoogleSQL for BigQuery 和基本 Shell 脚本构建 Dataflow 流水线的基础知识。此外还假定您熟悉 Vision API。

架构

下图展示了构建机器学习视觉分析解决方案的系统流程。

展示用于注入、触发、处理以及存储和分析流程的信息流的架构。

在上图中,信息按以下方式流经架构:

  1. 客户端将图片文件上传到 Cloud Storage 存储桶。
  2. Cloud Storage 向 Pub/Sub 发送有关数据上传的消息。
  3. Pub/Sub 向 Dataflow 发出有关上传的通知。
  4. Dataflow 流水线将图片发送到 Vision API。
  5. Vision API 处理图片,然后返回注解。
  6. 该流水线将带注解的文件发送到 BigQuery,以供您进行分析。

目标

  • 创建 Apache Beam 流水线,以对 Cloud Storage 中加载的图片进行图片分析。
  • 使用 Dataflow Runner v2 以流处理模式运行 Apache Beam 流水线,以便在图片上传后立即对其进行分析。
  • 使用 Vision API 针对一组特征类型分析图片。
  • 使用 BigQuery 分析注解。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成示例应用的构建后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  7. 克隆包含 Dataflow 流水线的源代码的 GitHub 代码库:
        git clone
        https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
        
  8. 找到该代码库的根文件夹:
        cd dataflow-vision-analytics
        
  9. 按照 GitHub 中 dataflow-vision-analytics 代码库的使用入门部分的说明来完成以下任务:
    • 启用多个 API。
    • 创建 Cloud Storage 存储分区。
    • 创建 Pub/Sub 主题和订阅。
    • 创建 BigQuery 数据集。
    • 为此部署设置多个环境变量。

对所有实现的 Vision API 特征运行 Dataflow 流水线

Dataflow 流水线会请求并处理带注解的文件中一组特定的 Vision API 特征和属性。

下表中列出的参数特定于此部署中的 Dataflow 流水线。如需查看标准 Dataflow 执行参数的完整列表,请参阅设置 Dataflow 流水线选项

参数名称 说明

batchSize

要在对 Vision API 的请求中包含的图片的数量。默认值为 1。您最多可以将此值增加到 16

datasetName

输出 BigQuery 数据集的名称。

features

图片处理特征列表。 该流水线支持标签、位置标记、徽标、人脸、剪裁提示和图片属性特征。

keyRange

此参数用于定义对 Vision API 的并行调用次数上限。默认值为 1。

labelAnnottationTable,
landmarkAnnotationTable,
logoAnnotationTable,
faceAnnotationTable,
imagePropertiesTable,
cropHintAnnotationTable,
errorLogTable

包含各种注释的表名称的字符串参数。系统会为每个表提供默认值,例如 label_annotation

maxBatchCompletionDurationInSecs

当图片批次不完整时,处理图片之前等待的时长。默认值为 30 秒。

subscriberId

接收输入 Cloud Storage 通知的 Pub/Sub 订阅的 ID。

visionApiProjectId

要用于 Vision API 的项目 ID。
  1. 在 Cloud Shell 中,运行以下命令,针对 Dataflow 数据流支持的所有特征类型处理图片:

    ./gradlew run --args=" \
    --jobName=test-vision-analytics \
      --streaming \
      --runner=DataflowRunner \
      --enableStreamingEngine \
      --diskSizeGb=30 \
      --project=${PROJECT} \
      --datasetName=${BIGQUERY_DATASET} \
      --subscriberId=projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} \
      --visionApiProjectId=${PROJECT} \
      --features=IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION"
    

    专用服务账号需要拥有包含图片的存储桶的读取权限。换句话说,该账号必须具有针对该存储桶授予的 roles/storage.objectViewer 角色。

    如需详细了解如何使用专用服务账号,请参阅 Dataflow 安全性和权限

  2. 在新的浏览器标签页中打开显示的网址,或进入 Dataflow 作业页面,然后选择 test-vision-analytics 流水线。

    几秒钟后,Dataflow 作业图会显示:

    Dataflow 作业的工作流图。

    Dataflow 流水线现在正在运行,并等待接收来自 Pub/Sub 订阅的输入通知。

  3. 通过将六个示例文件上传到输入存储桶来触发 Dataflow 图片处理:

    gcloud storage cp data-sample/* gs://${IMAGE_BUCKET}
    
  4. 在 Google Cloud 控制台中,找到“自定义计数器”面板,然后使用该面板查看 Dataflow 中的自定义计数器,并验证 Dataflow 是否已处理所有六张图片。您可以使用面板的过滤功能找到正确的指标。如需仅显示以 numberOf 前缀开头的计数器,请在过滤条件中输入 numberOf

    经过过滤的计数器列表,仅显示以“numberof”开头的计数器。

  5. 在 Cloud Shell 中,验证是否已自动创建表:

    bq query --nouse_legacy_sql "SELECT table_name FROM ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES ORDER BY table_name"
    

    输出如下所示:

    +----------------------+
    |      table_name      |
    +----------------------+
    | crop_hint_annotation |
    | face_annotation      |
    | image_properties     |
    | label_annotation     |
    | landmark_annotation  |
    | logo_annotation      |
    +----------------------+
    
  6. 查看 landmark_annotation 表的架构。LANDMARK_DETECTION 特征会捕获从 API 调用返回的属性

    bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
    

    输出如下所示:

    [
       {
          "name":"gcs_uri",
          "type":"STRING"
       },
       {
          "name":"feature_type",
          "type":"STRING"
       },
       {
          "name":"transaction_timestamp",
          "type":"STRING"
       },
       {
          "name":"mid",
          "type":"STRING"
       },
       {
          "name":"description",
          "type":"STRING"
       },
       {
          "name":"score",
          "type":"FLOAT"
       },
       {
          "fields":[
             {
                "fields":[
                   {
                      "name":"x",
                      "type":"INTEGER"
                   },
                   {
                  "name":"y",
                  "type":"INTEGER"
               }
            ],
            "mode":"REPEATED",
            "name":"vertices",
            "type":"RECORD"
         }
      ],
      "name":"boundingPoly",
      "type":"RECORD"
    },
    {
      "fields":[
         {
            "fields":[
               {
                  "name":"latitude",
                  "type":"FLOAT"
               },
               {
                  "name":"longitude",
                  "type":"FLOAT"
               }
            ],
                "name":"latLon",
                "type":"RECORD"
              }
            ],
          "mode":"REPEATED",
          "name":"locations",
          "type":"RECORD"
       }
    ]
    
  7. 运行以下 bq query 命令,查看 API 生成的注解数据,以查看在这六张图片中发现的所有位置标记,并按最可能的得分对其进行排序:

    bq query --nouse_legacy_sql "SELECT SPLIT(gcs_uri, '/')[OFFSET(3)] file_name, description, score, locations FROM ${BIGQUERY_DATASET}.landmark_annotation ORDER BY score DESC"
    

    输出类似于以下内容:

    +------------------+-------------------+------------+---------------------------------+
    |    file_name     |    description    |   score    |            locations            |
    +------------------+-------------------+------------+---------------------------------+
    | eiffel_tower.jpg | Eiffel Tower      |  0.7251996 | ["POINT(2.2944813 48.8583701)"] |
    | eiffel_tower.jpg | Trocadéro Gardens | 0.69601923 | ["POINT(2.2892823 48.8615963)"] |
    | eiffel_tower.jpg | Champ De Mars     |  0.6800974 | ["POINT(2.2986304 48.8556475)"] |
    +------------------+-------------------+------------+---------------------------------+
    

    如需详细了解特定于注解的所有列,请参阅 AnnotateImageResponse

  8. 如需停止流处理流水线,请运行以下命令。即使没有更多的 Pub/Sub 通知要处理,流水线仍会继续运行。

      gcloud dataflow jobs cancel
        --region ${REGION} $(gcloud dataflow jobs list
        --region ${REGION} --filter="NAME:test-vision-analytics AND STATE:Running"
        --format="get(JOB_ID)")
    

    以下部分包含更多分析图片不同图片特征的示例查询。

分析 Flickr30K 数据集

在本部分中,您将检测 Kaggle 上托管的公开 Flickr30k 图片数据集中的标签和位置标记。

  1. 在 Cloud Shell 中,更改 Dataflow 流水线参数,以便针对大型数据集优化流水线。如需提高吞吐量,请同时增加 batchSizekeyRange 值。Dataflow 会根据需要扩缩工作器数量:

    ./gradlew run --args=" \
      --runner=DataflowRunner \
      --jobName=vision-analytics-flickr \
      --streaming \
      --enableStreamingEngine \
      --diskSizeGb=30 \
      --autoscalingAlgorithm=THROUGHPUT_BASED \
      --maxNumWorkers=5 \
      --project=${PROJECT} \
      --region=${REGION} \
      --subscriberId=projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} \
      --visionApiProjectId=${PROJECT} \
      --features=LABEL_DETECTION,LANDMARK_DETECTION \
      --datasetName=${BIGQUERY_DATASET} \
      --batchSize=16 \
      --keyRange=5"
    

    由于数据集很大,因此您无法使用 Cloud Shell 从 Kaggle 检索图片并将其发送到 Cloud Storage 存储桶。您必须使用具有较大磁盘大小的虚拟机执行此操作。

  2. 如需检索基于 Kaggle 的图片并将其发送到 Cloud Storage 存储桶,请按照 GitHub 代码库中的模拟上传到存储桶的图片部分的说明操作。

  3. 如需通过查看 Dataflow 界面中可用的自定义指标来观察复制过程的进度,请进入 Dataflow 作业页面,然后选择 vision-analytics-flickr 流水线。客户计数器应定期更改,直到 Dataflow 流水线处理所有文件。

    输出类似于“自定义计数器”面板的以下屏幕截图。数据集中的一个文件的类型错误,并且 rejectedFiles 计数器反映了这一点。这些计数器值为近似值。您可能会看到更高的数字。此外,由于 Vision API 的处理准确性提高,注解数量很可能会发生变化。

    与处理基于 Kaggle 的图片关联的计数器列表。

    如需确定是接近还是超过可用资源,请参阅 Vision API 配额页面。

    在我们的示例中,Dataflow 流水线仅使用了其配额的大约 50%。根据您使用的配额百分比,您可以通过增加 keyRange 参数的值来决定提高流水线的并行性。

  4. 关停流水线:

    gcloud dataflow jobs list --region $REGION --filter="NAME:vision-analytics-flickr AND STATE:Running" --format="get(JOB_ID)"
    

在 BigQuery 中分析注解

在此部署中,您已经处理了超过 3 万张图片进行标签和位置标记注解。在本部分中,您将收集有关这些文件的统计信息。您可以在 GoogleSQL for BigQuery 工作区中运行这些查询,也可以使用 bq 命令行工具。

请注意,您看到的数字可能与此部署中的示例查询结果不同。Vision API 会不断提高分析的准确性;在您最初测试解决方案后,Vision API 可以通过分析同一张图片来生成更丰富的结果。

  1. 在 Google Cloud 控制台中,进入 BigQuery 查询编辑器页面,然后运行以下命令查看数据集中的前 20 个标签:

    转到查询编辑器

    SELECT  description, count(*)ascount \
      FROM vision_analytics.label_annotation
      GROUP BY description ORDER BY count DESC LIMIT 20
    

    输出类似于以下内容:

    +------------------+-------+
    |   description    | count |
    +------------------+-------+
    | Leisure          |  7663 |
    | Plant            |  6858 |
    | Event            |  6044 |
    | Sky              |  6016 |
    | Tree             |  5610 |
    | Fun              |  5008 |
    | Grass            |  4279 |
    | Recreation       |  4176 |
    | Shorts           |  3765 |
    | Happy            |  3494 |
    | Wheel            |  3372 |
    | Tire             |  3371 |
    | Water            |  3344 |
    | Vehicle          |  3068 |
    | People in nature |  2962 |
    | Gesture          |  2909 |
    | Sports equipment |  2861 |
    | Building         |  2824 |
    | T-shirt          |  2728 |
    | Wood             |  2606 |
    +------------------+-------+
    
  2. 确定具有特定标签的图片上存在哪些其他标签,按频率排序:

    DECLARE label STRING DEFAULT 'Plucked string instruments';
    
    WITH other_labels AS (
       SELECT description, COUNT(*) count
    FROM vision_analytics.label_annotation
    WHERE gcs_uri IN (
        SELECT gcs_uri FROM vision_analytics.label_annotation WHERE description = label )
      AND description != label
    GROUP BY description)
    SELECT description, count, RANK() OVER (ORDER BY count DESC) rank
    FROM other_labels ORDER BY rank LIMIT 20;
    

    输出如下所示。对于上述命令中使用的 Papped string instrumentation 标签,您应该会看到:

    +------------------------------+-------+------+
    |         description          | count | rank |
    +------------------------------+-------+------+
    | String instrument            |   397 |    1 |
    | Musical instrument           |   236 |    2 |
    | Musician                     |   207 |    3 |
    | Guitar                       |   168 |    4 |
    | Guitar accessory             |   135 |    5 |
    | String instrument accessory  |    99 |    6 |
    | Music                        |    88 |    7 |
    | Musical instrument accessory |    72 |    8 |
    | Guitarist                    |    72 |    8 |
    | Microphone                   |    52 |   10 |
    | Folk instrument              |    44 |   11 |
    | Violin family                |    28 |   12 |
    | Hat                          |    23 |   13 |
    | Entertainment                |    22 |   14 |
    | Band plays                   |    21 |   15 |
    | Jeans                        |    17 |   16 |
    | Plant                        |    16 |   17 |
    | Public address system        |    16 |   17 |
    | Artist                       |    16 |   17 |
    | Leisure                      |    14 |   20 |
    +------------------------------+-------+------+
    
  3. 查看检测到的前 10 个位置标记:

      SELECT description, COUNT(description) AS count
      FROM vision_analytics.landmark_annotation
      GROUP BY description ORDER BY count DESC LIMIT 10
    

    输出如下所示:

      +--------------------+-------+
      |    description     | count |
      +--------------------+-------+
      | Times Square       |    55 |
      | Rockefeller Center |    21 |
      | St. Mark's Square  |    16 |
      | Bryant Park        |    13 |
      | Millennium Park    |    13 |
      | Ponte Vecchio      |    13 |
      | Tuileries Garden   |    13 |
      | Central Park       |    12 |
      | Starbucks          |    12 |
      | National Mall      |    11 |
      +--------------------+-------+
      

  4. 确定最有可能包含瀑布的图片:

    SELECT SPLIT(gcs_uri, '/')[OFFSET(3)] file_name, description, score
    FROM vision_analytics.landmark_annotation
    WHERE LOWER(description) LIKE '%fall%'
    ORDER BY score DESC LIMIT 10
    

    输出如下所示:

    +----------------+----------------------------+-----------+
    |   file_name    |        description         |   score    |
    +----------------+----------------------------+-----------+
    | 895502702.jpg  | Waterfall Carispaccha      |  0.6181358 |
    | 3639105305.jpg | Sahalie Falls Viewpoint    | 0.44379658 |
    | 3672309620.jpg | Gullfoss Falls             | 0.41680416 |
    | 2452686995.jpg | Wahclella Falls            | 0.39005348 |
    | 2452686995.jpg | Wahclella Falls            |  0.3792498 |
    | 3484649669.jpg | Kodiveri Waterfalls        | 0.35024035 |
    | 539801139.jpg  | Mallela Thirtham Waterfall | 0.29260656 |
    | 3639105305.jpg | Sahalie Falls              |  0.2807213 |
    | 3050114829.jpg | Kawasan Falls              | 0.27511594 |
    | 4707103760.jpg | Niagara Falls              | 0.18691841 |
    +----------------+----------------------------+-----------+
    
  5. 在罗马斗兽场 3 公里的范围内查找位置标记图片(ST_GEOPOINT 函数使用斗兽场的经度和纬度):

    WITH
      landmarksWithDistances AS (
      SELECT
        gcs_uri,
        description,
        location,
        ST_DISTANCE(location,
          ST_GEOGPOINT(12.492231,
            41.890222)) distance_in_meters,
      FROM
        `vision_analytics.landmark_annotation` landmarks
      CROSS JOIN
        UNNEST(landmarks.locations) AS location )
    SELECT
      SPLIT(gcs_uri,"/")[OFFSET(3)] file,
      description,
        ROUND(distance_in_meters) distance_in_meters,
      location,
      CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url
    FROM
      landmarksWithDistances
    WHERE
      distance_in_meters < 3000
    ORDER BY
      distance_in_meters
    LIMIT
      100
    

    运行查询时,您会看到多张斗兽场图片,但也包括了君士坦丁拱门、帕拉蒂诺山和许多其他经常拍摄的地点的图片。

    通过粘贴上一个查询,您可以在 BigQuery Geo Viz 中直观呈现数据。在地图上选择一个点即可查看其详细信息。Image_url 属性包含图片文件的链接。

    位置及其与斗兽场的距离地图。

关于查询结果,需要注意一点。位置标记通常存在位置信息。同一张图片可以包含同一地标的多个位置。 AnnotateImageResponse 类型中介绍了此功能。

由于一个位置可能表示图片中的场景位置,因此可能存在多个 LocationInfo 元素。另一个位置可能表示图片的拍摄位置。

清理

为避免因本指南中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除 Google Cloud 项目

避免产生费用的最简单的方法是删除您为本教程创建的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

如果您决定逐个删除资源,请按照 GitHub 代码库的清理部分中的步骤操作。

后续步骤

  • 如需查看更多参考架构、图表和最佳实践,请浏览云架构中心

贡献者

作者:

其他贡献者:

如需查看非公开的 LinkedIn 个人资料,请登录 LinkedIn。