Esegui un DAG Apache Airflow in Cloud Composer 1

Cloud Composer 1 | Cloud Composer 2

Questa pagina mostra come creare un ambiente Cloud Composer ed eseguire un DAG Apache Airflow in Cloud Composer.

Se non hai mai utilizzato Airflow, consulta questo tutorial per ulteriori informazioni sui concetti, sugli oggetti e sul loro utilizzo di Airflow.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Attiva l'API Cloud Composer.

    Abilita l'API

Creazione di un ambiente

Console

  1. Nella console Google Cloud, vai alla pagina Crea ambiente.

    Vai a Crea ambiente

  2. Nel campo Nome, inserisci example-environment.

  3. Nell'elenco a discesa Località, seleziona una regione per l'ambiente Cloud Composer. Per informazioni su come selezionare una regione, consulta Regioni disponibili.

  4. Per altre opzioni di configurazione dell'ambiente, utilizza i valori predefiniti forniti.

  5. Per creare l'ambiente, fai clic su Crea.

  6. Attendi finché l'ambiente non viene creato. Al termine, viene visualizzato un segno di spunta verde accanto al nome dell'ambiente.

gcloud

Aggiungi l'account agente di servizio Cloud Composer come nuova entità nell'account di servizio del tuo ambiente e concedigli il ruolo Estensione agente di servizio API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext).

Per impostazione predefinita, il tuo ambiente utilizza l'account di servizio predefinito di Compute Engine.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Crea un nuovo ambiente:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente. Questa guida rapida utilizza example-environment.
  • LOCATION con una regione per l'ambiente Cloud Composer. Per informazioni su come selezionare una regione, consulta Regioni disponibili.
  • IMAGE_VERSION con il nome dell'immagine Cloud Composer. Questa guida utilizza composer-1.20.12-airflow-1.10.15 per creare un ambiente con l'immagine di Cloud Composer più recente.

Esempio:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-1.20.12-airflow-1.10.15

Terraform

Per configurare questo ambiente utilizzando Terraform, aggiungi il blocco di risorse seguente alla configurazione Terraform ed esegui terraform apply.

Per utilizzare questo blocco di risorse, l'account di servizio Terraform usa deve avere un ruolo con l'autorizzazione composer.environments.create abilitata. Per saperne di più sull'account di servizio per Terraform, consulta la pagina Riferimento per la configurazione del provider Google.

Per saperne di più sull'utilizzo di Terraform per creare un ambiente Cloud Composer, consulta la documentazione di Terraform.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME con il nome dell'ambiente. Questa guida rapida utilizza example-environment.

  • LOCATION con una regione per l'ambiente Cloud Composer. Per informazioni su come selezionare una regione, consulta Regioni disponibili.

  • IMAGE_VERSION con il nome dell'immagine Cloud Composer. Questa guida utilizza composer-1.20.12-airflow-1.10.15 per creare un ambiente con l'immagine di Cloud Composer più recente.

Esempio:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.20.12-airflow-1.10.15"
    }
  }

}

Visualizza dettagli ambiente

Al termine della creazione dell'ambiente, puoi visualizzare le informazioni sull'ambiente, ad esempio la versione di Cloud Composer, l'URL dell'interfaccia web di Airflow e la cartella dei DAG in Cloud Storage.

Per visualizzare le informazioni sull'ambiente:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Per visualizzare la pagina Dettagli ambiente, fai clic sul nome del tuo ambiente, example-environment.

Crea un DAG

Un DAG Airflow è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG sono definiti nei file Python standard.

Il codice Python in quickstart.py:

  1. Crea un DAG, composer_sample_dag. Il DAG viene eseguito una volta al giorno.
  2. Esegue un'attività: print_dag_run_conf. L'attività stampa la configurazione dell'esecuzione di DAG utilizzando l'operatore bash.

