Ejecuta un trabajo de recuento de palabras de Hadoop en un clúster de Dataproc

Cloud Composer 1 | Cloud Composer 2

En este instructivo, se muestra cómo usar Cloud Composer para crear un DAG (grafo acíclico dirigido) de Apache Airflow que ejecute un trabajo de conteo de palabras de Apache Hadoop en un clúster de Dataproc.

Objetivos

  1. Accede a tu entorno de Cloud Composer y usa la IU de Airflow.
  2. Crear y ver variables de entorno de Airflow
  3. Crea y ejecuta un DAG que incluya las siguientes tareas:
    1. Crear un clúster de Dataproc
    2. Ejecuta un trabajo de recuento de palabras de Apache Hadoop en el clúster.
    3. Envía los resultados del recuento de palabras a un bucket de Cloud Storage.
    4. Borra el clúster.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

  • Asegúrate de que las siguientes APIs estén habilitadas en tu proyecto:

    Console

    Habilita las API de Dataproc, Cloud Storage.

    Habilita las API

    gcloud

    Habilita las APIs de Dataproc, Cloud Storage:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • En tu proyecto, crea un bucket de Cloud Storage de cualquier clase de almacenamiento y región para almacenar los resultados del trabajo de recuento de palabras de Hadoop.

  • Toma nota de la ruta del bucket que creaste, por ejemplo, gs://example-bucket. Definirás una variable de Airflow para esta ruta de acceso y la usarás en el DAG de ejemplo más adelante en este instructivo.

  • Crea un entorno de Cloud Composer con parámetros predeterminados. Espera hasta que se complete la creación del entorno. Cuando hayas terminado, la marca de verificación verde se mostrará a la izquierda del nombre del entorno.

  • Anota la región en la que creaste tu entorno, por ejemplo, us-central. Definirás una variable de Airflow para esta región y la usarás en el DAG de ejemplo para ejecutar un clúster de Dataproc en la misma región.

Configura variables de Airflow

Configura las variables de Airflow para usarlas más adelante en el DAG de ejemplo. Por ejemplo, puedes configurar variables de Airflow en la IU de Airflow.

Variable de Airflow Valor
gcp_project El ID del proyecto que usas para este instructivo, como example-project.
gcs_bucket El URI del bucket de Cloud Storage que creaste para este instructivo, como gs://example-bucket.
gce_region La región en la que creaste el entorno, como us-central1. Esta es la región en la que se creará tu clúster de Dataproc.

Ver el flujo de trabajo de ejemplo

Un DAG de Airflow es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python. El código que se muestra en hadoop_tutorial.py es el código del flujo de trabajo.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operadores

Para organizar las tres tareas en el flujo de trabajo de ejemplo, el DAG importa los siguientes tres operadores de Airflow:

  • DataprocClusterCreateOperator: crea un clúster de Dataproc.

  • DataProcHadoopOperator: Envía un trabajo de conteo de palabras de Hadoop y escribe los resultados en un bucket de Cloud Storage.

  • DataprocClusterDeleteOperator: Borra el clúster para evitar que se generen cargos continuos de Compute Engine.

Dependencias

Las tareas que deseas ejecutar se organizan de una manera que refleje sus relaciones y dependencias. Las tareas de este DAG se ejecutan de forma secuencial.

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Programación

El nombre del DAG es composer_hadoop_tutorial, y este se ejecuta una vez al día. Debido a que la start_date que se pasa a default_dag_args se configura como yesterday, Cloud Composer programa el flujo de trabajo para que comience inmediatamente después de que se suba el DAG al bucket del entorno.

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Sube el DAG al bucket del entorno

Cloud Composer almacena los DAG en la carpeta /dags del bucket de tu entorno.

Sigue estos pasos para subir el DAG:

  1. En tu máquina local, guarda hadoop_tutorial.py.

  2. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  3. En la lista de entornos, en la columna Carpeta de DAG de tu entorno, haz clic en el vínculo DAGs.

  4. Haz clic en Subir archivos.

  5. Selecciona hadoop_tutorial.py en tu máquina local y haz clic en Abrir.

Cloud Composer agrega el DAG a Airflow y lo programa de forma automática. Los cambios en el DAG tardan de 3 a 5 minutos.

Explore las ejecuciones del DAG

Ver el estado de la tarea

Cuando subes el archivo DAG a la carpeta dags/ en Cloud Storage, Cloud Composer analiza el archivo. Cuando se completa correctamente, el nombre del flujo de trabajo aparece en la lista del DAG y el flujo de trabajo se pone en cola para ejecutarse de inmediato.

  1. Para ver el estado de la tarea, ve a la interfaz web de Airflow y haz clic en DAGs en la barra de herramientas.

  2. Para abrir la página de detalles del DAG, haz clic en composer_hadoop_tutorial. En esta página, se incluye una representación gráfica de las dependencias y tareas del flujo de trabajo.

  3. Para ver el estado de cada tarea, haz clic en Vista del gráfico (Graph View) y, luego, desplaza el mouse sobre el gráfico para ver cada tarea.

Vuelve a poner el flujo de trabajo en cola

Para volver a ejecutar el flujo de trabajo desde la Vista del gráfico (Graph View), sigue estos pasos:

  1. En la Vista del gráfico (Graph View) de la IU de Airflow, haz clic en el gráfico create_dataproc_cluster.
  2. Para restablecer las tres tareas, haz clic en Borrar (Clear) y, luego, en Aceptar para confirmar.
  3. Vuelve a hacer clic en create_dataproc_cluster en la Vista del gráfico (Graph View).
  4. Para volver a poner en cola el flujo de trabajo, haz clic en Ejecutar (Run).

Ver los resultados de la tarea

También puedes verificar el estado y los resultados del flujo de trabajo de composer_hadoop_tutorial en las siguientes páginas de la consola de Google Cloud:

  • Clústeres de Dataproc: Para supervisar la creación y eliminación de clústeres. Ten en cuenta que el clúster que crea el flujo de trabajo es efímero: solo existe durante el flujo de trabajo y se borra como parte de la última tarea del flujo de trabajo.

    Ir a Clústeres de Dataproc

  • Trabajos de Dataproc: Para ver o supervisar el trabajo de recuento de palabras de Apache Hadoop. Haz clic en el ID del trabajo para ver el resultado del registro de trabajos.

    Ir a Trabajos de Dataproc

  • Navegador de Cloud Storage: Para ver los resultados del recuento de palabras en la carpeta wordcount del bucket de Cloud Storage que creaste para este instructivo.

    Ir al navegador de Cloud Storage

Limpieza

Borra los recursos que se usaron en este instructivo:

  1. Borra el entorno de Cloud Composer, lo que incluye borrar de forma manual el bucket del entorno.

  2. Borra el bucket de Cloud Storage que almacena los resultados del trabajo de recuento de palabras de Hadoop.