En este instructivo, aprenderás a implementar una canalización de Dataflow para procesar archivos de imagen a gran escala con Cloud Vision. Dataflow almacena los resultados en BigQuery para que puedas usarlos a fin de entrenar modelos compilados con anterioridad de BigQuery ML.
La canalización de Dataflow que creas en el instructivo puede manejar imágenes en grandes cantidades. Solo se limita tu cuota de Vision. Puedes aumentar tu cuota de Vision según los requisitos de escalamiento.
El instructivo está dirigido a ingenieros y científicos de datos. Suponemos que tienes conocimientos básicos sobre la compilación de canalizaciones de Dataflow mediante el SDK de Java de Apache Beam, SQL estándar de BigQuery y secuencias de comandos de shell básicas. También se supone que estás familiarizado con Vision.
Objetivos
- Crear una canalización de transferencia de metadatos de imágenes con notificaciones de Pub/Sub para Cloud Storage
- Usar Dataflow para implementar una canalización de estadísticas de vision en tiempo real
- Utilizar Vision para analizar imágenes en un conjunto de tipos de características
- Analizar y entrenar datos con BigQuery ML
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.
Antes de comenzar
-
En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.
-
Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.
-
En la consola de Google Cloud, activa Cloud Shell.
En Cloud Shell, habilita las API de Dataflow, Container Registry y Vision.
gcloud services enable dataflow.googleapis.com \ containerregistry.googleapis.com vision.googleapis.com
Configura algunas variables de entorno (reemplaza REGION por una de las regiones de Dataflow disponibles, por ejemplo,
us-central1
).export PROJECT=$(gcloud config get-value project) export REGION=REGION
Clona el repositorio de Git del instructivo:
git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
Ve a la carpeta raíz del repositorio:
cd dataflow-vision-analytics
Arquitectura de referencia
En el siguiente diagrama, se ilustra el flujo del sistema que compilas en este instructivo.
Como se muestra en el diagrama, el flujo es el siguiente:
Los clientes suben archivos de imagen a un bucket de Cloud Storage.
Para cada carga de archivos, el sistema envía una notificación de forma automática al cliente mediante la publicación de un mensaje en Pub/Sub.
Para cada notificación nueva, la canalización de Dataflow realiza las siguientes acciones:
- Lee los metadatos del archivo del mensaje de Pub/Sub.
- Envía cada segmento a la API de Visio para el procesamiento de anotaciones.
- Almacena todas las anotaciones en una tabla de BigQuery para un análisis más detallado.
Crea una notificación de Pub/Sub para Cloud Storage
En esta sección, crearás una notificación de Pub/Sub para Cloud Storage. Esta notificación publica metadatos para el archivo de imagen que se sube en el bucket. Según los metadatos, la canalización de Dataflow comienza a procesar la solicitud.
En Cloud Shell, crea un tema de Pub/Sub:
export GCS_NOTIFICATION_TOPIC="gcs-notification-topic" gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
Crea una suscripción a Pub/Sub para el tema.
export GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription" gcloud pubsub subscriptions create ${GCS_NOTIFICATION_SUBSCRIPTION} --topic=${GCS_NOTIFICATION_TOPIC}
Crea un bucket para almacenar los archivos de imagen de entrada:
export IMAGE_BUCKET=${PROJECT}-images gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
Cree una notificación de Pub/Sub para el bucket:
gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \ -f json gs://${IMAGE_BUCKET}
Ahora que configuraste las notificaciones, el sistema envía un mensaje de Pub/Sub al tema que creaste. Esta acción ocurre cada vez que subes un archivo al bucket.
Crea un conjunto de datos de BigQuery
En esta sección, crearás un conjunto de datos de BigQuery para almacenar los resultados que genera la canalización de Dataflow. La canalización crea tablas de forma automática según los tipos de atributos de vision.
En Cloud Shell, crea un conjunto de datos de BigQuery:
export BIGQUERY_DATASET="vision_analytics" bq mk -d --location=US ${BIGQUERY_DATASET}
Crea una plantilla flexible de Dataflow
En esta sección, crearás un código de canalización de Apache Beam y, luego, ejecutarás la canalización de Dataflow como un trabajo de Dataflow con una plantilla flexible de Dataflow.
En Cloud Shell, compila el código de la canalización de Apache Beam:
gradle build
Crea una imagen de Docker para la plantilla de Dataflow Flex:
gcloud auth configure-docker gradle jib \ --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
Crea un bucket de Cloud Storage para almacenar la plantilla de Dataflow Flex:
export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config gsutil mb -c standard -l ${REGION} \ gs://${DATAFLOW_TEMPLATE_BUCKET}
Sube el archivo de configuración JSON de la plantilla al bucket:
cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json { "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest", "sdk_info": {"language": "JAVA"} } EOF
Ejecuta la canalización de Dataflow para un conjunto de características de Vision
Los parámetros enumerados en la siguiente tabla son específicos de esta canalización de Dataflow.
Consulta la documentación de Dataflow para obtener la lista completa de los parámetros de ejecución de Dataflow estándar.
Parámetro | Descripción |
---|---|
|
El intervalo de tiempo del período (en segundos) para enviar los resultados a BigQuery y Pub/Sub. El valor predeterminado es 5. |
|
La cantidad de imágenes que se incluirán en una solicitud a la API de Vision. El puerto predeterminado es 1. Puedes aumentarlo a un máximo de 16. |
|
El ID de la suscripción a Pub/Sub que recibe notificaciones de entrada de Cloud Storage. |
|
Parámetro que te permite mejorar el rendimiento de procesamiento de grandes conjuntos de datos. Un valor más alto significa un mayor paralelismo entre los trabajadores. El puerto predeterminado es 1. |
|
El ID del proyecto que se usará para la API de Vision. |
|
La referencia del conjunto de datos de BigQuery. |
|
Una lista de características de procesamiento de imágenes. |
|
Parámetros de string con nombres de tablas para varias anotaciones. Los valores predeterminados se proporcionan para cada tabla. |
En Cloud Shell, define el nombre de un trabajo para la canalización de Dataflow:
export JOB_NAME=vision-analytics-pipeline-1
Crea un archivo con parámetros para la canalización de Dataflow:
PARAMETERS=params.yaml cat << EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION datasetName: ${BIGQUERY_DATASET} EOF
Ejecuta la canalización de Dataflow para procesar imágenes en estos tipos de características:
IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION
.gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
Este comando usa los parámetros que se enumeran en la tabla anterior.
Recupera el ID del trabajo de Dataflow en ejecución:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
Muestra la URL de la página web del trabajo de Dataflow:
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
Abre la URL que se mostró en una nueva pestaña del navegador. Después de unos segundos, aparecerá el grafo del trabajo de Dataflow:
La canalización de Dataflow ahora se ejecuta y espera recibir las notificaciones de entrada de Pub/Sub.
En Cloud Shell, activa la canalización de Dataflow mediante la carga de algunos archivos de prueba al bucket de entrada:
gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET} gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
En la consola de Google Cloud, revisa los contadores personalizados de Dataflow (en el panel derecho del trabajo de Dataflow) y verifica si procesó las cinco imágenes:
En Cloud Shell, verifica que las tablas se hayan creado de forma automática:
bq query "select table_name, table_type from \ ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
El resultado es el siguiente:
+----------------------+------------+ | table_name | table_type | +----------------------+------------+ | face_annotation | BASE TABLE | | label_annotation | BASE TABLE | | crop_hint_annotation | BASE TABLE | | landmark_annotation | BASE TABLE | | image_properties | BASE TABLE | +----------------------+------------+
Visualiza el esquema de la tabla
landmark_annotation
. Si se solicita, la funciónLANDMARK_DETECTION
captura los atributos que muestra la llamada a la API.bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
El resultado es el siguiente:
[ { "mode": "REQUIRED", "name": "gcs_uri", "type": "STRING" }, { "mode": "NULLABLE", "name": "mid", "type": "STRING" }, { "mode": "REQUIRED", "name": "description", "type": "STRING" }, { "mode": "REQUIRED", "name": "score", "type": "FLOAT" }, { "fields": [ { "fields": [ { "mode": "REQUIRED", "name": "x", "type": "FLOAT" }, { "mode": "REQUIRED", "name": "y", "type": "FLOAT" } ], "mode": "REPEATED", "name": "vertices", "type": "RECORD" } ], "mode": "NULLABLE", "name": "bounding_poly", "type": "RECORD" }, { "mode": "REPEATED", "name": "locations", "type": "GEOGRAPHY" }, { "mode": "REQUIRED", "name": "transaction_timestamp", "type": "TIMESTAMP" } ]
Detén la canalización:
gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
Aunque no hay más notificaciones de Pub/Sub para procesar, la canalización de transmisión que creaste continúa ejecutándose hasta que ingresas este comando.
Analiza un conjunto de datos Flickr30K
En esta sección, analizarás un conjunto de datos flickr30K para detectar etiquetas y puntos de referencia.
En Cloud Shell, define el nombre de trabajo nuevo:
export JOB_NAME=vision-analytics-pipeline-2
Cambia los parámetros de canalización de Dataflow a fin de que estén optimizados para un conjunto de datos grande.
batchSize
ykeyRange
aumentan para permitir una mayor capacidad de procesamiento. Dataflow escalará la cantidad de trabajadores según sea necesario:cat <<EOF > ${PARAMETERS} --parameters: autoscalingAlgorithm: THROUGHPUT_BASED enableStreamingEngine: "true" subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION} visionApiProjectId: ${PROJECT} features: LABEL_DETECTION,LANDMARK_DETECTION datasetName: ${BIGQUERY_DATASET} batchSize: "16" windowInterval: "5" keyRange: "2" EOF
Ejecuta la canalización:
gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} \ --region=${REGION} \ --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \ --flags-file ${PARAMETERS}
Sube el conjunto de datos a un bucket de entrada:
gsutil -m cp gs://df-vision-ai-test-data/* gs://${IMAGE_BUCKET}
Recupera el ID del trabajo de Dataflow en ejecución:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
Muestra la URL de la página web del trabajo de Dataflow:
echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
Abre la URL que se mostró en una nueva pestaña del navegador.
En la consola de Google Cloud, valida los contadores personalizados en Dataflow para asegurarte de que se procesen todos los archivos. Generalmente, todos los archivos se procesan en menos de 30 minutos.
Filtra por contadores personalizados en Anotaciones de proceso.
El resultado es el siguiente:
La métrica
processedFiles
(31,935) coincide con la cantidad total de imágenes que se subieron al bucket (el recuento total de archivos es de 31,936). Sin embargo, la métricanumberOfRequests
(1,997) es menor que la cantidad de archivos que pasaron por la canalización. Esta diferencia se debe a que la canalización agrupa hasta 16 archivos por solicitud, como se muestra en los valores de las métricasbatchSizeDistribution_*
.Cierra la canalización:
JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active) \ gcloud dataflow jobs drain ${JOB_ID} \ --region ${REGION}
En la consola de Google Cloud, ve a la página Editor de consultas de BigQuery.
Busca la etiqueta más probable para cada archivo:
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM ( SELECT gcs_uri, description, score, ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC ) AS row_num FROM `vision_analytics.label_annotation`) WHERE row_num = 1 ORDER BY gcs_uri DESC
El resultado es el siguiente. A partir de la respuesta, puedes ver que Punto de referencia es la descripción más probable para el archivo
st_basils.jpeg
.Encuentra las 10 etiquetas principales y sus puntuaciones máximas:
SELECT description, COUNT(*) AS found, MAX(score) AS max_score FROM `vision_analytics.label_annotation` GROUP BY description ORDER BY found DESC LIMIT 10
El resultado final es similar al siguiente:
Encuentra los 10 puntos de referencia principales:
SELECT description, COUNT(*) AS count, MAX(score) AS max_score FROM `vision_analytics.landmark_annotation` WHERE LENGTH(description)>0 GROUP BY description ORDER BY count DESC LIMIT 10
El resultado es el siguiente. Se puede ver que la respuesta de Times Square parece ser el destino más popular.
Busca cualquier imagen que tenga una cascada:
SELECT SPLIT(gcs_uri,'/')[OFFSET(3)] file, description, score FROM `vision_analytics.landmark_annotation` WHERE LOWER(description) LIKE '%fall%' ORDER BY score DESC
El resultado es el siguiente. Solo contiene imágenes de cascada.
Busca una imagen de un punto de referencia en un radio de 3 kilómetros del Coliseo en Roma (la función
ST_GEOPOINT
usa la longitud y la latitud del Coliseo):WITH landmarksWithDistances AS ( SELECT gcs_uri, description, location, ST_DISTANCE(location, ST_GEOGPOINT(12.492231, 41.890222)) distance_in_meters, FROM `vision_analytics.landmark_annotation` landmarks CROSS JOIN UNNEST(landmarks.locations) AS location ) SELECT SPLIT(gcs_uri,"/")[OFFSET(3)] file, description, ROUND(distance_in_meters) distance_in_meters, location, CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url FROM landmarksWithDistances WHERE distance_in_meters < 3000 ORDER BY distance_in_meters LIMIT 100
El resultado es el siguiente. Puedes ver que aparecen varios destinos populares en estas imágenes:
La misma imagen puede contener varias ubicaciones del mismo punto de referencia. Esta funcionalidad se describe en la documentación de la API de Vision. Debido a que una ubicación puede indicar la ubicación de la escena en la imagen, pueden haber varios elementos
LocationInfo
. Otra ubicación puede indicar dónde se tomó la imagen. Por lo general, los puntos de referencia incluyen información de ubicación.Puedes visualizar los datos en BigQuery Geo Viz si pegas en la consulta anterior. Cuando seleccionas un punto en el mapa, puedes ver sus detalles. El atributo
Image_url
contiene el vínculo al archivo de imagen que puedes abrir en un navegador.
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto de Google Cloud
La manera más fácil de eliminar la facturación es borrar el proyecto de Google Cloud que creaste para el instructivo.
- En la consola de Google Cloud, ve a la página Administrar recursos.
- En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
- En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.
¿Qué sigue?
- Obtén más información sobre los patrones de referencia de analítica inteligente.
- Explora arquitecturas de referencia, diagramas y prácticas recomendadas sobre Google Cloud. Consulta nuestro Cloud Architecture Center.