Programar e acionar DAGs do Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Nesta página, explicamos como a programação e o acionamento de DAG funcionam no Airflow, como definir uma programação para um DAG e como acioná-lo manualmente ou pausá-lo.

Sobre os DAGs do Airflow no Cloud Composer

Os DAGs do Airflow no Cloud Composer são executados um ou mais ambientes do Cloud Composer na sua projeto. Você faz upload de arquivos de origem dos DAGs do Airflow para um bucket do Cloud Storage associado a um ambiente. A instância do ambiente do Airflow analisa esses arquivos e programa execuções de DAG, conforme definido por cada programação do DAG. Durante uma execução de DAG, o Airflow programa e executa tarefas individuais que compõem um DAG em uma sequência definida pelo DAG.

Para saber mais sobre os conceitos básicos do Airflow, como DAGs, execuções de DAG, tarefas ou operadores, consulte a página Conceitos básicos na documentação do Airflow.

Sobre a programação de DAGs no Airflow

O Airflow fornece os seguintes conceitos para o mecanismo de programação:

Data lógica

Representa uma data em que uma determinada execução de DAG é executada.

Essa não é a data real em que o Airflow executa um DAG, mas um período de tempo que uma execução específica do DAG precisa processar. Por exemplo, para um DAG que é programado para execução todos os dias às 12h, a data lógica também seria 12h. em um dia específico. Como ele é executado duas vezes por dia, o período que ele precisa processar é as últimas 12 horas. Ao mesmo tempo, a lógica definida no DAG pode não usar a data lógica ou o intervalo de tempo. Por exemplo, um DAG pode executar o mesmo script uma vez por dia sem usar o valor da data lógica.

Nas versões anteriores do Airflow, essa data é chamada de data de execução.

Data de execução

Representa uma data em que uma execução de DAG específica é executada.

Por exemplo, para um DAG programado para ser executado todos os dias às 12h, o a execução real do DAG pode acontecer às 12h05, algum tempo após de datas lógicos.

Intervalo da programação

Representa quando e com que frequência um DAG precisa ser executado, em termos de datas lógicas.

Por exemplo, uma programação diária significa que um DAG é executado uma vez por dia, e as datas lógicas das execuções do DAG têm intervalos de 24 horas.

Data de início

Especifica quando você quer que o Airflow comece a programar o DAG.

As tarefas no seu DAG podem ter datas de início individuais ou é possível especificar uma data de início única para todas as tarefas. Com base na data de início mínima das tarefas no DAG e no intervalo da programação, o Airflow programa as execuções de DAGs.

Atualização, preenchimento e novas tentativas

Mecanismos para executar execuções de DAGs em datas passadas.

O ajuste defasagem executa execuções de DAG que ainda não foram realizadas, por exemplo, se o DAG foi pausado por um longo período e depois desativado. É possível usar o preenchimento para executar execuções de DAG em um determinado período. Novas tentativas especificar quantas tentativas o Airflow precisa fazer ao executar tarefas de um DAG.

A programação funciona da seguinte maneira:

  1. Após a data de início, o Airflow aguarda a próxima ocorrência do do intervalo de programação.

  2. O Airflow programa a primeira execução do DAG para acontecer no final desta programação intervalo.

    Por exemplo, se um DAG estiver programado para ser executado a cada hora e a data de início for às 12h de hoje, a primeira execução do DAG acontece às 13h de hoje.

A seção Programar um DAG do Airflow descreve como configurar a programação dos DAGs usando esses conceitos. Para mais informações sobre execuções e programação de DAGs, consulte Execuções DAG na documentação do Airflow.

Formas de acionar um DAG

O Airflow fornece as seguintes maneiras de acionar um DAG:

  • Acionar com base em uma programação. O Airflow aciona o DAG automaticamente com base na programação especificada para ela no arquivo DAG.

  • Acionar manualmente. É possível acionar um DAG manualmente Console do Google Cloud, interface do Airflow ou com a execução de um comando da CLI do Airflow usando a Google Cloud CLI.

  • Acionar em resposta a eventos. A maneira padrão de acionar um DAG em resposta a eventos é usar um sensor.

