Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En este tutorial se muestra cómo usar Cloud Composer para crear un DAG (grafo acíclico dirigido) de Apache Airflow que ejecuta una tarea de recuento de palabras de Apache Hadoop en un clúster de Dataproc.
Objetivos
- Accede a tu entorno de Cloud Composer y usa la interfaz de usuario de Airflow.
- Crea y consulta variables de entorno de Airflow.
- Crea y ejecuta un DAG que incluya las siguientes tareas:
- Crea un clúster de Dataproc.
- Ejecuta una tarea de recuento de palabras de Apache Hadoop en el clúster.
- Genera los resultados del recuento de palabras en un segmento de Cloud Storage.
- Elimina el clúster.
Costes
En este documento, se utilizan los siguientes componentes facturables de Google Cloud:
- Cloud Composer
- Dataproc
- Cloud Storage
Para generar una estimación de costes basada en el uso previsto,
utiliza la calculadora de precios.
Antes de empezar
Asegúrate de que las siguientes APIs estén habilitadas en tu proyecto:
Consola
Enable the Dataproc, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.com
storage-component.googleapis.com En tu proyecto, crea un segmento de Cloud Storage de cualquier clase de almacenamiento y región para almacenar los resultados de la tarea de recuento de palabras de Hadoop.
Anota la ruta del segmento que has creado. Por ejemplo,
gs://example-bucket
. Definirás una variable de Airflow para esta ruta y usarás la variable en el DAG de ejemplo más adelante en este tutorial.Crea un entorno de Cloud Composer con los parámetros predeterminados. Espera a que se complete la creación del entorno. Cuando termine, aparecerá una marca de verificación verde a la izquierda del nombre del entorno.
Anota la región en la que has creado tu entorno. Por ejemplo:
us-central
. Definirás una variable de Airflow para esta región y la usarás en el DAG de ejemplo para ejecutar un clúster de Dataproc en la misma región.
Definir variables de Airflow
Define las variables de Airflow para usarlas más adelante en el DAG de ejemplo. Por ejemplo, puedes definir variables de Airflow en la interfaz de usuario de Airflow.
Variable de Airflow | Valor |
---|---|
gcp_project
|
El ID de proyecto del proyecto que vas a usar en este tutorial, como example-project . |
gcs_bucket
|
El URI del segmento de Cloud Storage que has creado en este tutorial, como gs://example-bucket |
gce_region
|
La región en la que has creado tu entorno, como us-central1 .
Es la región en la que se creará tu clúster de Dataproc. |
Ver el flujo de trabajo de ejemplo
Un DAG de Airflow es una colección de tareas organizadas que quieres programar y ejecutar. Los DAGs se definen en archivos Python estándar. El código que se muestra en
hadoop_tutorial.py
es el código del flujo de trabajo.
Airflow 2
Airflow 1
Operadores
Para orquestar las tres tareas del flujo de trabajo de ejemplo, el DAG importa los tres operadores de Airflow siguientes:
DataprocClusterCreateOperator
: crea un clúster de Dataproc.DataProcHadoopOperator
: envía una tarea de recuento de palabras de Hadoop y escribe los resultados en un segmento de Cloud Storage.DataprocClusterDeleteOperator
: elimina el clúster para evitar que se apliquen cargos de Compute Engine.
Dependencias
Organiza las tareas que quieres ejecutar de forma que reflejen sus relaciones y dependencias. Las tareas de este DAG se ejecutan de forma secuencial.
Airflow 2
Airflow 1
Programación
El nombre del DAG es composer_hadoop_tutorial
y se ejecuta una vez al día. Como el start_date
que se transfiere a default_dag_args
se ha definido como yesterday
, Cloud Composer programa el flujo de trabajo para que se inicie inmediatamente después de que se suba el DAG al bucket del entorno.
Airflow 2
Airflow 1
Sube el DAG al segmento del entorno
Cloud Composer almacena los DAGs en la carpeta /dags
del segmento de tu entorno.
Para subir el DAG, sigue estos pasos:
En tu máquina local, guarda
hadoop_tutorial.py
.En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, en la columna Carpeta DAGs de tu entorno, haz clic en el enlace DAGs.
Haz clic en Subir archivos.
Selecciona
hadoop_tutorial.py
en tu equipo local y haz clic en Abrir.
Cloud Composer añade el DAG a Airflow y lo programa automáticamente. Los cambios en el DAG se producen en un plazo de entre 3 y 5 minutos.
Consultar ejecuciones de DAG
Ver el estado de una tarea
Cuando subes el archivo DAG a la carpeta dags/
de Cloud Storage, Cloud Composer lo analiza. Si se completa correctamente, el nombre del flujo de trabajo aparecerá en la lista de DAGs y se pondrá en cola para ejecutarse inmediatamente.
Para ver el estado de las tareas, ve a la interfaz web de Airflow y haz clic en DAGs (DAGs) en la barra de herramientas.
Para abrir la página de detalles del DAG, haz clic en
composer_hadoop_tutorial
. Esta página incluye una representación gráfica de las tareas y las dependencias del flujo de trabajo.Para ver el estado de cada tarea, haz clic en Vista de gráfico y, a continuación, coloca el cursor sobre el gráfico de cada tarea.
Volver a poner en cola el flujo de trabajo
Para volver a ejecutar el flujo de trabajo desde la vista de gráfico, sigue estos pasos:
- En la vista de gráfico de la interfaz de usuario de Airflow, haz clic en el gráfico
create_dataproc_cluster
. - Para restablecer las tres tareas, haz clic en Borrar y, a continuación, en Aceptar para confirmar.
- Vuelve a hacer clic en
create_dataproc_cluster
en la vista de gráfico. - Para volver a poner en cola el flujo de trabajo, haz clic en Ejecutar.

Ver los resultados de las tareas
También puedes consultar el estado y los resultados del composer_hadoop_tutorial
flujo de trabajo en las siguientes Google Cloud páginas de la consola:
Clústeres de Dataproc: para monitorizar la creación y eliminación de clústeres. Ten en cuenta que el clúster creado por el flujo de trabajo es efímero: solo existe durante la duración del flujo de trabajo y se elimina como parte de la última tarea del flujo de trabajo.
Tareas de Dataproc: para ver o monitorizar la tarea de recuento de palabras de Apache Hadoop. Haz clic en el ID de trabajo para ver el resultado del registro de trabajo.
Navegador de Cloud Storage: para ver los resultados del recuento de palabras en la carpeta
wordcount
del segmento de Cloud Storage que has creado en este tutorial.
Limpieza
Elimina los recursos que has usado en este tutorial:
Elimina el entorno de Cloud Composer, incluido el segmento del entorno, que debes eliminar manualmente.
Elimina el segmento de Cloud Storage que almacena los resultados de la tarea de recuento de palabras de Hadoop.