Compila una solución de estadísticas de vision de AA con Dataflow y la API de Cloud Vision

Last reviewed 2021-02-10 UTC

En este instructivo, aprenderás a implementar una canalización de Dataflow para procesar archivos de imagen a gran escala con Cloud Vision. Dataflow almacena los resultados en BigQuery para que puedas usarlos a fin de entrenar modelos compilados con anterioridad de BigQuery ML.

La canalización de Dataflow que creas en el instructivo puede manejar imágenes en grandes cantidades. Solo se limita tu cuota de Vision. Puedes aumentar tu cuota de Vision según los requisitos de escalamiento.

El instructivo está dirigido a ingenieros y científicos de datos. Suponemos que tienes conocimientos básicos sobre la compilación de canalizaciones de Dataflow mediante el SDK de Java de Apache Beam, SQL estándar de BigQuery y secuencias de comandos de shell básicas. También se supone que estás familiarizado con Vision.

Objetivos

  • Crear una canalización de transferencia de metadatos de imágenes con notificaciones de Pub/Sub para Cloud Storage
  • Usar Dataflow para implementar una canalización de estadísticas de vision en tiempo real
  • Utilizar Vision para analizar imágenes en un conjunto de tipos de características
  • Analizar y entrenar datos con BigQuery ML

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

  1. En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  2. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  3. En la consola de Google Cloud, activa Cloud Shell.

    Activar Cloud Shell

  4. En Cloud Shell, habilita las API de Dataflow, Container Registry y Vision.

    gcloud services enable dataflow.googleapis.com \
    containerregistry.googleapis.com vision.googleapis.com
    
  5. Configura algunas variables de entorno (reemplaza REGION por una de las regiones de Dataflow disponibles, por ejemplo, us-central1).

    export PROJECT=$(gcloud config get-value project)
    export REGION=REGION
    
  6. Clona el repositorio de Git del instructivo:

    git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
    
  7. Ve a la carpeta raíz del repositorio:

    cd dataflow-vision-analytics
    

Arquitectura de referencia

En el siguiente diagrama, se ilustra el flujo del sistema que compilas en este instructivo.

Diagrama de flujo de trabajo que muestra el flujo de información para la transferencia o activación, el proceso y el almacenamiento.

Como se muestra en el diagrama, el flujo es el siguiente:

  1. Los clientes suben archivos de imagen a un bucket de Cloud Storage.

  2. Para cada carga de archivos, el sistema envía una notificación de forma automática al cliente mediante la publicación de un mensaje en Pub/Sub.

  3. Para cada notificación nueva, la canalización de Dataflow realiza las siguientes acciones:

    1. Lee los metadatos del archivo del mensaje de Pub/Sub.
    2. Envía cada segmento a la API de Visio  para el procesamiento de anotaciones.
    3. Almacena todas las anotaciones en una tabla de BigQuery para un análisis más detallado.

Crea una notificación de Pub/Sub para Cloud Storage

En esta sección, crearás una notificación de Pub/Sub para Cloud Storage. Esta notificación publica metadatos para el archivo de imagen que se sube en el bucket. Según los metadatos, la canalización de Dataflow comienza a procesar la solicitud.

  1. En Cloud Shell, crea un tema de Pub/Sub:

    export GCS_NOTIFICATION_TOPIC="gcs-notification-topic"
    gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
    
  2. Crea una suscripción a Pub/Sub para el tema.

    export  GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription"
    gcloud pubsub subscriptions create  ${GCS_NOTIFICATION_SUBSCRIPTION}  --topic=${GCS_NOTIFICATION_TOPIC}
    
  3. Crea un bucket para almacenar los archivos de imagen de entrada:

    export IMAGE_BUCKET=${PROJECT}-images
    gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
    
  4. Cree una notificación de Pub/Sub para el bucket:

    gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \
      -f json gs://${IMAGE_BUCKET}
    

Ahora que configuraste las notificaciones, el sistema envía un mensaje de Pub/Sub al tema que creaste. Esta acción ocurre cada vez que subes un archivo al bucket.