Outras maneiras de acionar DAGs:

Antes de começar

  • Verifique se sua conta tem um papel que pode gerenciar objetos no buckets do ambiente e visualizar e acionar DAGs. Para mais informações, consulte Controle de acesso.

Programar um DAG do Airflow

Você define uma programação para um DAG no arquivo DAG. Edite a definição do DAG em da seguinte forma:

  1. Localize e edite o arquivo DAG no computador. Se você não tiver o DAG faça o download da cópia dele do bucket do ambiente. Para um novo DAG, é preciso pode definir todos os parâmetros ao criar o arquivo DAG.

  2. No parâmetro schedule_interval, defina a programação. Você pode usar um Expressão cron, como 0 0 * * *, ou uma predefinição, como @daily. Para Para mais informações, consulte Cron e intervalos de tempo na documentação do Airflow.

    O Airflow determina datas lógicas para execuções de DAG com base na programação que você definiu.

  3. No parâmetro start_date, defina a data de início.

    O Airflow determina a data lógica da primeira execução do DAG usando esse parâmetro.

  4. (Opcional) No parâmetro catchup, defina se o Airflow precisa executar todas as execuções anteriores desse DAG da data de início até a data atual que ainda não foram executadas.

    As execuções do DAG executadas durante a atualização terão a data lógica no passado, e a data de execução deles refletirá o momento em que o DAG foi realmente executado executada.

  5. (Opcional) No parâmetro retries, defina quantas vezes o Airflow deve repetir as tarefas que falharam (cada DAG consiste em um ou mais tarefas). Por padrão, as tarefas no Cloud Composer são repetidas duas vezes.

  6. Faça upload da nova versão do DAG para o bucket do ambiente.

  7. Aguarde até que o Airflow analise o DAG. Por exemplo, é possível verificar a lista de DAGs no seu ambiente no Console do Google Cloud ou na interface do Airflow.

O exemplo de definição do DAG a seguir é executado duas vezes por dia às 0h e 12h. Seu a data de início está definida como 1o de janeiro de 2024, mas o Airflow não a executa para datas passadas depois de fazer o upload ou pausar porque a atualização foi desativada.

O DAG contém uma tarefa chamada insert_query_job, que insere uma linha em uma tabela com o operador BigQueryInsertJobOperator. Esse operador é um dos Operadores do BigQuery no Google Cloud, que você pode usar para gerenciar conjuntos de dados e tabelas, executar consultas e validar dados. Se uma determinada execução dessa tarefa falhar, o Airflow tentará novamente mais quatro vezes vezes com o intervalo de repetição padrão. A data lógica dessas novas tentativas permanece a mesma.

A consulta SQL para essa linha usa modelos do Airflow para gravar a data lógica e o nome da DAG na linha.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Para testar esse DAG, é possível acioná-lo manualmente e Em seguida, confira os registros de execução da tarefa.

Mais exemplos de parâmetros de programação

Os exemplos de parâmetros de programação a seguir ilustram como a programação funciona com combinações diferentes de parâmetros:

  • Se start_date for datetime(2024, 4, 4, 16, 25) e schedule_interval for 30 16 * * *, a primeira execução do DAG vai acontecer às 16h30 do dia 5 de abril de 2024.

  • Se start_date for datetime(2024, 4, 4, 16, 35) e schedule_interval for 30 16 * * *, a primeira execução do DAG vai acontecer às 16h30 em 6 de abril de 2024. Como a data de início é posterior ao intervalo de programação em 4 de abril de 2024, a execução do DAG não vai acontecer em 5 de abril de 2024. Em vez disso, o cronograma termina às 16h35 de 5 de abril de 2024, então a próxima execução do DAG é agendada para as 16h30 do dia seguinte.

  • Se start_date for datetime(2024, 4, 4) e schedule_interval for @daily, a primeira execução do DAG será programada para a meia-noite do dia 5 de abril de 2024.

  • Se start_date for datetime(2024, 4, 4, 16, 30) e schedule_interval for 0 * * * *, a primeira execução do DAG será programada para 18h do dia 4 de abril de 2024. Após a data e hora especificadas, o Airflow programa uma execução do DAG para ocorrer no minuto 0 de cada hora. O momento mais próximo em que isso acontece é 17h. Nesse momento, o Airflow programa uma execução do DAG para acontecer no final do intervalo da programação, ou seja, às 18h.

