Test dei DAG di Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Prima di eseguire il deployment dei DAG in produzione, puoi eseguire i sottocomandi della CLI Airflow per analizzare il codice DAG nello stesso contesto in cui viene eseguito il DAG.

Test durante la creazione del DAG

Puoi eseguire una singola istanza di attività in locale e visualizzare l'output del log. La visualizzazione dell'output ti consente di verificare la presenza di errori di sintassi e attività. I test in locale non controllano le dipendenze né comunicano lo stato al database.

Ti consigliamo di inserire i DAG in una cartella data/test nel tuo ambiente di test.

Crea una directory di test

Nel bucket del tuo ambiente, crea una directory di test in cui copia i DAG.

gcloud storage cp BUCKET_NAME/dags \
  BUCKET_NAME/data/test --recursive

Sostituisci quanto segue:

  • BUCKET_NAME: il nome del bucket associato al tuo nell'ambiente Cloud Composer.

Esempio:

gcloud storage cp gs://us-central1-example-environment-a12bc345-bucket/dags \
  gs://us-central1-example-environment-a12bc345-bucket/data/test --recursive

Per ulteriori informazioni sul caricamento dei DAG, consulta Aggiungere e aggiornare i DAG.

Verificare la presenza di errori di sintassi

Per verificare la presenza di errori di sintassi nei DAG caricati nella cartella /data/test , inserisci il seguente comando gcloud:

Flusso d'aria 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  dags list -- --subdir /home/airflow/gcs/data/test

Flusso d'aria 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  list_dags -- -sd /home/airflow/gcs/data/test

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome dell'ambiente.
  • ENVIRONMENT_LOCATION: la regione in cui si trova l'ambiente.

Verificare la presenza di errori nelle attività

Per verificare la presenza di errori specifici delle attività nei DAG che hai caricato nell'/data/test esegui questo comando gcloud:

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

Flusso d'aria 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome dell'ambiente.
  • ENVIRONMENT_LOCATION: la regione in cui si trova l'ambiente.
  • DAG_ID: l'ID del DAG.
  • TASK_ID: l'ID dell'attività.
  • DAG_EXECUTION_DATE: la data di esecuzione del DAG. Questa data viene utilizzata per la creazione di modelli. Indipendentemente dalla data specificata qui, il DAG viene eseguita immediatamente.

Esempio:

Flusso d'aria 2

gcloud composer environments run \
  example-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Airflow 1

gcloud composer environments run example-environment \
  --location us-central1 \
  test -- -sd /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Aggiornamento e test di un DAG di cui è stato eseguito il deployment

Per testare gli aggiornamenti dei DAG nell'ambiente di test:

  1. Copia in data/test il DAG di cui hai eseguito il deployment che vuoi aggiornare.
  2. Aggiorna il DAG.
  3. Testa il DAG.
    1. Controlla che non siano presenti errori di sintassi.
    2. Verifica la presenza di errori specifici delle attività.
  4. Assicurati che il DAG venga eseguito correttamente.
  5. Disattiva il DAG nel tuo ambiente di test.
    1. Vai alla pagina UI di Airflow > DAG.
    2. Se il DAG che stai modificando viene eseguito costantemente, disattivalo.
    3. Per velocizzare le attività in sospeso, fai clic sull'attività e su Contrassegna come completata.
  6. Esegui il deployment del DAG nel tuo ambiente di produzione.
    1. Disattiva il DAG nell'ambiente di produzione.
    2. Carica il DAG aggiornato nella cartella dags/ del tuo ambiente di produzione.

Domande frequenti per i test dei DAG

Come posso isolare le esecuzioni dei DAG nei miei ambienti di produzione e test?

Ad esempio, Airflow dispone di un repository globale di codice sorgente in dags/ condivisa condivisa da tutte le esecuzioni dei DAG. Vuoi aggiornare il codice sorgente in produzione o in test senza interferire con i DAG in esecuzione.

Airflow non fornisce un forte isolamento dei DAG. I nostri suggerimenti di mantenere una produzione separata e testare Cloud Composer per impedire ai DAG di test di interferire con la tua produzione e i DAG.

Come faccio a evitare interferenze con il DAG quando eseguo test di integrazione da diversi rami di GitHub

Usa nomi univoci per le attività per evitare interferenze. Ad esempio, puoi aggiungere un prefisso gli ID attività con il nome del ramo.

Qual è una best practice per i test di integrazione con Airflow?

Ti consigliamo di utilizzare un ambiente dedicato per i test di integrazione Airflow. Un modo per segnalare l'esito positivo dell'esecuzione del DAG è scrivere in un file in una cartella Cloud Storage e poi controllare i contenuti nei tuoi scenari di test di integrazione.

Come faccio a collaborare in modo efficiente con altri collaboratori del DAG?

Ogni collaboratore può avere una sottodirectory nella cartella data/ per lo sviluppo.

I DAG aggiunti alla cartella data/ non vengono raccolti automaticamente Scheduler o server web Airflow

I collaboratori del DAG possono creare esecuzioni manuali del DAG utilizzando il comando gcloud composer environments run e il sottocomando test con il flag --subdir per specificare la directory di sviluppo del collaboratore.

Ad esempio:

Flusso d'aria 2

gcloud composer environments run test-environment-name \
  tasks test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

Airflow 1

gcloud composer environments run test-environment-name \
  test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

Come faccio a mantenere sincronizzati gli ambienti di deployment e di produzione?

Per gestire l'accesso:

Per eseguire il deployment dallo sviluppo alla produzione:

  • Assicurati una configurazione coerente, ad esempio variabili di ambiente e PyPI .

  • Assicurati che gli argomenti DAG siano coerenti. Per evitare la programmazione a livello di codice, ti consigliamo di utilizzare le macro e le variabili Airflow.

    Ad esempio:

    Flusso d'aria 2

    gcloud composer environments run test-environment-name \
      variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

    Airflow 1

    gcloud composer environments run test-environment-name \
      variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

Passaggi successivi