使用 Dataflow 和 Cloud Vision API 构建机器学习视觉分析解决方案

Last reviewed 2024-04-16 UTC

在本教程中,您将了解如何部署 Dataflow 流水线,以使用 Cloud Vision 处理大型图片文件。Dataflow 将结果存储在 BigQuery 中,以便将其用于训练 BigQuery ML 预构建模型

您在教程中创建的 Dataflow 流水线可以处理大量图片。该 Dataflow 流水线仅受 Vision 配额的限制。您可以根据规模需求增加 Vision 配额。

本教程适用于数据工程师和数据科学家。本教程假定您具备使用 Apache Beam 的 Java SDK、BigQuery 标准 SQL 和基本 Shell 脚本构建 Dataflow 流水线的基础知识。此外还假定您熟悉 Vision。

目标

  • 使用适用于 Cloud Storage 的 Pub/Sub 通知创建图片元数据提取流水线。
  • 使用 Dataflow 部署实时视觉分析流水线。
  • 使用 Vision 针对一组特征类型分析图片。
  • 使用 BigQuery ML 分析和训练数据。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. 在 Cloud Shell 中,启用 Dataflow、Container Registry 和 Vision API。

    gcloud services enable dataflow.googleapis.com \
    containerregistry.googleapis.com vision.googleapis.com
    
  5. 设置一些环境变量。(将 REGION 替换为一个可用的 Dataflow 区域。例如 us-central1)。

    export PROJECT=$(gcloud config get-value project)
    export REGION=REGION
    
  6. 克隆教程的 Git 代码库:

    git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
    
  7. 转到代码库的根文件夹:

    cd dataflow-vision-analytics
    

参考架构

下图展示了您将在本教程中构建的系统流程。

显示提取/触发、处理和存储的信息流的工作流图。

如图所示,流程如下:

  1. 客户端将图片文件上传到 Cloud Storage 存储桶。

  2. 对于每项文件上传操作,系统会通过向 Pub/Sub 发布消息自动通知客户端。

  3. 对于每条新通知,Dataflow 流水线都会执行以下操作:

    1. 从 Pub/Sub 消息中读取文件元数据。
    2. 将每个片段发送至 Vision API 进行注释处理。
    3. 将所有注释存储在 BigQuery 表中以供进一步分析。

为 Cloud Storage 创建 Pub/Sub 通知

在本部分中,您将创建适用于 Cloud Storage 的 Pub/Sub 通知。此通知会发布上传到存储桶中的图片文件的元数据。Dataflow 流水线根据元数据开始处理请求。

  1. 在 Cloud Shell 中,创建 Pub/Sub 主题:

    export GCS_NOTIFICATION_TOPIC="gcs-notification-topic"
    gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
    
  2. 为该主题创建 Pub/Sub 订阅:

    export  GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription"
    gcloud pubsub subscriptions create  ${GCS_NOTIFICATION_SUBSCRIPTION}  --topic=${GCS_NOTIFICATION_TOPIC}
    
  3. 创建一个存储桶以存储输入图片文件:

    export IMAGE_BUCKET=${PROJECT}-images
    gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
    
  4. 为存储桶创建 Pub/Sub 通知:

    gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \
      -f json gs://${IMAGE_BUCKET}
    

现在您已配置通知,接下来系统会向您创建的主题发送 Pub/Sub 消息。每次您将文件上传到存储桶时系统都会执行此操作。

创建 BigQuery 数据集

在本部分中,您将创建 BigQuery 数据集,以用于存储由 Dataflow 流水线输出的结果。该流水线会根据视觉特征类型自动创建表。

  • 在 Cloud Shell 中,创建一个 BigQuery 数据集:

    export BIGQUERY_DATASET="vision_analytics"
    bq mk -d --location=US ${BIGQUERY_DATASET}
    

创建 Dataflow Flex 模板

在本部分中,您将创建 Apache Beam 流水线代码,然后使用 Dataflow Flex 模板将 Dataflow 流水线作为 Dataflow 作业运行。

  1. 在 Cloud Shell 中,构建 Apache Beam 流水线的代码:

    gradle build
    
  2. 为 Dataflow 柔性模板创建 Docker 映像:

    gcloud auth configure-docker
    gradle jib \
      --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
    
  3. 创建 Cloud Storage 存储桶以存储 Dataflow 柔性模板:

    export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config
    gsutil mb -c standard -l ${REGION} \
      gs://${DATAFLOW_TEMPLATE_BUCKET}
    
  4. 将模板的 JSON 配置文件上传到该存储桶中:

    cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json
    {
      "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest",
      "sdk_info": {"language": "JAVA"}
    }
    EOF
    

