Esegui un DAG di Apache Airflow in Cloud Composer 2

Cloud Composer 1 | Cloud Composer 2

Questa pagina mostra come creare un ambiente Cloud Composer ed eseguire un DAG di Apache Airflow in Cloud Composer 2.

Se non hai mai utilizzato Airflow, guarda questo tutorial per ulteriori informazioni sui concetti, sugli oggetti e sul loro utilizzo di Airflow.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella console di Google Cloud Console, nella pagina del selettore dei progetti, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Nella console di Google Cloud Console, nella pagina del selettore dei progetti, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Attiva l'API Cloud Composer.

    Abilita l'API

Creazione di un ambiente

Console

  1. Nella console Google Cloud, vai alla pagina Crea ambiente.

    Vai a Crea ambiente

  2. Quando crei un ambiente nel tuo progetto, se l'agente di servizio Cloud Composer non dispone delle autorizzazioni necessarie per l'account di servizio dell'ambiente, viene visualizzata la sezione Concedi le autorizzazioni necessarie all'account di servizio di Cloud Composer.

    Aggiungi l'account agente di servizio Cloud Composer come nuova entità all'account di servizio dell'ambiente e concedigli il ruolo Estensione agente di servizio API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext).

    Conferma di utilizzare l'account di servizio previsto per il tuo ambiente e fai clic su Concedi.

  3. Nel campo Nome, inserisci example-environment.

  4. Nell'elenco a discesa Località, seleziona una regione per l'ambiente Cloud Composer. Consulta Regioni disponibili per informazioni sulla selezione di una regione.

  5. Per altre opzioni di configurazione dell'ambiente, utilizza i valori predefiniti forniti.

  6. Per creare l'ambiente, fai clic su Crea.

  7. Attendi che venga creato l'ambiente. Al termine, accanto al nome dell'ambiente viene visualizzato un segno di spunta verde.

gcloud

Aggiungi l'account agente di servizio Cloud Composer come nuova entità nell'account di servizio dell'ambiente e concedigli il ruolo Estensione agente di servizio API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext).

Per impostazione predefinita, il tuo ambiente utilizza l'account di servizio Compute Engine predefinito.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Crea un nuovo ambiente:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente. Questa guida rapida utilizza example-environment.
  • LOCATION con una regione per l'ambiente Cloud Composer. Consulta Regioni disponibili per informazioni sulla selezione di una regione.
  • IMAGE_VERSION con il nome dell'immagine Cloud Composer. Questa guida utilizza composer-2.5.1-airflow-2.6.3 per creare un ambiente con l'immagine di Cloud Composer 2 più recente.

Esempio:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-2.5.1-airflow-2.6.3

Terraform

Per configurare questo ambiente utilizzando Terraform, aggiungi il seguente blocco di risorse alla configurazione Terraform ed esegui terraform apply.

Per utilizzare questo blocco di risorse, l'account di servizio utilizzato da Terraform deve avere un ruolo con l'autorizzazione composer.environments.create abilitata. Per saperne di più sull'account di servizio per Terraform, consulta la pagina Riferimento per la configurazione del provider Google.

Per saperne di più sull'utilizzo di Terraform per creare un ambiente Cloud Composer, consulta la documentazione di Terraform.

resource "google_composer_environment" "example" {
  provider = google-beta
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME con il nome dell'ambiente. Questa guida rapida utilizza example-environment.

  • LOCATION con una regione per l'ambiente Cloud Composer. Consulta Regioni disponibili per informazioni sulla selezione di una regione.

  • IMAGE_VERSION con il nome dell'immagine Cloud Composer. Questa guida utilizza composer-2.5.1-airflow-2.6.3 per creare un ambiente con l'immagine di Cloud Composer 2 più recente.

Esempio:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-2.5.1-airflow-2.6.3"
    }
  }

}

Visualizza dettagli ambiente

Al termine della creazione dell'ambiente, puoi visualizzare le informazioni sull'ambiente, ad esempio la versione di Cloud Composer, l'URL dell'interfaccia web di Airflow e la cartella dei DAG in Cloud Storage.

Per visualizzare le informazioni sull'ambiente:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Per visualizzare la pagina Dettagli ambiente, fai clic sul nome del tuo ambiente, example-environment.

Crea un DAG

Un DAG Airflow è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG sono definiti nei file Python standard.

Il codice Python in quickstart.py:

  1. Crea un DAG, composer_sample_dag. Il DAG viene eseguito una volta al giorno.
  2. Esegue un'attività, print_dag_run_conf. L'attività stampa la configurazione dell'esecuzione dell'esecuzione DAG utilizzando l'operatore bash.

