使用 Dataflow 和 Cloud Vision API 构建机器学习视觉分析解决方案

Last reviewed 2024-04-16 UTC

在本教程中,您将了解如何部署 Dataflow 流水线,以使用 Cloud Vision 处理大型图片文件。Dataflow 将结果存储在 BigQuery 中,以便将其用于训练 BigQuery ML 预构建模型

您在教程中创建的 Dataflow 流水线可以处理大量图片。该 Dataflow 流水线仅受 Vision 配额的限制。您可以根据规模需求增加 Vision 配额。

本教程适用于数据工程师和数据科学家。本教程假定您具备使用 Apache Beam 的 Java SDK、BigQuery 标准 SQL 和基本 Shell 脚本构建 Dataflow 流水线的基础知识。此外还假定您熟悉 Vision。

目标

  • 使用适用于 Cloud Storage 的 Pub/Sub 通知创建图片元数据提取流水线。
  • 使用 Dataflow 部署实时视觉分析流水线。
  • 使用 Vision 针对一组特征类型分析图片。
  • 使用 BigQuery ML 分析和训练数据。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  2. 确保您的 Google Cloud 项目已启用结算功能

  3. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

  4. 在 Cloud Shell 中,启用 Dataflow、Container Registry 和 Vision API。

    gcloud services enable dataflow.googleapis.com \
    containerregistry.googleapis.com vision.googleapis.com
    
  5. 设置一些环境变量。(将 REGION 替换为一个可用的 Dataflow 区域。例如 us-central1)。

    export PROJECT=$(gcloud config get-value project)
    export REGION=REGION
    
  6. 克隆教程的 Git 代码库:

    git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
    
  7. 转到代码库的根文件夹:

    cd dataflow-vision-analytics
    

参考架构

下图展示了您将在本教程中构建的系统流程。

显示提取/触发、处理和存储的信息流的工作流图。

如图所示,流程如下:

  1. 客户端将图片文件上传到 Cloud Storage 存储桶。

  2. 对于每项文件上传操作,系统会通过向 Pub/Sub 发布消息自动通知客户端。

  3. 对于每条新通知,Dataflow 流水线都会执行以下操作:

    1. 从 Pub/Sub 消息中读取文件元数据。
    2. 将每个片段发送至 Vision API 进行注释处理。
    3. 将所有注释存储在 BigQuery 表中以供进一步分析。

为 Cloud Storage 创建 Pub/Sub 通知

在本部分中,您将创建适用于 Cloud Storage 的 Pub/Sub 通知。此通知会发布上传到存储桶中的图片文件的元数据。Dataflow 流水线根据元数据开始处理请求。

  1. 在 Cloud Shell 中,创建 Pub/Sub 主题:

    export GCS_NOTIFICATION_TOPIC="gcs-notification-topic"
    gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
    
  2. 为该主题创建 Pub/Sub 订阅:

    export  GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription"
    gcloud pubsub subscriptions create  ${GCS_NOTIFICATION_SUBSCRIPTION}  --topic=${GCS_NOTIFICATION_TOPIC}
    
  3. 创建一个存储桶以存储输入图片文件:

    export IMAGE_BUCKET=${PROJECT}-images
    gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
    
  4. 为存储桶创建 Pub/Sub 通知:

    gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \
      -f json gs://${IMAGE_BUCKET}
    

现在您已配置通知,接下来系统会向您创建的主题发送 Pub/Sub 消息。每次您将文件上传到存储桶时系统都会执行此操作。

创建 BigQuery 数据集

在本部分中,您将创建 BigQuery 数据集,以用于存储由 Dataflow 流水线输出的结果。该流水线会根据视觉特征类型自动创建表。

  • 在 Cloud Shell 中,创建一个 BigQuery 数据集:

    export BIGQUERY_DATASET="vision_analytics"
    bq mk -d --location=US ${BIGQUERY_DATASET}
    

创建 Dataflow Flex 模板

在本部分中,您将创建 Apache Beam 流水线代码,然后使用 Dataflow Flex 模板将 Dataflow 流水线作为 Dataflow 作业运行。

  1. 在 Cloud Shell 中,构建 Apache Beam 流水线的代码:

    gradle build
    
  2. 为 Dataflow 柔性模板创建 Docker 映像:

    gcloud auth configure-docker
    gradle jib \
      --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
    
  3. 创建 Cloud Storage 存储桶以存储 Dataflow 柔性模板:

    export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config
    gsutil mb -c standard -l ${REGION} \
      gs://${DATAFLOW_TEMPLATE_BUCKET}
    
  4. 将模板的 JSON 配置文件上传到该存储桶中:

    cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json
    {
      "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest",
      "sdk_info": {"language": "JAVA"}
    }
    EOF
    

