Écrire des DAG Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Ce guide explique comment écrire un graphe orienté acyclique (DAG) Apache Airflow exécuté dans un environnement Cloud Composer.

Étant donné qu'Apache Airflow n'offre pas une forte isolation des DAG et des tâches, nous vous recommandons d'utiliser des environnements de production et de test distincts afin d'éviter les interférences au niveau des DAG. Pour en savoir plus, reportez-vous à la section Tester les DAG.

Structurer un DAG Airflow

Un DAG Airflow est défini dans un fichier Python et comprend les composants suivants:

  • Définition du DAG
  • Opérateurs Airflow
  • Relations avec les opérateurs

Les extraits de code suivants illustrent des exemples de chaque composant hors contexte.

Une définition de DAG

L'exemple suivant illustre une définition de DAG Airflow:

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Opérateurs et tâches

Les opérateurs Airflow décrivent le travail à effectuer. Une tâche est une instance spécifique d'un opérateur.

Airflow 2

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Relations entre les tâches

Les relations entre les tâches décrivent l'ordre dans lequel le travail doit être effectué.

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Exemple de workflow DAG complet en Python

Le workflow suivant est un modèle de DAG fonctionnel complet composé de deux tâches: une tâche hello_python et une tâche goodbye_bash:

Airflow 2


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Pour en savoir plus sur la définition des DAG Airflow, consultez le tutoriel Airflow et les concepts Airflow.

Opérateurs Airflow

Les exemples suivants présentent quelques opérateurs Airflow connus. Pour obtenir une documentation de référence faisant autorité sur les opérateurs Airflow, consultez la documentation de référence sur les opérateurs et les hooks et l'index des fournisseurs.

BashOperator

Utilisez BashOperator pour exécuter des programmes de ligne de commande.

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer exécute les commandes fournies dans un script Bash sur un nœud de calcul Airflow. Le nœud de calcul est un conteneur Docker basé sur Debian et comprend plusieurs packages.

PythonOperator

Utilisez PythonOperator pour exécuter du code Python arbitraire.

Cloud Composer exécute le code Python dans un conteneur qui inclut des packages pour la version d'image Cloud Composer utilisée dans votre environnement.

Pour installer des packages Python supplémentaires, reportez-vous à la section Installer des dépendances Python.

OpérateursGoogle Cloud

Pour exécuter des tâches qui utilisent des produits Google Cloud , utilisez les opérateurs AirflowGoogle Cloud . Par exemple, les opérateurs BigQuery interrogent et traitent les données dans BigQuery.

Il existe de nombreux autres opérateurs Airflow pour Google Cloud et les services individuels fournis par Google Cloud. Pour obtenir la liste complète, consultez la section OpérateursGoogle Cloud .

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

Utilisez EmailOperator pour envoyer un e-mail à partir d'un DAG. Pour envoyer des e-mails à partir d'un environnement Cloud Composer, configurez votre environnement pour qu'il utilise SendGrid.

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Notifications en cas d'échec de l'opérateur

