Test dei DAG Airflow

Cloud Composer 1 | Cloud Composer 2

Prima di eseguire il deployment dei DAG in produzione, puoi eseguire i comandi secondari dell'interfaccia a riga di comando di Airflow per analizzare il codice DAG nello stesso contesto in cui viene eseguito il DAG.

Test durante la creazione dei DAG

Puoi eseguire una singola istanza di attività localmente e visualizzare l'output del log. La visualizzazione dell'output consente di verificare la presenza di errori di sintassi e delle attività. Il test localmente non controlla le dipendenze né comunica lo stato al database.

Ti consigliamo di inserire i DAG in una cartella data/test nel tuo ambiente di test.

Crea una directory di test

Nel bucket del tuo ambiente, crea una directory di test in cui copiare i DAG.

gsutil cp -r BUCKET_NAME/dags \
  BUCKET_NAME/data/test

Sostituisci quanto segue:

  • BUCKET_NAME: il nome del bucket associato al tuo ambiente Cloud Composer.

Esempio:

gsutil cp -r gs://us-central1-example-environment-a12bc345-bucket/dags \
  gs://us-central1-example-environment-a12bc345-bucket/data/test

Per maggiori informazioni sul caricamento dei DAG, consulta Aggiungere e aggiornare i DAG.

Verificare la presenza di errori di sintassi

Per verificare la presenza di errori di sintassi nei DAG caricati nella cartella /data/test, inserisci il seguente comando gcloud:

Flusso d'aria 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  dags list -- --subdir /home/airflow/gcs/data/test

Flusso d'aria 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  list_dags -- -sd /home/airflow/gcs/data/test

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome dell'ambiente.
  • ENVIRONMENT_LOCATION: la regione in cui si trova l'ambiente.

Verifica la presenza di errori delle attività

Per verificare la presenza di errori specifici per le attività nei DAG caricati nella cartella /data/test, esegui il comando gcloud seguente:

Flusso d'aria 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

Flusso d'aria 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome dell'ambiente.
  • ENVIRONMENT_LOCATION: la regione in cui si trova l'ambiente.
  • DAG_ID: l'ID del DAG.
  • TASK_ID: l'ID dell'attività.
  • DAG_EXECUTION_DATE: la data di esecuzione del DAG. Questa data viene utilizzata per la creazione di modelli. Indipendentemente dalla data specificata qui, il DAG viene eseguito immediatamente.

Esempio:

Flusso d'aria 2

gcloud composer environments run \
  example-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Flusso d'aria 1

gcloud composer environments run example-environment \
  --location us-central1 \
  test -- -sd /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Aggiornamento e test di un DAG di cui è stato eseguito il deployment

Per testare gli aggiornamenti ai DAG nel tuo ambiente di test:

  1. Copia in data/test il DAG di cui vuoi aggiornare il deployment.
  2. Aggiorna il DAG.
  3. Testa il DAG.
    1. Verifica l'eventuale presenza di errori di sintassi.
    2. Verifica la presenza di errori specifici per le attività.
  4. Assicurati che il DAG venga eseguito correttamente.
  5. Disattiva il DAG nell'ambiente di test.
    1. Vai alla pagina UI di Airflow > DAG.
    2. Se il DAG che stai modificando è in esecuzione continua, disattivalo.
    3. Per velocizzare le attività in sospeso, fai clic sull'attività e su Contrassegna come completata.
  6. Eseguire il deployment del DAG nell'ambiente di produzione.
    1. Disattiva il DAG nell'ambiente di produzione.
    2. Carica il DAG aggiornato nella cartella dags/ nel tuo ambiente di produzione.

Domande frequenti per il test dei DAG

Come posso isolare le esecuzioni di DAG nei miei ambienti di produzione e di test?

Ad esempio, Airflow ha un repository globale di codice sorgente nella cartella dags/ condiviso da tutti i DAG eseguiti. e vuoi aggiornare il codice sorgente in fase di produzione o test senza interferire con i DAG in esecuzione.

Airflow non fornisce un isolamento DAG elevato. Ti consigliamo di mantenere ambienti di produzione e di test separati di Cloud Composer per evitare che i DAG di test interferiscano con i DAG di produzione.

Come posso evitare interferenze DAG quando eseguo test di integrazione da diversi rami GitHub

Usa nomi univoci delle attività per evitare interferenze. Ad esempio, puoi anteporre il nome del ramo agli ID attività.

Qual è una best practice per i test di integrazione con Airflow?

Ti consigliamo di utilizzare un ambiente dedicato per i test di integrazione con Airflow. Un modo per segnalare il successo dell'esecuzione di DAG è scrivere in un file in una cartella Cloud Storage e quindi controllare il contenuto nei tuoi scenari di test di integrazione.

Come faccio a collaborare in modo efficiente con altri collaboratori DAG?

Ogni collaboratore può avere una sottodirectory nella cartella data/ per lo sviluppo.

I DAG aggiunti alla cartella data/ non vengono selezionati automaticamente dallo scheduler di Airflow o dal server web

I collaboratori DAG possono creare esecuzioni manuali di DAG utilizzando il comando gcloud composer environments run e il sottocomando test con il flag --subdir per specificare la directory di sviluppo del collaboratore.

Ad esempio:

Flusso d'aria 2

gcloud composer environments run test-environment-name \
  tasks test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

Flusso d'aria 1

gcloud composer environments run test-environment-name \
  test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

Come posso mantenere sincronizzati gli ambienti di deployment e produzione?

Per gestire l'accesso:

Per eseguire il deployment dallo sviluppo alla produzione:

  • Garantisci una configurazione coerente, ad esempio variabili di ambiente e pacchetti PyPI.

  • Assicurati che argomenti DAG coerenti. Per evitare l'hardcoding, ti consigliamo di utilizzare le macro e le variabili Airflow.

    Ad esempio:

    Flusso d'aria 2

    gcloud composer environments run test-environment-name \
      variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

    Flusso d'aria 1

    gcloud composer environments run test-environment-name \
      variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

Passaggi successivi