Usa operadores diferibles en DAG de Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se explica cómo habilitar la compatibilidad con los operadores diferibles en tu entorno y cómo usar operadores diferibles Google Cloud en tus DAGs.

Acerca de los operadores aplazables en Cloud Composer

Si tienes al menos una instancia de activador (o al menos dos en entornos altamente resilientes), puedes usar operadores y activadores diferibles en tus DAG.

En el caso de los operadores aplazables, Airflow divide la ejecución de la tarea en las siguientes etapas:

  1. Inicia la operación. En esta etapa, la tarea ocupa una ranura de trabajador de Airflow. La tarea realiza una operación que delega el trabajo a otro servicio.

    Por ejemplo, ejecutar un trabajo de BigQuery puede tardar desde unos segundos hasta varias horas. Después de crear el trabajo, la operación pasa el identificador del trabajo (ID de trabajo de BigQuery) a un activador de Airflow.

  2. El activador supervisa el trabajo hasta que finaliza. En esta etapa, no se ocupa ninguna ranura de trabajador. El supervisor asíncrono de Airflow tiene una arquitectura asíncrona y es capaz de controlar cientos de trabajos de este tipo. Cuando el activador detecta que el trabajo finalizó, envía un evento que activa la última etapa.

  3. En la última etapa, un trabajador de Airflow ejecuta una devolución de llamada. Por ejemplo, esta devolución de llamada puede marcar la tarea como exitosa o ejecutar otra operación y configurar el trabajo para que el activador lo supervise nuevamente.

El activador no tiene estado y, por lo tanto, es resistente a las interrupciones o los reinicios. Por este motivo, los trabajos de larga duración son resistentes a los reinicios de Pods, a menos que el reinicio ocurra durante la última etapa, que se espera que sea breve.

Antes de comenzar

Habilita la compatibilidad con operadores aplazables

Un componente del entorno llamado supervisor asíncrono de Airflow supervisa de forma asíncrona todas las tareas diferidas en tu entorno. Una vez que se completa una operación diferida de una tarea de este tipo, el activador pasa la tarea a un trabajador de Airflow.

Necesitas al menos una instancia de activador en tu entorno (o al menos dos en entornos altamente resilientes) para usar el modo aplazable en tus DAGs. Puedes configurar los activadores cuando creas un entorno o ajustar la cantidad de activadores y los parámetros de rendimiento de un entorno existente.

OperadoresGoogle Cloud que admiten el modo aplazable

Solo algunos operadores de Airflow se extendieron para admitir el modelo aplazable. La siguiente lista es una referencia para los operadores del paquete apache-airflow-providers-google que admiten el modo aplazable. La columna con la versión mínima requerida del paquete apache-airflow-providers-google representa la versión más antigua del paquete en la que ese operador admite el modo aplazable.

Operadores de BigQuery

Nombre del operador Versión de apache-airflow-providers-google requerida
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Operadores del Servicio de transferencia de datos de BigQuery

Nombre del operador Versión de apache-airflow-providers-google requerida
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Operadores de lotes

Nombre del operador Versión de apache-airflow-providers-google requerida
CloudBatchSubmitJobOperator 10.7.0

Operadores de Cloud Build

Nombre del operador Versión de apache-airflow-providers-google requerida
CloudBuildCreateBuildOperator 8.7.0

Operadores de Cloud Composer

Nombre del operador Versión de apache-airflow-providers-google requerida
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Operadores de Cloud Run

Nombre del operador Versión de apache-airflow-providers-google requerida
CloudRunExecuteJobOperator 10.7.0

Operadores de Cloud SQL

Nombre del operador Versión de apache-airflow-providers-google requerida
CloudSQLExportInstanceOperator 10.3.0

Operadores del Servicio de transferencia de almacenamiento

Nombre del operador Versión de apache-airflow-providers-google requerida
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Operadores de Dataflow

Nombre del operador Versión de apache-airflow-providers-google requerida
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Operadores de Cloud Data Fusion

Nombre del operador Versión de apache-airflow-providers-google requerida
CloudDataFusionStartPipelineOperator 8.9.0

Operadores de Dataplex Universal Catalog

Nombre del operador Versión de apache-airflow-providers-google requerida
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Operadores de Google Kubernetes Engine

Nombre del operador Versión de apache-airflow-providers-google requerida
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Operadores de Pub/Sub

Nombre del operador Versión de apache-airflow-providers-google requerida
PubSubPullOperator 14.0.0

Operadores de AI Platform

Nombre del operador Versión de apache-airflow-providers-google requerida
MLEngineStartTrainingJobOperator 8.9.0

Usa operadores diferibles en tus DAG

Una convención común para todos los operadores de Google Cloud es habilitar el modo aplazable con el parámetro booleano deferrable. Si un operador Google Cloudno tiene este parámetro, no puede ejecutarse en el modo aplazable. Otros operadores pueden tener una convención diferente. Por ejemplo, algunos operadores de la comunidad tienen una clase separada con el sufijo Async en el nombre.

En el siguiente DAG de ejemplo, se usa el operador DataprocSubmitJobOperator en el modo aplazable:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Cómo ver los registros del activador

El activador genera registros que están disponibles junto con los registros de otros componentes del entorno. Para obtener más información sobre cómo ver los registros de tu entorno, consulta Visualiza los registros.

Activador de supervisión

Para obtener más información sobre la supervisión del componente de activador, consulta Métricas de Airflow.

Además de supervisar el activador, puedes verificar la cantidad de tareas diferidas en las métricas de Unfinished Task del panel de supervisión de tu entorno.

¿Qué sigue?