Agende e acione DAGs do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página explica como a programação e o acionamento de DAGs funcionam no Airflow, como definir uma programação para um DAG e como acionar um DAG manualmente ou pausá-lo.

Acerca dos DAGs do Airflow no Cloud Composer

Os DAGs do Airflow no Cloud Composer são executados em um ou mais ambientes do Cloud Composer no seu projeto. Carrega ficheiros de origem dos seus DAGs do Airflow para um contentor do Cloud Storage associado a um ambiente. A instância do Airflow do ambiente analisa estes ficheiros e agenda execuções de DAGs, conforme definido pela agenda de cada DAG. Durante uma execução de DAG, o Airflow agenda e executa tarefas individuais que compõem um DAG numa sequência definida pelo DAG.

Para saber mais sobre os conceitos principais do Airflow, como DAGs do Airflow, execuções de DAGs, tarefas ou operadores, consulte a página Conceitos principais na documentação do Airflow.

Acerca da programação de DAGs no Airflow

O Airflow oferece os seguintes conceitos para o respetivo mecanismo de agendamento:

Data lógica

Representa uma data para a qual uma execução de DAG específica é executada.

Esta não é a data real em que o Airflow executa um DAG, mas sim um período durante o qual uma execução de DAG específica tem de ser processada. Por exemplo, para um DAG agendado para ser executado todos os dias às 12:00, a data lógica também seria às 12:00 num dia específico. Uma vez que é executado duas vezes por dia, o período de tempo que tem de processar é as últimas 12 horas. Ao mesmo tempo, a lógica definida no DAG propriamente dito pode não usar a data lógica nem o intervalo de tempo. Por exemplo, um DAG pode executar o mesmo script uma vez por dia sem usar o valor da data lógica.

Nas versões do Airflow anteriores à 2.2, esta data é denominada data de execução.

Data de execução

Representa uma data em que uma execução de DAG específica é executada.

Por exemplo, para um DAG agendado para ser executado todos os dias às 12:00, a execução real do DAG pode ocorrer às 12:05, algum tempo após a data lógica.

Intervalo de agendamento

Representa quando e com que frequência um DAG tem de ser executado, em termos de datas lógicas.

Por exemplo, um horário diário significa que um DAG é executado uma vez por dia e que as datas lógicas das respetivas execuções de DAG têm intervalos de 24 horas.

Data de início

Especifica quando quer que o Airflow comece a agendar o seu DAG.

As tarefas no seu DAG podem ter datas de início individuais ou pode especificar uma única data de início para todas as tarefas. Com base na data de início mínima das tarefas no seu DAG e no intervalo de agendamento, o Airflow agenda execuções de DAG.

Sincronização, preenchimento e novas tentativas

Mecanismos para executar execuções de DAG para datas anteriores.

O Catchup executa execuções de DAG que ainda não foram executadas, por exemplo, se o DAG tiver sido pausado durante um longo período e, em seguida, retomado. Pode usar o preenchimento para executar execuções de DAG para um determinado intervalo de datas. As repetições especificam quantas tentativas o Airflow tem de fazer ao executar tarefas de um DAG.

O agendamento funciona da seguinte forma:

  1. Após a data de início, o Airflow aguarda a próxima ocorrência do intervalo de agendamento.

  2. O Airflow agenda a primeira execução do DAG para ocorrer no final deste intervalo de agendamento.

    Por exemplo, se um DAG estiver agendado para ser executado a cada hora e a data de início for às 12:00 de hoje, a primeira execução do DAG ocorre às 13:00 de hoje.

A secção Agende um DAG do Airflow neste documento descreve como configurar o agendamento dos seus DAGs através destes conceitos. Para mais informações sobre execuções de DAGs e agendamento, consulte Execuções de DAGs na documentação do Airflow.

Acerca das formas de acionar um DAG

O Airflow oferece as seguintes formas de acionar um DAG:

  • Acionar com base num horário. O Airflow aciona o DAG automaticamente com base na programação especificada no ficheiro DAG.

  • Acionar manualmente. Pode acionar um DAG manualmente a partir da Google Cloud consola, da IU do Airflow ou executando um comando da CLI do Airflow a partir da CLI do Google Cloud.

  • Acionam em resposta a eventos. A forma padrão de acionar um DAG em resposta a eventos é usar um sensor.

Outras formas de acionar DAGs:

Antes de começar

  • Certifique-se de que a sua conta tem uma função que pode gerir objetos nos contentores do ambiente e ver e acionar DAGs. Para mais informações, consulte o artigo Controlo de acesso.

Agende um DAG do Airflow

Define um agendamento para um DAG no ficheiro DAG. Edite a definição do DAG da seguinte forma:

  1. Localize e edite o ficheiro DAG no computador. Se não tiver o ficheiro DAG, pode transferir uma cópia do contentor do ambiente. Para um novo DAG, pode definir todos os parâmetros quando cria o ficheiro DAG.

  2. No parâmetro schedule_interval, defina a programação. Pode usar uma expressão cron, como 0 0 * * *, ou uma predefinição, como @daily. Para mais informações, consulte o artigo Cron and Time Intervals na documentação do Airflow.

    O Airflow determina as datas lógicas para as execuções de DAG com base no horário que definir.

  3. No parâmetro start_date, defina a data de início.

    O Airflow determina a data lógica da primeira execução do DAG através deste parâmetro.

  4. (Opcional) No parâmetro catchup, defina se o Airflow tem de executar todas as execuções anteriores deste DAG desde a data de início até à data atual que ainda não foram executadas.

    As execuções de DAGs executadas durante a sincronização vão ter a respetiva data lógica no passado e a data de execução vai refletir a hora em que a execução de DAG foi efetivamente executada.

  5. (Opcional) No parâmetro retries, defina quantas vezes o Airflow tem de tentar novamente as tarefas com falhas (cada DAG consiste numa ou mais tarefas individuais). Por predefinição, as tarefas no Cloud Composer são repetidas duas vezes.

  6. Carregue a nova versão do DAG para o contentor do ambiente.

  7. Aguarde até que o Airflow analise com êxito o DAG. Por exemplo, pode verificar a lista de DAGs no seu ambiente naGoogle Cloud consola ou na IU do Airflow.

