Flusso di lavoro con Cloud Composer

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono essere idonei a una prova senza costi aggiuntivi.

Prima di iniziare

Configura il progetto

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Dataproc, Compute Engine, and Cloud Composer .

    Abilita le API

  5. Installa Google Cloud CLI.
  6. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  7. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  8. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  9. Abilita le API Dataproc, Compute Engine, and Cloud Composer .

    Abilita le API

  10. Installa Google Cloud CLI.
  11. Per initialize gcloud CLI, esegui questo comando:

    gcloud init

Crea un modello di flusso di lavoro Dataproc

Copia ed esegui i comandi elencati di seguito in una finestra del terminale locale o in Cloud Shell per creare e definire un modello di flusso di lavoro.

  1. Crea il modello di flusso di lavoro sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Aggiungi il job spark al modello di flusso di lavoro sparkpi. Il flag step-id "compute" identifica il job SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Utilizza un cluster gestito a nodo singolo per eseguire il flusso di lavoro. Dataproc creerà il cluster, eseguirà il flusso di lavoro ed eliminerà il cluster al termine del flusso di lavoro.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Conferma la creazione del modello di flusso di lavoro.

    Console

    Fai clic sul nome sparkpi nella pagina Workflows di Dataproc nella console Google Cloud per aprire la pagina Dettagli del modello di flusso di lavoro. Fai clic sul nome del modello di flusso di lavoro per confermare gli attributi del modello sparkpi.

    Comando g-cloud

    Esegui questo comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Crea e carica un DAG in Cloud Storage

  1. Crea o utilizza un ambiente Cloud Composer esistente.
  2. Imposta le variabili di ambiente.

    UI di Airflow

    1. Nella barra degli strumenti, fai clic su Amministrazione > Variabili.
    2. Fai clic su Crea.
    3. Inserisci le seguenti informazioni:
      • Chiave:project_id
      • Val: PROJECT_ID - ID progetto Google Cloud
    4. Fai clic su Salva.

    Comando g-cloud

    Inserisci i seguenti comandi:

    • ENVIRONMENT è il nome dell'ambiente Cloud Composer
    • LOCATION è la regione in cui si trova l'ambiente Cloud Composer
    • PROJECT_ID è l'ID del progetto che contiene l'ambiente Cloud Composer
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copia localmente il seguente codice DAG in un file denominato "composer-dataproc-dag.py", che utilizza il file DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Carica il tuo DAG nella cartella dell'ambiente in Cloud Storage. Una volta completato il caricamento, fai clic sul link Cartella DAG nella pagina dell'ambiente di Cloud Composer.

Visualizzare lo stato di un'attività

UI di Airflow

  1. Apri l'interfaccia web di Airflow.
  2. Nella pagina dei DAG, fai clic sul nome del DAG (ad esempio, dataproc_workflow_dag).
  3. Nella pagina dei dettagli dei DAG, fai clic su Visualizzazione grafico.
  4. Controlla lo stato:
    • Non riuscita: intorno all'attività è presente un riquadro rosso. Puoi anche tenere il puntatore sull'attività e cercare lo stato Stato: non riuscito. l'attività è circondata da un riquadro rosso che indica che l'attività non è riuscita
    • Operazione riuscita: l'attività è circondata da una casella verde. Puoi anche tenere il puntatore del mouse sull'attività e verificare lo stato: riuscito. l'attività è circondata da un riquadro verde che indica che l'attività è riuscita

Console

Fai clic sulla scheda Workflows per visualizzare lo stato del flusso di lavoro.

Comando g-cloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi, puoi eliminare le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer.

  2. Elimina il modello di flusso di lavoro.

Passaggi successivi