Définissez email_on_failure sur True pour envoyer une notification par e-mail lorsqu'un opérateur du DAG échoue. Pour envoyer des notifications par e-mail à partir d'un environnement Cloud Composer, vous devez configurer votre environnement pour qu'il utilise SendGrid.

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Consignes concernant les workflows DAG

  • Placez toutes les bibliothèques Python personnalisées dans l'archive ZIP des DAG, dans un répertoire imbriqué. Ne placez pas de bibliothèques au niveau supérieur du répertoire des DAG.

    Lorsque Airflow analyse le dossier dags/, il ne recherche que les DAG des modules Python situés au niveau supérieur du dossier des DAG et au niveau supérieur d'une archive ZIP également située au niveau supérieur du dossier dags/. Si Airflow rencontre un module Python dans une archive ZIP ne contenant pas les deux sous-chaînes airflow et DAG, il cesse de traiter l'archive ZIP. Airflow ne renvoie que les DAG trouvés jusqu'à ce point.

  • Utilisez Airflow 2 au lieu d'Airflow 1.

    La communauté Airflow ne publie plus de nouvelles versions mineures ni de correctifs pour Airflow 1.

  • Pour la tolérance aux pannes, ne définissez pas plusieurs objets DAG dans le même module Python.

  • N'utilisez pas de subDAG. À la place, regroupez les tâches dans des DAG.

  • Placez les fichiers requis au moment de l'analyse du DAG dans le dossier dags/, et non dans le dossier data/.

  • Implémentez des tests unitaires pour vos DAG.

  • Testez les DAG développés ou modifiés comme recommandé dans les instructions de test des DAG.

  • Vérifiez que les DAG développés n'augmentent pas trop les durées d'analyse des DAG.

  • Les tâches de flux d'air peuvent échouer pour plusieurs raisons. Pour éviter les échecs de l'exécution complète d'un DAG, nous vous recommandons d'activer les nouvelles tentatives de tâches. Si vous définissez le nombre maximal de tentatives sur 0, aucune tentative n'est effectuée.

    Nous vous recommandons d'ignorer l'option default_task_retries en définissant une valeur pour les nouvelles tentatives de la tâche autre que 0. En outre, vous pouvez définir le paramètre retries au niveau de la tâche.

  • Si vous souhaitez utiliser des GPU dans vos tâches Airflow, créez un cluster GKE distinct basé sur des nœuds utilisant des machines avec des GPU. Utilisez GKEStartPodOperator pour exécuter vos tâches.

  • Évitez d'exécuter des tâches gourmandes en processeur et en mémoire dans le pool de nœuds du cluster où d'autres composants Airflow (programmeurs, nœuds de calcul, serveurs Web) s'exécutent. Utilisez plutôt KubernetesPodOperator ou GKEStartPodOperator.

  • Lorsque vous déployez des DAG dans un environnement, n'importez dans le dossier /dags que les fichiers absolument nécessaires à l'interprétation et à l'exécution des DAG.

  • Limitez le nombre de fichiers DAG dans le dossier /dags.

    Airflow analyse en continu les DAG du dossier /dags. L'analyse est un processus qui parcourt le dossier DAG et le nombre de fichiers à charger (avec leurs dépendances) a un impact sur les performances de l'analyse des DAG et de la planification des tâches. Il est beaucoup plus efficace d'utiliser 100 fichiers avec 100 DAG chacun que 10 000 fichiers avec un DAG chacun. Par conséquent, une telle optimisation est recommandée. Cette optimisation est un équilibre entre le temps d'analyse et l'efficacité de la création et de la gestion des DAG.

    Par exemple, si vous souhaitez déployer 10 000 fichiers DAG, vous pouvez créer 100 fichiers ZIP contenant chacun 100 fichiers DAG.

    En plus des conseils ci-dessus, si vous disposez de plus de 10 000 fichiers DAG, la génération de DAG de manière programmatique peut être une bonne option. Par exemple, vous pouvez implémenter un seul fichier DAG Python qui génère un certain nombre d'objets DAG (par exemple, 20 ou 100 objets DAG).

  • Évitez d'utiliser des opérateurs Airflow obsolètes. Utilisez plutôt leurs alternatives à jour.

Questions fréquentes sur l'écriture des DAG

Comment réduire la répétition du code lorsque j'exécute des tâches identiques ou semblables dans plusieurs DAG ?

Nous vous recommandons de définir des bibliothèques et des wrappers afin de réduire la répétition du code.

Comment réutiliser le code à travers plusieurs fichiers DAG ?

Stockez les fonctions utilitaires dans une bibliothèque Python locale, puis importez-les. Vous pouvez référencer les fonctions de n'importe quel DAG situé dans le dossier dags/ du bucket de votre environnement.

Comment minimiser le risque d'apparition de définitions différentes ?

Imaginons que deux équipes souhaitent agréger des données brutes en métriques de revenus. Les équipes rédigent deux tâches légèrement différentes pour effectuer la même opération. Définissez les bibliothèques pour qu'elles fonctionnent avec les données sur les revenus. Ainsi, les développeurs des DAG doivent clarifier la définition des revenus en cours d'agrégation.

Comment définir des dépendances entre les DAG ?

Tout dépend de la manière dont vous souhaitez définir la dépendance.

Si vous avez deux DAG (DAG A et DAG B) et que vous souhaitez que le DAG B se déclenche après le DAG A, vous pouvez placer un TriggerDagRunOperator à la fin du DAG A.

Si le DAG B ne dépend que d'un artefact généré par le DAG A, tel qu'un message Pub/Sub, il est possible qu'un capteur fonctionne mieux.

Si le DAG B est étroitement intégré au DAG A, vous pourrez peut-être fusionner les deux DAG en un seul.

Comment transférer des ID d'exécution uniques à un DAG et à ses tâches ?

Imaginons que vous souhaitiez transférer les noms de cluster Dataproc et les chemins des fichiers.