Crea un conjunto de datos de BigQuery

En esta sección, crearás un conjunto de datos de BigQuery para almacenar los resultados que genera la canalización de Dataflow. La canalización crea tablas de forma automática según los tipos de atributos de vision.

  • En Cloud Shell, crea un conjunto de datos de BigQuery:

    export BIGQUERY_DATASET="vision_analytics"
    bq mk -d --location=US ${BIGQUERY_DATASET}
    

Crea una plantilla flexible de Dataflow

En esta sección, crearás un código de canalización de Apache Beam y, luego, ejecutarás la canalización de Dataflow como un trabajo de Dataflow con una plantilla flexible de Dataflow.

  1. En Cloud Shell, compila el código de la canalización de Apache Beam:

    gradle build
    
  2. Crea una imagen de Docker para la plantilla de Dataflow Flex:

    gcloud auth configure-docker
    gradle jib \
      --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
    
  3. Crea un bucket de Cloud Storage para almacenar la plantilla de Dataflow Flex:

    export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config
    gsutil mb -c standard -l ${REGION} \
      gs://${DATAFLOW_TEMPLATE_BUCKET}
    
  4. Sube el archivo de configuración JSON de la plantilla al bucket:

    cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json
    {
      "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest",
      "sdk_info": {"language": "JAVA"}
    }
    EOF
    

Ejecuta la canalización de Dataflow para un conjunto de características de Vision

Los parámetros enumerados en la siguiente tabla son específicos de esta canalización de Dataflow.

Consulta la documentación de Dataflow para obtener la lista completa de los parámetros de ejecución de Dataflow estándar.

Parámetro Descripción

windowInterval

El intervalo de tiempo del período (en segundos) para enviar los resultados a BigQuery y Pub/Sub. El valor predeterminado es 5.

batchSize

La cantidad de imágenes que se incluirán en una solicitud a la API de Vision. El puerto predeterminado es 1. Puedes aumentarlo a un máximo de 16.

subscriberId

El ID de la suscripción a Pub/Sub que recibe notificaciones de entrada de Cloud Storage.

keyRange

Parámetro que te permite mejorar el rendimiento de procesamiento de grandes conjuntos de datos. Un valor más alto significa un mayor paralelismo entre los trabajadores. El puerto predeterminado es 1.

visionApiProjectId

El ID del proyecto que se usará para la API de Vision.

datasetName

La referencia del conjunto de datos de BigQuery.

features

Una lista de características de procesamiento de imágenes.

labelAnnottationTable, landmarkAnnotationTable, logoAnnotationTable, faceAnnotationTable, imagePropertiesTable, cropHintAnnotationTable, errorLogTable

