Flusso di lavoro con Cloud Composer

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Prima di iniziare

Configura il progetto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  7. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  13. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  14. Crea un modello di workflow Dataproc

    Copia ed esegui i seguenti comandi in una finestra del terminale locale o in Cloud Shell per creare e definire un modello di flusso di lavoro.

    1. Crea il modello di workflow sparkpi.
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. Aggiungi il job Spark al modello di workflow sparkpi. Il flag "compute" step-id identifica il job SparkPi.
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. Utilizza un cluster gestito a un solo nodo per eseguire il workflow. Dataproc creerà il cluster, eseguirà il workflow e lo eliminerà al termine del workflow.
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. Conferma la creazione del modello di workflow.

      Console

      Fai clic sul nome sparkpi nella pagina Workflows di Dataproc nella console Google Cloud per aprire la pagina Dettagli modello di flusso di lavoro. Fai clic sul nome del modello di workflow per confermare gli attributi del modello sparkpi.

      Comando g-cloud

      Esegui questo comando:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    Crea e carica un DAG in Cloud Storage

    1. Crea o utilizza un ambiente Cloud Composer esistente.
    2. Imposta le variabili di ambiente.

      UI di Airflow

      1. Nella barra degli strumenti, fai clic su Amministrazione > Variabili.
      2. Fai clic su Crea.
      3. Inserisci le seguenti informazioni:
        • Chiave:project_id
        • Val: PROJECT_ID: il tuo ID progetto Google Cloud
      4. Fai clic su Salva.

      Comando g-cloud

      Inserisci i seguenti comandi:

      • ENVIRONMENT è il nome dell'ambiente Cloud Composer
      • LOCATION è la regione in cui si trova l'ambiente Cloud Composer
      • PROJECT_ID è l'ID progetto del progetto che contiene l'ambiente Cloud Composer
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. Copia il seguente codice DAG localmente in un file denominato "composer-dataproc-dag.py", che utilizza DataprocInstantiateWorkflowTemplateOperator.

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Airflow 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. Carica il DAG nella cartella dell'ambiente in Cloud Storage. Una volta completato il caricamento, fai clic sul link Cartella DAG nella pagina dell'ambiente Cloud Composer.

    Visualizzare lo stato di un'attività

    UI di Airflow

    1. Apri l'interfaccia web di Airflow.
    2. Nella pagina DAG, fai clic sul nome del DAG (ad esempio, dataproc_workflow_dag).
    3. Nella pagina Dettagli DAG, fai clic su Visualizzazione grafico.
    4. Controlla lo stato:
      • Non riuscita: l'attività è racchiusa in un riquadro rosso. Puoi anche tenere il puntatore sopra l'attività e cercare Stato: non riuscito. L'attività è racchiusa in un riquadro rosso, che indica che non è riuscita
      • Riuscita: l'attività è racchiusa in un riquadro verde. Puoi anche tenere il puntatore sopra l'attività e verificare la presenza di Stato: riuscita. L'attività è racchiusa in una casella verde, a indicare che è stata completata correttamente

    Console

    Fai clic sulla scheda Workflows per visualizzare lo stato del flusso di lavoro.

    Comando g-cloud

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    Esegui la pulizia

    Per evitare che al tuo account Google Cloud vengano addebitati costi, puoi eliminare le risorse utilizzate in questo tutorial:

    1. Elimina l'ambiente Cloud Composer.

    2. Elimina il modello di workflow.

    Passaggi successivi