Usar operadores adiáveis nos DAGs

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Nesta página, explicamos como ativar o suporte para operadores adiáveis no ambiente e usar operadores adiáveis do Google Cloud nos DAGs.

Sobre os operadores adiáveis no Cloud Composer

Se você tiver pelo menos uma instância de engatilhador (ou pelo menos duas em ambientes altamente resilientes), será possível usar operadores e gatilhos deferidos nos DAGs.

Para operadores adiáveis, o Airflow divide a execução da tarefa nos seguintes estágios:

  1. Inicie a operação. Neste estágio, a tarefa ocupa um slot de worker do Airflow. A tarefa executa uma operação que delega o job para um serviço diferente.

    Por exemplo, a execução de um job do BigQuery pode levar de alguns segundos a várias horas. Depois de criar o job, a operação transmite o identificador de trabalho (ID do job do BigQuery) para um gatilho do Airflow.

  2. O gatilho monitora o job até que ele seja concluído. Neste estágio, um slot de worker não está ocupado. O acionador do Airflow tem arquitetura assíncrona e é capaz de lidar com centenas desses jobs. Quando o acionador detecta que o job foi concluído, ele envia um evento que aciona o último estágio.

  3. Na última etapa, um worker do Airflow executa um callback. Esse callback, por exemplo, pode marcar a tarefa como bem-sucedida ou executar outra operação e definir o job para ser monitorado novamente pelo acionador.

O engatilhador não tem estado e, portanto, é resiliente a interrupções ou reinicializações. Por isso, os jobs de longa duração são resilientes a reinicializações de pods, a menos que a reinicialização aconteça durante o último estágio, que é esperado que seja curto.

Antes de começar

  • Os operadores e sensores adiáveis estão disponíveis nos ambientes do Cloud Composer 2 e exigem o seguinte:
    • Cloud Composer 2.0.31 e versões mais recentes
    • Airflow 2.2.5, 2.3.3 e versões mais recentes

Ativar o suporte a operadores adiáveis

Um componente de ambiente chamado acionador do Airflow monitora de maneira assíncrona todas as tarefas adiadas no seu ambiente. Depois que uma operação adiada dessa tarefa é concluída, o acionador passa a tarefa para um worker do Airflow.

Você precisa de pelo menos uma instância de engatilhador no seu ambiente (ou pelo menos duas em ambientes altamente resilientes) para usar o modo adiável nos DAGs. É possível configurar os acionadores ao criar um ambiente ou ajustar o número de acionadores e parâmetros de desempenho de um ambiente atual.

Operadores do Google Cloud com suporte para o modo adiável

Apenas alguns operadores do Airflow foram estendidos para oferecer suporte ao modelo adiável. A lista a seguir é uma referência para os operadores no pacote airflow.providers.google.operators.cloud que oferecem suporte ao modo adiável. A coluna com a versão de pacote airflow.providers.google.operators.cloud mínima necessária representa a versão do pacote mais antiga em que esse operador oferece suporte ao modo deferível.

Operadores do Cloud Composer

Nome do operadorVersão apache-airflow-providers-google necessária
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0

Operadores do BigQuery

Nome do operadorVersão apache-airflow-providers-google necessária
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Operadores do serviço de transferência de dados do BigQuery

Nome do operadorVersão apache-airflow-providers-google necessária
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Operadores do Cloud Build

Nome do operadorVersão apache-airflow-providers-google necessária
CloudBuildCreateBuildOperator 8.7.0

Operadores do Cloud SQL

Nome do operadorVersão apache-airflow-providers-google necessária
CloudSQLExportInstanceOperator 10.3.0

Operadores do Dataflow

Nome do operadorVersão apache-airflow-providers-google necessária
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0

Operadores do Cloud Data Fusion

Nome do operadorVersão apache-airflow-providers-google necessária
CloudDataFusionStartPipelineOperator 8.9.0

Operadores do Google Kubernetes Engine

Nome do operadorVersão apache-airflow-providers-google necessária
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0

Operadores da AI Platform

Nome do operadorVersão apache-airflow-providers-google necessária
MLEngineStartTrainingJobOperator 8.9.0

Usar operadores adiáveis nos DAGs

Uma convenção comum para todos os operadores do Google Cloud é ativar o modo adiável com o parâmetro booleano deferrable. Se um operador do Google Cloud não tiver esse parâmetro, não poderá ser executado no modo adiável. Outros operadores podem ter uma convenção diferente. Por exemplo, alguns operadores da comunidade têm uma classe separada com o sufixo Async no nome.

O DAG de exemplo a seguir usa o operador DataprocSubmitJobOperator no modo adiável:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Ver registros do engatilhador

O engatilhador gera registros que estão disponíveis com registros de outros componentes do ambiente. Para mais informações sobre como visualizar os registros do ambiente, consulte Ver registros.

Monitorar acionador

Para mais informações sobre como monitorar o componente do acionador, consulte Métricas do Airflow.

Além de monitorar o engatilhador, é possível verificar o número de tarefas adiadas nas métricas Unfinished Task no painel do Monitoring do seu ambiente.

A seguir