Ejecutar un trabajo de recuento de palabras de Hadoop en un clúster de Dataproc

En este instructivo, se muestra cómo usar Cloud Composer para crear un DAG de Apache Airflow (flujo de trabajo) que ejecuta un trabajo de recuento de palabras de Apache Hadoop en un clúster de Dataproc con Google Cloud Console.

Objetivos

  1. Acceder a tu entorno de Cloud Composer y usar la interfaz web de Airflow
  2. Crear y ver variables de entorno de Airflow
  3. Crear y ejecutar un DAG que incluya las siguientes tareas:
    1. Crea un clúster de Dataproc
    2. Ejecución de un trabajo de recuento de palabras de Apache Hadoop en el clúster
    3. Envío de los resultados del recuento de palabras a un depósito de Cloud Storage
    4. Eliminación del clúster

Costos

En este instructivo, se usan los componentes facturables de Google Cloud, incluidos los siguientes:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

El sistema demora hasta 25 minutos en crear el entorno. Este instructivo puede tardar aproximadamente 1 hora en completarse. Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Google Cloud pueden ser elegibles para obtener una prueba gratuita.

Antes de comenzar

  1. Accede a tu Cuenta de Google.

    Si todavía no tienes una cuenta, regístrate para obtener una nueva.

  2. En la página de selección de proyectos de Cloud Console, selecciona o crea un proyecto de Cloud.

    Ir a la página Selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud. Obtén información sobre cómo confirmar que tienes habilitada la facturación para tu proyecto.

  4. Habilita las API de Cloud Composer, Cloud Dataproc, and Cloud Storage.

    Habilita las API

  5. En tu proyecto, crea un depósito de Cloud Storage de cualquier clase de almacenamiento y región para almacenar los resultados del trabajo de recuento de palabras de Hadoop.
  6. Toma nota de la ruta del depósito que creaste, por ejemplo, gs://my-bucket. Definirás una variable de Airflow para esta ruta y la usarás en el DAG de ejemplo.

Crea un entorno

  1. En Cloud Console, ve a la página Crear entorno.

    Abrir la página Crear entorno

  2. En el campo Nombre, ingresa example-environment.

  3. En la lista desplegable Ubicación, selecciona una región para el entorno de Cloud Composer. Consulta Regiones disponibles para obtener información sobre cómo seleccionar una región.

  4. Para otras opciones de configuración del entorno, usa los valores predeterminados proporcionados.

  5. Para crear el entorno, haz clic en Crear.

  6. Espera hasta que se complete la creación del entorno. Cuando termine, aparecerá la marca de verificación verde a la izquierda del nombre del entorno.

Visualiza detalles del entorno

Después de completar la creación del entorno, puedes ver la información de implementación de tu entorno, como las versiones de Cloud Composer y Python, la URL de la interfaz web de Airflow y el ID del clúster de Google Kubernetes Engine.

Para ver la información de implementación, sigue estos pasos:

  1. En Cloud Console, ve a la página Entornos.

    Abrir la página Entornos

  2. Para ver la página de detalles de Entorno, haz clic en example-environment.

  3. Toma nota de la zona en la que creó tu entorno, por ejemplo, us-central-1c. Definirás una variable de Airflow para esta zona y la usarás en el DAG de ejemplo.

Configura variables de Airflow

Las variables de Airflow son un concepto específico de Airflow que difiere de las variables de entorno. En este paso, usarás la interfaz web de Airflow para configurar tres variables de Airflow que se usarán más adelante en el DAG de ejemplo.

Para configurar las variables, sigue estos pasos:

  1. Accede a la interfaz web de Airflow en Cloud Console:

    1. En Cloud Console, ve a la página Entornos.

      Abrir la página Entornos

    2. En la columna Servidor web de Airflow para example-environment, haz clic en el vínculo Airflow. La interfaz web de Airflow se abrirá en una ventana nueva.

  2. Configura las variables en la interfaz web de Airflow con estos pasos:

    1. En la barra de herramientas, haz clic en Administrador > Variables.
    2. Para crear una variable nueva, haz clic en Crear.
    3. Para cada una de las siguientes variables, ingresa el par clave-valor y haz clic en Guardar. Todas las variables de Airflow aparecen en la pestaña Lista.
      CLAVE VALOR
      gcp_project El ID del proyecto del proyecto de Google Cloud Platform que usas para este instructivo, como composer-test.
      gcs_bucket El depósito de Cloud Storage que creaste para este instructivo, como gs://my-bucket.
      gce_zone La zona de Compute Engine para tu entorno, como us-central1-c. Esta es la zona en la que se creará tu clúster de Dataproc. Consulta Regiones y zonas disponibles.

Visualiza el flujo de trabajo de ejemplo

Un DAG de Airflow es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python. El código que se muestra en hadoop_tutorial.py es el código del flujo de trabajo.

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operadores

