Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial es una modificación de Ejecutar un DAG de analíticas de datos en Google Cloud, que muestra cómo conectar tu entorno de Cloud Composer a Microsoft Azure para utilizar los datos almacenados allí. En él se muestra cómo usar Cloud Composer para crear un DAG de Apache Airflow. El DAG combina datos de un conjunto de datos público de BigQuery y de un archivo CSV almacenado en Azure Blob Storage y, a continuación, ejecuta un trabajo por lotes de Dataproc Serverless para procesar los datos combinados.
El conjunto de datos público de BigQuery de este tutorial es ghcn_d, una base de datos integrada de resúmenes climáticos de todo el mundo. El archivo CSV contiene información sobre las fechas y los nombres de las festividades de EE. UU. entre 1997 y 2021.
La pregunta a la que queremos responder con el DAG es: "¿Qué temperatura hizo en Chicago el día de Acción de Gracias durante los últimos 25 años?".
Objetivos
- Crear un entorno de Cloud Composer con la configuración predeterminada
- Crear un blob en Azure
- Crear un conjunto de datos de BigQuery vacío
- Crear un segmento de Cloud Storage
- Crea y ejecuta un DAG que incluya las siguientes tareas:
- Cargar un conjunto de datos externo de Azure Blob Storage en Cloud Storage
- Cargar un conjunto de datos externo de Cloud Storage en BigQuery
- Combinar dos conjuntos de datos en BigQuery
- Ejecutar una tarea de PySpark de analíticas de datos
Antes de empezar
Habilitar APIs
Habilita las siguientes APIs:
Consola
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Conceder permisos
Concede los siguientes roles y permisos a tu cuenta de usuario:
Concede roles para gestionar los entornos de Cloud Composer y los segmentos de entorno.
Asigna el rol Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) para crear un conjunto de datos de BigQuery.Asigna el rol Administrador de Storage (
roles/storage.admin
) para crear un segmento de Cloud Storage.
Crear y preparar un entorno de Cloud Composer
Crea un entorno de Cloud Composer con los parámetros predeterminados:
- Elige una región de EE. UU.
- Elige la última versión de Cloud Composer.
Asigna los siguientes roles a la cuenta de servicio que se usa en tu entorno de Cloud Composer para que los workers de Airflow puedan ejecutar correctamente las tareas de los DAG:
- Usuario de BigQuery (
roles/bigquery.user
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
) - Editor de Dataproc (
roles/dataproc.editor
) - Trabajador de Dataproc (
roles/dataproc.worker
)
- Usuario de BigQuery (
Crear y modificar recursos relacionados en Google Cloud
Instala el
apache-airflow-providers-microsoft-azure
paquete de PyPI en tu entorno de Cloud Composer.Crea un conjunto de datos de BigQuery vacío con los siguientes parámetros:
- Nombre:
holiday_weather
- Región:
US
- Nombre:
Crea un segmento de Cloud Storage en la multirregión
US
.Ejecuta el siguiente comando para habilitar el acceso privado a Google en la subred predeterminada de la región en la que quieras ejecutar Dataproc sin servidor para cumplir los requisitos de red. Te recomendamos que uses la misma región que tu entorno de Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Crear recursos relacionados en Azure
Crea una cuenta de almacenamiento con la configuración predeterminada.
Obtén la clave de acceso y la cadena de conexión de tu cuenta de almacenamiento.
Crea un contenedor con las opciones predeterminadas en la cuenta de almacenamiento que acabas de crear.
Concede el rol Delegador de Storage Blob al contenedor creado en el paso anterior.
Sube holidays.csv para crear un blob de bloque con las opciones predeterminadas en el portal de Azure.
Crea un token de SAS para el blob de bloques que has creado en el paso anterior en Azure Portal.
- Método de firma: clave de delegación de usuario
- Permisos: lectura
- Dirección IP permitida: ninguna
- Protocolos permitidos: solo HTTPS
Conectarse a Azure desde Cloud Composer
Añade tu conexión de Microsoft Azurecon la interfaz de usuario de Airflow:
Vaya a Administrar > Conexiones.
Crea una conexión con la siguiente configuración:
- ID de conexión:
azure_blob_connection
- Tipo de conexión:
Azure Blob Storage
- Inicio de sesión de Blob Storage: nombre de tu cuenta de almacenamiento
- Clave de Blob Storage: la clave de acceso de tu cuenta de almacenamiento
- Cadena de conexión de la cuenta de almacenamiento de blobs: cadena de conexión de su cuenta de almacenamiento
- Token de SAS: token de SAS generado a partir de tu blob.
- ID de conexión:
Procesamiento de datos con Dataproc Serverless
Consultar el ejemplo de tarea de PySpark
El código que se muestra a continuación es un ejemplo de trabajo de PySpark que convierte la temperatura de décimas de grado Celsius a grados Celsius. Este trabajo convierte los datos de temperatura del conjunto de datos a otro formato.
Sube el archivo PySpark a Cloud Storage
Para subir el archivo PySpark a Cloud Storage, sigue estos pasos:
Guarda data_analytics_process.py en tu máquina local.
En la Google Cloud consola, ve a la página Navegador de Cloud Storage:
Haga clic en el nombre del segmento que ha creado anteriormente.
En la pestaña Objetos del contenedor, haga clic en el botón Subir archivos, seleccione
data_analytics_process.py
en el cuadro de diálogo que aparece y haga clic en Abrir.
DAG de analíticas de datos
Explorar el DAG de ejemplo
El DAG usa varios operadores para transformar y unificar los datos:
El comando
AzureBlobStorageToGCSOperator
transfiere el archivo holidays.csv de tu blob de bloques de Azure a tu segmento de Cloud Storage.El
GCSToBigQueryOperator
ingiere el archivo holidays.csv de Cloud Storage en una tabla nueva del conjunto de datosholidays_weather
de BigQuery que has creado anteriormente.El comando
DataprocCreateBatchOperator
crea y ejecuta una tarea por lotes de PySpark con Dataproc sin servidor.La
BigQueryInsertJobOperator
combina los datos de holidays.csv de la columna "Date" con los datos meteorológicos del conjunto de datos público de BigQuery ghcn_d. Las tareasBigQueryInsertJobOperator
se generan de forma dinámica mediante un bucle for y se encuentran en unTaskGroup
para que sean más fáciles de leer en la vista de gráfico de la interfaz de usuario de Airflow.
Usar la interfaz de usuario de Airflow para añadir variables
En Airflow, las variables son una forma universal de almacenar y recuperar ajustes o configuraciones arbitrarios como un simple almacén de valores clave. Este DAG usa variables de Airflow para almacenar valores comunes. Para añadirlos a tu entorno, sigue estos pasos:
Acceder a la interfaz de usuario de Airflow desde la consola de Cloud Composer
Vaya a Administrar > Variables.
Añade las siguientes variables:
gcp_project
: tu ID de proyecto.gcs_bucket
: el nombre del segmento que has creado anteriormente (sin el prefijogs://
).gce_region
: la región en la que quieres que se ejecute el trabajo de Dataproc que cumpla los requisitos de red de Dataproc Serverless. Es la región en la que has habilitado el acceso privado de Google anteriormente.dataproc_service_account
: la cuenta de servicio de tu entorno de Cloud Composer. Puedes encontrar esta cuenta de servicio en la pestaña de configuración del entorno de Cloud Composer.azure_blob_name
: el nombre del blob que has creado antes.azure_container_name
: el nombre del contenedor que has creado antes.
Subir el DAG al segmento de tu entorno
Cloud Composer programa los DAGs que se encuentran en la carpeta /dags
del segmento de tu entorno. Para subir el DAG con la consola, sigue estos pasos:
Google Cloud
En tu máquina local, guarda azureblobstoretogcsoperator_tutorial.py.
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, en la columna Carpeta DAG, haga clic en el enlace DAGs. Se abrirá la carpeta DAGs de tu entorno.
Haz clic en Subir archivos.
Selecciona
azureblobstoretogcsoperator_tutorial.py
en tu máquina local y haz clic en Abrir.
Activar el DAG
En tu entorno de Cloud Composer, haz clic en la pestaña DAGs (DAGs).
Haz clic en el ID de DAG
azure_blob_to_gcs_dag
.Haz clic en Activar DAG.
Espera entre cinco y diez minutos hasta que veas una marca de verificación verde que indica que las tareas se han completado correctamente.
Validar que el DAG se ha completado correctamente
En la consola, ve a la página BigQuery. Google Cloud
En el panel Explorador, haz clic en el nombre de tu proyecto.
Haz clic en
holidays_weather_joined
.Haz clic en Vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna de valores están en décimas de grado Celsius.
Haz clic en
holidays_weather_normalized
.Haz clic en Vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna "Valor" están en grados Celsius.
Limpieza
Elimina los recursos que hayas creado para este tutorial:
Elimina el segmento de Cloud Storage que has creado en este tutorial.
Elimina el entorno de Cloud Composer, incluido el segmento del entorno, que debes eliminar manualmente.
Siguientes pasos
- Ejecuta un DAG de analíticas de datos en Google Cloud.
- Ejecutar un DAG de analíticas de datos en AWS