Vous pouvez générer un ID aléatoire unique en renvoyant str(uuid.uuid4()) dans un opérateur PythonOperator. L'ID est alors placé dans XComs afin que vous puissiez y faire référence dans d'autres opérateurs via des champs modélisés.

Avant de générer un uuid, essayez de déterminer si un ID propre à DagRun serait plus utile. Vous pouvez également référencer ces ID dans les substitutions Jinja à l'aide de macros.

Comment séparer les tâches dans un DAG ?

Chaque tâche doit constituer une unité de travail idempotente. Par conséquent, évitez d'encapsuler un workflow en plusieurs étapes au sein d'une tâche unique, comme un programme complexe exécuté dans un opérateur PythonOperator.

Faut-il définir plusieurs tâches dans un même DAG pour agréger des données provenant de plusieurs sources ?

Par exemple, vous avez plusieurs tables avec des données brutes et vous souhaitez créer des agrégats quotidiens pour chaque table. Les tâches ne dépendent pas les unes des autres. Faut-il créer une tâche et un DAG pour chaque table ou créer un DAG général ?

Si vous acceptez que chaque tâche partage les mêmes propriétés au niveau du DAG, telles que schedule_interval, il est judicieux de définir plusieurs tâches dans un même DAG. Sinon, pour réduire la répétition du code, plusieurs DAG peuvent être générés à partir d'un seul module Python en les plaçant dans les globals() du module.

Comment limiter le nombre de tâches simultanées en cours d'exécution dans un DAG ?

Par exemple, vous souhaitez éviter de dépasser les limites/quotas d'utilisation de l'API ou éviter d'exécuter trop de processus simultanés.

Vous pouvez définir des pools Airflow dans l'interface utilisateur Web d'Airflow et associer des tâches à des pools existants dans vos DAG.

Questions fréquentes sur l'utilisation des opérateurs

Faut-il utiliser l'opérateur DockerOperator ?

Nous vous déconseillons d'utiliser l'opérateur DockerOperator, sauf s'il est utilisé pour lancer des conteneurs sur une installation Docker distante (et non dans un cluster d'environnement). Dans un environnement Cloud Composer, l'opérateur n'a pas accès aux daemons Docker.

Utilisez plutôt KubernetesPodOperator ou GKEStartPodOperator. Ces opérateurs lancent des pods Kubernetes dans des clusters Kubernetes ou GKE, respectivement. Notez qu'il est déconseillé de lancer des pods dans le cluster d'un environnement, car cela peut entraîner une concurrence des ressources.

Faut-il utiliser l'opérateur SubDagOperator ?

Nous vous déconseillons d'utiliser l'opérateur SubDagOperator.

Utilisez des alternatives, comme suggéré dans la section Regrouper des tâches.

Faut-il exécuter du code Python uniquement dans des opérateurs PythonOperators pour séparer complètement les opérateurs Python ?

En fonction de votre objectif, plusieurs options s'offrent à vous.

Si votre seule préoccupation est de conserver des dépendances Python distinctes, vous pouvez utiliser l'opérateur PythonVirtualenvOperator.

Envisagez d'utiliser KubernetesPodOperator. Cet opérateur vous permet de définir des pods Kubernetes et d'exécuter les pods dans d'autres clusters.

Comment ajouter des packages binaires personnalisés ou non PyPI ?

Vous pouvez installer des packages hébergés dans des dépôts de packages privés.

Comment transmettre uniformément des arguments à un DAG et à ses tâches ?

La compatibilité avec Airflow intégrée à la modélisation Jinja permet de transmettre des arguments pouvant être utilisés dans des champs modélisés.

Quand se produit la substitution de modèle ?

La substitution de modèle se produit sur les nœuds de calcul Airflow juste avant l'appel de la fonction pre_execute d'un opérateur. En pratique, cela signifie que les modèles ne sont remplacés que juste avant l'exécution d'une tâche.

Comment connaître les arguments d'opérateur compatibles avec la substitution de modèle ?

Les arguments d'opérateur compatibles avec la substitution de modèle Jinja2 sont explicitement marqués comme tels.

Recherchez le champ template_fields dans la définition de l'opérateur, qui contient la liste des noms d'arguments qui subiront une substitution de modèle.

Par exemple, consultez la page sur l'opérateur BashOperator, qui est compatible avec la modélisation pour les arguments bash_command et env.

Opérateurs Airflow obsolètes et supprimés

