Exécuter une tâche Hadoop de décompte de mots sur un cluster Dataproc

Cloud Composer 1 | Cloud Composer 2

Ce tutoriel explique comment utiliser Cloud Composer pour créer un DAG (graphe orienté acyclique) Apache Airflow qui exécute une tâche de décompte de mots Apache Hadoop sur un cluster Dataproc.

Objectifs

  1. Accédez à votre environnement Cloud Composer et utilisez l'interface utilisateur Airflow.
  2. Créer et afficher des variables d'environnement Airflow
  3. Créez et exécutez un DAG comprenant les tâches suivantes :
    1. crée un cluster Dataproc ;
    2. Exécute une tâche de décompte de mots Apache Hadoop sur le cluster.
    3. Génère les résultats du décompte de mots dans un bucket Cloud Storage.
    4. Supprime le cluster.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  • Assurez-vous que les API suivantes sont activées dans votre projet:

    Console

    Activer les API Dataproc, Cloud Storage.

    Activer les API

    gcloud

    Activer les API Dataproc, Cloud Storage :

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • Dans votre projet, créez un bucket Cloud Storage de n'importe quelle classe de stockage et région pour stocker les résultats de la tâche Hadoop de décompte de mots.

  • Notez le chemin d'accès au bucket que vous avez créé, par exemple gs://example-bucket. Vous définirez une variable Airflow pour ce chemin et l'utiliserez dans l'exemple de DAG plus loin dans ce tutoriel.

  • Créez un environnement Cloud Composer avec les paramètres par défaut. Attendez la fin de la création de l'environnement. Lorsque vous avez terminé, une coche verte s'affiche à gauche du nom de l'environnement.

  • Notez la région dans laquelle vous avez créé votre environnement, par exemple us-central. Vous allez définir une variable Airflow pour cette région et l'utiliser dans l'exemple de DAG afin d'exécuter un cluster Dataproc dans la même région.

Définir les variables Airflow

Définissez les variables Airflow à utiliser ultérieurement dans l'exemple de DAG. Par exemple, vous pouvez définir des variables Airflow dans l'interface utilisateur Airflow.

Variable Airflow Valeur
gcp_project L'ID du projet que vous utilisez pour ce tutoriel, par exemple example-project.
gcs_bucket L'URI du bucket Cloud Storage que vous avez créé pour ce tutoriel, tel que gs://example-bucket.
gce_region Région dans laquelle vous avez créé votre environnement, par exemple us-central1. Il s'agit de la région dans laquelle votre cluster Dataproc sera créé.

Afficher l'exemple de workflow

Un DAG Airflow est un ensemble de tâches organisées que vous souhaitez planifier et exécuter. Les DAG sont définis dans des fichiers Python standards. Le code affiché dans hadoop_tutorial.py est le code du workflow.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Opérateurs

Pour orchestrer les trois tâches de l'exemple de workflow, le DAG importe les trois opérateurs Airflow suivants:

  • DataprocClusterCreateOperator : crée un cluster Dataproc.

  • DataProcHadoopOperator: envoie un job de décompte de mots Hadoop et écrit les résultats dans un bucket Cloud Storage.

  • DataprocClusterDeleteOperator: supprime le cluster pour éviter que des frais Compute Engine continuent de vous être facturés.

Dépendances

Vous organisez les tâches que vous souhaitez exécuter de manière à refléter leurs relations et leurs dépendances. Les tâches de ce DAG s'exécutent de manière séquentielle.

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Planification

Le nom du DAG est composer_hadoop_tutorial, et il s'exécute une fois par jour. Étant donné que la valeur start_date transmise à default_dag_args est définie sur yesterday, Cloud Composer planifie le démarrage du workflow immédiatement après l'importation du DAG dans le bucket de l'environnement.

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Importer le DAG dans le bucket de l'environnement

Cloud Composer stocke les DAG dans le dossier /dags du bucket de votre environnement.

Pour importer le DAG :

  1. Sur votre ordinateur local, enregistrez hadoop_tutorial.py.

  2. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  3. Dans la liste des environnements, dans la colonne Dossier des DAG de votre environnement, cliquez sur le lien DAG.

  4. Cliquez sur Importer des fichiers.

  5. Sélectionnez hadoop_tutorial.py sur votre machine locale, puis cliquez sur Ouvrir.

Cloud Composer ajoute le DAG à Airflow et le planifie automatiquement. Les modifications sont appliquées au DAG après trois à cinq minutes.

Explorer les exécutions du DAG

Afficher l'état de la tâche

Lorsque vous importez votre fichier DAG dans le dossier dags/ de Cloud Storage, Cloud Composer l'analyse. Une fois l'opération terminée, le nom du workflow apparaît dans la liste des DAG, et le workflow est mis en file d'attente pour s'exécuter immédiatement.

  1. Pour connaître l'état des tâches, accédez à l'interface Web Airflow, puis cliquez sur DAGs (DAG) dans la barre d'outils.

  2. Pour ouvrir la page de détails des DAG, cliquez sur composer_hadoop_tutorial. Cette page comprend une représentation graphique des tâches et des dépendances du workflow.

  3. Pour connaître l'état de chaque tâche, cliquez sur Graph View (Vue graphique), puis sur le graphique de chaque tâche.

Ajouter à nouveau le workflow à la file d'attente

Pour exécuter de nouveau le workflow à partir de la vue graphique, procédez comme suit :

  1. Dans la vue graphique de l'interface utilisateur Airflow, cliquez sur le graphique create_dataproc_cluster.
  2. Pour réinitialiser les trois tâches, cliquez sur Clear (Effacer), puis sur OK pour confirmer.
  3. Cliquez de nouveau sur create_dataproc_cluster dans la vue graphique.
  4. Pour remettre le workflow en file d'attente, cliquez sur Run (Exécuter).

Afficher les résultats des tâches

Vous pouvez également vérifier l'état et les résultats du workflow composer_hadoop_tutorial en accédant aux pages suivantes de la console Google Cloud:

  • Clusters Dataproc: surveiller la création et la suppression des clusters. Notez que le cluster créé par le workflow est éphémère: il n'existe que pour la durée du workflow et est supprimé lors de la dernière tâche.

    Accéder aux clusters Dataproc

  • Tâches Dataproc: afficher ou surveiller le job de décompte de mots Apache Hadoop Cliquez sur l'ID de job pour afficher la sortie du journal associée au job.

    Accéder aux jobs Dataproc

  • Navigateur Cloud Storage: affichez les résultats du décompte de mots dans le dossier wordcount du bucket Cloud Storage que vous avez créé pour ce tutoriel.

    Accéder au navigateur Cloud Storage

Nettoyage

Supprimez les ressources utilisées dans ce tutoriel:

  1. Supprimez l'environnement Cloud Composer, y compris en supprimant manuellement le bucket de l'environnement.

  2. Supprimez le bucket Cloud Storage qui stocke les résultats de la tâche Hadoop de décompte de mots.