针对一组 Vision 特征运行 Dataflow 流水线

下表中列出的参数特定于 Dataflow 流水线。

如需查看标准 Dataflow 执行参数的完整列表,请参阅 Dataflow 文档

参数 说明

windowInterval

将结果输出到 BigQuery 和 Pub/Sub 的时间间隔(以秒为单位)。默认值为 5。

batchSize

要在对 Vision API 的请求中包含的图片的数量。默认值为 1。您最多可以将其增加到 16 个

subscriberId

接收输入 Cloud Storage 通知的 Pub/Sub 订阅的 ID。

keyRange

此参数可让您提高大型数据集的处理性能。值越高,工作器之间的并行性就越高。默认值为 1。

visionApiProjectId

要用于 Vision API 的项目 ID。

datasetName

输出 BigQuery 数据集的引用。

features

图片处理特征列表。

labelAnnottationTable, landmarkAnnotationTable, logoAnnotationTable, faceAnnotationTable, imagePropertiesTable, cropHintAnnotationTable, errorLogTable

包含各种注释的表名称的字符串参数。系统会为每个表提供默认值。
  1. 在 Cloud Shell 中,为 Dataflow 流水线定义作业名称:

    export JOB_NAME=vision-analytics-pipeline-1
    
  2. 创建一个包含 Dataflow 流水线的参数的文件:

    PARAMETERS=params.yaml
    cat << EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION
      datasetName: ${BIGQUERY_DATASET}
    EOF
    
  3. 运行 Dataflow 流水线来处理以下特征类型的图片:IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    

    此命令使用上表中列出的参数。

  4. 检索正在运行的 Dataflow 作业的 ID:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
    
  5. 显示 Dataflow 作业网页的网址:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  6. 在新浏览器标签页中打开显示的网址。几秒钟后,Dataflow 作业图将显示:

    Dataflow 作业的工作流图。

    Dataflow 流水线现在正在运行,并等待接收来自 Pub/Sub 的输入通知。

  7. 在 Cloud Shell 中,通过将一些测试文件上传到输入存储桶来触发 Dataflow 流水线:

    gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
    
  8. 在 Google Cloud Console 中,查看 Dataflow 中的自定义计数器(位于 Dataflow 作业的右侧面板中),并验证是否已处理所有这五张图片:

    从文件上传返回的图片列表。

  9. 在 Cloud Shell 中,验证是否已自动创建表:

    bq query "select table_name, table_type from \
    ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
    

    输出如下所示:

    +----------------------+------------+
    |      table_name      | table_type |
    +----------------------+------------+
    | face_annotation      | BASE TABLE |
    | label_annotation     | BASE TABLE |
    | crop_hint_annotation | BASE TABLE |
    | landmark_annotation  | BASE TABLE |
    | image_properties     | BASE TABLE |
    +----------------------+------------+
    
  10. 查看 landmark_annotation 表的架构。如果已请求,则 LANDMARK_DETECTION 特征会捕获从 API 调用返回的特性

    bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
    

    输出如下所示:

    [
      {
        "mode": "REQUIRED",
        "name": "gcs_uri",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "mid",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "description",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "score",
        "type": "FLOAT"
      },
      {
        "fields": [
          {
            "fields": [
              {
                "mode": "REQUIRED",
                "name": "x",
                "type": "FLOAT"
              },
              {
                "mode": "REQUIRED",
                "name": "y",
                "type": "FLOAT"
              }
            ],
            "mode": "REPEATED",
            "name": "vertices",
            "type": "RECORD"
          }
        ],
        "mode": "NULLABLE",
        "name": "bounding_poly",
        "type": "RECORD"
      },
      {
        "mode": "REPEATED",
        "name": "locations",
        "type": "GEOGRAPHY"
      },
      {
        "mode": "REQUIRED",
        "name": "transaction_timestamp",
        "type": "TIMESTAMP"
      }
    ]
    
  11. 停止流水线:

    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    

    虽然没有更多要处理的 Pub/Sub 通知,但您创建的流处理流水线仍会运行,直到您输入此命令。

分析 Flickr30K 数据集