Per creare un DAG, crea una copia del file quickstart.py sulla tua macchina locale.

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with airflow.DAG(
    "composer_sample_dag",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Airflow 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

carica il DAG in Cloud Storage

Cloud Composer pianifica solo i DAG che si trovano nella cartella /dags del bucket Cloud Storage dell'ambiente.

Per pianificare il DAG, carica quickstart.py dalla macchina locale nella cartella /dags del tuo ambiente.

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Per aprire la cartella /dags, segui il link alla cartella DAG per example-environment.

  3. Nella pagina dei dettagli del bucket, fai clic su Carica file e seleziona la copia locale di quickstart.py.

  4. Per caricare il file, fai clic su Apri.

    Dopo aver caricato il DAG, Cloud Composer aggiunge il DAG a Airflow e pianifica immediatamente l'esecuzione di un DAG. Potrebbero essere necessari alcuni minuti prima che il DAG venga visualizzato nell'interfaccia web di Airflow.

gcloud

Per caricare quickstart.py con gcloud, esegui questo comando:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Visualizza il DAG nell'interfaccia utente di Airflow

Ogni ambiente Cloud Composer dispone di un server web che esegue l'interfaccia web Airflow. Puoi gestire i DAG dall'interfaccia web di Airflow.

Per visualizzare il DAG nell'interfaccia web di Airflow:

Airflow 1

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Per aprire l'interfaccia web di Airflow, fai clic sul link Airflow per example-environment. L'interfaccia utente di Airflow si apre in una nuova finestra del browser.

  3. Nella barra degli strumenti di Airflow, vai alla pagina DAG.

  4. Per aprire la pagina dei dettagli dei DAG, fai clic su composer_sample_dag.

    Pagina dei DAG nell'interfaccia utente di Airflow
    Figura 1. Pagina dei DAG nell'interfaccia utente di Airflow (fai clic per ingrandire)

    La pagina del DAG mostra la visualizzazione ad albero, una rappresentazione grafica delle attività e delle dipendenze del flusso di lavoro.

    Visualizzazione ad albero del DAG Composer_sample_dags
    Figura 2. Visualizzazione ad albero per il DAG Composerr_sample_dags

Airflow 2

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  2. Per aprire l'interfaccia web di Airflow, fai clic sul link Airflow per example-environment. L'interfaccia utente di Airflow si apre in una nuova finestra del browser.

  3. Nella barra degli strumenti di Airflow, vai alla pagina DAG.

  4. Per aprire la pagina dei dettagli dei DAG, fai clic su composer_sample_dag.

    Pagina dei DAG nell'interfaccia utente di Airflow
    Figura 1. Pagina dei DAG nell'interfaccia utente di Airflow (fai clic per ingrandire)

    La pagina del DAG mostra la visualizzazione ad albero, una rappresentazione grafica delle attività e delle dipendenze del flusso di lavoro.

    Visualizzazione ad albero del DAG Composer_sample_dags
    Figura 2. Visualizzazione ad albero per il DAG Composerr_sample_dags

Visualizza i dettagli dell'istanza delle attività nei log di Airflow

Il DAG che hai pianificato include l'attività print_dag_run_conf. L'attività stampa la configurazione dell'esecuzione di DAG, che puoi vedere nei log di Airflow per l'istanza dell'attività.

Per visualizzare i dettagli dell'istanza dell'attività:

Airflow 1

  1. Nella visualizzazione ad albero del DAG nell'interfaccia web di Airflow, fai clic su Vista grafico.

    Se tieni il puntatore sull'attività print_dag_run_conf, viene visualizzato il relativo stato.

    Visualizzazione ad albero del DAG Composer_sample_dags
    Figura 3. Stato dell'attività print_dag_run_conf
  2. Fai clic sull'attività print_dag_run_conf.

    Nel menu contestuale Istanza attività puoi visualizzare i metadati ed eseguire alcune azioni.

    Menu contestuale dell'istanza di attività per l'attività compiler_sample_dags
    Figura 4. Menu contestuale dell'istanza di attività per l'attività Composerr_sample_dags
  3. Nel menu contestuale Istanza attività, fai clic su Visualizza log.

  4. Nel log, cerca Running: ['bash' per vedere l'output dell'operatore bash.

    Output log dell'operatore Bash
    Figura 5. Output log dell'operatore Bash

Airflow 2

  1. Nella visualizzazione ad albero del DAG nell'interfaccia web di Airflow, fai clic su Vista grafico.

    Se tieni il puntatore sull'attività print_dag_run_conf, viene visualizzato il relativo stato.

    Visualizzazione ad albero del DAG Composer_sample_dags
    Figura 3. Stato dell'attività print_dag_run_conf
  2. Fai clic sull'attività print_dag_run_conf.

    Nel menu contestuale Istanza attività puoi visualizzare i metadati ed eseguire alcune azioni.

    Menu contestuale dell'istanza di attività per l'attività compiler_sample_dags
    Figura 4. Menu contestuale dell'istanza di attività per l'attività Composerr_sample_dags
  3. Nel menu contestuale Istanza attività, fai clic su Log.

  4. Nel log, cerca Running command: ['bash' per vedere l'output dell'operatore bash.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi:

Elimina le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai a Ambienti

    2. Seleziona example-environment e fai clic su Elimina.

    3. Attendi che l'ambiente venga eliminato.

  2. Eliminare il bucket del tuo ambiente. L'eliminazione dell'ambiente Cloud Composer non elimina il relativo bucket.

    1. Nella console Google Cloud, vai alla pagina Storage > Browser.

      Vai a Spazio di archiviazione > Browser

    2. Seleziona il bucket dell'ambiente e fai clic su Elimina. Ad esempio, questo bucket può essere denominato us-central1-example-environ-c1616fe8-bucket.

  3. Elimina il disco permanente della coda Redis del tuo ambiente. L'eliminazione dell'ambiente Cloud Composer non elimina il relativo disco permanente.

    1. Nella console Google Cloud, vai a Compute Engine > Dischi.

      Vai a Dischi

    2. Seleziona il disco permanente della coda Redis dell'ambiente e fai clic su Elimina.

      Ad esempio, questo disco può essere denominato gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. I dischi per Cloud Composer 1 hanno sempre il tipo Standard persistent disk e la dimensione di 2 GB.

Passaggi successivi