Test dei DAG Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Prima di eseguire il deployment dei DAG in produzione, puoi eseguire i sottocomandi dell'interfaccia a riga di comando di Airflow per analizzare il codice DAG nello stesso contesto in cui viene eseguito il DAG.

Test durante la creazione del DAG

Puoi eseguire una singola istanza di attività in locale e visualizzare l'output del log. La visualizzazione dell'output consente di verificare la presenza di errori di sintassi e delle attività. Il test in locale non controlla le dipendenze né comunica lo stato al database.

Ti consigliamo di inserire i DAG in una cartella data/test del tuo ambiente di test.

Crea una directory di prova

Nel bucket del tuo ambiente, crea una directory di test in cui copia i DAG.

gsutil cp -r BUCKET_NAME/dags \
  BUCKET_NAME/data/test

Sostituisci quanto segue:

  • BUCKET_NAME: il nome del bucket associato al tuo ambiente Cloud Composer.

Esempio:

gsutil cp -r gs://us-central1-example-environment-a12bc345-bucket/dags \
  gs://us-central1-example-environment-a12bc345-bucket/data/test

Per saperne di più sul caricamento dei DAG, vedi Aggiungere e aggiornare i DAG.

Verificare la presenza di errori di sintassi

Per verificare la presenza di errori di sintassi nei DAG che hai caricato nella cartella /data/test, inserisci il seguente comando gcloud:

Flusso d'aria 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  dags list -- --subdir /home/airflow/gcs/data/test

Flusso d'aria 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  list_dags -- -sd /home/airflow/gcs/data/test

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome dell'ambiente.
  • ENVIRONMENT_LOCATION: la regione in cui si trova l'ambiente.

Verificare la presenza di errori nelle attività

Per verificare la presenza di errori specifici delle attività nei DAG che hai caricato nella cartella /data/test, esegui questo comando gcloud:

Flusso d'aria 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

Flusso d'aria 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome dell'ambiente.
  • ENVIRONMENT_LOCATION: la regione in cui si trova l'ambiente.
  • DAG_ID: l'ID del DAG.
  • TASK_ID: l'ID dell'attività.
  • DAG_EXECUTION_DATE: la data di esecuzione del DAG. Questa data viene utilizzata per la creazione di modelli. Indipendentemente dalla data specificata qui, il DAG viene eseguito immediatamente.

Esempio:

Flusso d'aria 2

gcloud composer environments run \
  example-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Flusso d'aria 1

gcloud composer environments run example-environment \
  --location us-central1 \
  test -- -sd /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Aggiornamento e test di un DAG di cui è stato eseguito il deployment

Per testare gli aggiornamenti dei DAG nel tuo ambiente di test:

  1. Copia in data/test il DAG di cui hai eseguito il deployment che vuoi aggiornare.
  2. Aggiorna il DAG.
  3. Testa il DAG.
    1. Verifica la presenza di errori di sintassi.
    2. Verifica la presenza di errori specifici delle attività.
  4. Assicurati che il DAG venga eseguito correttamente.
  5. Disattiva il DAG nel tuo ambiente di test.
    1. Vai alla pagina UI di Airflow > DAG.
    2. Se il DAG che stai modificando viene eseguito costantemente, disattiva il DAG.
    3. Per velocizzare le attività in sospeso, fai clic sull'attività e su Contrassegna come completata.
  6. Esegui il deployment del DAG nel tuo ambiente di produzione.
    1. Disattiva il DAG nel tuo ambiente di produzione.
    2. Carica il DAG aggiornato nella cartella dags/ del tuo ambiente di produzione.

Domande frequenti per i test dei DAG

Come posso isolare le esecuzioni dei DAG nei miei ambienti di produzione e test?

Ad esempio, Airflow ha un repository globale di codice sorgente nella cartella dags/ condiviso da tutte le esecuzioni dei DAG. Vuoi aggiornare il codice sorgente in produzione o test senza interferire con i DAG in esecuzione.

Airflow non fornisce un forte isolamento dei DAG. Ti consigliamo di mantenere ambienti Cloud Composer di produzione e di test separati per evitare che i DAG di test interferiscano con i DAG di produzione.

Come evitare le interferenze dei DAG quando eseguo test di integrazione da diversi rami GitHub

Usa nomi univoci per le attività per evitare interferenze. Ad esempio, puoi aggiungere il nome del ramo agli ID attività.

Qual è una best practice per i test di integrazione con Airflow?

Ti consigliamo di utilizzare un ambiente dedicato per i test di integrazione con Airflow. Un modo per segnalare che l'esecuzione del DAG è riuscita è scrivere in un file in una cartella di Cloud Storage e quindi verificare il contenuto nei tuoi scenari di test di integrazione.

Come posso collaborare in modo efficiente con altri collaboratori DAG?

Ogni collaboratore può avere una sottodirectory nella cartella data/ per lo sviluppo.

I DAG aggiunti alla cartella data/ non vengono raccolti automaticamente dallo scheduler o dal server web di Airflow

I collaboratori DAG possono creare esecuzioni manuali dei DAG utilizzando il comando gcloud composer environments run e il sottocomando test con il flag --subdir per specificare la directory di sviluppo del collaboratore.

Ad esempio:

Flusso d'aria 2

gcloud composer environments run test-environment-name \
  tasks test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

Flusso d'aria 1

gcloud composer environments run test-environment-name \
  test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

Come posso mantenere sincronizzati il mio ambiente di deployment e produzione?

Per gestire l'accesso:

Per eseguire il deployment dallo sviluppo alla produzione:

  • Garantisci una configurazione coerente, ad esempio variabili di ambiente e pacchetti PyPI.

  • Assicurati che gli argomenti DAG siano coerenti. Per evitare l'hardcoded, ti consigliamo di utilizzare le macro e le variabili Airflow.

    Ad esempio:

    Flusso d'aria 2

    gcloud composer environments run test-environment-name \
      variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

    Flusso d'aria 1

    gcloud composer environments run test-environment-name \
      variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

Passaggi successivi