Per creare un DAG, crea una copia del file quickstart.py sulla tua macchina locale.

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Carica il DAG in Cloud Storage

Cloud Composer pianifica solo i DAG che si trovano nella cartella /dags nel bucket Cloud Storage dell'ambiente.

Per pianificare il DAG, carica quickstart.py dalla macchina locale nella cartella /dags del tuo ambiente.

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Per aprire la cartella /dags, segui il link della cartella DAG per example-environment.

  3. Nella pagina dei dettagli del bucket, fai clic su Carica file e seleziona la tua copia locale di quickstart.py.

  4. Per caricare il file, fai clic su Apri.

    Dopo aver caricato il DAG, Cloud Composer aggiunge il DAG a Airflow e pianifica l'esecuzione immediata di un DAG. Potrebbero essere necessari alcuni minuti prima che il DAG venga visualizzato nell'interfaccia web di Airflow.

gcloud

Per caricare quickstart.py con gcloud, esegui questo comando:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Visualizza il DAG nella UI di Airflow

Ogni ambiente Cloud Composer dispone di un server web che esegue l'interfaccia web di Airflow. Puoi gestire i DAG dall'interfaccia web di Airflow.

Per visualizzare il DAG nell'interfaccia web di Airflow:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Per aprire l'interfaccia web di Airflow, fai clic sul link Airflow per example-environment. La UI di Airflow si apre in una nuova finestra del browser.

  3. Nella barra degli strumenti di Airflow, vai alla pagina DAG.

  4. Per aprire la pagina dei dettagli del DAG, fai clic su composer_sample_dag.

    Pagina DAG nella UI di Airflow
    Figura 1. Pagina dei DAG nell'interfaccia utente di Airflow (fai clic per ingrandire)

    La pagina del DAG mostra la visualizzazione ad albero, una rappresentazione grafica delle attività e delle dipendenze del flusso di lavoro.

    Visualizzazione ad albero del DAG Composer_sample_dags
    Figura 2. Visualizzazione ad albero del DAG Composer_sample_dags

Visualizza i dettagli dell'istanza dell'attività nei log di Airflow

Il DAG che hai pianificato include l'attività print_dag_run_conf. L'attività stampa la configurazione dell'esecuzione di DAG, che puoi vedere nei log di Airflow per l'istanza dell'attività.

Per visualizzare i dettagli dell'istanza dell'attività:

  1. Nella visualizzazione ad albero del DAG nell'interfaccia web di Airflow, fai clic su Visualizzazione grafico.

    Se tieni il puntatore del mouse sull'attività print_dag_run_conf, viene visualizzato il relativo stato.

    Visualizzazione ad albero del DAG Composer_sample_dags
    Figura 3. Stato dell'attività print_dag_run_conf
  2. Fai clic sull'attività print_dag_run_conf.

    Nel menu contestuale Istanza attività, puoi ottenere i metadati ed eseguire alcune azioni.

    Menu contestuale dell'istanza di attività per l'attività Composer_sample_dags
    Figura 4. Menu contestuale dell'istanza dell'attività per l'attività Composer_sample_dags
  3. Nel menu contestuale Istanza attività, fai clic su Log.

  4. Nel log, cerca Running command: ['bash' per vedere l'output dell'operatore bash.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.

Elimina le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Seleziona example-environment e fai clic su Elimina.

    3. Attendi che l'ambiente venga eliminato.

  2. Eliminare il bucket del tuo ambiente. L'eliminazione dell'ambiente Cloud Composer non elimina il relativo bucket.

    1. Nella console Google Cloud, vai alla pagina Storage > Browser.

      Vai a Spazio di archiviazione > Browser

    2. Seleziona il bucket dell'ambiente e fai clic su Elimina. Ad esempio, questo bucket può essere denominato us-central1-example-environ-c1616fe8-bucket.

  3. Elimina il disco permanente della coda Redis del tuo ambiente. L'eliminazione dell'ambiente Cloud Composer non elimina il relativo disco permanente.

    1. Nella console Google Cloud, vai a Compute Engine > Dischi.

      Vai a Dischi

    2. Seleziona il disco permanente della coda Redis dell'ambiente e fai clic su Elimina.

      Ad esempio, questo disco può essere denominato pvc-02bc4842-2312-4347-8519-d87bdcd31115. I dischi per Cloud Composer 2 hanno sempre il tipo Balanced persistent disk e le dimensioni di 2 GB.

Passaggi successivi