Exécuter une tâche Hadoop de décompte de mots sur un cluster Dataproc

Ce tutoriel explique comment utiliser Cloud Composer pour créer un DAG (workflow) Apache Airflow qui exécute une tâche Apache Hadoop de décompte de mots sur un cluster Dataproc à l'aide de Google Cloud Console.

Objectifs

  1. Accéder à votre environnement Cloud Composer et utiliser l'interface Web Airflow
  2. Créer et afficher des variables d'environnement Airflow
  3. Créer et exécuter un DAG comprenant les tâches suivantes :
    1. Création d'un cluster Dataproc
    2. Exécution d'une tâche Apache Hadoop de décompte de mots sur le cluster
    3. Génération des résultats du décompte de mots dans un bucket Cloud Storage
    4. Suppression du cluster

Coûts

Ce tutoriel utilise les composants facturables de Google Cloud, dont :

  • Cloud Composer
  • Dataproc
  • Cloud Storage

La création de votre environnement par le système peut nécessiter jusqu'à 25 minutes. Ce tutoriel peut prendre environ une heure. Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder à la page de sélection du projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Cloud Composer, Cloud Dataproc, and Cloud Storage.

    Activer les API

  5. Dans votre projet, créez un bucket Cloud Storage de n'importe quelle région et classe de stockage pour stocker les résultats de la tâche Hadoop de décompte de mots.
  6. Notez le chemin du bucket que vous avez créé, par exemple gs://my-bucket. Vous allez définir une variable Airflow pour ce chemin et l'utiliser dans l'exemple de DAG.

Créer un environnement

  1. Dans Cloud Console, accédez à la page "Créer un environnement".

    Ouvrir la page "Créer un environnement"

  2. Dans le champ Nom, saisissez example-environment.

  3. Dans la liste déroulante Emplacement, sélectionnez une région pour l'environnement Cloud Composer. Pour plus d'informations sur la sélection d'une région, consultez la page Régions disponibles.

  4. Pour les autres options de configuration de l'environnement, utilisez les valeurs par défaut fournies.

  5. Pour créer l'environnement, cliquez sur Créer.

  6. Attendez la fin de la création de l'environnement. Lorsque vous avez terminé, une coche verte s'affiche à gauche du nom de l'environnement.

Afficher les détails de l'environnement

Une fois la création de l'environnement terminée, vous pouvez afficher les informations de déploiement de votre environnement, telles que les versions de Cloud Composer et de Python, l'URL de l'interface Web Airflow et l'ID de cluster Google Kubernetes Engine.

Pour afficher les informations de déploiement, procédez comme suit :

  1. Dans Cloud Console, accédez à la page "Environnements".

    Ouvrir la page "Environnements"

  2. Pour afficher la page "Détails de l'environnement", cliquez sur example-environment.

  3. Notez la zone dans laquelle vous avez créé votre environnement, par exemple us-central-1c. Vous allez définir une variable Airflow pour cette zone et l'utiliser dans l'exemple de DAG.

Définition des variables Airflow

Les variables Airflow constituent un concept propre à Airflow et sont à distinguer des variables d'environnement. Lors de cette étape, vous allez vous servir de l'interface Web Airflow pour définir trois variables Airflow à utiliser ultérieurement dans l'exemple de DAG.

Pour définir des variables, procédez comme suit :

  1. Accédez à l'interface Web Airflow dans Cloud Console :

    1. Dans Cloud Console, accédez à la page Environnements.

      Ouvrir la page "Environnements"

    2. Dans la colonne Serveur Web Airflow associée à example-environment, cliquez sur le lien Airflow. L'interface Web Airflow s'ouvre dans une nouvelle fenêtre.

  2. Définissez des variables dans l'interface Web Airflow :

    1. Dans la barre d'outils, cliquez sur Admin > Variables (Administration > Variables).
    2. Pour créer une variable, cliquez sur Create (Créer).
    3. Pour chacune des variables suivantes, saisissez la paire clé-valeur et cliquez sur Enregistrer. Toutes les variables Airflow s'affichent dans l'onglet "List" (Liste).
      CLÉ VALEUR
      gcp_project ID du projet Google Cloud Platform que vous utilisez pour ce tutoriel, tel que composer-test.
      gcs_bucket Bucket Cloud Storage que vous avez créé pour ce tutoriel, par exemple gs://my-bucket.
      gce_zone Zone Compute Engine pour votre environnement, par exemple us-central1-c. Il s'agit de la zone dans laquelle votre cluster Dataproc sera créé. Consultez la page Régions et zones disponibles.

Afficher l'exemple de workflow

Un DAG Airflow est un ensemble de tâches organisées que vous souhaitez programmer et exécuter. Les DAG sont définis dans des fichiers Python standards. Le code affiché dans hadoop_tutorial.py correspond au code du workflow.

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Opérateurs

