Cómo escribir DAG de Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta guía, se muestra cómo escribir un grafo acíclico dirigido (DAG) por Apache Airflow que se ejecuta en un entorno de Cloud Composer.

Debido a que Apache Airflow no proporciona un aislamiento eficaz del DAG y de las tareas, te recomendamos que uses entornos de prueba y producción separados para evitar la interferencia del DAG. Para obtener más información, consulta Probar DAG.

Estructura un DAG de Airflow

Un DAG de Airflow se define en un archivo de Python y está compuesto por los siguientes componentes:

  • Definición de DAG
  • Operadores de Airflow
  • Relaciones con operadores

En los siguientes fragmentos de código, se muestran ejemplos de cada componente fuera de contexto.

Una definición de DAG

En el siguiente ejemplo, se muestra una definición de DAG de Airflow:

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Operadores y tareas

Los operadores de Airflow describen el trabajo que se debe realizar. Una tarea es una instancia específica de un operador.

Airflow 2

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Relaciones de tareas

Las relaciones entre tareas describen el orden en el que se debe completar el trabajo.

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Ejemplo completo de flujo de trabajo de DAG en Python

El siguiente flujo de trabajo es una plantilla de DAG de trabajo completa que consta de dos tareas: una tarea hello_python y una goodbye_bash:

Airflow 2


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Para obtener más información sobre cómo definir los DAG de Airflow, consulta el instructivo de Airflow y los conceptos de Airflow.

Operadores de Airflow

En los siguientes ejemplos, se muestran algunos operadores populares de Airflow. Para obtener una referencia fidedigna de los operadores de Airflow, consulta la Referencia de operadores y hooks y el índice de proveedores.

BashOperator

Usa BashOperator para ejecutar programas de línea de comandos.

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer ejecuta los comandos proporcionados en una secuencia de comandos de Bash en un trabajador de Airflow. El trabajador es un contenedor de Docker basado en Debian que incluye varios paquetes.

PythonOperator

Usa PythonOperator para ejecutar código de Python arbitrario.

Cloud Composer ejecuta el código de Python en un contenedor que incluye paquetes para la versión de la imagen de Cloud Composer que se usa en tu entorno.

Para instalar paquetes adicionales de Python, consulta cómo instalar dependencias de Python.

OperadoresGoogle Cloud

Para ejecutar tareas que usen productos de Google Cloud , usa los Google Cloud operadores de Airflow. Por ejemplo, los operadores de BigQuery consultan y procesan datos en BigQuery.

Hay muchos más operadores de Airflow para Google Cloud y servicios individuales que proporciona Google Cloud. Consulta OperadoresGoogle Cloud para obtener la lista completa.

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

Usa el EmailOperator para enviar correos electrónicos desde un DAG. Para enviar correos electrónicos desde un entorno de Cloud Composer, configura tu entorno para usar SendGrid.

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Notificaciones sobre fallas del operador

