Cloud Composer 1 | Cloud Composer 2
Este instructivo es una modificación de la sección Ejecuta un DAG de análisis de datos en Google Cloud que muestra cómo conectar tu entorno de Cloud Composer a Microsoft Azure para usar los datos almacenados allí. Se muestra cómo usar Cloud Composer para crear un DAG de Apache Airflow. El DAG une los datos de un conjunto de datos públicos de BigQuery y un archivo CSV almacenado en Azure Blob Storage y, luego, ejecuta un trabajo por lotes sin servidores de Dataproc para procesar los datos unidos.
El conjunto de datos públicos de BigQuery en este instructivo es ghcn_d, una base de datos integrada de resúmenes climáticos de todo el mundo. El archivo CSV contiene información sobre las fechas y los nombres de los feriados de EE.UU. de 1997 a 2021.
La pregunta que queremos responder con el DAG es la siguiente: “¿Qué tanta temperatura fue en Chicago durante el Día de Acción de Gracias durante los últimos 25 años?”.
Objetivos
- Crea un entorno de Cloud Composer en la configuración predeterminada
- Crea un BLOB en Azure
- Crea un conjunto de datos vacío de BigQuery
- Cree un nuevo bucket de Cloud Storage
- Crea y ejecuta un DAG que incluya las siguientes tareas:
- Carga un conjunto de datos externo de Azure Blob Storage a Cloud Storage
- Carga un conjunto de datos externo de Cloud Storage a BigQuery
- Unir dos conjuntos de datos en BigQuery
- Ejecuta un trabajo de PySpark de análisis de datos
Antes de comenzar
Habilita las APIs
Habilita las siguientes APIs:
Consola
Habilita las API de Dataproc, Cloud Composer, BigQuery, Cloud Storage.
gcloud
Habilita las APIs de Dataproc, Cloud Composer, BigQuery, Cloud Storage:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Otorga permisos
Otorga los siguientes roles y permisos a tu cuenta de usuario:
Otorga roles para administrar entornos y buckets de entornos de Cloud Composer.
Otorga el rol Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) para crear un conjunto de datos de BigQuery.Otorga el rol Administrador de almacenamiento (
roles/storage.admin
) para crear un bucket de Cloud Storage.
Crea y prepara tu entorno de Cloud Composer
Crea un entorno de Cloud Composer con los parámetros predeterminados:
- Elige una región de EE.UU.
- Elige la versión de Cloud Composer más reciente.
Otorga los siguientes roles a la cuenta de servicio utilizada en tu entorno de Cloud Composer para que los trabajadores de Airflow ejecuten correctamente las tareas de DAG:
- Usuario de BigQuery (
roles/bigquery.user
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
) - Editor de Dataproc (
roles/dataproc.editor
) - Trabajador de Dataproc (
roles/dataproc.worker
)
- Usuario de BigQuery (
Crear y modificar recursos relacionados en Google Cloud
Instala el paquete de PyPI
apache-airflow-providers-microsoft-azure
en tu entorno de Cloud Composer.Crea un conjunto de datos de BigQuery vacío con los siguientes parámetros:
- Nombre:
holiday_weather
- Región:
US
- Nombre:
Crea un bucket de Cloud Storage nuevo en la multirregión
US
.Ejecuta el siguiente comando para habilitar el acceso privado a Google en la subred predeterminada de la región en la que deseas ejecutar Dataproc Serverless a fin de cumplir con los requisitos de red. Recomendamos usar la misma región que tu entorno de Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Crea recursos relacionados en Azure
Crea una cuenta de almacenamiento con la configuración predeterminada.
Obtén la clave de acceso y la string de conexión de tu cuenta de almacenamiento.
Crea un contenedor con opciones predeterminadas en la cuenta de almacenamiento recién creada.
Otorga la función Delegador de BLOB de Storage al contenedor que creaste en el paso anterior.
Sube holidays.csv para crear un BLOB en bloque con las opciones predeterminadas en el portal de Azure.
Crea un token SAS para el BLOB en bloque que creaste en el paso anterior en el portal de Azure.
- Método de firma: Clave de delegación de usuarios
- Permisos: Leer
- Dirección IP permitida: ninguna
- Protocolos permitidos: Solo HTTPS
Conéctate a Azure desde Cloud Composer
Agrega tu conexión de Microsoft Azure mediante la IU de Airflow:
Ve a Administrador > Conexiones.
Crea una conexión nueva con la siguiente configuración:
- ID de conexión:
azure_blob_connection
- Tipo de conexión:
Azure Blob Storage
- Blob Storage Login: el nombre de tu cuenta de almacenamiento
- Clave de almacenamiento de BLOB: La clave de acceso de tu cuenta de almacenamiento
- Blob Storage Account Connection String: Tu string de conexión de cuenta de almacenamiento
- Token de SAS: es el token de SAS generado a partir de tu BLOB.
- ID de conexión:
Procesamiento de datos con Dataproc Serverless
Explora el trabajo de PySpark de ejemplo
El código que se muestra a continuación es un ejemplo de trabajo de PySpark que convierte la temperatura de décimas de grado en Celsius a grados Celsius. Este trabajo convierte los datos de temperatura del conjunto de datos en un formato diferente.
Sube el archivo PySpark a Cloud Storage
Para subir el archivo PySpark a Cloud Storage, sigue estos pasos:
Guarda data_analytics_process.py en tu máquina local.
En la consola de Google Cloud, ve a la página Navegador de Cloud Storage:
Haz clic en el nombre del bucket que creaste anteriormente.
En la pestaña Objetos del bucket, haz clic en el botón Subir archivos, selecciona
data_analytics_process.py
en el cuadro de diálogo que aparece y haz clic en Abrir.
DAG de análisis de datos
Explore el DAG de ejemplo
El DAG usa varios operadores para transformar y unificar los datos:
El
AzureBlobStorageToGCSOperator
transfiere el archivo holidays.csv del BLOB en bloque de Azure al bucket de Cloud Storage.El
GCSToBigQueryOperator
transfiere el archivo holidays.csv de Cloud Storage a una tabla nueva en el conjunto de datosholidays_weather
de BigQuery que creaste antes.El
DataprocCreateBatchOperator
crea y ejecuta un trabajo por lotes de PySpark con Dataproc Serverless.El
BigQueryInsertJobOperator
une los datos de holidays.csv en la columna “Fecha” con los datos meteorológicos del conjunto de datos públicos de BigQuery ghcn_d. Las tareasBigQueryInsertJobOperator
se generan de forma dinámica mediante un bucle for, y estas tareas se encuentran en unTaskGroup
para mejorar la legibilidad en la Vista de gráfico de la IU de Airflow.
Usar la IU de Airflow para agregar variables
En Airflow, las variables son una forma universal de almacenar y recuperar parámetros de configuración arbitrarios como un almacén de clave-valor simple. Este DAG usa variables de Airflow para almacenar valores comunes. Para agregarlas a tu entorno, haz lo siguiente:
Accede a la IU de Airflow desde la consola de Cloud Composer.
Ve a Administrador > Variables.
Agrega las siguientes variables:
gcp_project
: el ID de tu proyecto.gcs_bucket
: Es el nombre del bucket que creaste antes (sin el prefijogs://
).gce_region
: Es la región en la que deseas que tu trabajo de Dataproc cumpla con los requisitos de herramientas de redes sin servidores de Dataproc. Esta es la región en la que habilitaste el acceso privado a Google anteriormente.dataproc_service_account
: Es la cuenta de servicio para el entorno de Cloud Composer. Puedes encontrar esta cuenta de servicio en la pestaña de configuración del entorno de tu entorno de Cloud Composer.azure_blob_name
: Es el nombre del BLOB que creaste antes.azure_blob_path
: Es la URL del BLOB que creaste antes. Para obtener la información, haz clic con el botón derecho en el BLOB y selecciona view/edit. La URL se encuentra en la pestaña de descripción general.azure_container_name
: Es el nombre del contenedor que creaste antes.
Sube el DAG al bucket de tu entorno
Cloud Composer programa los DAG que se encuentran en la carpeta /dags
del bucket de tu entorno. Para subir el DAG mediante la
consola de Google Cloud, sigue estos pasos:
En tu máquina local, guarda azureblobstoretogcsoperator_tutorial.py.
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, en la columna Carpeta de DAG, haz clic en el vínculo DAG. Se abrirá la carpeta de DAG de tu entorno.
Haga clic en Subir archivos.
Selecciona
azureblobstoretogcsoperator_tutorial.py
en tu máquina local y haz clic en Abrir.
Activa el DAG
En tu entorno de Cloud Composer, haz clic en la pestaña DAG.
Haz clic en el ID de DAG
azure_blob_to_gcs_dag
.Haz clic en Activar DAG.
Espera entre cinco y diez minutos hasta que veas una marca de verificación verde que indica que las tareas se completaron correctamente.
Valida el éxito del DAG
En la consola de Google Cloud, ve a la página de BigQuery.
En el panel Explorador, haz clic en el nombre de tu proyecto.
Haz clic en
holidays_weather_joined
.Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números en la columna de valores están en décimas de un grado Celsius.
Haz clic en
holidays_weather_normalized
.Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números en la columna de valores están en grados Celsius.
Limpieza
Borra los recursos individuales que creaste para este instructivo:
Borra el bucket de Cloud Storage que creaste para este instructivo.
Borra el entorno de Cloud Composer, incluida la eliminación manual del bucket del entorno.