Les opérateurs Airflow listés dans le tableau suivant sont obsolètes:

  • Évitez d'utiliser ces opérateurs dans vos DAG. Utilisez plutôt les opérateurs de remplacement à jour fournis.

  • Si un opérateur est listé comme disponible, cela signifie que la dernière version de maintenance de Cloud Composer (1.20.12) est toujours disponible.

  • Certains opérateurs de remplacement ne sont compatibles avec aucune version de Cloud Composer 1. Pour les utiliser, envisagez de passer à Cloud Composer 3 ou à Cloud Composer 2.

Opérateur obsolète État Opérateur de remplacement Remplacement disponible chez
CreateAutoMLTextTrainingJobOperator Disponible dans la version 1.20.12 SupervisedFineTuningTrainOperator Opérateur de remplacement non disponible
GKEDeploymentHook Disponible dans la version 1.20.12 GKEKubernetesHook Opérateur de remplacement non disponible
GKECustomResourceHook Disponible dans la version 1.20.12 GKEKubernetesHook Opérateur de remplacement non disponible
GKEPodHook Disponible dans la version 1.20.12 GKEKubernetesHook Opérateur de remplacement non disponible
GKEJobHook Disponible dans la version 1.20.12 GKEKubernetesHook Opérateur de remplacement non disponible
GKEPodAsyncHook Disponible dans la version 1.20.12 GKEKubernetesAsyncHook Opérateur de remplacement non disponible
SecretsManagerHook Disponible dans la version 1.20.12 GoogleCloudSecretManagerHook Opérateur de remplacement non disponible
BigQueryExecuteQueryOperator Disponible dans la version 1.20.12 BigQueryInsertJobOperator Disponible dans la version 1.20.12
BigQueryPatchDatasetOperator Disponible dans la version 1.20.12 BigQueryUpdateDatasetOperator Disponible dans la version 1.20.12
DataflowCreateJavaJobOperator Disponible dans la version 1.20.12 beam.BeamRunJavaPipelineOperator Disponible dans la version 1.20.12
DataflowCreatePythonJobOperator Disponible dans la version 1.20.12 beam.BeamRunPythonPipelineOperator Disponible dans la version 1.20.12
DataprocSubmitPigJobOperator Disponible dans la version 1.20.12 DataprocSubmitJobOperator Disponible dans la version 1.20.12
DataprocSubmitHiveJobOperator Disponible dans la version 1.20.12 DataprocSubmitJobOperator Disponible dans la version 1.20.12
DataprocSubmitSparkSqlJobOperator Disponible dans la version 1.20.12 DataprocSubmitJobOperator Disponible dans la version 1.20.12
DataprocSubmitSparkJobOperator Disponible dans la version 1.20.12 DataprocSubmitJobOperator Disponible dans la version 1.20.12
DataprocSubmitHadoopJobOperator Disponible dans la version 1.20.12 DataprocSubmitJobOperator Disponible dans la version 1.20.12
DataprocSubmitPySparkJobOperator Disponible dans la version 1.20.12 DataprocSubmitJobOperator Disponible dans la version 1.20.12
BigQueryTableExistenceAsyncSensor Disponible dans la version 1.20.12 BigQueryTableExistenceSensor Opérateur de remplacement non disponible
BigQueryTableExistencePartitionAsyncSensor Disponible dans la version 1.20.12 BigQueryTablePartitionExistenceSensor Opérateur de remplacement non disponible
CloudComposerEnvironmentSensor Disponible dans la version 1.20.12 CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator, CloudComposerUpdateEnvironmentOperator Opérateur de remplacement non disponible
GCSObjectExistenceAsyncSensor Disponible dans la version 1.20.12 GCSObjectExistenceSensor Opérateur de remplacement non disponible
GoogleAnalyticsHook Disponible dans la version 1.20.12 GoogleAnalyticsAdminHook Opérateur de remplacement non disponible
GoogleAnalyticsListAccountsOperator Disponible dans la version 1.20.12 GoogleAnalyticsAdminListAccountsOperator Opérateur de remplacement non disponible
GoogleAnalyticsGetAdsLinkOperator Disponible dans la version 1.20.12 GoogleAnalyticsAdminGetGoogleAdsLinkOperator Opérateur de remplacement non disponible
GoogleAnalyticsRetrieveAdsLinksListOperator Disponible dans la version 1.20.12 GoogleAnalyticsAdminListGoogleAdsLinksOperator Opérateur de remplacement non disponible
GoogleAnalyticsDataImportUploadOperator Disponible dans la version 1.20.12 GoogleAnalyticsAdminCreateDataStreamOperator Opérateur de remplacement non disponible
GoogleAnalyticsDeletePreviousDataUploadsOperator Disponible dans la version 1.20.12 GoogleAnalyticsAdminDeleteDataStreamOperator Opérateur de remplacement non disponible
DataPipelineHook Disponible dans la version 1.20.12 DataflowHook Opérateur de remplacement non disponible
CreateDataPipelineOperator Disponible dans la version 1.20.12 DataflowCreatePipelineOperator Opérateur de remplacement non disponible
RunDataPipelineOperator Disponible dans la version 1.20.12 DataflowRunPipelineOperator Opérateur de remplacement non disponible
AutoMLDatasetLink Disponible dans la version 1.20.12 TranslationLegacyDatasetLink Opérateur de remplacement non disponible
AutoMLDatasetListLink Disponible dans la version 1.20.12 TranslationDatasetListLink Opérateur de remplacement non disponible
AutoMLModelLink Disponible dans la version 1.20.12 TranslationLegacyModelLink Opérateur de remplacement non disponible
AutoMLModelTrainLink Disponible dans la version 1.20.12 TranslationLegacyModelTrainLink Opérateur de remplacement non disponible
AutoMLModelPredictLink Disponible dans la version 1.20.12 TranslationLegacyModelPredictLink Opérateur de remplacement non disponible
AutoMLBatchPredictOperator Disponible dans la version 1.20.12 vertex_ai.batch_prediction_job Opérateur de remplacement non disponible
AutoMLPredictOperator Disponible dans la version 1.20.12 vertex_aigenerative_model. TextGenerationModelPredictOperator, translate.TranslateTextOperator Opérateur de remplacement non disponible
PromptLanguageModelOperator Disponible dans la version 1.20.12 TextGenerationModelPredictOperator Opérateur de remplacement non disponible
GenerateTextEmbeddingsOperator Disponible dans la version 1.20.12 TextEmbeddingModelGetEmbeddingsOperator Opérateur de remplacement non disponible
PromptMultimodalModelOperator Disponible dans la version 1.20.12 GenerativeModelGenerateContentOperator Opérateur de remplacement non disponible
PromptMultimodalModelWithMediaOperator Disponible dans la version 1.20.12 GenerativeModelGenerateContentOperator Opérateur de remplacement non disponible
DataflowStartSqlJobOperator Disponible dans la version 1.20.12 DataflowStartYamlJobOperator Opérateur de remplacement non disponible
LifeSciencesHook Disponible dans la version 1.20.12 Crochet des opérateurs Google Cloud Batch À venir
DataprocScaleClusterOperator Disponible dans la version 1.20.12 DataprocUpdateClusterOperator À venir
MLEngineStartBatchPredictionJobOperator Disponible dans la version 1.20.12 CreateBatchPredictionJobOperator À venir
MLEngineManageModelOperator Disponible dans la version 1.20.12 MLEngineCreateModelOperator, MLEngineGetModelOperator À venir
MLEngineGetModelOperator Disponible dans la version 1.20.12 GetModelOperator À venir
MLEngineDeleteModelOperator Disponible dans la version 1.20.12 DeleteModelOperator À venir
MLEngineManageVersionOperator Disponible dans la version 1.20.12 MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion À venir
MLEngineCreateVersionOperator Disponible dans la version 1.20.12 Paramètre parent_model pour les opérateurs VertexAI À venir
MLEngineSetDefaultVersionOperator Disponible dans la version 1.20.12 SetDefaultVersionOnModelOperator À venir
MLEngineListVersionsOperator Disponible dans la version 1.20.12 ListModelVersionsOperator À venir
MLEngineDeleteVersionOperator Disponible dans la version 1.20.12 DeleteModelVersionOperator À venir
MLEngineStartTrainingJobOperator Disponible dans la version 1.20.12 CreateCustomPythonPackageTrainingJobOperator À venir
MLEngineTrainingCancelJobOperator Disponible dans la version 1.20.12 CancelCustomTrainingJobOperator À venir
LifeSciencesRunPipelineOperator Disponible dans la version 1.20.12 Opérateurs Google Cloud Batch À venir
MLEngineCreateModelOperator Disponible dans la version 1.20.12 Opérateur Vertex AI correspondant À venir

Étape suivante