Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En este instructivo, se muestra cómo usar Cloud Composer para crear una DAG de Apache Airflow El El DAG une los datos de un conjunto de datos públicos de BigQuery y un archivo CSV almacenado en un bucket de Cloud Storage y, luego, ejecuta Trabajo por lotes de Dataproc Serverless para procesar los datos unidos.
El conjunto de datos públicos de BigQuery en este instructivo es ghcn_d, una base de datos integrada de resúmenes del clima de en el mundo. El archivo CSV contiene información sobre las fechas y los nombres de los días festivos de EE.UU. de 1997 a 2021.
La pregunta que queremos responder con el DAG es la siguiente: “¿Qué tan cálido fue en Chicago en el Día de Acción de Gracias durante los últimos 25 años?”.
Objetivos
- Crea un entorno de Cloud Composer en la configuración predeterminada
- Crea un conjunto de datos vacío de BigQuery
- Cree un nuevo bucket de Cloud Storage
- Crea y ejecuta un DAG que incluya las siguientes tareas:
- Cargar un conjunto de datos externo desde Cloud Storage a BigQuery
- Unir dos conjuntos de datos en BigQuery
- Ejecuta un trabajo de PySpark de análisis de datos
Antes de comenzar
Habilita las APIs
Habilita las siguientes APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Otorga permisos
Otorga los siguientes roles y permisos a tu cuenta de usuario:
Otorga funciones para administrar entornos de Cloud Composer y buckets de entorno.
Otorga el rol Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) para crear un conjunto de datos de BigQuery.Otorga el rol Administrador de almacenamiento (
roles/storage.admin
) para crear un bucket de Cloud Storage.
Crea y prepara tu entorno de Cloud Composer
Crea un entorno de Cloud Composer con la configuración parámetros:
- Elige una región de EE.UU.
- Elige la versión más reciente de Cloud Composer.
Otorga los siguientes roles a la cuenta de servicio que usaste en tu de Cloud Composer para que los trabajadores de Airflow ejecutar correctamente las tareas de DAG:
- Usuario de BigQuery (
roles/bigquery.user
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
) - Editor de Dataproc (
roles/dataproc.editor
) - Trabajador de Dataproc (
roles/dataproc.worker
)
- Usuario de BigQuery (
Crea recursos relacionados
Crea un conjunto de datos vacío de BigQuery con los siguientes parámetros:
- Nombre:
holiday_weather
- Región:
US
- Nombre:
Crea un nuevo bucket de Cloud Storage en la multirregión
US
.Ejecuta el siguiente comando para habilitar el Acceso privado a Google en la subred predeterminada de la región en la que quieres ejecutar Dataproc sin servidores para entregar requisitos de red. Mié te recomendamos usar la misma región que tu Cloud Composer en un entorno de nube.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Procesamiento de datos con Dataproc Serverless
Explora el trabajo de PySpark de ejemplo
El código que se muestra a continuación es un ejemplo de trabajo de PySpark que convierte la temperatura de décimas de grado en grados Celsius a grados Celsius. Este trabajo convierte de temperatura del conjunto de datos a un formato diferente.
Sube archivos complementarios a Cloud Storage
Para subir el archivo de PySpark y el conjunto de datos almacenado en holidays.csv
, haz lo siguiente:
Guardar data_analytics_process.py a tu máquina local.
Guarda holidays.csv en tu máquina local.
En la consola de Google Cloud, ve a la página del navegador de Cloud Storage:
Haz clic en el nombre del bucket que creaste anteriormente.
En la pestaña Objetos del bucket, haz clic en el botón Subir archivos. selecciona
data_analytics_process.py
yholidays.csv
en el diálogo y haz clic en Abrir.
DAG de análisis de datos
Explora el DAG de ejemplo
El DAG usa múltiples operadores para transformar y unificar los datos:
El
GCSToBigQueryOperator
transfiere el archivo holidays.csv de Cloud Storage a una nueva tabla en BigQuery Conjunto de datosholidays_weather
que creaste antes.El
DataprocCreateBatchOperator
crea y ejecuta un trabajo por lotes de PySpark Dataproc sin servidores.El
BigQueryInsertJobOperator
une los datos de holidays.csv en el "Fecha" Columna con datos meteorológicos del conjunto de datos públicos de BigQuery ghcn_d. Las tareasBigQueryInsertJobOperator
se de forma dinámica con un bucle for, y estas tareas estánTaskGroup
para mejorar la legibilidad de la Vista de gráfico de la IU de Airflow.
Usa la IU de Airflow para agregar variables
En Airflow, variables son una forma universal de almacenar y recuperar parámetros de configuración arbitrarios o como un simple almacén de par clave-valor. Este DAG usa variables de Airflow para lo siguiente: almacenan valores comunes. Para agregarlos a tu entorno, sigue estos pasos:
Accede a la IU de Airflow desde la consola de Cloud Composer.
Ve a Administrador > Variables.
Agrega las siguientes variables:
gcp_project
: el ID de tu proyectogcs_bucket
: Es el nombre del bucket que creaste antes. (sin el prefijogs://
).gce_region
: Es la región en la que deseas que tu Trabajo de Dataproc que cumple con Requisitos de las herramientas de redes sin servidores de Dataproc Esta es la región donde habilitaste el Acceso privado a Google anteriormente.dataproc_service_account
: Es la cuenta de servicio de tu entorno de Cloud Composer. Puedes encontrar este servicio en la pestaña de configuración del entorno de tu entorno de Cloud Composer.
Sube el DAG al bucket de tu entorno
Cloud Composer programa los DAG que se encuentran en
/dags
en el bucket de tu entorno. Para subir el DAG usando el
Consola de Google Cloud:
En tu máquina local, guarda data_analytics_dag.py.
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, en la columna Carpeta de DAG, haz clic en el vínculo de los DAG. Se abrirá la carpeta DAG de tu entorno.
Haz clic en Subir archivos.
Selecciona
data_analytics_dag.py
en tu máquina local y haz clic en Abrir.
Activa el DAG
En tu entorno de Cloud Composer, haz clic en la pestaña DAG.
Haz clic en el ID del DAG
data_analytics_dag
.Haz clic en Activar DAG.
Espera entre cinco y diez minutos hasta que veas una marca de verificación verde que indique tareas se hayan completado correctamente.
Valida el éxito del DAG
En la consola de Google Cloud, ve a la página de BigQuery.
En el panel Explorador, haz clic en el nombre de tu proyecto.
Haz clic en
holidays_weather_joined
.Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números en están en décimas de un grado Celsius.
Haz clic en
holidays_weather_normalized
.Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números en están en grados Celsius.
Análisis detallado con Dataproc Serverless (opcional)
Puedes probar una versión avanzada de este DAG con una PySpark más compleja de procesamiento de datos. Consulta Extensión de Dataproc para el ejemplo de análisis de datos en GitHub.
Limpieza
Borra los recursos individuales que creaste para este instructivo:
Borra el bucket de Cloud Storage que para este instructivo.
Borra el entorno de Cloud Composer, incluido lo siguiente: borrar manualmente el bucket del entorno.
¿Qué sigue?
- Ejecuta un DAG de análisis de datos en Google Cloud mediante datos de AWS.
- Ejecuta un DAG de análisis de datos en Azure.