Un operador es una plantilla para una sola tarea en un flujo de trabajo. Para organizar las tres tareas en el flujo de trabajo de ejemplo, el DAG importa los siguientes tres operadores:

  1. DataprocClusterCreateOperator: crea un clúster de Dataproc.
  2. DataProcHadoopOperator: envía un trabajo de recuento de palabras de Hadoop y escribe los resultados en un depósito de Cloud Storage.
  3. DataprocClusterDeleteOperator: borra el clúster para evitar incurrir en cargos constantes de Compute Engine.

Dependencias

Las tareas que desea ejecutar se organizan de una manera que refleje sus relaciones y dependencias. Las tareas de este DAG se ejecutan de forma secuencial. En este ejemplo, la relación se establece en la dirección que señala el operador bitshift de Python (>>).

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Programación

El nombre del DAG es composer_hadoop_tutorial, y el DAG se ejecuta una vez al día. Debido a que la start_date que se transfiere a default_dag_args se configura como yesterday, Cloud Composer programa el flujo de trabajo para comenzar de inmediato después de subir el DAG.

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Sube el DAG a Cloud Storage

Cloud Composer programa solo los DAG en la carpeta de DAG. La carpeta de DAG se encuentra en el depósito de Cloud Storage que Cloud Composer crea de forma automática para tu entorno.

Para subir el DAG, sigue estos pasos:

  1. En tu máquina local, guarda hadoop_tutorial.py.
  2. En Cloud Console, ve a la página Entornos.

    Abrir la página Entornos

  3. En la columna Carpeta de DAG para example-environment, haz clic en el vínculo DAG. Se abrirá la carpeta de DAG en Cloud Storage.

  4. Haz clic en Subir archivos.

  5. Selecciona hadoop_tutorial.py en tu máquina local y haz clic en Abrir.

Cloud Composer agrega el DAG a Airflow y lo programa de forma automática. Los cambios en el DAG tardan de 3 a 5 minutos.

Explora las ejecuciones del DAG

Visualiza el estado de la tarea

Cuando subes tu archivo DAG a la carpeta dags/ en Cloud Storage, Cloud Composer analiza el archivo. Cuando se completa correctamente, el nombre del flujo de trabajo aparece en la lista del DAG y el flujo de trabajo se coloca en cola para ejecutarse de inmediato.

  1. Para ver el estado de la tarea, ve a la interfaz web de Airflow y haz clic en DAGs en la barra de herramientas.

  2. Para abrir la página de detalles del DAG, haz clic en composer_hadoop_tutorial. En esta página, se incluye una representación gráfica de las dependencias y tareas del flujo de trabajo.

  3. Para ver el estado de cada tarea, haz clic en Vista del gráfico (Graph View) y, luego, desplaza el mouse sobre el gráfico para ver cada tarea.

Vuelve a poner en cola el flujo de trabajo

Para volver a ejecutar el flujo de trabajo desde la Vista del gráfico (Graph View), sigue estos pasos:

  1. En la Vista del gráfico (Graph View) de la IU de Airflow, haz clic en el gráfico create_dataproc_cluster.
  2. Para restablecer las tres tareas, haz clic en Borrar (Clear) y, luego, en Aceptar para confirmar.
  3. Vuelve a hacer clic en create_dataproc_cluster en la Vista del gráfico (Graph View).
  4. Para volver a poner en cola el flujo de trabajo, haz clic en Ejecutar (Run).

Visualiza los resultados de la tarea

También puedes consultar el estado y los resultados del flujo de trabajo composer_hadoop_tutorial en las siguientes páginas de Cloud Console:

  • Clústeres de Dataproc para supervisar la creación y eliminación de clústeres. Ten en cuenta que el clúster que crea el flujo de trabajo es efímero; solo existe durante el flujo de trabajo y se borra como parte de su última tarea.

  • Trabajos de Dataproc para ver o supervisar el trabajo de recuento de palabras de Apache Hadoop. Haz clic en el ID de la tarea para ver el resultado del registro de trabajos.

  • Navegador de Cloud Storage para ver los resultados del recuento de palabras en la carpeta wordcount del depósito de Cloud Storage que creaste para este instructivo.

Limpia

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform por los recursos que usaste en este instructivo:

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a la página Administrar recursos

  2. Si el proyecto que deseas borrar está vinculado con una organización, selecciónala en la lista Organización, en la parte superior de la página.
  3. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar .
  4. En el cuadro de diálogo, escribe el ID del proyecto y haz clic en Cerrar para borrar el proyecto.

Como alternativa, puedes borrar los recursos que se usan en este instructivo de la siguiente manera:

  1. Borra el entorno de Cloud Composer.
  2. Borra el depósito de Cloud Storage para el entorno de Cloud Composer. Si borras el entorno de Cloud Composer, no se borra el depósito.
  3. Borra los temas de Pub/Sub para Cloud Composer (composer-agent y composer-backend).

Qué sigue