Parámetros de string con nombres de tablas para varias anotaciones. Los valores predeterminados se proporcionan para cada tabla.
  1. En Cloud Shell, define el nombre de un trabajo para la canalización de Dataflow:

    export JOB_NAME=vision-analytics-pipeline-1
    
  2. Crea un archivo con parámetros para la canalización de Dataflow:

    PARAMETERS=params.yaml
    cat << EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION
      datasetName: ${BIGQUERY_DATASET}
    EOF
    
  3. Ejecuta la canalización de Dataflow para procesar imágenes en estos tipos de características: IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION.

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    

    Este comando usa los parámetros que se enumeran en la tabla anterior.

  4. Recupera el ID del trabajo de Dataflow en ejecución:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
    
  5. Muestra la URL de la página web del trabajo de Dataflow:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  6. Abre la URL que se mostró en una nueva pestaña del navegador. Después de unos segundos, aparecerá el grafo del trabajo de Dataflow:

    Diagrama de flujo de trabajo del trabajo de Dataflow

    La canalización de Dataflow ahora se ejecuta y espera recibir las notificaciones de entrada de Pub/Sub.

  7. En Cloud Shell, activa la canalización de Dataflow mediante la carga de algunos archivos de prueba al bucket de entrada:

    gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
    
  8. En la consola de Google Cloud, revisa los contadores personalizados de Dataflow (en el panel derecho del trabajo de Dataflow) y verifica si procesó las cinco imágenes:

    Lista de imágenes que se muestran a partir de la carga de archivos.

  9. En Cloud Shell, verifica que las tablas se hayan creado de forma automática:

    bq query "select table_name, table_type from \
    ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
    

    El resultado es el siguiente:

    +----------------------+------------+
    |      table_name      | table_type |
    +----------------------+------------+
    | face_annotation      | BASE TABLE |
    | label_annotation     | BASE TABLE |
    | crop_hint_annotation | BASE TABLE |
    | landmark_annotation  | BASE TABLE |
    | image_properties     | BASE TABLE |
    +----------------------+------------+
    
  10. Visualiza el esquema de la tabla landmark_annotation. Si se solicita, la función LANDMARK_DETECTION captura los atributos que muestra la llamada a la API.

    bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
    

    El resultado es el siguiente:

    [
      {
        "mode": "REQUIRED",
        "name": "gcs_uri",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "mid",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "description",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "score",
        "type": "FLOAT"
      },
      {
        "fields": [
          {
            "fields": [
              {
                "mode": "REQUIRED",
                "name": "x",
                "type": "FLOAT"
              },
              {
                "mode": "REQUIRED",
                "name": "y",
                "type": "FLOAT"
              }
            ],
            "mode": "REPEATED",
            "name": "vertices",
            "type": "RECORD"
          }
        ],
        "mode": "NULLABLE",
        "name": "bounding_poly",
        "type": "RECORD"
      },
      {
        "mode": "REPEATED",
        "name": "locations",
        "type": "GEOGRAPHY"
      },
      {
        "mode": "REQUIRED",
        "name": "transaction_timestamp",
        "type": "TIMESTAMP"
      }
    ]
    
  11. Detén la canalización:

    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    

    Aunque no hay más notificaciones de Pub/Sub para procesar, la canalización de transmisión que creaste continúa ejecutándose hasta que ingresas este comando.

Analiza un conjunto de datos Flickr30K