针对一组 Vision 特征运行 Dataflow 流水线

下表中列出的参数特定于 Dataflow 流水线。

如需查看标准 Dataflow 执行参数的完整列表,请参阅 Dataflow 文档

参数 说明

windowInterval

将结果输出到 BigQuery 和 Pub/Sub 的时间间隔(以秒为单位)。默认值为 5。

batchSize

要在对 Vision API 的请求中包含的图片的数量。默认值为 1。您最多可以将其增加到 16 个

subscriberId

接收输入 Cloud Storage 通知的 Pub/Sub 订阅的 ID。

keyRange

此参数可让您提高大型数据集的处理性能。值越高,工作器之间的并行性就越高。默认值为 1。

visionApiProjectId

要用于 Vision API 的项目 ID。

datasetName

输出 BigQuery 数据集的引用。

features

图片处理特征列表。

labelAnnottationTable, landmarkAnnotationTable, logoAnnotationTable, faceAnnotationTable, imagePropertiesTable, cropHintAnnotationTable, errorLogTable

包含各种注释的表名称的字符串参数。系统会为每个表提供默认值。
  1. 在 Cloud Shell 中,为 Dataflow 流水线定义作业名称:

    export JOB_NAME=vision-analytics-pipeline-1
    
  2. 创建一个包含 Dataflow 流水线的参数的文件:

    PARAMETERS=params.yaml
    cat << EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION
      datasetName: ${BIGQUERY_DATASET}
    EOF
    
  3. 运行 Dataflow 流水线来处理以下特征类型的图片:IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    

    此命令使用上表中列出的参数。

  4. 检索正在运行的 Dataflow 作业的 ID:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
    
  5. 显示 Dataflow 作业网页的网址:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  6. 在新浏览器标签页中打开显示的网址。几秒钟后,Dataflow 作业图将显示:

    Dataflow 作业的工作流图。

    Dataflow 流水线现在正在运行,并等待接收来自 Pub/Sub 的输入通知。

  7. 在 Cloud Shell 中,通过将一些测试文件上传到输入存储桶来触发 Dataflow 流水线:

    gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
    
  8. 在 Google Cloud Console 中,查看 Dataflow 中的自定义计数器(位于 Dataflow 作业的右侧面板中),并验证是否已处理所有这五张图片:

    从文件上传返回的图片列表。

  9. 在 Cloud Shell 中,验证是否已自动创建表:

    bq query "select table_name, table_type from \
    ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
    

    输出如下所示:

    +----------------------+------------+
    |      table_name      | table_type |
    +----------------------+------------+
    | face_annotation      | BASE TABLE |
    | label_annotation     | BASE TABLE |
    | crop_hint_annotation | BASE TABLE |
    | landmark_annotation  | BASE TABLE |
    | image_properties     | BASE TABLE |
    +----------------------+------------+
    
  10. 查看 landmark_annotation 表的架构。如果已请求,则 LANDMARK_DETECTION 特征会捕获从 API 调用返回的特性

    bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
    

    输出如下所示:

    [
      {
        "mode": "REQUIRED",
        "name": "gcs_uri",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "mid",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "description",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "score",
        "type": "FLOAT"
      },
      {
        "fields": [
          {
            "fields": [
              {
                "mode": "REQUIRED",
                "name": "x",
                "type": "FLOAT"
              },
              {
                "mode": "REQUIRED",
                "name": "y",
                "type": "FLOAT"
              }
            ],
            "mode": "REPEATED",
            "name": "vertices",
            "type": "RECORD"
          }
        ],
        "mode": "NULLABLE",
        "name": "bounding_poly",
        "type": "RECORD"
      },
      {
        "mode": "REPEATED",
        "name": "locations",
        "type": "GEOGRAPHY"
      },
      {
        "mode": "REQUIRED",
        "name": "transaction_timestamp",
        "type": "TIMESTAMP"
      }
    ]
    
  11. 停止流水线:

    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    

    虽然没有更多要处理的 Pub/Sub 通知,但您创建的流处理流水线仍会运行,直到您输入此命令。

分析 Flickr30K 数据集