Configura email_on_failure como True para enviar una notificación por correo electrónico cuando un operador del DAG falle. Para enviar notificaciones por correo electrónico desde un entorno de Cloud Composer, debes configurar tu entorno para usar SendGrid.

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Lineamientos del flujo de trabajo de DAG

  • Coloca cualquier biblioteca de Python personalizada en el archivo ZIP de un DAG en un directorio anidado. No coloques bibliotecas en el nivel superior del directorio de DAG.

    Cuando Airflow escanea la carpeta dags/, Airflow solo busca DAG en los módulos de Python que se encuentran en el nivel superior de la carpeta de DAG y en el nivel superior de un archivo ZIP ubicado también en la carpeta dags/ de nivel superior. Si Airflow encuentra un módulo de Python en un archivo ZIP que no contiene las substrings airflow y DAG, Airflow deja de procesar el archivo ZIP. Airflow solo muestra los DAG encontrados hasta ese momento.

  • Usa Airflow 2 en lugar de Airflow 1.

    La comunidad de Airflow ya no publica nuevas versiones secundarias ni de parches para Airflow 1.

  • Para la tolerancia a errores, no definas varios objetos DAG en el mismo módulo de Python.

  • No uses subDAG. En su lugar, agrupa las tareas dentro de los DAG.

  • Coloca los archivos que se requieren en el tiempo de análisis de DAG en la carpeta dags/, no en la carpeta data/.

  • Implementa pruebas de unidades para tus DAG.

  • Prueba los DAG desarrollados o modificados como se recomienda en las instrucciones para probar DAG.

  • Verifica que los DAG desarrollados no aumenten demasiado los tiempos de análisis de DAG.

  • Las tareas de Airflow pueden fallar por varios motivos. Para evitar fallas en las ejecuciones de DAG completas, te recomendamos que habilites los reintentos de tareas. Si estableces la cantidad máxima de reintentos en 0, no se realizará ningún reintento.

    Te recomendamos que anules la opción default_task_retries con un valor para las reintentos de tareas que no sea 0. Además, puedes establecer el parámetro retries a nivel de la tarea.

  • Si quieres usar GPU en tus tareas de Airflow, crea un clúster de GKE independiente basado en nodos que usen máquinas con GPUs. Usa GKEStartPodOperator para ejecutar tus tareas.

  • Evita ejecutar tareas que requieran mucha CPU y memoria en el grupo de nodos del clúster donde se ejecutan otros componentes de Airflow (programadores, trabajadores y servidores web). En su lugar, usa KubernetesPodOperator o GKEStartPodOperator.

  • Cuando implementes DAG en un entorno, sube solo los archivos que sean absolutamente necesarios para interpretar y ejecutar DAG en la carpeta /dags.

  • Limita la cantidad de archivos DAG en la carpeta /dags.

    Airflow analiza continuamente los DAG en la carpeta /dags. El análisis es un proceso que recorre la carpeta de DAG y la cantidad de archivos que se deben cargar (con sus dependencias) afecta el rendimiento del análisis de DAG y la programación de tareas. Es mucho más eficiente usar 100 archivos con 100 DAG cada uno que 10,000 archivos con 1 DAG cada uno, por lo que se recomienda esa optimización. Esta optimización es un equilibrio entre el tiempo de análisis y la eficiencia de la creación y administración de DAG.

    También puedes considerar, por ejemplo, que, para implementar 10,000 archivos DAG, podrías crear 100 archivos ZIP, cada uno con 100 archivos DAG.

    Además de las sugerencias anteriores, si tienes más de 10,000 archivos de DAG, generar DAGs de forma programática puede ser una buena opción. Por ejemplo, puedes implementar un solo archivo DAG de Python que genere una cantidad determinada de objetos DAG (por ejemplo, 20, 100 objetos DAG).

  • Evita usar operadores de Airflow obsoletos. En su lugar, usa sus alternativas actualizadas.

Preguntas frecuentes para escribir DAG

¿Cómo minimizo la repetición de código si quiero ejecutar las mismas tareas o tareas similares en varios DAG?

Sugerimos definir bibliotecas y wrappers para minimizar la repetición de código.

¿Cómo vuelvo a usar el código entre los archivos DAG?

Coloca tus funciones de utilidad en una biblioteca local de Python y, luego, importa las funciones. Puedes hacer referencia a las funciones en cualquier DAG que se encuentre en la carpeta dags/ del bucket de tu entorno.

¿Cómo minimizo el riesgo de que surjan definiciones diferentes?

Por ejemplo, tienes dos equipos que desean agregar datos sin procesar a las métricas de ingresos. Los equipos escriben dos tareas con pequeñas diferencias entre sí que logran lo mismo. Se deben definir bibliotecas para trabajar con los datos de ingresos, de modo que los implementadores de DAG aclaren la definición de ingresos que se agrega.

¿Cómo configuro las dependencias entre los DAG?

Esto dependerá de cómo deseas definir la dependencia.

Si tienes dos DAG (DAG A y DAG B) y quieres que el DAG B se active después del DAG A, puedes poner un TriggerDagRunOperator al final del DAG A.

Si el DAG B solo depende de un artefacto generado por el DAG A, como un mensaje de Pub/Sub, es posible que un sensor funcione mejor.

Si el DAG B se integra perfectamente al DAG A, es posible que puedas combinar los dos DAG en uno.

¿Cómo transfiero los ID de ejecución únicos a un DAG y sus tareas?

Por ejemplo, deseas transmitir nombres de clústeres de Dataproc y rutas de archivos.

