Attivazione dei DAG

Cloud Composer 1 | Cloud Composer 2

Questa pagina descrive i diversi modi per attivare i DAG negli ambienti Cloud Composer.

Airflow offre i seguenti modi per attivare un DAG:

  • Attiva il trigger in base a una pianificazione. Quando crei un DAG, ne specifichi una pianificazione. Airflow attiva automaticamente il DAG in base ai parametri di pianificazione specificati.

  • Attiva manualmente. Puoi attivare un DAG manualmente dall'interfaccia utente di Airflow o eseguendo un comando dell'interfaccia a riga di comando di Airflow da gcloud.

  • Attiva l'attivazione in risposta agli eventi. Il modo standard per attivare un DAG in risposta agli eventi consiste nell'utilizzare un sensore.

Altri modi per attivare i DAG:

Attivare un DAG in base a una pianificazione

Per attivare un DAG in base a una pianificazione:

  1. Specifica i parametri start_date e schedule_interval nel file DAG, come descritto più avanti in questa sezione.
  2. Carica il file DAG nel tuo ambiente.

Specifica i parametri di pianificazione

Quando definisci un DAG, nel parametro schedule_interval devi specificare la frequenza di esecuzione del DAG. Nel parametro start_date, specifichi quando vuoi che Airflow inizi a pianificare il DAG. Le attività nel DAG possono avere date di inizio individuali oppure puoi specificare una singola data di inizio per tutte le attività. In base alla data di inizio minima per le attività nel tuo DAG e nell'intervallo di pianificazione, Airflow pianifica le esecuzioni dei DAG.

La programmazione funziona nel seguente modo. Una volta superato start_date, Airflow attende la seguente occorrenza di schedule_interval. Quindi pianifica la prima esecuzione di DAG al termine di questo intervallo di pianificazione. Ad esempio, se un DAG è pianificato per essere eseguito ogni ora (schedule_interval corrisponde a un'ora) e la data di inizio è oggi alle 12:00, la prima esecuzione di DAG avverrà alle 13:00 di oggi.

L'esempio seguente mostra un DAG eseguito ogni ora a partire dalle 15:00 del 5 aprile 2021. Con i parametri utilizzati nell'esempio, Airflow pianifica la prima esecuzione di DAG alle 16:00 del 5 aprile 2021.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2021
    start_date=datetime(2021, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

Per ulteriori informazioni sui parametri di pianificazione, consulta Esecuzioni DAG nella documentazione di Airflow.

Altri esempi di parametri di pianificazione

I seguenti esempi di parametri di pianificazione illustrano come funziona la pianificazione con diverse combinazioni di parametri:

  • Se start_date è datetime(2021, 4, 4, 16, 25) e schedule_interval è 30 16 * * *, la prima esecuzione di DAG avviene alle 16:30 del 5 aprile 2021.
  • Se start_date è datetime(2021, 4, 4, 16, 35) e schedule_interval è 30 16 * * *, la prima esecuzione di DAG avviene alle 16:30 del 6 aprile 2021. Poiché la data di inizio è successiva all'intervallo pianificato il 4 aprile 2021, l'esecuzione di DAG non verrà eseguita il 5 aprile 2021. L'intervallo programmato termina alle 16:35 del 5 aprile 2021, quindi la prossima esecuzione di DAG è programmata per le 16:30 del giorno successivo.
  • Se start_date è datetime(2021, 4, 4) e schedule_interval è @daily, la prima esecuzione di DAG è programmata per le ore 00:00 del 5 aprile 2021.
  • Se start_date è datetime(2021, 4, 4, 16, 30) e schedule_interval è 0 * * * *, la prima esecuzione di DAG è programmata per le 18:00 del 4 aprile 2021. Una volta trascorse la data e l'ora specificate, Airflow pianifica un'esecuzione di DAG al minuto 0 di ogni ora. Il momento più vicino in cui si verifica questa situazione è le 17:00. In questo momento, Airflow pianifica un'esecuzione DAG al termine dell'intervallo di pianificazione, ovvero alle 18:00.

Attivare manualmente un DAG

Quando attivi un DAG manualmente, Airflow esegue un'esecuzione di DAG. Ad esempio, se hai un DAG già eseguito in base a una pianificazione e lo attivi manualmente, Airflow esegue il DAG una volta, indipendentemente dalla pianificazione effettiva specificata per il DAG.

Console

La UI DAG è supportata in Cloud Composer 2.0.1 e versioni successive.

Per attivare un DAG dalla console Google Cloud:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Seleziona un ambiente per visualizzarne i dettagli.

  3. Nella pagina Dettagli ambiente, vai alla scheda DAG.

  4. Fai clic sul nome di un DAG.

  5. Nella pagina Dettagli DAG, fai clic su Attiva DAG. Viene creata una nuova esecuzione DAG.

UI di Airflow

Per attivare un DAG dall'interfaccia web di Airflow:

  1. Nella console Google Cloud, vai alla pagina Ambienti.

Vai ad Ambienti

  1. Nella colonna Airflow webserver, segui il link Airflow per il tuo ambiente.

  2. Accedi con l'Account Google che dispone delle autorizzazioni appropriate.

  3. Nell'interfaccia web di Airflow, nella colonna Link del DAG della pagina DAG, fai clic sul pulsante Evento di attivazione.

  4. (Facoltativo) Specifica la configurazione dell'esecuzione di DAG.

  5. Fai clic su Attivatore.

gcloud

Esegui il comando dell'interfaccia a riga di comando di Airflow dags trigger:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • DAG_ID con il nome del DAG.

Per ulteriori informazioni sull'esecuzione dei comandi dell'interfaccia a riga di comando di Airflow negli ambienti Cloud Composer, consulta Esecuzione dei comandi dell'interfaccia a riga di comando di Airflow.

Per ulteriori informazioni sui comandi dell'interfaccia a riga di comando di Airflow disponibili, consulta la documentazione di riferimento per i comandi gcloud composer environments run.

Passaggi successivi