Workflow utilisant Cloud Composer

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataproc
  • Compute Engine
  • Cloud Composer

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

Configurer votre projet

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Créer un modèle de workflow Dataproc

Copiez et exécutez les commandes répertoriées ci-dessous dans une fenêtre de terminal local ou dans Cloud Shell pour créer et définir un modèle de workflow.

  1. Créez le modèle de workflow sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Ajoutez la tâche spark au modèle de workflow sparkpi. L'option "compute" step-id identifie la tâche SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Utilisez un cluster géré à nœud unique pour exécuter le workflow. Dataproc crée le cluster, y exécute le workflow, puis le supprime une fois le workflow terminé.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Confirmez la création du modèle de workflow.

    Console

    Cliquez sur le nom sparkpi dans Dataproc. Processus de la console Google Cloud pour ouvrir Page Détails du modèle de workflow. Cliquez sur le nom de votre modèle de workflow pour confirmer les attributs du modèle sparkpi.

    Commande gcloud

    Exécutez la commande suivante :

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Créer et importer un DAG dans Cloud Storage

  1. Créez ou utilisez un environnement Cloud Composer existant.
  2. Définir des variables d'environnement.

    Interface utilisateur d'Airflow

    1. Dans la barre d'outils, cliquez sur Admin > Variables (Administration > Variables).
    2. Cliquez sur Créer.
    3. Saisissez les informations suivantes :
      • Key (Clé) : project_id
      • Valeur : PROJECT_ID, l'ID de votre projet Google Cloud
    4. Cliquez sur Enregistrer.

    Commande gcloud

    Saisissez les commandes suivantes :

    • ENVIRONMENT correspond au nom de l'environnement Cloud Composer.
    • LOCATION correspond à la région où se trouve l'environnement Cloud Composer.
    • PROJECT_ID est l'ID du projet contenant l'environnement Cloud Composer.
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copiez le code DAG suivant localement dans un fichier nommé "composer-dataproc-dag.py", qui utilise DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Importez votre DAG dans le dossier d'environnement de Cloud Storage. Une fois l'importation terminée, cliquez sur le lien Dossier DAGS sur la page de l'environnement Cloud Composer.

Afficher l'état d'une tâche

Interface utilisateur d'Airflow

  1. Ouvrez l'interface Web Airflow.
  2. Sur la page des DAG, cliquez sur le nom du DAG (par exemple, dataproc_workflow_dag).
  3. Sur la page "Détails des DAG", cliquez sur Graph View (Vue graphique).
  4. Vérifiez l'état :
    • Échec : la tâche apparaît dans un encadré rouge. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention (État : Échec). La tâche apparaît dans un encadré rouge, indiquant qu'elle a échoué
    • Réussite : la tâche apparaît dans un encadré vert. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention (État : Réussite). La tâche apparaît dans un encadré vert, indiquant qu'elle a réussi

Console

Cliquez sur l'onglet "Workflows" pour afficher l'état du workflow.

Commande gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Nettoyer

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud, vous pouvez les supprimer :

  1. Supprimez l'environnement Cloud Composer.

  2. Supprimez le modèle de workflow.

Étape suivante