Use operadores adiáveis em DAGs do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página explica como ativar o suporte para operadores adiáveis no seu ambiente e usar operadores adiáveis nos seus DAGs. Google Cloud

Acerca dos operadores adiáveis no Cloud Composer

Se tiver, pelo menos, uma instância de acionador (ou, pelo menos, duas em ambientes altamente resilientes), pode usar operadores e acionadores adiáveis nos seus DAGs.

Para os operadores adiáveis, o Airflow divide a execução de tarefas nas seguintes fases:

  1. Inicie a operação. Nesta fase, a tarefa ocupa um espaço de trabalho do Airflow. A tarefa executa uma operação que delega o trabalho a um serviço diferente.

    Por exemplo, a execução de uma tarefa do BigQuery pode demorar alguns segundos a várias horas. Após a criação da tarefa, a operação transmite o identificador da tarefa (ID da tarefa do BigQuery) a um acionador do Airflow.

  2. O acionador monitoriza a tarefa até esta terminar. Nesta fase, não é ocupado nenhum espaço de trabalho. O acionador do Airflow tem uma arquitetura assíncrona e é capaz de processar centenas de tarefas deste tipo. Quando o acionador deteta que a tarefa está concluída, envia um evento que aciona a última fase.

  3. Na última fase, um trabalhador do Airflow executa um callback. Este callback, por exemplo, pode marcar a tarefa como bem-sucedida ou executar outra operação e definir a tarefa para ser monitorizada novamente pelo acionador.

O acionador não tem estado e, por isso, é resiliente a interrupções ou reinícios. Por este motivo, as tarefas de longa duração são resilientes aos reinícios de pods, a menos que o reinício ocorra durante a última fase, que se espera que seja curta.

Antes de começar

  • No Cloud Composer 2, os operadores e os sensores adiáveis requerem o seguinte:

    • Versão 2.0.31 e posteriores do Cloud Composer
    • Airflow 2.2.5, 2.3.3 e versões posteriores

Ative o suporte para operadores diferíveis

Um componente do ambiente denominado Airflow triggerer monitoriza de forma assíncrona todas as tarefas adiadas no seu ambiente. Após a conclusão de uma operação adiada de uma tarefa deste tipo, o acionador passa a tarefa a um trabalhador do Airflow.

Precisa de, pelo menos, uma instância de acionador no seu ambiente (ou, pelo menos, duas em ambientes altamente resilientes) para usar o modo adiável nos seus DAGs. Pode configurar os acionadores quando cria um ambiente ou ajustar o número de acionadores e os parâmetros de desempenho de um ambiente existente.

Google Cloud operadores que suportam o modo adiável

Apenas alguns operadores do Airflow foram expandidos para suportar o modelo adiável. A lista seguinte é uma referência para os operadores no pacote apache-airflow-providers-google que suportam o modo adiável. A coluna com a versão mínima do pacote apache-airflow-providers-google necessária representa a versão mais antiga do pacote em que esse operador suporta o modo adiável.

Operadores do BigQuery

Nome do operador Versão apache-airflow-providers-google obrigatória
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Operadores do Serviço de transferência de dados do BigQuery

Nome do operador Versão apache-airflow-providers-google obrigatória
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Operadores de lote

Nome do operador Versão apache-airflow-providers-google obrigatória
CloudBatchSubmitJobOperator 10.7.0

Operadores do Cloud Build

Nome do operador Versão apache-airflow-providers-google obrigatória
CloudBuildCreateBuildOperator 8.7.0

Operadores do Cloud Composer

Nome do operador Versão apache-airflow-providers-google obrigatória
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Operadores do Cloud Run

Nome do operador Versão apache-airflow-providers-google obrigatória
CloudRunExecuteJobOperator 10.7.0

Operadores do Cloud SQL

Nome do operador Versão apache-airflow-providers-google obrigatória
CloudSQLExportInstanceOperator 10.3.0

Operadores do Serviço de transferência de armazenamento

Nome do operador Versão apache-airflow-providers-google obrigatória
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Operadores do Dataflow

Nome do operador Versão apache-airflow-providers-google obrigatória
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Operadores do Cloud Data Fusion

Nome do operador Versão apache-airflow-providers-google obrigatória
CloudDataFusionStartPipelineOperator 8.9.0

Operadores do Dataplex Universal Catalog

Nome do operador Versão apache-airflow-providers-google obrigatória
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Operadores do Google Kubernetes Engine

Nome do operador Versão apache-airflow-providers-google obrigatória
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Operadores do Pub/Sub

Nome do operador Versão apache-airflow-providers-google obrigatória
PubSubPullOperator 14.0.0

Operadores da AI Platform

Nome do operador Versão apache-airflow-providers-google obrigatória
MLEngineStartTrainingJobOperator 8.9.0

Use operadores adiáveis nos seus DAGs

Uma convenção comum para todos os operadores Google Cloud é ativar o modo adiável com o parâmetro booleano deferrable. Se um operador não tiver este parâmetro, não pode ser executado no modo adiável. Google CloudOutros operadores podem ter uma convenção diferente. Por exemplo, alguns operadores da comunidade têm uma classe separada com o sufixo Async no nome.

O DAG de exemplo seguinte usa o operador DataprocSubmitJobOperator no modo diferível:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Veja os registos do acionador

O acionador gera registos que estão disponíveis juntamente com os registos de outros componentes do ambiente. Para mais informações sobre como ver os registos do seu ambiente, consulte o artigo Ver registos.

Monitorize o acionador

Para mais informações sobre a monitorização do componente de acionamento, consulte o artigo Métricas do Airflow.

Além de monitorizar o acionador, pode verificar o número de tarefas adiadas nas métricas de tarefas inacabadas no painel de controlo de monitorização do seu ambiente.

O que se segue?