Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa pagina spiega come funzionano la pianificazione e l'attivazione dei DAG in Airflow, come definire una pianificazione per un DAG e come attivare un DAG manualmente o metterlo in pausa.
Informazioni sui DAG di Airflow in Cloud Composer
I DAG di Airflow in Cloud Composer vengono eseguiti in uno o più ambienti Cloud Composer nel progetto. Carichi i file di origine dei DAG di Airflow in un bucket Cloud Storage associato a un ambiente. L'istanza di Airflow dell'ambiente analizza quindi questi file e pianifica le esecuzioni dei DAG, come definito dalla pianificazione di ciascun DAG. Durante l'esecuzione di un DAG, Airflow pianifica ed esegue le singole attività che compongono un DAG in una sequenza definita dal DAG.
Per saperne di più sui concetti principali di Airflow come i DAG e le esecuzioni dei DAG, o operatori, consulta la pagina Concetti principali nella documentazione di Airflow.
Informazioni sulla pianificazione dei DAG in Airflow
Airflow fornisce i seguenti concetti per il suo meccanismo di pianificazione:
- Data logica
Rappresenta una data per la quale viene eseguita una determinata esecuzione del DAG.
Questa non è la data effettiva in cui Airflow esegue un DAG, ma un periodo di tempo che una determinata esecuzione del DAG deve elaborare. Ad esempio, per un DAG pianificato per l'esecuzione ogni giorno alle 12:00, la data logica sarà anche 12:00 in un giorno specifico. Poiché viene eseguito due volte al giorno, il periodo di tempo da elaborare è costituito dalle ultime 12 ore. Allo stesso tempo, la logica definita nel DAG stesso potrebbe non utilizzare affatto la data logica o l'intervallo di tempo. Ad esempio, un DAG potrebbe eseguire lo stesso script una volta al giorno senza utilizzare il valore della data logica.
Nelle versioni di Airflow precedenti alla 2.2, questa data è chiamata data di esecuzione.
- Data esecuzione
Rappresenta una data in cui viene eseguita una determinata esecuzione del DAG.
Ad esempio, per un DAG pianificato per l'esecuzione ogni giorno alle 12:00 l'esecuzione effettiva del DAG potrebbe avvenire alle 12:05, qualche tempo dopo passa una data logica.
- Intervallo pianificazione
Rappresenta quando e con quale frequenza deve essere eseguito un DAG, in termini di date logiche.
Ad esempio, una pianificazione giornaliera indica che un DAG viene eseguito una volta al giorno e le date logiche per le relative esecuzioni hanno intervalli di 24 ore.
- Data di inizio
Specifica quando vuoi che Airflow inizi a pianificare il DAG.
Le attività nel DAG possono avere date di inizio individuali oppure puoi specificare un'unica data di inizio per tutte le attività. In base alla data di inizio minima delle attività nel DAG e nell'intervallo pianificato, Airflow pianifica le esecuzioni dei DAG.
- Recupero, backfill e nuovi tentativi
Meccanismi per l'esecuzione di esecuzioni di DAG per date passate.
La funzionalità di aggiornamento esegue le esecuzioni del DAG che non sono state ancora eseguite, ad esempio se il DAG è stato messo in pausa per un lungo periodo di tempo e poi è stato riattivato. Puoi utilizzare il backfill per eseguire le esecuzioni DAG per un determinato intervallo di date. I tentativi specificano il numero di tentativi che Airflow deve eseguire durante l'esecuzione delle attività da un DAG.
La pianificazione funziona nel seguente modo:
Una volta trascorsa la data di inizio, Airflow attende la successiva occorrenza dell'intervallo di pianificazione.
Airflow pianifica la prima esecuzione del DAG alla fine di questo intervallo di pianificazione.
Ad esempio, se è pianificata l'esecuzione di un DAG ogni ora e la data di inizio è alle 12:00 di oggi, la prima esecuzione del DAG avviene alle 13:00 di oggi.
La sezione Pianifica un DAG Airflow in questo documento descrive come configurare la pianificazione dei DAG utilizzando questi concetti. Per maggiori informazioni informazioni sulle esecuzioni e sulla pianificazione dei DAG, vedi Esecuzioni di DAG nella documentazione di Airflow.
Informazioni sui modi per attivare un DAG
Airflow fornisce i seguenti modi per attivare un DAG:
Attiva in base a una pianificazione. Airflow attiva il DAG automaticamente in base alla pianificazione specificata nel file DAG.
Attiva manualmente. Puoi attivare manualmente un DAG da Console Google Cloud, UI di Airflow o esecuzione di un comando dell'interfaccia a riga di comando di Airflow da Google Cloud CLI.
Si attiva in risposta agli eventi. Il metodo standard per attivare un DAG in la risposta agli eventi consiste nell'usare un sensore.
Altri modi per attivare i DAG:
Si attiva in modo programmatico. Puoi attivare un DAG utilizzando l'API REST Airflow. Ad esempio da uno script Python.
Attivare ed eseguire il codice in modo programmatico in risposta agli eventi. Puoi attivare i DAG in la risposta agli eventi utilizzando Funzioni Cloud Run e l'API REST Airflow.
Prima di iniziare
- Assicurati che il tuo account abbia un ruolo che possa gestire gli oggetti nel di ambiente e visualizzare e attivare i DAG. Per ulteriori informazioni, vedi Controllo dell'accesso.
Pianifica un DAG Airflow
Definisci una pianificazione per un DAG nel file DAG. Modifica la definizione del DAG nel seguente modo:
Individua e modifica il file DAG sul tuo computer. Se non hai il DAG puoi scaricare la relativa copia dal bucket dell'ambiente. Per un nuovo DAG, puoi definire tutti i parametri quando crei il file DAG.
Nel parametro
schedule_interval
, definisci la pianificazione. Puoi utilizzare un'espressione CRON, ad esempio0 0 * * *
, o un valore preimpostato, ad esempio@daily
. Per maggiori informazioni, consulta la sezione Cron e intervalli di tempo nella documentazione di Airflow.Airflow determina le date logiche per le esecuzioni dei DAG in base alla pianificazione impostata.
Nel parametro
start_date
, definisci la data di inizio.Airflow determina la data logica della prima esecuzione del DAG utilizzando questo parametro.
(Facoltativo) Nel parametro
catchup
, definisci se Airflow deve eseguire tutte le esecuzioni precedenti di questo DAG dalla data di inizio alla data corrente che non sono state ancora eseguite.Le esecuzioni dei DAG eseguite durante il recupero avranno la data logica nei e la relativa data di esecuzione rifletterà l'ora in cui l'esecuzione del DAG è stata effettivamente eseguito.
(Facoltativo) Nel parametro
retries
, definisci quante volte Airflow deve riprovare le attività non riuscite (ogni DAG è costituito da una o più attività individuali). Per impostazione predefinita, le attività in Cloud Composer vengono tentate di nuovo in due volte.Carica la nuova versione del DAG nel bucket dell'ambiente.
Attendi che Airflow analizzi correttamente il DAG. Ad esempio, puoi controllare l'elenco dei DAG nel tuo ambiente nella console Google Cloud o nell'interfaccia utente di Airflow.
La definizione di DAG di esempio seguente viene eseguita due volte al giorno alle 00:00 e alle 12:00. La data di inizio è impostata su 1° gennaio 2024, ma Airflow non la esegue per le date passate dopo il caricamento o la messa in pausa perché il recupero è disattivato.
Il DAG contiene un'attività denominata insert_query_job
, che inserisce una riga in una tabella con l'operatore BigQueryInsertJobOperator
. Questo operatore è uno degli
operatori BigQuery di Google Cloud,
che puoi utilizzare per gestire set di dati e tabelle, eseguire query e convalidare i dati.
Se una determinata esecuzione di questa attività non va a buon fine, Airflow la riprova altre quattro volte con l'intervallo di nuovo tentativo predefinito. La data logica per questi tentativi rimane invariata.
La query SQL per questa riga utilizza Modelli Airflow per scrivere la data logica del DAG e il nome della riga.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Per testare questo DAG, puoi attivarlo manualmente e e poi visualizza i log di esecuzione delle attività.
Altri esempi di parametri di pianificazione
I seguenti esempi di parametri di pianificazione illustrano il funzionamento della pianificazione con diverse combinazioni di parametri:
Se
start_date
èdatetime(2024, 4, 4, 16, 25)
eschedule_interval
è30 16 * * *
, la prima esecuzione del DAG avviene alle 16:30 del 5 aprile 2024.Se
start_date
èdatetime(2024, 4, 4, 16, 35)
eschedule_interval
è30 16 * * *
, la prima esecuzione del DAG verrà eseguita alle 16:30 del 6 aprile 2024. Poiché la data di inizio è successiva all'intervallo di pianificazione del 4 aprile 2024, l'esecuzione dei DAG non viene eseguita il 5 aprile 2024. Tuttavia, l'intervallo di pianificazione termina alle 16:35 del 5 aprile 2024, pertanto l'esecuzione successiva del DAG è pianificata per le 16:30 del giorno successivo.Se
start_date
èdatetime(2024, 4, 4)
eschedule_interval
è@daily
, la prima esecuzione del DAG è pianificata per le 00:00 del 5 aprile 2024.Se
start_date
èdatetime(2024, 4, 4, 16, 30)
eschedule_interval
è0 * * * *
, quindi la prima esecuzione del DAG programmato per le 18:00 del 4 aprile 2024. Una volta trascorsi la data e l'ora specificati, Airflow pianifica un'esecuzione del DAG al minuto 0 di ogni ora. Il momento più vicino in cui si verifica è alle 17:00. Al momento, Airflow pianifica l'esecuzione di un DAG alla fine dell'intervallo di pianificazione, cioè alle 18:00.
Attivare manualmente un DAG
Quando attivi manualmente un DAG Airflow, Airflow esegue il DAG una volta, indipendentemente dalla pianificazione specificata nel file DAG.
Console
La UI dei DAG è supportata in Cloud Composer 1.17.8 e versioni successive.
Per attivare un DAG dalla console Google Cloud:
Nella console Google Cloud, vai alla pagina Ambienti.
Seleziona un ambiente per visualizzarne i dettagli.
Nella pagina Dettagli ambiente, vai alla scheda DAG.
Fai clic sul nome di un DAG.
Nella pagina Dettagli DAG, fai clic su Attiva DAG. Una nuova esecuzione di DAG è stato creato.
UI di Airflow
Per attivare un DAG dalla UI di Airflow:
Nella console Google Cloud, vai alla pagina Ambienti.
Nella colonna Server web Airflow, segui il link Airflow per il tuo completamente gestito di Google Cloud.
Accedi con l'Account Google che dispone delle autorizzazioni appropriate.
Nell'interfaccia web di Airflow, nella pagina DAG, nella colonna Collegamenti per il DAG, fai clic sul pulsante Attiva DAG.
(Facoltativo) Specifica la configurazione dell'esecuzione del DAG.
Fai clic su Attiva.
gcloud
In Airflow 1.10.12 o versioni precedenti, esegui il comando trigger_dag
dell'interfaccia a riga di comando di Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
In Airflow 1.10.14 o versioni successive, inclusa Airflow 2, esegui il dags trigger
comando Airflow CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Sostituisci quanto segue:
ENVIRONMENT_NAME
: il nome dell'ambiente.LOCATION
: la regione in cui si trova l'ambiente.DAG_ID
: il nome del DAG.
Per ulteriori informazioni sull'esecuzione dei comandi dell'interfaccia a riga di comando di Airflow negli ambienti Cloud Composer, consulta Eseguire i comandi dell'interfaccia a riga di comando di Airflow.
Per ulteriori informazioni sui comandi dell'interfaccia a riga di comando di Airflow disponibili, consulta il riferimento al comando gcloud composer environments run
.
Visualizzare i log e i dettagli delle esecuzioni di DAG
Nella console Google Cloud puoi:
- Visualizza gli stati delle esecuzioni precedenti di DAG e i dettagli dei DAG.
- Esplora i log dettagliati di tutte le esecuzioni dei DAG e di tutte le attività da questi DAG.
- Visualizza le statistiche DAG.
Inoltre, Cloud Composer fornisce l'accesso all'interfaccia utente di Airflow, ovvero l'interfaccia web di Airflow.
Metti in pausa un DAG
Console
La UI del DAG è supportata in Cloud Composer 1.17.8 e versioni successive.
Per mettere in pausa un DAG dalla console Google Cloud:
Nella console Google Cloud, vai alla pagina Ambienti.
Seleziona un ambiente per visualizzarne i dettagli.
Nella pagina Dettagli ambiente, vai alla scheda DAG.
Fai clic sul nome di un DAG.
Nella pagina Dettagli DAG, fai clic su Metti in pausa DAG.
UI di Airflow
Per mettere in pausa un DAG dalla UI di Airflow:
- Nella console Google Cloud, vai alla pagina Ambienti.
Nella colonna Server web Airflow, segui il link Airflow per il tuo ambiente.
Accedi con l'Account Google che dispone delle autorizzazioni appropriate.
Nell'interfaccia web di Airflow, nella pagina DAG, fai clic su il pulsante di attivazione/disattivazione accanto al nome del DAG.
gcloud
In Airflow 1.10.12 o versioni precedenti, esegui il comando pause
dell'interfaccia a riga di comando Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
pause -- DAG_ID
In Airflow 1.10.14 o versioni successive, incluso Airflow 2, esegui dags pause
Comando dell'interfaccia a riga di comando di Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Sostituisci quanto segue:
ENVIRONMENT_NAME
: il nome dell'ambiente.LOCATION
: la regione in cui si trova l'ambiente.DAG_ID
: il nome del DAG.
Per ulteriori informazioni sull'esecuzione dei comandi dell'interfaccia a riga di comando di Airflow negli ambienti Cloud Composer, consulta Eseguire i comandi dell'interfaccia a riga di comando di Airflow.
Per ulteriori informazioni sui comandi dell'interfaccia a riga di comando di Airflow disponibili, consulta il riferimento al comando gcloud composer environments run
.