Puedes generar un ID único aleatorio si muestras str(uuid.uuid4()) en un PythonOperator. Esto coloca el ID en XComs para que puedas consultar el ID en otros operadores a través de campos de plantillas.

Antes de generar un uuid, considera si un ID específico de DagRun sería más valioso. También puedes hacer referencia a estos IDs en sustituciones de Jinja con macros.

¿Cómo separo las tareas en un DAG?

Cada tarea debe ser una unidad de trabajo idempotente. Por lo tanto, debes evitar encapsular un flujo de trabajo de varios pasos en una sola tarea, como un programa complejo que se ejecute en un PythonOperator.

¿Debo definir varias tareas en un solo DAG para agregar datos de varias fuentes?

Por ejemplo, tienes varias tablas con datos sin procesar y quieres crear agregados diarios para cada una. Las tareas no dependen una de otra. ¿Debes crear una tarea y un DAG para cada tabla o crear un DAG general?

Si estás de acuerdo con que cada tarea comparta las mismas propiedades a nivel del DAG, como schedule_interval, entonces tiene sentido definir varias tareas en un solo DAG. De lo contrario, para minimizar la repetición de código, se pueden generar varios DAG a partir de un único módulo de Python colocándolos en globals() del módulo.

¿Cómo puedo limitar la cantidad de tareas simultáneas que se ejecutan en un DAG?

Por ejemplo, quieres evitar superar las cuotas o los límites de uso de la API o evitar ejecutar demasiados procesos simultáneos.

Puedes definir grupos de Airflow en la IU web de Airflow y asociar tareas con grupos existentes en tus DAG.

Preguntas frecuentes sobre el uso de operadores

¿Debo usar DockerOperator?

No recomendamos usar DockerOperator, a menos que se use para iniciar contenedores en una instalación remota de Docker (no dentro del clúster de un entorno). En un entorno de Cloud Composer, el operador no tiene acceso a los daemons de Docker.

En su lugar, usa KubernetesPodOperator o GKEStartPodOperator. Estos operadores inician pods de Kubernetes en clústeres de Kubernetes o de GKE, respectivamente. Ten en cuenta que no recomendamos el lanzamiento de pods en el clúster de un entorno, ya que esto puede generar competencia en los recursos.

¿Debo usar SubDagOperator?

No recomendamos usar SubDagOperator.

Usa las alternativas que se sugieren en Cómo agrupar tareas.

¿Debería ejecutar código de Python solo en PythonOperators para separar completamente los operadores de Python?

Según tu objetivo, tienes algunas opciones.

Si tu única preocupación es mantener dependencias independientes de Python, puedes usar PythonVirtualenvOperator.

Considera usar KubernetesPodOperator. Este operador te permite definir pods de Kubernetes y ejecutarlos en otros clústeres.

¿Cómo agrego paquetes binarios personalizados o que no son de PyPI?

Puedes instalar paquetes alojados en repositorios de paquetes privados.

¿Cómo transmito uniformemente los argumentos a un DAG y sus tareas?

Puedes usar la compatibilidad integrada de Airflow con las plantillas de Jinja para pasar argumentos que se pueden usar en campos con plantillas.

¿Cuándo ocurre la sustitución de plantilla?

La sustitución de plantilla ocurre en los trabajadores de Airflow justo antes de que se llame a la función pre_execute de un operador. En la práctica, esto significa que las plantillas no se sustituyen hasta justo antes de que se ejecute una tarea.

¿Cómo puedo saber qué argumentos del operador admiten la sustitución de plantillas?

Los argumentos del operador que admiten la sustitución de plantillas de Jinja2 se marcan explícitamente como tales.

Busca el campo template_fields en la definición de operador, que contiene una lista de nombres de argumentos que se someterán a la sustitución de plantillas.

Por ejemplo, consulta BashOperator, que admite plantillas para los argumentos bash_command y env.

Operadores de Airflow obsoletos y quitados

Los operadores de Airflow que se indican en la siguiente tabla dejaron de estar disponibles:

  • Evita usar estos operadores en tus DAG. En su lugar, usa los operadores de reemplazo actualizados proporcionados.

  • Si un operador aparece como disponible, significa que la versión de mantenimiento más reciente de Cloud Composer (1.20.12) aún tiene este operador disponible.

  • Algunos de los operadores de reemplazo no son compatibles con ninguna versión de Cloud Composer 1. Para usarlos, considera actualizar a Cloud Composer 3 o Cloud Composer 2.