在本部分中,您将分析 flickr30K 数据集以进行标签和地标检测。

  1. 在 Cloud Shell 中,定义新的作业名称:

    export JOB_NAME=vision-analytics-pipeline-2
    
  2. 更改 Dataflow 流水线参数,以便针对大型数据集优化流水线。增大 batchSizekeyRange 以允许更高的吞吐量。Dataflow 会根据需要扩缩工作器数量:

    cat <<EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: LABEL_DETECTION,LANDMARK_DETECTION
      datasetName: ${BIGQUERY_DATASET}
      batchSize: "16"
      windowInterval: "5"
      keyRange: "2"
    EOF
    
  3. 运行流水线:

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    
  4. 将数据集上传到输入存储桶:

    gsutil -m  cp gs://df-vision-ai-test-data/*  gs://${IMAGE_BUCKET}
    
  5. 检索正在运行的 Dataflow 作业的 ID:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
    
  6. 显示 Dataflow 作业网页的网址:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  7. 在新浏览器标签页中打开显示的网址。

  8. 在 Google Cloud Console 中,验证 Dataflow 中的自定义计数器,以确保所有文件均已得到处理。所有文件通常会在 30 分钟内完成处理。

  9. 处理注释 (Process Annotations) 下的自定义计数器过滤。

    输出如下所示:

    按自定义计数器过滤后返回的计数器列表。显示计数器名称、值和步骤。

    processedFiles 指标 (31935) 匹配存储桶中已上传的图片总数(文件总数为 31936)。但是,numberOfRequests 指标 (1997) 低于已通过流水线的文件数。这种差异是因为流水线在每个请求中最多批处理 16 个文件,如 batchSizeDistribution_* 指标的值所示。

  10. 关停流水线:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}"
    --region ${REGION}
    --format "value(id)"
    --status active) \
    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    
  11. 在 Google Cloud 控制台中,转到 BigQuery 查询编辑器页面。

    转到查询编辑器

  12. 查找每个文件最可能的标签:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM (
      SELECT
        gcs_uri,
        description,
        score,
        ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC )
    AS row_num
      FROM
         `vision_analytics.label_annotation`)
    WHERE
      row_num = 1
    ORDER BY
      gcs_uri DESC
    

    输出如下所示。您可以从响应中看出,地标st_basils.jpeg 文件最可能的说明。

    图片文件名、说明和得分列表。

  13. 查找前 10 个标签及其最高得分:

    SELECT
      description,
      COUNT(*) AS found,
      MAX(score) AS max_score
    FROM
      `vision_analytics.label_annotation`
    GROUP BY
      description
    ORDER BY
      found DESC
    LIMIT 10
    

    最终输出类似于以下内容:

    找到的前 10 个标签的列表。其中包括说明、发现的次数和最高得分。

  14. 查找前 10 个热门地标:

    SELECT
      description,
      COUNT(*) AS count,
      MAX(score) AS max_score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LENGTH(description)>0
    GROUP BY
      description
    ORDER BY
      count DESC
    LIMIT 10
    

    输出如下所示。您可以从响应中看到,时代广场似乎是最热门的目的地。

    查询返回的前 10 个最热门地标列表。包括说明、计数和最高得分。

  15. 查找任何包含瀑布的图片:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LOWER(description) LIKE '%fall%'
    ORDER BY score DESC
    

    输出如下所示。其中仅包含瀑布图片。

    瀑布列表。包括文件名、说明和得分。

  16. 在罗马斗兽场 3 公里的范围内查找地标图片(ST_GEOPOINT 函数使用斗兽场的经度和纬度):

    WITH
      landmarksWithDistances AS (
      SELECT
        gcs_uri,
        description,
        location,
        ST_DISTANCE(location,
          ST_GEOGPOINT(12.492231,
            41.890222)) distance_in_meters,
      FROM
        `vision_analytics.landmark_annotation` landmarks
      CROSS JOIN
        UNNEST(landmarks.locations) AS location )
    SELECT
      SPLIT(gcs_uri,"/")[OFFSET(3)] file,
      description,
        ROUND(distance_in_meters) distance_in_meters,
      location,
      CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url
    FROM
      landmarksWithDistances
    WHERE
      distance_in_meters < 3000
    ORDER BY
      distance_in_meters
    LIMIT
      100
    

    输出如下所示。您可以看到下图显示了多个热门目的地:

    罗马斗兽场 3 公里范围内的所有图片列表。包括文件名、说明、距离斗兽场的距离(以米为单位)以及位置。

    同一张图片可以包含同一地标的多个位置。Vision API 文档介绍了此功能。由于一个位置可能表示图片中的场景位置,因此可能存在多个 LocationInfo 元素。另一个位置可能表示图片的拍摄位置。地标通常存在位置信息。

    通过粘贴上一个查询,您可以在 BigQuery Geo Viz 中直观呈现数据。在地图上选择一个点后,即可看到其详细信息。Image_url 特性包含可以在浏览器中打开的图片文件的链接。

    位置及其与斗兽场的距离地图。

清除数据

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除 Google Cloud 项目

避免产生费用的最简单的方法是删除您为本教程创建的 Google Cloud 项目。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

后续步骤