Esegui un DAG Apache Airflow in Cloud Composer 3

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa guida rapida mostra come creare un ambiente Cloud Composer e eseguire un DAG Apache Airflow in Cloud Composer 3.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. Per ottenere le autorizzazioni necessarie per completare questa guida introduttiva, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:

    Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

    Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Creare l'account di servizio di un ambiente

Quando crei un ambiente, specifichi un account di servizio. Questo account di servizio è chiamato account di servizio dell'ambiente. Il tuo ambiente utilizza questo account di servizio per eseguire la maggior parte delle operazioni.

L'account di servizio per il tuo ambiente non è un account utente. Un account di servizio è un particolare tipo di account utilizzato da un'applicazione o da un'istanza di macchina virtuale (VM), non da una persona fisica.

Per creare un account di servizio per il tuo ambiente:

  1. Crea un nuovo account di servizio, come descritto nella documentazione di Identity and Access Management.

  2. Concedere un ruolo, come descritto nella documentazione di Identity and Access Management. Il ruolo richiesto è Worker Composer (composer.worker).

Creazione di un ambiente

  1. Nella console Google Cloud, vai alla pagina Crea ambiente.

    Vai a Crea ambiente

  2. Nel campo Nome, inserisci example-environment.

  3. Nell'elenco a discesa Località, seleziona una regione per l'ambiente Cloud Composer. Questa guida utilizza la regione us-central1.

  4. Per altre opzioni di configurazione dell'ambiente, utilizza i valori predefiniti forniti.

  5. Fai clic su Crea e attendi che l'ambiente venga creato.

  6. Al termine, accanto al nome dell'ambiente viene visualizzato un segno di spunta verde.

Creare un file DAG

Un DAG Airflow è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG sono definiti in file Python standard.

Questa guida utilizza un DAG Airflow di esempio definito nel file quickstart.py. Il codice Python in questo file esegue le seguenti operazioni:

  1. Crea un DAG, composer_sample_dag. Questo DAG viene eseguito ogni giorno.
  2. Esegue un'attività, print_dag_run_conf. L'attività stampa la configurazione della esecuzione del DAG utilizzando l'operatore bash.

Salva una copia del file quickstart.py sulla tua macchina locale:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Carica il file DAG nel bucket dell'ambiente

A ogni ambiente Cloud Composer è associato un bucket Cloud Storage. Airflow in Cloud Composer pianifica solo i DAG che si trovano nella cartella /dags di questo bucket.

Per pianificare il DAG, carica quickstart.py dalla tua macchina locale nella cartella /dags dell'ambiente:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente, example-environment. Viene visualizzata la pagina Dettagli dell'ambiente.

  3. Fai clic su Apri cartella DAG. Viene visualizzata la pagina Dettagli bucket.

  4. Fai clic su Carica file e seleziona la tua copia di quickstart.py.

  5. Per caricare il file, fai clic su Apri.

Visualizza il DAG

Dopo aver caricato il file DAG, Airflow esegue le seguenti operazioni:

  1. Analizza il file DAG che hai caricato. Potrebbero essere necessari alcuni minuti prima che il DAG sia disponibile per Airflow.
  2. Aggiunge il DAG all'elenco dei DAG disponibili.
  3. Esegue il DAG in base alla pianificazione specificata nel file DAG.

Controlla che il DAG venga elaborato senza errori e che sia disponibile in Airflow visualizzandolo nell'interfaccia utente del DAG. L'interfaccia utente DAG è l'interfaccia di Cloud Composer per visualizzare le informazioni sul DAG nella console Google Cloud. Cloud Composer fornisce inoltre accesso all'interfaccia utente di Airflow, un'interfaccia web nativa di Airflow.

  1. Attendi circa cinque minuti per dare ad Airflow il tempo di elaborare il file DAG caricato in precedenza e di completare la prima esecuzione del DAG (spiegato di seguito).

  2. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  3. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente, example-environment. Viene visualizzata la pagina Dettagli dell'ambiente.

  4. Vai alla scheda DAG.

  5. Verifica che il DAG composer_quickstart sia presente nell'elenco dei DAG.

    L'elenco dei DAG mostra il DAG composer_quickstart con informazioni aggiuntive come stato e pianificazione
    Figura 1. Nell'elenco dei DAG viene visualizzato il DAG composer_quickstart (fai clic per ingrandire)

Visualizzare i dettagli dell'esecuzione del DAG

Una singola esecuzione di un DAG è chiamata esecuzione del DAG. Airflow esegue immediatamente un'esecuzione del DAG di esempio perché la data di inizio nel file DAG è impostata su ieri. In questo modo, Airflow si allinea alla pianificazione del DAG specificato.

Il DAG di esempio contiene un'attività, print_dag_run_conf, che esegue il comando echo nella console. Questo comando genera metadati sul DAG (identificatore numerico dell'esecuzione del DAG).

  1. Nella scheda DAG, fai clic su composer_quickstart. Si apre la scheda Esecuzioni per il DAG.

  2. Nell'elenco delle esecuzioni del DAG, fai clic sulla prima voce.

    L'elenco delle esecuzioni di DAG mostra l'esecuzione di DAG recente (data di esecuzione e stato).
    Figura 2. L'elenco delle esecuzioni di DAG per il DAG composer_quickstart (fai clic per ingrandire)
  3. Vengono visualizzati i dettagli dell'esecuzione del DAG, con informazioni dettagliate sulle singole attività del DAG di esempio.

    L'elenco di attività con una voce print_dag_run_conf, la relativa ora di inizio, di fine e la durata
    Figura 3. L'elenco delle attività eseguite nell'esecuzione del DAG (fai clic per ingrandire)
  4. La sezione Log per l'esecuzione del DAG elenca i log di tutte le attività nell'esecuzione del DAG. Puoi visualizzare l'output del comando echo nei log.

    Le voci di log dell'attività, una delle quali è Output e l'altra elenca un identificatore
    Figura 4. Log dell'attività print_dag_run_conf (fai clic per ingrandire)

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate su questa pagina, segui questi passaggi.

Elimina le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Seleziona example-environment e fai clic su Elimina.

    3. Attendi che l'ambiente venga eliminato.

  2. Elimina il bucket dell'ambiente. L'eliminazione dell'ambiente Cloud Composer non comporta l'eliminazione del relativo bucket.

    1. Nella console Google Cloud, vai alla pagina Storage > Browser.

      Vai ad Archiviazione > Browser

    2. Seleziona il bucket dell'ambiente e fai clic su Elimina. Ad esempio, questo bucket può essere denominato us-central1-example-environ-c1616fe8-bucket.

Passaggi successivi