Un opérateur est un modèle pour une seule tâche d'un workflow. Pour orchestrer les trois tâches dans l'exemple de workflow, le DAG importe les trois opérateurs suivants :

  1. DataprocClusterCreateOperator : crée un cluster Dataproc.
  2. DataProcHadoopOperator : envoie une tâche Hadoop de décompte de mots et écrit les résultats dans un bucket Cloud Storage.
  3. DataprocClusterDeleteOperator : supprime le cluster pour éviter que des frais Compute Engine ne continuent d'être facturés.

Dépendances

Vous organisez les tâches que vous souhaitez exécuter de façon à refléter leurs relations et leurs dépendances. Les tâches de ce DAG sont exécutées de manière séquentielle. Dans cet exemple, la relation est définie dans la direction vers laquelle pointe l'opérateur de changement de bit de Python (>>).

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Planification

Le nom du DAG est composer_hadoop_tutorial, et le DAG s'exécute une fois par jour. Comme la valeur start_date transmise à default_dag_args est définie sur yesterday, Cloud Composer programme le workflow de façon qu'il commence immédiatement après l'importation du DAG.

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Importation du DAG dans Cloud Storage

Cloud Composer ne programme que les DAG figurant dans le dossier des DAG. Ce dossier se trouve dans le bucket Cloud Storage que Cloud Composer crée automatiquement pour votre environnement.

Pour importer le DAG :

  1. Sur votre machine locale, enregistrez hadoop_tutorial.py.
  2. Dans Cloud Console, accédez à la page Environnements.

    Ouvrir la page "Environnements"

  3. Dans la colonne Dossier des DAG pour example-environment, cliquez sur le lien DAG. Le dossier des DAG de Cloud Storage s'ouvre.

  4. Cliquez sur Importer des fichiers.

  5. Sélectionnez hadoop_tutorial.py sur votre machine locale, puis cliquez sur Ouvrir.

Cloud Composer ajoute le DAG à Airflow et le planifie automatiquement. Les modifications sont appliquées au DAG après 3 à 5 minutes.

Exploration des exécutions du DAG

Afficher l'état des tâches

Lorsque vous importez votre fichier de DAG dans le dossier dags/ de Cloud Storage, Cloud Composer l'analyse. Une fois la procédure terminée, le nom du workflow apparaît dans la liste des DAG et le workflow est mis en file d'attente pour être exécuté immédiatement.

  1. Pour connaître l'état des tâches, accédez à l'interface Web Airflow, puis cliquez sur DAGs (DAG) dans la barre d'outils.

  2. Pour ouvrir la page de détails des DAG, cliquez sur composer_hadoop_tutorial. Cette page comprend une représentation graphique des tâches et des dépendances du workflow.

  3. Pour connaître l'état de chaque tâche, cliquez sur Graph View (Vue graphique), puis sur le graphique de chaque tâche.

Remettre le workflow en file d'attente

Pour exécuter de nouveau le workflow à partir de la vue graphique, procédez comme suit :

  1. Dans la vue graphique de l'interface utilisateur Airflow, cliquez sur le graphique create_dataproc_cluster.
  2. Pour réinitialiser les trois tâches, cliquez sur Clear (Effacer), puis sur OK pour confirmer.
  3. Cliquez de nouveau sur create_dataproc_cluster dans la vue graphique.
  4. Pour remettre le workflow en file d'attente, cliquez sur Run (Exécuter).

Afficher les résultats des tâches

Vous pouvez également vérifier l'état et les résultats du workflow composer_hadoop_tutorial en accédant aux pages suivantes de Cloud Console :

  • Clusters Dataproc, pour surveiller la création et la suppression des clusters. Notez que le cluster créé par le workflow est éphémère : il n'existe que pour la durée du workflow et est supprimé dans le cadre de sa dernière tâche.

  • Tâches Dataproc, pour afficher ou surveiller la tâche Apache Hadoop de décompte de mots. Cliquez sur l'ID de tâche pour afficher le résultat du journal associé à la tâche.

  • Navigateur Cloud Storage, pour afficher les résultats du décompte de mots dans le dossier wordcount du bucket Cloud Storage que vous avez créé pour ce tutoriel.

Nettoyer

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud Platform :

  1. Dans Cloud Console, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Si le projet que vous envisagez de supprimer est associé à une organisation, sélectionnez-la dans la liste des organisations en haut de la page.
  3. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer .
  4. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Vous pouvez également supprimer les ressources utilisées dans ce tutoriel :

  1. Supprimez l'environnement Cloud Composer.
  2. Supprimez le bucket Cloud Storage pour l'environnement Cloud Composer. La suppression de l'environnement Cloud Composer ne supprime pas son bucket.
  3. Supprimez les sujets Pub/Sub pour Cloud Composer (composer-agent et composer-backend)).

Étape suivante