A seguinte definição de DAG de exemplo é executada duas vezes por dia, às 00:00 e às 12:00. A data de início está definida como 1 de janeiro de 2024, mas o Airflow não o executa para datas anteriores depois de o carregar ou pausar porque a sincronização está desativada.

O DAG contém uma tarefa denominada insert_query_job, que insere uma linha numa tabela com o operador BigQueryInsertJobOperator. Este operador é um dos Google Cloud operadores do BigQuery, que pode usar para gerir conjuntos de dados e tabelas, executar consultas e validar dados. Se uma execução específica desta tarefa falhar, o Airflow tenta novamente mais quatro vezes com o intervalo de repetição predefinido. A data lógica destas novas tentativas permanece a mesma.

A consulta SQL para esta linha usa modelos do Airflow para escrever a data lógica e o nome do DAG na linha.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Para testar este DAG, pode acioná-lo manualmente e, em seguida, ver os registos de execução de tarefas.

Mais exemplos de parâmetros de agendamento

Os exemplos de parâmetros de agendamento seguintes ilustram o funcionamento do agendamento com diferentes combinações de parâmetros:

  • Se start_date for datetime(2024, 4, 4, 16, 25) e schedule_interval for 30 16 * * *, a primeira execução do DAG ocorre às 16:30 a 5 de abril de 2024.

  • Se start_date for datetime(2024, 4, 4, 16, 35) e schedule_interval for 30 16 * * *, a primeira execução do DAG ocorre às 16:30 a 6 de abril de 2024. Uma vez que a data de início é posterior ao intervalo da programação de 4 de abril de 2024, a execução do DAG não ocorre a 5 de abril de 2024. Em vez disso, o intervalo de programação termina às 16:35 a 5 de abril de 2024, pelo que a próxima execução do DAG está agendada para as 16:30 do dia seguinte.

  • Se start_date for datetime(2024, 4, 4) e schedule_interval for @daily, a primeira execução do DAG está agendada para as 00:00 de 5 de abril de 2024.

  • Se start_date for datetime(2024, 4, 4, 16, 30) e o schedule_interval for 0 * * * *, a primeira execução do DAG está agendada para as 18:00 de 4 de abril de 2024. Após a data e a hora especificadas, o Airflow agenda uma execução de DAG para ocorrer no minuto 0 de cada hora. O ponto mais próximo no tempo em que isto acontece é às 17:00. Nesta altura, o Airflow agenda uma execução de DAG para ocorrer no final do intervalo do agendamento, ou seja, às 18:00.

Acione um DAG manualmente

Quando aciona manualmente um DAG do Airflow, o Airflow executa o DAG uma vez, independentemente do horário especificado no ficheiro DAG.

Consola

Para acionar um DAG a partir da Google Cloud consola:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Selecione um ambiente para ver os respetivos detalhes.

  3. Na página Detalhes do ambiente, aceda ao separador DAGs.

  4. Clique no nome de um DAG.

  5. Na página Detalhes do DAG, clique em Acionar DAG. É criado um novo ciclo de execução do DAG.

IU do Airflow

Para acionar um DAG a partir da IU do Airflow:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.

  3. Inicie sessão com a Conta Google que tem as autorizações adequadas.

  4. Na interface Web do Airflow, na página DAGs, na coluna Ações do seu DAG, clique no botão Acionar DAG.

gcloud

Execute o dags triggercomando da CLI do Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.
  • DAG_ID: o nome do DAG.

Para mais informações sobre a execução de comandos da CLI do Airflow em ambientes do Cloud Composer, consulte o artigo Executar comandos da CLI do Airflow.

Para mais informações sobre os comandos da CLI do Airflow disponíveis, consulte a gcloud composer environments run referência de comandos.

Veja os registos e os detalhes da execução do DAG

Na Google Cloud consola, pode:

Além disso, o Cloud Composer oferece acesso à IU do Airflow, que é a interface Web do Airflow.

Pause um DAG

Consola

Para pausar um DAG a partir da Google Cloud consola:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Selecione um ambiente para ver os respetivos detalhes.

  3. Na página Detalhes do ambiente, aceda ao separador DAGs.

  4. Clique no nome de um DAG.

  5. Na página Detalhes do DAG, clique em Pausar DAG.

IU do Airflow

Para pausar um DAG a partir da IU do Airflow:

  1. Na Google Cloud consola, aceda à página Ambientes.

Aceder a Ambientes

  1. Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.

  2. Inicie sessão com a Conta Google que tem as autorizações adequadas.

  3. Na interface Web do Airflow, na página DAGs, clique no botão para ativar/desativar junto ao nome do DAG.

gcloud

Execute o dags pausecomando da CLI do Airflow:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.
  • DAG_ID: o nome do DAG.

Para mais informações sobre a execução de comandos da CLI do Airflow em ambientes do Cloud Composer, consulte o artigo Executar comandos da CLI do Airflow.

Para mais informações sobre os comandos da CLI do Airflow disponíveis, consulte a gcloud composer environments run referência de comandos.

O que se segue?