Como testar DAGs (fluxos de trabalho)

Antes de implantar os DAGs na produção, é possível executar subcomandos CLI do Airflow para analisar o código DAG no mesmo contexto em que o DAG é executado.

Teste durante a criação do DAG

É possível executar uma única instância de tarefa localmente e ver a saída do registro. A visualização da saída permite verificar se há erros de sintaxe e de tarefa. O teste local não verifica as dependências nem informa o status para o banco de dados.

Recomendamos que você coloque os DAGs em uma pasta data/test no ambiente de teste.

Verificação de erros de pacote PyPI

Como as dependências PyPI podem causar conflitos com dependências exigidas pelo Airflow, recomendamos que você instale os pacotes Python que quiser em um contêiner local de worker do Airflow e teste o pacote.

  1. Determine o cluster do GKE do ambiente do Cloud Composer.

  2. Conecte-se ao cluster do GKE.

  3. Visualize e escolha um pod de worker do Airflow.

    kubectl get pods --all-namespaces

    Procure um pod com um nome como airflow-worker-1a2b3c-x0yz.

  4. Conecte-se a um shell remoto em um contêiner de worker do Airflow.

    kubectl -n composer-1-6-0-airflow-example-namespace \
      exec -it airflow-worker-1a2b3c-x0yz -c airflow-worker -- /bin/bash

    Enquanto estiver conectado ao shell remoto, o prompt de comando mostrará o nome do pod de worker do Airflow, como airflow-worker-1a2b3c-x0yz:.

  5. No caso da versão do Python em execução no ambiente, instale o pacote Python no contêiner de worker do Airflow, como:

    sudo python2 -m pip install "[PACKAGE]"

  6. Teste a compatibilidade no contêiner de worker do Airflow.

    • Verifique se há erros de sintaxe.
      airflow list_dags
    • Renderizar o modelo.
      airflow test --dry_run [DAG_ID] [TASK_ID] [EXECUTION_DATE]
    • Verifique se há erros de tarefa.

      airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE]

  7. Desinstale o pacote Python do contêiner de worker do Airflow, como:

    sudo python2 -m pip uninstall "[PACKAGE]"

Como verificar erros de sintaxe

  1. No bucket do Cloud Storage do ambiente, crie um diretório de teste.
  2. Para verificar se há erros de sintaxe, insira este comando gcloud:

    gcloud composer environments run ENVIRONMENT_NAME \
     --location LOCATION \
     list_dags -- -sd /home/airflow/gcs/data/test

    em que:

    • ENVIRONMENT_NAME é o nome do ambiente;
    • LOCATION é a região do Compute Engine em que o ambiente está localizado.

    Exemplo:

    gcloud composer environments run \
     test-environment --location us-central1 \
     list_dags -- -sd /home/airflow/gcs/data/test

Como verificar erros de tarefas

Para verificar se há erros específicos da tarefa, insira este comando gcloud:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • DAG_ID é o ID do DAG;
  • TASK_ID é o ID da tarefa;
  • DAG_EXECUTION_DATE é a data de execução do DAG. Essa data é usada para propósitos de modelos. Independentemente da data especificada aqui, o DAG é executado imediatamente.

Exemplo:

gcloud composer environments run test-environment --location us-central1 \
        -- -sd /home/airflow/gcs/data/test-dags hello_world print_date 2018-09-03

Como atualizar e testar um DAG implantado

Para testar atualizações dos DAGs no ambiente de teste:

  1. Copie o DAG implantado que você quer atualizar para data/test.
  2. Atualize o DAG.
  3. Teste o DAG.
    1. Verifique se há erros de sintaxe.
    2. Verifique se há erros específicos da tarefa.
  4. Verifique se o DAG é executado com êxito.
  5. Desative o DAG no ambiente de teste.
    1. Vá para a página IU do Airflow > DAGs.
    2. Se o DAG que você está modificando está sendo executado constantemente, desative o DAG.
    3. Para acelerar as tarefas pendentes, clique na tarefa e em Marcar com êxito.
  6. Implante o DAG no ambiente de produção.
    1. Desative o DAG no ambiente de produção.
    2. Faça o upload do DAG atualizado para a pasta dags/ no ambiente de produção.

Perguntas frequentes sobre testes de fluxos de trabalho

A seguir