Acionar um DAG manualmente

Quando você aciona manualmente um DAG do Airflow, ele é executado uma vez, independentemente da programação especificada no arquivo DAG.

Console

A interface do DAG é compatível com o Cloud Composer 1.17.8 e versões mais recentes.

Para acionar um DAG no console do Google Cloud:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Selecione um ambiente para ver os detalhes.

  3. Na página Detalhes do ambiente, acesse a guia DAGs.

  4. Clique no nome de um DAG.

  5. Na página Detalhes do DAG, clique em Acionar DAG. Uma nova execução do DAG criados.

IU do Airflow

Para acionar um DAG na interface do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  3. Faça login com a Conta do Google que tem as permissões apropriadas.

  4. Na interface da Web do Airflow, na página DAGs, na coluna Links do DAG, clique no botão Acionar DAG.

  5. (Opcional) Especifique a configuração de execução do DAG.

  6. Clique em Gatilho.

gcloud

No Airflow 1.10.12 ou versões anteriores, execute o comando trigger_dag da CLI do Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

No Airflow 1.10.14 ou posterior, incluindo o Airflow 2, execute o dags trigger Comando da CLI do Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Substitua:

  • ENVIRONMENT_NAME: o nome do ambiente;
  • LOCATION: a região em que o ambiente está localizado.
  • DAG_ID: o nome do DAG.

Para mais informações sobre como executar comandos da CLI do Airflow em ambientes do Cloud Composer, consulte Como executar comandos da CLI do Airflow.

Para mais informações sobre os comandos da CLI do Airflow disponíveis, consulte a referência do comando gcloud composer environments run.

Visualizar detalhes e registros de execução do DAG

No console do Google Cloud, é possível:

Além disso, o Cloud Composer fornece acesso ao A IU do Airflow, que é a interface da Web do próprio Airflow.

Pausar um DAG

Console

A interface do DAG é compatível com o Cloud Composer 1.17.8 e versões mais recentes.

Para pausar um DAG no console do Google Cloud:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Selecione um ambiente para ver os detalhes.

  3. Na página Detalhes do ambiente, acesse a guia DAGs.

  4. Clique no nome de um DAG.

  5. Na página Detalhes do DAG, clique em Pausar DAG.

IU do Airflow

Para pausar um DAG na interface do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

Acessar "Ambientes"

  1. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  2. Faça login com a Conta do Google que tem as permissões apropriadas.

  3. Na interface da Web do Airflow, na página DAGs, clique no botão de alternância ao lado do nome do DAG.

gcloud

No Airflow 1.10.12 ou anterior, execute o comando pause da CLI do Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    pause -- DAG_ID

No Airflow 1.10.14 ou mais recente, incluindo o Airflow 2, execute o comando dags pause da CLI do Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Substitua:

  • ENVIRONMENT_NAME: o nome do ambiente;
  • LOCATION: a região em que o ambiente está localizado.
  • DAG_ID: o nome do DAG.

Para mais informações sobre como executar comandos da CLI do Airflow em ambientes do Cloud Composer, consulte Como executar comandos da CLI do Airflow.

Para mais informações sobre os comandos da CLI do Airflow disponíveis, consulte a referência do comando gcloud composer environments run.

A seguir