Ejecutar una tarea de recuento de palabras de Hadoop en un clúster de Dataproc

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En este tutorial se muestra cómo usar Cloud Composer para crear un DAG (grafo acíclico dirigido) de Apache Airflow que ejecuta una tarea de recuento de palabras de Apache Hadoop en un clúster de Dataproc.

Objetivos

  1. Accede a tu entorno de Cloud Composer y usa la interfaz de usuario de Airflow.
  2. Crea y consulta variables de entorno de Airflow.
  3. Crea y ejecuta un DAG que incluya las siguientes tareas:
    1. Crea un clúster de Dataproc.
    2. Ejecuta una tarea de recuento de palabras de Apache Hadoop en el clúster.
    3. Genera los resultados del recuento de palabras en un segmento de Cloud Storage.
    4. Elimina el clúster.

Costes

En este documento, se utilizan los siguientes componentes facturables de Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Antes de empezar

  • Asegúrate de que las siguientes APIs estén habilitadas en tu proyecto:

    Consola

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • En tu proyecto, crea un segmento de Cloud Storage de cualquier clase de almacenamiento y región para almacenar los resultados de la tarea de recuento de palabras de Hadoop.

  • Anota la ruta del segmento que has creado. Por ejemplo, gs://example-bucket. Definirás una variable de Airflow para esta ruta y usarás la variable en el DAG de ejemplo más adelante en este tutorial.

  • Crea un entorno de Cloud Composer con los parámetros predeterminados. Espera a que se complete la creación del entorno. Cuando termine, aparecerá una marca de verificación verde a la izquierda del nombre del entorno.

  • Anota la región en la que has creado tu entorno. Por ejemplo: us-central. Definirás una variable de Airflow para esta región y la usarás en el DAG de ejemplo para ejecutar un clúster de Dataproc en la misma región.

Definir variables de Airflow

Define las variables de Airflow para usarlas más adelante en el DAG de ejemplo. Por ejemplo, puedes definir variables de Airflow en la interfaz de usuario de Airflow.

Variable de Airflow Valor
gcp_project El ID de proyecto del proyecto que vas a usar en este tutorial, como example-project.
gcs_bucket El URI del segmento de Cloud Storage que has creado en este tutorial, como gs://example-bucket.
gce_region La región en la que has creado tu entorno, como us-central1. Es la región en la que se creará tu clúster de Dataproc.

Ver el flujo de trabajo de ejemplo

Un DAG de Airflow es una colección de tareas organizadas que quieres programar y ejecutar. Los DAGs se definen en archivos Python estándar. El código que se muestra en hadoop_tutorial.py es el código del flujo de trabajo.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operadores

Para orquestar las tres tareas del flujo de trabajo de ejemplo, el DAG importa los tres operadores de Airflow siguientes:

  • DataprocClusterCreateOperator: crea un clúster de Dataproc.

  • DataProcHadoopOperator: envía una tarea de recuento de palabras de Hadoop y escribe los resultados en un segmento de Cloud Storage.

  • DataprocClusterDeleteOperator: elimina el clúster para evitar que se apliquen cargos de Compute Engine.

Dependencias

Organiza las tareas que quieres ejecutar de forma que reflejen sus relaciones y dependencias. Las tareas de este DAG se ejecutan de forma secuencial.

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Programación

El nombre del DAG es composer_hadoop_tutorial y se ejecuta una vez al día. Como el start_date que se transfiere a default_dag_args se ha definido como yesterday, Cloud Composer programa el flujo de trabajo para que se inicie inmediatamente después de que se suba el DAG al bucket del entorno.

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Sube el DAG al segmento del entorno

Cloud Composer almacena los DAGs en la carpeta /dags del segmento de tu entorno.

Para subir el DAG, sigue estos pasos:

  1. En tu máquina local, guarda hadoop_tutorial.py.

  2. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  3. En la lista de entornos, en la columna Carpeta DAGs de tu entorno, haz clic en el enlace DAGs.

  4. Haz clic en Subir archivos.

  5. Selecciona hadoop_tutorial.py en tu equipo local y haz clic en Abrir.

Cloud Composer añade el DAG a Airflow y lo programa automáticamente. Los cambios en el DAG se producen en un plazo de entre 3 y 5 minutos.

Consultar ejecuciones de DAG

Ver el estado de una tarea

Cuando subes el archivo DAG a la carpeta dags/ de Cloud Storage, Cloud Composer lo analiza. Si se completa correctamente, el nombre del flujo de trabajo aparecerá en la lista de DAGs y se pondrá en cola para ejecutarse inmediatamente.

  1. Para ver el estado de las tareas, ve a la interfaz web de Airflow y haz clic en DAGs (DAGs) en la barra de herramientas.

  2. Para abrir la página de detalles del DAG, haz clic en composer_hadoop_tutorial. Esta página incluye una representación gráfica de las tareas y las dependencias del flujo de trabajo.

  3. Para ver el estado de cada tarea, haz clic en Vista de gráfico y, a continuación, coloca el cursor sobre el gráfico de cada tarea.

Volver a poner en cola el flujo de trabajo

Para volver a ejecutar el flujo de trabajo desde la vista de gráfico, sigue estos pasos:

  1. En la vista de gráfico de la interfaz de usuario de Airflow, haz clic en el gráfico create_dataproc_cluster.
  2. Para restablecer las tres tareas, haz clic en Borrar y, a continuación, en Aceptar para confirmar.
  3. Vuelve a hacer clic en create_dataproc_cluster en la vista de gráfico.
  4. Para volver a poner en cola el flujo de trabajo, haz clic en Ejecutar.

Ver los resultados de las tareas

También puedes consultar el estado y los resultados del composer_hadoop_tutorial flujo de trabajo en las siguientes Google Cloud páginas de la consola:

  • Clústeres de Dataproc: para monitorizar la creación y eliminación de clústeres. Ten en cuenta que el clúster creado por el flujo de trabajo es efímero: solo existe durante la duración del flujo de trabajo y se elimina como parte de la última tarea del flujo de trabajo.

    Ir a Clústeres de Dataproc

  • Tareas de Dataproc: para ver o monitorizar la tarea de recuento de palabras de Apache Hadoop. Haz clic en el ID de trabajo para ver el resultado del registro de trabajo.

    Ir a tareas de Dataproc

  • Navegador de Cloud Storage: para ver los resultados del recuento de palabras en la carpeta wordcount del segmento de Cloud Storage que has creado en este tutorial.

    Ir al navegador de Cloud Storage

Limpieza

Elimina los recursos que has usado en este tutorial:

  1. Elimina el entorno de Cloud Composer, incluido el segmento del entorno, que debes eliminar manualmente.

  2. Elimina el segmento de Cloud Storage que almacena los resultados de la tarea de recuento de palabras de Hadoop.