Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En este instructivo, se muestra cómo usar Cloud Composer para crear una DAG de Apache Airflow (grafo acíclico dirigido) que ejecuta un trabajo de conteo de palabras de Apache Hadoop en Dataproc clúster.
Objetivos
- Accede a tu entorno de Cloud Composer y usa IU de Airflow.
- Crear y ver variables de entorno de Airflow
- Crear y ejecutar un DAG que incluya las siguientes tareas:
- Crea un clúster de Dataproc.
- Ejecuta un trabajo de recuento de palabras de Apache Hadoop en el clúster.
- Envía los resultados del recuento de palabras a un bucket de Cloud Storage.
- Borra el clúster.
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Cloud Composer
- Dataproc
- Cloud Storage
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Antes de comenzar
Asegúrate de que las siguientes APIs estén habilitadas en tu proyecto:
Console
Enable the Dataproc, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.com
storage-component.googleapis.com En tu proyecto, crea un bucket de Cloud Storage de cualquier clase de almacenamiento y región para almacenar los resultados de conteo de palabras.
Toma nota de la ruta del bucket que creaste, por ejemplo,
gs://example-bucket
. Definirás una variable de Airflow para esta ruta y la usarás en el DAG de ejemplo más adelante en este instructivo.Crea un entorno de Cloud Composer con la configuración parámetros. Espera hasta que se complete la creación del entorno. Cuando hayas terminado, una marca de verificación verde a la izquierda del nombre del entorno.
Toma nota de la región en la que creaste tu entorno, por ejemplo
us-central
Definirás una variable de Airflow para esta región y la usarás en el DAG de ejemplo para ejecutar un clúster de Dataproc en la misma región.
Configura variables de Airflow
Configura las variables de Airflow para usar más adelante en el DAG de ejemplo. Por ejemplo, puedes configurar variables de Airflow en la IU de Airflow.
Variable de Airflow | Valor |
---|---|
gcp_project
|
El ID del proyecto que usas para este instructivo, como example-project . |
gcs_bucket
|
El URI del bucket de Cloud Storage que creaste para este instructivo, como gs://example-bucket |
gce_region
|
La región en la que creaste tu entorno, como us-central1 .
Esta es la región en la que se creará tu clúster de Dataproc. |
Ver el flujo de trabajo de ejemplo
Un DAG de Airflow es una colección de tareas organizadas que deseas programar
y ejecutar. Los DAG se definen en archivos estándares de Python. El código que se muestra en hadoop_tutorial.py
es el código del flujo de trabajo.
Operadores
Para organizar las tres tareas en el flujo de trabajo de ejemplo, el DAG importa los siguientes tres operadores de Airflow:
DataprocClusterCreateOperator
: crea un clúster de Dataproc.DataProcHadoopOperator
: Envía un trabajo de recuento de palabras de Hadoop y escribe los resultados en un bucket de Cloud Storage.DataprocClusterDeleteOperator
: Borra el clúster para evitar que se generen. cargos continuos de Compute Engine.
Dependencias
Las tareas que deseas ejecutar se organizan de una manera que refleje sus relaciones y dependencias. Las tareas de este DAG se ejecutan de forma secuencial.
Programación
El nombre del DAG es composer_hadoop_tutorial
, y este se ejecuta una vez cada uno
día. Debido a que la start_date
que se transfiere a default_dag_args
se configura como yesterday
, Cloud Composer programa el flujo de trabajo para que comience de inmediato después de subir el DAG al bucket del entorno.
Sube el DAG al bucket del entorno
Cloud Composer almacena los DAG en la carpeta /dags
de tu
en el bucket de tu entorno.
Sigue estos pasos para subir el DAG:
En tu máquina local, guarda
hadoop_tutorial.py
.En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, en la columna Carpeta de DAG de tu haz clic en el vínculo DAG.
Haz clic en Subir archivos.
Selecciona
hadoop_tutorial.py
en tu máquina local y haz clic en Abrir.
Cloud Composer agrega el DAG a Airflow y lo programa de forma automática. Los cambios en el DAG tardan de 3 a 5 minutos.
Explora las ejecuciones de DAG
Ver estado de la tarea
Cuando subes tu archivo DAG a la carpeta dags/
en Cloud Storage, Cloud Composer analiza el archivo. Cuando se completa correctamente, el nombre
del flujo de trabajo aparece en la lista del DAG, y este se pone en cola para ejecutarse
de inmediato.
Para ver el estado de la tarea, ve a la interfaz web de Airflow y haz clic en DAGs en la barra de herramientas.
Para abrir la página de detalles del DAG, haz clic en
composer_hadoop_tutorial
. Esta incluye una representación gráfica de las tareas del flujo de trabajo y las dependencias.Para ver el estado de cada tarea, haz clic en Vista del gráfico (Graph View) y, luego, desplaza el mouse sobre el gráfico para ver cada tarea.
Vuelve a poner en cola el flujo de trabajo
Para volver a ejecutar el flujo de trabajo desde la Vista del gráfico (Graph View), sigue estos pasos:
- En la Vista del gráfico (Graph View) de la IU de Airflow, haz clic en el gráfico
create_dataproc_cluster
. - Para restablecer las tres tareas, haz clic en Borrar (Clear) y, luego, en Aceptar para confirmar.
- Vuelve a hacer clic en
create_dataproc_cluster
en la Vista del gráfico (Graph View). - Para volver a poner en cola el flujo de trabajo, haz clic en Ejecutar (Run).
Cómo ver los resultados de las tareas
También puedes verificar el estado y los resultados de composer_hadoop_tutorial
de trabajo en las siguientes páginas de la consola de Google Cloud:
Clústeres de Dataproc: Para supervisar la creación de clústeres y y la eliminación de datos. Ten en cuenta que el clúster que crea el flujo de trabajo es efímero; solo existe durante el flujo de trabajo y se borra como parte de su última tarea.
Trabajos de Dataproc: para ver o supervisar Apache Hadoop de conteo de palabras. Haz clic en el ID del trabajo para ver el resultado del registro de trabajos.
Navegador de Cloud Storage: Para ver los resultados del recuento de palabras en la carpeta
wordcount
del bucket de Cloud Storage que creaste para este instructivo.
Limpieza
Borra los recursos que se usaron en este instructivo:
Borra el entorno de Cloud Composer, incluido lo siguiente: borrar manualmente el bucket del entorno.
Borra el bucket de Cloud Storage que almacena los resultados del trabajo de recuento de palabras de Hadoop.