在本部分中,您将分析 flickr30K 数据集以进行标签和地标检测。

  1. 在 Cloud Shell 中,定义新的作业名称:

    export JOB_NAME=vision-analytics-pipeline-2
    
  2. 更改 Dataflow 流水线参数,以便针对大型数据集优化流水线。增大 batchSizekeyRange 以允许更高的吞吐量。Dataflow 会根据需要扩缩工作器数量:

    cat <<EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: LABEL_DETECTION,LANDMARK_DETECTION
      datasetName: ${BIGQUERY_DATASET}
      batchSize: "16"
      windowInterval: "5"
      keyRange: "2"
    EOF
    
  3. 运行流水线:

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    
  4. 将数据集上传到输入存储桶:

    gsutil -m  cp gs://df-vision-ai-test-data/*  gs://${IMAGE_BUCKET}
    
  5. 检索正在运行的 Dataflow 作业的 ID:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
    
  6. 显示 Dataflow 作业网页的网址:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  7. 在新浏览器标签页中打开显示的网址。

  8. 在 Google Cloud Console 中,验证 Dataflow 中的自定义计数器,以确保所有文件均已得到处理。所有文件通常会在 30 分钟内完成处理。

  9. 处理注释 (Process Annotations) 下的自定义计数器过滤。

    输出如下所示:

    按自定义计数器过滤后返回的计数器列表。显示计数器名称、值和步骤。

    processedFiles 指标 (31935) 匹配存储桶中已上传的图片总数(文件总数为 31936)。但是,numberOfRequests 指标 (1997) 低于已通过流水线的文件数。这种差异是因为流水线在每个请求中最多批处理 16 个文件,如 batchSizeDistribution_* 指标的值所示。

  10. 关停流水线:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}"
    --region ${REGION}
    --format "value(id)"
    --status active) \
    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    
  11. 在 Google Cloud 控制台中,转到 BigQuery 查询编辑器页面。

    转到查询编辑器

  12. 查找每个文件最可能的标签:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM (
      SELECT
        gcs_uri,
        description,
        score,
        ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC )
    AS row_num
      FROM
         `vision_analytics.label_annotation`)
    WHERE
      row_num = 1
    ORDER BY
      gcs_uri DESC
    

    输出如下所示。您可以从响应中看出,地标st_basils.jpeg 文件最可能的说明。

    图片文件名、说明和得分列表。

  13. 查找前 10 个标签及其最高得分:

    SELECT
      description,
      COUNT(*) AS found,
      MAX(score) AS max_score
    FROM
      `vision_analytics.label_annotation`
    GROUP BY
      description
    ORDER BY
      found DESC
    LIMIT 10
    

    最终输出类似于以下内容:

    找到的前 10 个标签的列表。其中包括说明、发现的次数和最高得分。

  14. 查找前 10 个热门地标:

    SELECT
      description,
      COUNT(*) AS count,
      MAX(score) AS max_score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LENGTH(description)>0
    GROUP BY
      description
    ORDER BY
      count DESC
    LIMIT 10
    

    输出如下所示。您可以从响应中看到,时代广场似乎是最热门的目的地。

    查询返回的前 10 个最热门地标列表。包括说明、计数和最高得分。

  15. 查找任何包含瀑布的图片:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LOWER(description) LIKE '%fall%'
    ORDER BY score DESC
    

    输出如下所示。其中仅包含瀑布图片。

    瀑布列表。包括文件名、说明和得分。

  16. 在罗马斗兽场 3 公里的范围内查找地标图片(ST_GEOPOINT 函数使用斗兽场的经度和纬度):

    WITH
      landmarksWithDistances AS (
      SELECT
        gcs_uri,
        description,
        location,
        ST_DISTANCE(location,
          ST_GEOGPOINT(12.492231,
            41.890222)) distance_in_meters,
      FROM
        `vision_analytics.landmark_annotation` landmarks
      CROSS JOIN
        UNNEST(landmarks.locations) AS location )
    SELECT
      SPLIT(gcs_uri,"/")[OFFSET(3)] file,
      description,
        ROUND(distance_in_meters) distance_in_meters,
      location,
      CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url
    FROM
      landmarksWithDistances
    WHERE
      distance_in_meters < 3000
    ORDER BY
      distance_in_meters
    LIMIT
      100
    

    输出如下所示。您可以看到下图显示了多个热门目的地:

    罗马斗兽场 3 公里范围内的所有图片列表。包括文件名、说明、距离斗兽场的距离(以米为单位)以及位置。

    同一张图片可以包含同一地标的多个位置。Vision API 文档介绍了此功能。由于一个位置可能表示图片中的场景位置,因此可能存在多个 LocationInfo 元素。另一个位置可能表示图片的拍摄位置。地标通常存在位置信息。

    通过粘贴上一个查询,您可以在 BigQuery Geo Viz 中直观呈现数据。在地图上选择一个点后,即可看到其详细信息。Image_url 特性包含可以在浏览器中打开的图片文件的链接。

    位置及其与斗兽场的距离地图。

清除数据

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除 Google Cloud 项目

避免产生费用的最简单的方法是删除您为本教程创建的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