Eseguire un job di conteggio parole di Hadoop su un cluster Dataproc

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questo tutorial mostra come utilizzare Cloud Composer per creare un DAG (Directed Acyclic Graph) Apache Airflow che esegue un job di conteggio parole di Apache Hadoop su un cluster Dataproc.

Obiettivi

  1. Accedere all'ambiente Cloud Composer e utilizzare la UI di Airflow.
  2. Crea e visualizza le variabili di ambiente Airflow.
  3. Crea ed esegui un DAG che includa le seguenti attività:
    1. Crea un cluster Dataproc.
    2. Esegue un job di conteggio parole Apache Hadoop nel cluster.
    3. Restituisce i risultati del conteggio delle parole in un bucket Cloud Storage.
    4. Elimina il cluster.

Costi

In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

  • Assicurati che nel progetto siano abilitate le API seguenti:

    Console

    Abilita le API Dataproc, Cloud Storage.

    Abilita le API

    gcloud

    Abilita le API Dataproc, Cloud Storage.

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • Nel tuo progetto, crea un bucket Cloud Storage di qualsiasi classe di archiviazione e regione per archiviare i risultati del job di conteggio parole di Hadoop.

  • Prendi nota del percorso del bucket che hai creato, ad esempio gs://example-bucket. Definirai una variabile Airflow per questo percorso e userai la variabile nel DAG di esempio più avanti in questo tutorial.

  • Crea un ambiente Cloud Composer con parametri predefiniti. Attendi il completamento della creazione dell'ambiente. Al termine, il segno di spunta verde viene visualizzato a sinistra del nome dell'ambiente.

  • Prendi nota della regione in cui hai creato il tuo ambiente, ad esempio us-central. Definirai una variabile Airflow per questa regione e la utilizzerai nel DAG di esempio per eseguire un cluster Dataproc nella stessa regione.

Imposta variabili Airflow

Imposta le variabili Airflow da utilizzare in seguito nel DAG di esempio. Ad esempio, puoi impostare le variabili Airflow nella UI di Airflow.

Variabile Airflow Valore
gcp_project L'ID progetto del progetto che stai utilizzando per questo tutorial, ad esempio example-project.
gcs_bucket L'URI del bucket Cloud Storage che hai creato per questo tutorial, ad esempio gs://example-bucket.
gce_region La regione in cui hai creato l'ambiente, ad esempio us-central1. Questa è la regione in cui verrà creato il cluster Dataproc.

Visualizza il flusso di lavoro di esempio

Un DAG Airflow è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG sono definiti in file Python standard. Il codice mostrato in hadoop_tutorial.py è il codice del flusso di lavoro.

Flusso d'aria 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Flusso d'aria 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operatori

Per orchestrare le tre attività nel flusso di lavoro di esempio, il DAG importa i seguenti tre operatori Airflow:

  • DataprocClusterCreateOperator: crea un cluster Dataproc.

  • DataProcHadoopOperator: invia un job di conteggio parole di Hadoop e scrive i risultati in un bucket Cloud Storage.

  • DataprocClusterDeleteOperator: elimina il cluster per evitare addebiti continui per Compute Engine.

Dipendenze

Puoi organizzare le attività da eseguire in modo che riflettano le loro relazioni e dipendenze. Le attività in questo DAG vengono eseguite in sequenza.

Flusso d'aria 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Flusso d'aria 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Programmazione

Il nome del DAG è composer_hadoop_tutorial e il DAG viene eseguito una volta al giorno. Poiché l'start_date che viene passato a default_dag_args è impostato su yesterday, Cloud Composer pianifica l'avvio del flusso di lavoro immediatamente dopo il caricamento del DAG nel bucket dell'ambiente.

Flusso d'aria 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Flusso d'aria 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Carica il DAG nel bucket dell'ambiente

Cloud Composer archivia i DAG nella cartella /dags del bucket del tuo ambiente.

Per caricare il DAG:

  1. Sul computer locale, salva hadoop_tutorial.py.

  2. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  3. Nell'elenco degli ambienti, nella colonna Cartella DAG del tuo ambiente, fai clic sul link DAG.

  4. Fai clic su Carica file.

  5. Seleziona hadoop_tutorial.py sulla macchina locale e fai clic su Apri.

Cloud Composer aggiunge il DAG ad Airflow e pianifica il DAG automaticamente. Le modifiche ai DAG vengono applicate entro 3-5 minuti.

Esplora le esecuzioni di DAG

Visualizza lo stato dell'attività

Quando carichi il file DAG nella cartella dags/ di Cloud Storage, Cloud Composer analizza il file. Una volta completato correttamente, il nome del flusso di lavoro viene visualizzato nell'elenco di DAG e il flusso di lavoro viene messo in coda per essere eseguito immediatamente.

  1. Per visualizzare lo stato dell'attività, vai all'interfaccia web di Airflow e fai clic su DAG nella barra degli strumenti.

  2. Per aprire la pagina dei dettagli del DAG, fai clic su composer_hadoop_tutorial. Questa pagina include una rappresentazione grafica delle attività e delle dipendenze del flusso di lavoro.

  3. Per visualizzare lo stato di ogni attività, fai clic su Visualizzazione grafico e passa il mouse sopra l'immagine di ogni attività.

Aggiungi di nuovo il flusso di lavoro in coda

Per eseguire nuovamente il flusso di lavoro dalla Visualizzazione grafico:

  1. Nella visualizzazione del grafico della UI di Airflow, fai clic sull'immagine create_dataproc_cluster.
  2. Per reimpostare le tre attività, fai clic su Cancella, quindi fai clic su OK per confermare.
  3. Fai di nuovo clic su create_dataproc_cluster in Visualizzazione grafico.
  4. Per inserire nuovamente il flusso di lavoro in coda, fai clic su Esegui.

Visualizza i risultati delle attività

Puoi anche controllare lo stato e i risultati del flusso di lavoro composer_hadoop_tutorial visitando le seguenti pagine della console Google Cloud:

  • Cluster Dataproc: per monitorare la creazione e l'eliminazione del cluster. Tieni presente che il cluster creato dal flusso di lavoro è temporaneo: esiste solo per la durata del flusso di lavoro e viene eliminato come parte dell'ultima attività del flusso di lavoro.

    Vai a Cluster Dataproc

  • Job Dataproc: per visualizzare o monitorare il job di conteggio parole di Apache Hadoop. Fai clic sull'ID job per visualizzarne l'output del log.

    Vai ai job Dataproc

  • Browser Cloud Storage: per visualizzare i risultati del conteggio parole nella cartella wordcount del bucket Cloud Storage creato per questo tutorial.

    Vai al browser di Cloud Storage

esegui la pulizia

Elimina le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer, inclusa l'eliminazione manuale del bucket dell'ambiente.

  2. Elimina il bucket Cloud Storage in cui sono archiviati i risultati del job di conteggio parole di Hadoop.