Utiliser des opérateurs différables dans les DAG Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page explique comment activer la compatibilité avec les opérateurs différables dans votre environnement et comment utiliser les opérateurs différables dans vos DAG. Google Cloud

À propos des opérateurs différables dans Cloud Composer

Si vous disposez d'au moins une instance de déclencheur (ou au moins deux dans les environnements très résilients), vous pouvez utiliser les opérateurs et déclencheurs différables dans vos DAG.

Pour les opérateurs différables, Airflow divise l'exécution des tâches en plusieurs étapes :

  1. Démarrez l'opération. À ce stade, la tâche occupe un emplacement de nœud de calcul Airflow. La tâche effectue une opération qui délègue le job à un autre service.

    Par exemple, l'exécution d'un job BigQuery peut prendre de quelques secondes à plusieurs heures. Après avoir créé le job, l'opération transmet l'identifiant de travail (ID de job BigQuery) à un déclencheur Airflow.

  2. Le déclencheur surveille le job jusqu'à ce qu'il se termine. À cette étape, un emplacement de worker n'est pas occupé. Le déclencheur Airflow possède une architecture asynchrone et peut gérer des centaines de tâches de ce type. Lorsque le déclencheur détecte que le job est terminé, il envoie un événement qui déclenche la dernière étape.

  3. Lors de la dernière étape, un nœud de calcul Airflow exécute un rappel. Ce rappel peut, par exemple, marquer la tâche comme réussie, ou exécuter une autre opération et définir à nouveau le job comme devant être surveillé par le déclencheur.

Le déclencheur est sans état et résiste donc aux interruptions ou aux redémarrages. Par conséquent, les jobs de longue durée sont résilients aux redémarrages de pods, sauf si le redémarrage a lieu lors de la dernière étape, qui est censée être courte.

Avant de commencer

Activer la compatibilité avec les opérateurs différables

Un composant d'environnement appelé déclencheur Airflow surveille de manière asynchrone toutes les tâches différées de votre environnement. Une fois une opération différée de ce type de tâche terminée, le déclencheur transmet la tâche à un nœud de calcul Airflow.

Vous avez besoin d'au moins une instance de déclencheur dans votre environnement (ou au moins deux dans les environnements très résilients) pour utiliser le mode différable dans vos DAG. Vous pouvez configurer les déclencheurs lorsque vous créez un environnement ou ajuster le nombre de déclencheurs et les paramètres de performances pour un environnement existant.

OpérateursGoogle Cloud compatibles avec le mode différable

Seuls certains opérateurs Airflow ont été étendus pour prendre en charge le modèle différable. La liste suivante sert de référence pour les opérateurs du package apache-airflow-providers-google qui sont compatibles avec le mode différable. La colonne indiquant la version minimale requise du package apache-airflow-providers-google représente la version la plus ancienne du package dans laquelle cet opérateur est compatible avec le mode différable.

Opérateurs BigQuery

Nom de l'opérateur Version apache-airflow-providers-google requise
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Opérateurs du service de transfert de données BigQuery

Nom de l'opérateur Version apache-airflow-providers-google requise
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Opérateurs de lot

Nom de l'opérateur Version apache-airflow-providers-google requise
CloudBatchSubmitJobOperator 10.7.0

Opérateurs Cloud Build

Nom de l'opérateur Version apache-airflow-providers-google requise
CloudBuildCreateBuildOperator 8.7.0

Opérateurs Cloud Composer

Nom de l'opérateur Version apache-airflow-providers-google requise
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Opérateurs Cloud Run

Nom de l'opérateur Version apache-airflow-providers-google requise
CloudRunExecuteJobOperator 10.7.0

Opérateurs Cloud SQL

Nom de l'opérateur Version apache-airflow-providers-google requise
CloudSQLExportInstanceOperator 10.3.0

Opérateurs du service de transfert de stockage

Nom de l'opérateur Version apache-airflow-providers-google requise
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Opérateurs Dataflow

Nom de l'opérateur Version apache-airflow-providers-google requise
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Opérateurs Cloud Data Fusion

Nom de l'opérateur Version apache-airflow-providers-google requise
CloudDataFusionStartPipelineOperator 8.9.0

Opérateurs Dataplex Universal Catalog

Nom de l'opérateur Version apache-airflow-providers-google requise
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Opérateurs Google Kubernetes Engine

Nom de l'opérateur Version apache-airflow-providers-google requise
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Opérateurs Pub/Sub

Nom de l'opérateur Version apache-airflow-providers-google requise
PubSubPullOperator 14.0.0

Opérateurs AI Platform

Nom de l'opérateur Version apache-airflow-providers-google requise
MLEngineStartTrainingJobOperator 8.9.0

Utiliser des opérateurs différables dans vos DAG

Une convention courante pour tous les opérateurs Google Cloud consiste à activer le mode différable avec le paramètre booléendeferrable. Si un opérateur Google Cloudne dispose pas de ce paramètre, il ne peut pas s'exécuter en mode différable. D'autres opérateurs peuvent avoir une convention différente. Par exemple, certains opérateurs de la communauté ont une classe distincte avec le suffixe Async dans le nom.

L'exemple de DAG suivant utilise l'opérateur DataprocSubmitJobOperator en mode différable :

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Afficher les journaux du déclencheur

Le déclencheur génère des journaux qui sont disponibles avec les journaux des autres composants de l'environnement. Pour en savoir plus sur l'affichage des journaux de votre environnement, consultez Afficher les journaux.

Déclencheur de surveillance

Pour en savoir plus sur la surveillance du composant de déclencheur, consultez Métriques Airflow.

En plus de surveiller le déclencheur, vous pouvez vérifier le nombre de tâches différées dans les métriques Tâche non terminée du tableau de bord "Surveillance" de votre environnement.

Étapes suivantes