Operador obsoleto Estado Operador de reemplazo Fecha de reemplazo disponible
CreateAutoMLTextTrainingJobOperator Disponible en la versión 1.20.12 SupervisedFineTuningTrainOperator El operador de reemplazo no está disponible
GKEDeploymentHook Disponible en la versión 1.20.12 GKEKubernetesHook El operador de reemplazo no está disponible
GKECustomResourceHook Disponible en la versión 1.20.12 GKEKubernetesHook El operador de reemplazo no está disponible
GKEPodHook Disponible en la versión 1.20.12 GKEKubernetesHook El operador de reemplazo no está disponible
GKEJobHook Disponible en la versión 1.20.12 GKEKubernetesHook El operador de reemplazo no está disponible
GKEPodAsyncHook Disponible en la versión 1.20.12 GKEKubernetesAsyncHook El operador de reemplazo no está disponible
SecretsManagerHook Disponible en la versión 1.20.12 GoogleCloudSecretManagerHook El operador de reemplazo no está disponible
BigQueryExecuteQueryOperator Disponible en la versión 1.20.12 BigQueryInsertJobOperator Disponible en la versión 1.20.12
BigQueryPatchDatasetOperator Disponible en la versión 1.20.12 BigQueryUpdateDatasetOperator Disponible en la versión 1.20.12
DataflowCreateJavaJobOperator Disponible en la versión 1.20.12 beam.BeamRunJavaPipelineOperator Disponible en la versión 1.20.12
DataflowCreatePythonJobOperator Disponible en la versión 1.20.12 beam.BeamRunPythonPipelineOperator Disponible en la versión 1.20.12
DataprocSubmitPigJobOperator Disponible en la versión 1.20.12 DataprocSubmitJobOperator Disponible en la versión 1.20.12
DataprocSubmitHiveJobOperator Disponible en la versión 1.20.12 DataprocSubmitJobOperator Disponible en la versión 1.20.12
DataprocSubmitSparkSqlJobOperator Disponible en la versión 1.20.12 DataprocSubmitJobOperator Disponible en la versión 1.20.12
DataprocSubmitSparkJobOperator Disponible en la versión 1.20.12 DataprocSubmitJobOperator Disponible en la versión 1.20.12
DataprocSubmitHadoopJobOperator Disponible en la versión 1.20.12 DataprocSubmitJobOperator Disponible en la versión 1.20.12
DataprocSubmitPySparkJobOperator Disponible en la versión 1.20.12 DataprocSubmitJobOperator Disponible en la versión 1.20.12
BigQueryTableExistenceAsyncSensor Disponible en la versión 1.20.12 BigQueryTableExistenceSensor El operador de reemplazo no está disponible
BigQueryTableExistencePartitionAsyncSensor Disponible en la versión 1.20.12 BigQueryTablePartitionExistenceSensor El operador de reemplazo no está disponible
CloudComposerEnvironmentSensor Disponible en la versión 1.20.12 CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator, CloudComposerUpdateEnvironmentOperator El operador de reemplazo no está disponible
GCSObjectExistenceAsyncSensor Disponible en la versión 1.20.12 GCSObjectExistenceSensor El operador de reemplazo no está disponible
GoogleAnalyticsHook Disponible en la versión 1.20.12 GoogleAnalyticsAdminHook El operador de reemplazo no está disponible
GoogleAnalyticsListAccountsOperator Disponible en la versión 1.20.12 GoogleAnalyticsAdminListAccountsOperator El operador de reemplazo no está disponible
GoogleAnalyticsGetAdsLinkOperator Disponible en la versión 1.20.12 GoogleAnalyticsAdminGetGoogleAdsLinkOperator El operador de reemplazo no está disponible
GoogleAnalyticsRetrieveAdsLinksListOperator Disponible en la versión 1.20.12 GoogleAnalyticsAdminListGoogleAdsLinksOperator El operador de reemplazo no está disponible
GoogleAnalyticsDataImportUploadOperator Disponible en la versión 1.20.12 GoogleAnalyticsAdminCreateDataStreamOperator El operador de reemplazo no está disponible
GoogleAnalyticsDeletePreviousDataUploadsOperator Disponible en la versión 1.20.12 GoogleAnalyticsAdminDeleteDataStreamOperator El operador de reemplazo no está disponible
DataPipelineHook Disponible en la versión 1.20.12 DataflowHook El operador de reemplazo no está disponible
CreateDataPipelineOperator Disponible en la versión 1.20.12 DataflowCreatePipelineOperator El operador de reemplazo no está disponible
RunDataPipelineOperator Disponible en la versión 1.20.12 DataflowRunPipelineOperator El operador de reemplazo no está disponible
AutoMLDatasetLink Disponible en la versión 1.20.12 TranslationLegacyDatasetLink El operador de reemplazo no está disponible
AutoMLDatasetListLink Disponible en la versión 1.20.12 TranslationDatasetListLink El operador de reemplazo no está disponible
AutoMLModelLink Disponible en la versión 1.20.12 TranslationLegacyModelLink El operador de reemplazo no está disponible
AutoMLModelTrainLink Disponible en la versión 1.20.12 TranslationLegacyModelTrainLink El operador de reemplazo no está disponible
AutoMLModelPredictLink Disponible en la versión 1.20.12 TranslationLegacyModelPredictLink El operador de reemplazo no está disponible
AutoMLBatchPredictOperator Disponible en la versión 1.20.12 vertex_ai.batch_prediction_job El operador de reemplazo no está disponible
AutoMLPredictOperator Disponible en la versión 1.20.12 vertex_aigenerative_model. TextGenerationModelPredictOperator, translate.TranslateTextOperator El operador de reemplazo no está disponible
PromptLanguageModelOperator Disponible en la versión 1.20.12 TextGenerationModelPredictOperator El operador de reemplazo no está disponible
GenerateTextEmbeddingsOperator Disponible en la versión 1.20.12 TextEmbeddingModelGetEmbeddingsOperator El operador de reemplazo no está disponible
PromptMultimodalModelOperator Disponible en la versión 1.20.12 GenerativeModelGenerateContentOperator El operador de reemplazo no está disponible
PromptMultimodalModelWithMediaOperator Disponible en la versión 1.20.12 GenerativeModelGenerateContentOperator El operador de reemplazo no está disponible
DataflowStartSqlJobOperator Disponible en la versión 1.20.12 DataflowStartYamlJobOperator El operador de reemplazo no está disponible
LifeSciencesHook Disponible en la versión 1.20.12 Hook de los operadores de lotes de Google Cloud Por confirmar
DataprocScaleClusterOperator Disponible en la versión 1.20.12 DataprocUpdateClusterOperator Por confirmar
MLEngineStartBatchPredictionJobOperator Disponible en la versión 1.20.12 CreateBatchPredictionJobOperator Por confirmar
MLEngineManageModelOperator Disponible en la versión 1.20.12 MLEngineCreateModelOperator y MLEngineGetModelOperator Por confirmar
MLEngineGetModelOperator Disponible en la versión 1.20.12 GetModelOperator Por confirmar
MLEngineDeleteModelOperator Disponible en la versión 1.20.12 DeleteModelOperator Por confirmar
MLEngineManageVersionOperator Disponible en la versión 1.20.12 MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion Por confirmar
MLEngineCreateVersionOperator Disponible en la versión 1.20.12 Parámetro parent_model para operadores de VertexAI Por confirmar
MLEngineSetDefaultVersionOperator Disponible en la versión 1.20.12 SetDefaultVersionOnModelOperator Por confirmar
MLEngineListVersionsOperator Disponible en la versión 1.20.12 ListModelVersionsOperator Por confirmar
MLEngineDeleteVersionOperator Disponible en la versión 1.20.12 DeleteModelVersionOperator Por confirmar
MLEngineStartTrainingJobOperator Disponible en la versión 1.20.12 CreateCustomPythonPackageTrainingJobOperator Por confirmar
MLEngineTrainingCancelJobOperator Disponible en la versión 1.20.12 CancelCustomTrainingJobOperator Por confirmar
LifeSciencesRunPipelineOperator Disponible en la versión 1.20.12 Operadores de Cloud Batch de Google Cloud Por confirmar
MLEngineCreateModelOperator Disponible en la versión 1.20.12 operador de VertexAI correspondiente Por confirmar

¿Qué sigue?