Attiva i DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina descrive i diversi modi per attivare i DAG negli ambienti Cloud Composer.

Airflow fornisce i seguenti modi per attivare un DAG:

  • Attiva in base a una pianificazione. Quando crei un DAG, devi specificare una pianificazione per il DAG. Airflow attiva il DAG automaticamente in base ai parametri di pianificazione specificati.

  • Attivazione manuale. Puoi attivare un DAG manualmente dalla console Google Cloud, dalla UI di Airflow o eseguendo un comando dell'interfaccia a riga di comando di Airflow da gcloud.

  • Si attiva in risposta agli eventi. Il modo standard per attivare un DAG in risposta agli eventi è usare un sensore.

Altri modi per attivare i DAG:

Attiva un DAG in base a una pianificazione

Per attivare un DAG in una pianificazione:

  1. Specifica i parametri start_date e schedule_interval nel file DAG, come descritto più avanti in questa sezione.
  2. Carica il file DAG nel tuo ambiente.

Specifica i parametri di pianificazione

Quando definisci un DAG, nel parametro schedule_interval specifichi quanto spesso vuoi eseguire il DAG. Nel parametro start_date, specifichi quando vuoi che Airflow inizi a pianificare il tuo DAG. Le attività nel DAG possono avere date di inizio individuali oppure puoi specificare un'unica data di inizio per tutte le attività. In base alla data di inizio minima delle attività nel DAG e all'intervallo di pianificazione, Airflow pianifica le esecuzioni dei DAG.

La pianificazione funziona nel seguente modo. Dopo il passaggio start_date, Airflow attende la seguente occorrenza di schedule_interval. Quindi pianifica la prima esecuzione del DAG al termine di questo intervallo di pianificazione. Ad esempio, se un DAG è pianificato per essere eseguito ogni ora (schedule_interval corrisponde a un'ora) e la data di inizio è alle 12:00 di oggi, la prima esecuzione del DAG avviene alle 13:00 di oggi.

L'esempio seguente mostra un DAG che viene eseguito ogni ora a partire dalle 15:00 del 5 aprile 2024. Con i parametri utilizzati nell'esempio, Airflow pianifica la prima esecuzione di DAG alle 16:00 del 5 aprile 2024.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2024
    start_date=datetime(2024, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

Per ulteriori informazioni sui parametri di pianificazione, consulta Esecuzioni di DAG nella documentazione di Airflow.

Altri esempi di parametri di pianificazione

I seguenti esempi di parametri di pianificazione mostrano come funziona la pianificazione con diverse combinazioni di parametri:

  • Se start_date è datetime(2024, 4, 4, 16, 25) e schedule_interval è 30 16 * * *, la prima esecuzione del DAG avviene alle 16:30 del 5 aprile 2024.

  • Se start_date è datetime(2024, 4, 4, 16, 35) e schedule_interval è 30 16 * * *, la prima esecuzione del DAG avviene alle 16:30 del 6 aprile 2024. Poiché la data di inizio è successiva all'intervallo di pianificazione del 4 aprile 2024, l'esecuzione del DAG non viene eseguita il 5 aprile 2024. L'intervallo di pianificazione termina invece alle 16:35 del 5 aprile 2024, quindi la successiva esecuzione del DAG è pianificata alle 16:30 del giorno successivo.

  • Se start_date è datetime(2024, 4, 4) e schedule_interval è @daily, la prima esecuzione del DAG è pianificata per le 00:00 del 5 aprile 2024.

  • Se start_date è datetime(2024, 4, 4, 16, 30) e schedule_interval è 0 * * * *, la prima esecuzione del DAG è pianificata per le 18:00 del 4 aprile 2024. Una volta trascorsa la data e l'ora specificate, Airflow pianifica l'esecuzione di un DAG al minuto 0 di ogni ora. Il momento più vicino in cui questo accade sono le 17:00. In questo momento, Airflow pianifica l'esecuzione di un DAG alla fine dell'intervallo di pianificazione, ovvero alle 18:00.

Attiva manualmente un DAG

Quando attivi un DAG manualmente, Airflow esegue un'esecuzione del DAG. Ad esempio, se disponi di un DAG che viene già eseguito in base a una pianificazione e attivi questo DAG manualmente, Airflow esegue il DAG una volta, indipendentemente dalla pianificazione effettiva specificata per il DAG.

Console

La UI del DAG è supportata in Cloud Composer 2.0.1 e versioni successive.

Per attivare un DAG dalla console Google Cloud:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Seleziona un ambiente per visualizzarne i dettagli.

  3. Nella pagina Dettagli ambiente, vai alla scheda DAG.

  4. Fai clic sul nome di un DAG.

  5. Nella pagina Dettagli DAG, fai clic su Attiva DAG. Viene creata una nuova esecuzione del DAG.

UI di Airflow

Per attivare un DAG dall'interfaccia web di Airflow:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

Vai ad Ambienti

  1. Nella colonna Server web Airflow, segui il link Airflow per il tuo ambiente.

  2. Accedi con l'Account Google che dispone delle autorizzazioni appropriate.

  3. Nell'interfaccia web di Airflow, nella pagina DAG, nella colonna Link per il tuo DAG, fai clic sul pulsante Trigger DAG.

  4. (Facoltativo) Specifica la configurazione dell'esecuzione del DAG.

  5. Fai clic su Attiva.

gcloud

Esegui il comando dell'interfaccia a riga di comando di Airflow dags trigger:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • DAG_ID con il nome del DAG.

Per ulteriori informazioni sull'esecuzione dei comandi dell'interfaccia a riga di comando di Airflow negli ambienti Cloud Composer, consulta Esecuzione dei comandi dell'interfaccia a riga di comando di Airflow.

Per maggiori informazioni sui comandi dell'interfaccia a riga di comando di Airflow disponibili, consulta il riferimento per i comandi gcloud composer environments run.

Passaggi successivi