Flusso di lavoro con Cloud Composer

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono beneficiare di una prova gratuita.

Prima di iniziare

Configura il progetto

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

  4. Abilita le API Dataproc, Compute Engine, and Cloud Composer .

    Abilita le API

  5. Installa Google Cloud CLI.
  6. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

    gcloud init
  7. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  8. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

  9. Abilita le API Dataproc, Compute Engine, and Cloud Composer .

    Abilita le API

  10. Installa Google Cloud CLI.
  11. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

    gcloud init

Crea un modello di flusso di lavoro Dataproc

Copia ed esegui i comandi elencati di seguito in una finestra di terminale locale o in Cloud Shell per creare e definire un modello del flusso di lavoro.

  1. Crea il modello di flusso di lavoro sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Aggiungi il job Spark al modello di flusso di lavoro sparkpi. Il flag step-id "compute" identifica il job SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Utilizza un cluster gestito e a nodo singolo per eseguire il flusso di lavoro. Dataproc creerà il cluster, vi eseguirà il flusso di lavoro, quindi eliminerà il cluster al termine dell'operazione.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Conferma la creazione del modello di flusso di lavoro.

    Console

    Fai clic sul nome sparkpi nella pagina Flussi di lavoro di Dataproc nella console Google Cloud per aprire la pagina Dettagli modello di flusso di lavoro. Fai clic sul nome del modello di flusso di lavoro per confermare gli attributi del modello sparkpi.

    Comando gcloud

    Esegui questo comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Crea e carica un DAG su Cloud Storage

  1. Crea o utilizza un ambiente Cloud Composer esistente.
  2. Imposta le variabili di ambiente.

    UI di Airflow

    1. Nella barra degli strumenti, fai clic su Amministrazione > Variabili.
    2. Fai clic su Crea.
    3. Inserisci le seguenti informazioni:
      • Chiave:project_id
      • Val: PROJECT_ID — il tuo ID progetto Google Cloud
    4. Fai clic su Salva.

    Comando gcloud

    Inserisci i seguenti comandi:

    • ENVIRONMENT è il nome dell'ambiente Cloud Composer
    • LOCATION è la regione in cui si trova l'ambiente di Cloud Composer
    gcloud composer environments run ENVIRONMENT \
        --location LOCATION
        
  3. Copia il seguente codice DAG in locale in un file intitolato "composer-dataproc-dag.py", che utilizza Dataproc InstantiateWorkflowTemplateOperator.

    Flusso d'aria 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Flusso d'aria 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Carica il tuo Dag nella cartella dell'ambiente in Cloud Storage. Una volta completato il caricamento, fai clic sul link Cartella DAG nella pagina dell'ambiente Cloud Composer.

Visualizzare lo stato di un'attività

UI di Airflow

  1. Apri l'interfaccia web di Airflow.
  2. Nella pagina DAG, fai clic sul nome del DAG (ad esempio dataproc_workflow_dag).
  3. Nella pagina dei dettagli di DAG, fai clic su Visualizzazione grafica.
  4. Controlla lo stato:
    • Non riuscita: l'attività è circondata da una casella rossa. Puoi anche tenere il puntatore sopra l'attività e cercare Stato: non riuscito. l'attività è circondata da una casella rossa che indica che l'attività non è riuscita
    • Operazione riuscita: l'attività è circondata da una casella verde. Puoi anche tenere il puntatore sull'attività e controllare lo stato: operazione riuscita. l'attività ha una casella verde intorno che indica che l'attività è riuscita

Console

Fai clic sulla scheda Flussi di lavoro per visualizzare lo stato del flusso di lavoro.

Comando gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Eseguire la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi, puoi eliminare le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer.

  2. Elimina il modello del flusso di lavoro.

Passaggi successivi