En esta sección, analizarás un conjunto de datos flickr30K para detectar etiquetas y puntos de referencia.

  1. En Cloud Shell, define el nombre de trabajo nuevo:

    export JOB_NAME=vision-analytics-pipeline-2
    
  2. Cambia los parámetros de canalización de Dataflow a fin de que estén optimizados para un conjunto de datos grande. batchSize y keyRange aumentan para permitir una mayor capacidad de procesamiento. Dataflow escalará la cantidad de trabajadores según sea necesario:

    cat <<EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: LABEL_DETECTION,LANDMARK_DETECTION
      datasetName: ${BIGQUERY_DATASET}
      batchSize: "16"
      windowInterval: "5"
      keyRange: "2"
    EOF
    
  3. Ejecuta la canalización:

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    
  4. Sube el conjunto de datos a un bucket de entrada:

    gsutil -m  cp gs://df-vision-ai-test-data/*  gs://${IMAGE_BUCKET}
    
  5. Recupera el ID del trabajo de Dataflow en ejecución:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
    
  6. Muestra la URL de la página web del trabajo de Dataflow:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  7. Abre la URL que se mostró en una nueva pestaña del navegador.

  8. En la consola de Google Cloud, valida los contadores personalizados en Dataflow para asegurarte de que se procesen todos los archivos. Generalmente, todos los archivos se procesan en menos de 30 minutos.

  9. Filtra por contadores personalizados en Anotaciones de proceso.

    El resultado es el siguiente:

    Lista de contadores que se muestran después de filtrar por contadores personalizados. Muestra el nombre del contador, el valor y el paso.

    La métrica processedFiles (31,935) coincide con la cantidad total de imágenes que se subieron al bucket (el recuento total de archivos es de 31,936). Sin embargo, la métrica numberOfRequests (1,997) es menor que la cantidad de archivos que pasaron por la canalización. Esta diferencia se debe a que la canalización agrupa hasta 16 archivos por solicitud, como se muestra en los valores de las métricas batchSizeDistribution_*.

  10. Cierra la canalización:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}"
    --region ${REGION}
    --format "value(id)"
    --status active) \
    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    
  11. En la consola de Google Cloud, ve a la página Editor de consultas de BigQuery.

    Ir a Editor de consultas

  12. Busca la etiqueta más probable para cada archivo:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM (
      SELECT
        gcs_uri,
        description,
        score,
        ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC )
    AS row_num
      FROM
         `vision_analytics.label_annotation`)
    WHERE
      row_num = 1
    ORDER BY
      gcs_uri DESC
    

    El resultado es el siguiente. A partir de la respuesta, puedes ver que Punto de referencia es la descripción más probable para el archivo st_basils.jpeg.

    Lista de nombres de archivos de imagen, descripciones y puntuaciones.

  13. Encuentra las 10 etiquetas principales y sus puntuaciones máximas:

    SELECT
      description,
      COUNT(*) AS found,
      MAX(score) AS max_score
    FROM
      `vision_analytics.label_annotation`
    GROUP BY
      description
    ORDER BY
      found DESC
    LIMIT 10
    

    El resultado final es similar al siguiente:

    Lista de las 10 etiquetas principales encontradas. Incluye la descripción, la cantidad de veces que se encontró y una puntuación máxima.

  14. Encuentra los 10 puntos de referencia principales:

    SELECT
      description,
      COUNT(*) AS count,
      MAX(score) AS max_score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LENGTH(description)>0
    GROUP BY
      description
    ORDER BY
      count DESC
    LIMIT 10
    

    El resultado es el siguiente. Se puede ver que la respuesta de Times Square parece ser el destino más popular.

    Lista de los 10 puntos de referencia más populares que muestra la consulta. Incluye la descripción, el recuento y la puntuación máxima.

  15. Busca cualquier imagen que tenga una cascada:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LOWER(description) LIKE '%fall%'
    ORDER BY score DESC
    

    El resultado es el siguiente. Solo contiene imágenes de cascada.

    Lista de cascadas. Incluye el nombre, la descripción y la puntuación del archivo.

  16. Busca una imagen de un punto de referencia en un radio de 3 kilómetros del Coliseo en Roma (la función ST_GEOPOINT usa la longitud y la latitud del Coliseo):

    WITH
      landmarksWithDistances AS (
      SELECT
        gcs_uri,
        description,
        location,
        ST_DISTANCE(location,
          ST_GEOGPOINT(12.492231,
            41.890222)) distance_in_meters,
      FROM
        `vision_analytics.landmark_annotation` landmarks
      CROSS JOIN
        UNNEST(landmarks.locations) AS location )
    SELECT
      SPLIT(gcs_uri,"/")[OFFSET(3)] file,
      description,
        ROUND(distance_in_meters) distance_in_meters,
      location,
      CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url
    FROM
      landmarksWithDistances
    WHERE
      distance_in_meters < 3000
    ORDER BY
      distance_in_meters
    LIMIT
      100
    

    El resultado es el siguiente. Puedes ver que aparecen varios destinos populares en estas imágenes:

    Lista de todas las imágenes a 3 km del Coliseo de Roma. Incluye el nombre del archivo, la descripción, la distancia en metros del Coliseo y la ubicación.

    La misma imagen puede contener varias ubicaciones del mismo punto de referencia. Esta funcionalidad se describe en la documentación de la API de Vision. Debido a que una ubicación puede indicar la ubicación de la escena en la imagen, pueden haber varios elementos LocationInfo. Otra ubicación puede indicar dónde se tomó la imagen. Por lo general, los puntos de referencia incluyen información de ubicación.

    Puedes visualizar los datos en BigQuery Geo Viz si pegas en la consulta anterior. Cuando seleccionas un punto en el mapa, puedes ver sus detalles. El atributo Image_url contiene el vínculo al archivo de imagen que puedes abrir en un navegador.

    Mapa de ubicaciones y su distancia del Coliseo.

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto de Google Cloud

La manera más fácil de eliminar la facturación es borrar el proyecto de Google Cloud que creaste para el instructivo.

  1. En la consola de Google Cloud, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

¿Qué sigue?