Scrivere DAG Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa guida mostra come scrivere un grafo diretto aciclico (DAG) di Apache Airflow in esecuzione in un ambiente Cloud Composer.

Poiché Apache Airflow non fornisce un isolamento rigoroso di DAG e attività, consigliamo di utilizzare ambienti di produzione e di test separati per evitare interferenze con il DAG. Per ulteriori informazioni, consulta la sezione Testare i DAG.

Strutturare un DAG Airflow

Un DAG di Airflow è definito in un file Python ed è costituito dai seguenti componenti:

  • Definizione del DAG
  • Operatori Airflow
  • Relazioni con gli operatori

I seguenti snippet di codice mostrano esempi di ciascun componente fuori contesto.

Una definizione DAG

L'esempio seguente mostra una definizione di DAG Airflow:

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Operatori e attività

Gli operatori Airflow descrivono il lavoro da svolgere. Un'attività è un'istanza specifica di un operatore.

Airflow 2

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Relazioni tra attività

Le relazioni tra attività descrivono l'ordine in cui deve essere completato il lavoro.

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Esempio di flusso di lavoro DAG completo in Python

Il seguente flusso di lavoro è un modello DAG funzionante completo composto da due attività: un'attività hello_python e un'attività goodbye_bash:

Airflow 2


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Per ulteriori informazioni sulla definizione dei DAG di Airflow, consulta il tutorial di Airflow e i concetti di Airflow.

Operatori Airflow

Gli esempi riportati di seguito mostrano alcuni operatori Airflow molto utilizzati. Per un riferimento autorevole sugli operatori di Airflow, consulta la Guida di riferimento su operatori e hook e l'Indice dei provider.

BashOperator

Utilizza BashOperator per eseguire programmi a riga di comando.

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer esegue i comandi forniti in uno script Bash su un worker Airflow. Il worker è un container Docker basato su Debian e include diversi pacchetti.

PythonOperator

Utilizza PythonOperator per eseguire codice Python arbitrario.

Cloud Composer esegue il codice Python in un contenitore che include i pacchetti per la versione dell'immagine Cloud Composer utilizzata nel tuo ambiente.

Per installare altri pacchetti Python, consulta Installazione delle dipendenze Python.

Google Cloud Operatori

Per eseguire attività che utilizzano Google Cloud prodotti, utilizza gli Google Cloud operatori Airflow. Ad esempio, gli operatori BigQuery eseguono query ed elaborano i dati in BigQuery.

Esistono molti altri operatori Airflow per Google Cloud e per i singoli servizi forniti da Google Cloud. Per l'elenco completo, consulta Google Cloud Operatori.

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

Utilizza EmailOperator per inviare email da un DAG. Per inviare email da un ambiente Cloud Composer, configuralo per utilizzare SendGrid.

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Notifiche in caso di errore dell'operatore

Imposta email_on_failure su True per inviare una notifica via email quando un operatore nel DAG non va a buon fine. Per inviare notifiche via email da un ambiente Cloud Composer, devi configurare l'ambiente in modo che utilizzi SendGrid.

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Linee guida per i flussi di lavoro DAG

  • Inserisci eventuali librerie Python personalizzate nell'archivio ZIP di un DAG in una directory nidificata. Non posizionare le librerie al livello superiore della directory dei DAG.

    Quando Airflow esegue la scansione della cartella dags/, controlla solo la presenza di DAG nei moduli Python che si trovano nel livello principale della cartella dei DAG e nel livello principale di un archivio ZIP che si trova anche nella cartella principale dags/. Se Airflow rileva un modulo Python in un archivio ZIP che non contiene entrambe le sottostringhe airflow e DAG, interrompe l'elaborazione dell'archivio ZIP. Airflow restituisce solo i DAG trovati fino a quel momento.

  • Utilizza Airflow 2 anziché Airflow 1.

    La community di Airflow non pubblica più nuove release secondarie o patch per Airflow 1.

  • Per la tolleranza ai guasti, non definire più oggetti DAG nello stesso modulo Python.

  • Non utilizzare DAG secondari. Invece, raggruppa le attività all'interno dei DAG.

  • Inserisci i file richiesti al momento dell'analisi del DAG nella cartella dags/, non nella cartella data/.

  • Implementa i test delle unità per i tuoi DAG.

  • Testa i DAG sviluppati o modificati come consigliato nelle istruzioni per il test dei DAG.

  • Verifica che i DAG sviluppati non aumentino troppo i tempi di analisi dei DAG.

  • Le attività di Airflow possono non riuscire per diversi motivi. Per evitare errori di esecuzione di interi DAG, ti consigliamo di attivare i tentativi di esecuzione delle attività. Se imposti il numero massimo di tentativi su 0, non viene eseguito alcun tentativo.

    Ti consigliamo di sostituire l'opzione default_task_retries con un valore per le ripetizioni della tarefa diverso da 0. Inoltre, puoi impostare il parametro retries a livello di attività.

  • Se vuoi utilizzare la GPU nelle tue attività Airflow, crea un cluster GKE distinto in base ai nodi che utilizzano macchine con GPU. Utilizza GKEStartPodOperator per eseguire le attività.

  • Evita di eseguire attività che richiedono molta CPU e memoria nel pool di nodi del cluster in cui sono in esecuzione altri componenti di Airflow (scheduler, worker, server web). Utilizza invece KubernetesPodOperator o GKEStartPodOperator.

  • Quando esegui il deployment dei DAG in un ambiente, carica nella cartella /dags solo i file assolutamente necessari per interpretare ed eseguire i DAG.

  • Limita il numero di file DAG nella cartella /dags.

    Airflow analizza continuamente i DAG nella cartella /dags. L'analisi è un processo che esegue un ciclo nella cartella dei DAG e il numero di file che devono essere caricati (con le relative dipendenze) influisce sul rendimento dell'analisi dei DAG e della pianificazione delle attività. È molto più efficiente utilizzare 100 file con 100 DAG ciascuno rispetto a 10000 file con 1 DAG ciascuno, pertanto questa ottimizzazione è consigliata. Questa ottimizzazione è un equilibrio tra il tempo di analisi e l'efficienza della creazione e della gestione dei DAG.

    Puoi anche prendere in considerazione, ad esempio, di eseguire il deployment di 10000 file DAG creando 100 file ZIP contenenti ciascuno 100 file DAG.

    Oltre ai suggerimenti riportati sopra, se hai più di 10.000 file DAG, la generazione di DAG in modo programmatico potrebbe essere una buona opzione. Ad esempio, puoi implementare un singolo file DAG Python che genera un certo numero di oggetti DAG (ad esempio 20, 100 oggetti DAG).

  • Evita di utilizzare gli operatori Airflow deprecati. Utilizza invece le alternative aggiornate.

Domande frequenti sulla scrittura dei DAG

Come faccio a ridurre al minimo la ripetizione del codice se voglio eseguire le stesse attività o attività simili in più DAG?

Ti consigliamo di definire librerie e wrapper per minimizzare la ripetizione del codice.

Come faccio a riutilizzare il codice tra i file DAG?

Inserisci le funzioni di utilità in una libreria Python locale e importa le funzioni. Puoi fare riferimento alle funzioni in qualsiasi DAG situato nella cartella dags/ del bucket del tuo ambiente.

Come faccio a ridurre al minimo il rischio che si verifichino definizioni diverse?

Ad esempio, hai due team che vogliono aggregare i dati non elaborati in metriche sulle entrate. I team scrivono due attività leggermente diverse che svolgono la stessa cosa. Definisci le librerie per lavorare con i dati sulle entrate in modo che gli implementatori del DAG debbano chiarire la definizione delle entrate aggregate.

Come faccio a impostare le dipendenze tra i DAG?

Dipende da come vuoi definire la dipendenza.

Se hai due DAG (DAG A e DAG B) e vuoi che il DAG B venga attivato dopo il DAG A, puoi inserire un TriggerDagRunOperator alla fine del DAG A.

Se il DAG B dipende solo da un artefatto generato dal DAG A, ad esempio un messaggio Pub/Sub, un sensore potrebbe funzionare meglio.

Se il DAG B è integrato strettamente con il DAG A, potresti essere in grado di unire i due DAG in un unico DAG.

Come faccio a passare ID esecuzione univoci a un DAG e alle relative attività?

Ad esempio, vuoi passare i nomi dei cluster Dataproc e i percorsi dei file.

Puoi generare un ID univoco casuale restituendo str(uuid.uuid4()) in PythonOperator. L'ID viene inserito in XComs in modo da poter fare riferimento all'ID in altri operatori tramite i campi basati su modelli.

Prima di generare un uuid, valuta se un ID specifico per DagRun sarebbe più utile. Puoi anche fare riferimento a questi ID nelle sostituzioni Jinja utilizzando le macro.

Come faccio a separare le attività in un DAG?

Ogni attività deve essere un'unità di lavoro idempotente. Di conseguenza, dovresti evitare di eseguire il wrapping di un flusso di lavoro a più passaggi all'interno di un'unica attività, ad esempio un programma complesso in esecuzione in un PythonOperator.

Devo definire più attività in un unico DAG per aggregare i dati di più origini?

Ad esempio, hai più tabelle con dati non elaborati e vuoi creare aggregati giornalieri per ogni tabella. Le attività non sono dipendenti l'una dall'altra. Dovresti creare un'attività e un DAG per ogni tabella o un DAG generale?

Se non ti dispiace che ogni attività condivida le stesse proprietà a livello di DAG, ad esempioschedule_interval, ha senso definire più attività in un singolo DAG. In caso contrario, per ridurre al minimo la ripetizione del codice, è possibile generare più DAG da un singolo modulo Python inserendoli nel globals() del modulo.

Come faccio a limitare il numero di attività simultanee in esecuzione in un DAG?

Ad esempio, vuoi evitare di superare i limiti/le quote di utilizzo dell'API o di eseguire troppi processi contemporaneamente.

Puoi definire pool Airflow nell'interfaccia utente web di Airflow e associare le attività ai pool esistenti nei tuoi DAG.

Domande frequenti sull'utilizzo degli operatori

Devo utilizzare DockerOperator?

Sconsigliamo di utilizzare DockerOperator, a meno che non venga utilizzato per avviare contenuti su un'installazione Docker remota (non all'interno del cluster di un ambiente). In un ambiente Cloud Composer, l'operatore non ha accesso ai daemon Docker.

Utilizza invece KubernetesPodOperator o GKEStartPodOperator. Questi operatori lanciano i pod Kubernetes rispettivamente nei cluster Kubernetes o GKE. Tieni presente che non consigliamo di avviare pod nel cluster di un ambiente, perché ciò può portare alla concorrenza per le risorse.

Devo utilizzare SubDagOperator?

Non è consigliabile utilizzare SubDagOperator.

Utilizza le alternative suggerite in Raggruppare le attività.

Devo eseguire il codice Python solo in PythonOperators per separare completamente gli operatori Python?

A seconda del tuo obiettivo, hai a disposizione alcune opzioni.

Se il tuo unico problema è mantenere dipendenze Python separate, puoi utilizzare PythonVirtualenvOperator.

Valuta la possibilità di utilizzare KubernetesPodOperator. Questo operatore ti consente di definire i pod Kubernetes ed eseguirli in altri cluster.

Come faccio ad aggiungere pacchetti non PyPI o binari personalizzati?

Puoi installare pacchetti ospitati in repository di pacchetti privati.

Come faccio a passare gli argomenti in modo uniforme a un DAG e alle relative attività?

Puoi utilizzare il supporto integrato di Airflow per il modello Jinja per passare gli argomenti che possono essere utilizzati nei campi basati su modelli.

Quando avviene la sostituzione del modello?

La sostituzione del modello avviene sui worker di Airflow appena prima della chiamata della funzione pre_execute di un operatore. In pratica, ciò significa che i modelli non vengono sostituiti fino a poco prima dell'esecuzione di un'attività.

Come faccio a sapere quali argomenti dell'operatore supportano la sostituzione del modello?

Gli argomenti dell'operatore che supportano la sostituzione del modello Jinja2 sono contrassegnati esplicitamente come tali.

Cerca il campo template_fields nella definizione dell'operatore, che contiene un elenco di nomi di argomenti sottoposti alla sostituzione del modello.

Ad esempio, consulta BashOperator, che supporta i modelli per gli argomenti bash_command e env.

Operatori Airflow ritirati e rimossi

Gli operatori Airflow elencati nella tabella seguente sono deprecati:

  • Evita di utilizzare questi operatori nei tuoi DAG. Utilizza invece gli operatori di sostituzione aggiornati forniti.

  • Se un operatore è indicato come disponibile, significa che l'operatore è ancora disponibile nell'ultima release di manutenzione di Cloud Composer (1.20.12).

  • Alcuni degli operatori di sostituzione non sono supportati in nessuna versione di Cloud Composer 1. Per utilizzarli, valuta la possibilità di eseguire l'upgrade a Cloud Composer 3 o Cloud Composer 2.

Operatore obsoleto Stato Operatore sostitutivo Sostituzione disponibile da
CreateAutoMLTextTrainingJobOperator Disponibile in 1.20.12 SupervisedFineTuningTrainOperator Operatore sostitutivo non disponibile
GKEDeploymentHook Disponibile in 1.20.12 GKEKubernetesHook Operatore sostitutivo non disponibile
GKECustomResourceHook Disponibile in 1.20.12 GKEKubernetesHook Operatore sostitutivo non disponibile
GKEPodHook Disponibile in 1.20.12 GKEKubernetesHook Operatore sostitutivo non disponibile
GKEJobHook Disponibile in 1.20.12 GKEKubernetesHook Operatore sostitutivo non disponibile
GKEPodAsyncHook Disponibile in 1.20.12 GKEKubernetesAsyncHook Operatore sostitutivo non disponibile
SecretsManagerHook Disponibile in 1.20.12 GoogleCloudSecretManagerHook Operatore sostitutivo non disponibile
BigQueryExecuteQueryOperator Disponibile in 1.20.12 BigQueryInsertJobOperator Disponibile in 1.20.12
BigQueryPatchDatasetOperator Disponibile in 1.20.12 BigQueryUpdateDatasetOperator Disponibile in 1.20.12
DataflowCreateJavaJobOperator Disponibile in 1.20.12 beam.BeamRunJavaPipelineOperator Disponibile in 1.20.12
DataflowCreatePythonJobOperator Disponibile in 1.20.12 beam.BeamRunPythonPipelineOperator Disponibile in 1.20.12
DataprocSubmitPigJobOperator Disponibile in 1.20.12 DataprocSubmitJobOperator Disponibile in 1.20.12
DataprocSubmitHiveJobOperator Disponibile in 1.20.12 DataprocSubmitJobOperator Disponibile in 1.20.12
DataprocSubmitSparkSqlJobOperator Disponibile in 1.20.12 DataprocSubmitJobOperator Disponibile in 1.20.12
DataprocSubmitSparkJobOperator Disponibile in 1.20.12 DataprocSubmitJobOperator Disponibile in 1.20.12
DataprocSubmitHadoopJobOperator Disponibile in 1.20.12 DataprocSubmitJobOperator Disponibile in 1.20.12
DataprocSubmitPySparkJobOperator Disponibile in 1.20.12 DataprocSubmitJobOperator Disponibile in 1.20.12
BigQueryTableExistenceAsyncSensor Disponibile in 1.20.12 BigQueryTableExistenceSensor Operatore sostitutivo non disponibile
BigQueryTableExistencePartitionAsyncSensor Disponibile in 1.20.12 BigQueryTablePartitionExistenceSensor Operatore sostitutivo non disponibile
CloudComposerEnvironmentSensor Disponibile in 1.20.12 CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator, CloudComposerUpdateEnvironmentOperator Operatore sostitutivo non disponibile
GCSObjectExistenceAsyncSensor Disponibile in 1.20.12 GCSObjectExistenceSensor Operatore sostitutivo non disponibile
GoogleAnalyticsHook Disponibile in 1.20.12 GoogleAnalyticsAdminHook Operatore sostitutivo non disponibile
GoogleAnalyticsListAccountsOperator Disponibile in 1.20.12 GoogleAnalyticsAdminListAccountsOperator Operatore sostitutivo non disponibile
GoogleAnalyticsGetAdsLinkOperator Disponibile in 1.20.12 GoogleAnalyticsAdminGetGoogleAdsLinkOperator Operatore sostitutivo non disponibile
GoogleAnalyticsRetrieveAdsLinksListOperator Disponibile in 1.20.12 GoogleAnalyticsAdminListGoogleAdsLinksOperator Operatore sostitutivo non disponibile
GoogleAnalyticsDataImportUploadOperator Disponibile in 1.20.12 GoogleAnalyticsAdminCreateDataStreamOperator Operatore sostitutivo non disponibile
GoogleAnalyticsDeletePreviousDataUploadsOperator Disponibile in 1.20.12 GoogleAnalyticsAdminDeleteDataStreamOperator Operatore sostitutivo non disponibile
DataPipelineHook Disponibile in 1.20.12 DataflowHook Operatore sostitutivo non disponibile
CreateDataPipelineOperator Disponibile in 1.20.12 DataflowCreatePipelineOperator Operatore sostitutivo non disponibile
RunDataPipelineOperator Disponibile in 1.20.12 DataflowRunPipelineOperator Operatore sostitutivo non disponibile
AutoMLDatasetLink Disponibile in 1.20.12 TranslationLegacyDatasetLink Operatore sostitutivo non disponibile
AutoMLDatasetListLink Disponibile in 1.20.12 TranslationDatasetListLink Operatore sostitutivo non disponibile
AutoMLModelLink Disponibile in 1.20.12 TranslationLegacyModelLink Operatore sostitutivo non disponibile
AutoMLModelTrainLink Disponibile in 1.20.12 TranslationLegacyModelTrainLink Operatore sostitutivo non disponibile
AutoMLModelPredictLink Disponibile in 1.20.12 TranslationLegacyModelPredictLink Operatore sostitutivo non disponibile
AutoMLBatchPredictOperator Disponibile in 1.20.12 vertex_ai.batch_prediction_job Operatore sostitutivo non disponibile
AutoMLPredictOperator Disponibile in 1.20.12 vertex_aigenerative_model. TextGenerationModelPredictOperator, translate.TranslateTextOperator Operatore sostitutivo non disponibile
PromptLanguageModelOperator Disponibile in 1.20.12 TextGenerationModelPredictOperator Operatore sostitutivo non disponibile
GenerateTextEmbeddingsOperator Disponibile in 1.20.12 TextEmbeddingModelGetEmbeddingsOperator Operatore sostitutivo non disponibile
PromptMultimodalModelOperator Disponibile in 1.20.12 GenerativeModelGenerateContentOperator Operatore sostitutivo non disponibile
PromptMultimodalModelWithMediaOperator Disponibile in 1.20.12 GenerativeModelGenerateContentOperator Operatore sostitutivo non disponibile
DataflowStartSqlJobOperator Disponibile in 1.20.12 DataflowStartYamlJobOperator Operatore sostitutivo non disponibile
LifeSciencesHook Disponibile in 1.20.12 Hook degli operatori batch di Google Cloud Da definire
DataprocScaleClusterOperator Disponibile in 1.20.12 DataprocUpdateClusterOperator Da definire
MLEngineStartBatchPredictionJobOperator Disponibile in 1.20.12 CreateBatchPredictionJobOperator Da definire
MLEngineManageModelOperator Disponibile in 1.20.12 MLEngineCreateModelOperator, MLEngineGetModelOperator Da definire
MLEngineGetModelOperator Disponibile in 1.20.12 GetModelOperator Da definire
MLEngineDeleteModelOperator Disponibile in 1.20.12 DeleteModelOperator Da definire
MLEngineManageVersionOperator Disponibile in 1.20.12 MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion Da definire
MLEngineCreateVersionOperator Disponibile in 1.20.12 Parametro parent_model per gli operatori VertexAI Da definire
MLEngineSetDefaultVersionOperator Disponibile in 1.20.12 SetDefaultVersionOnModelOperator Da definire
MLEngineListVersionsOperator Disponibile in 1.20.12 ListModelVersionsOperator Da definire
MLEngineDeleteVersionOperator Disponibile in 1.20.12 DeleteModelVersionOperator Da definire
MLEngineStartTrainingJobOperator Disponibile in 1.20.12 CreateCustomPythonPackageTrainingJobOperator Da definire
MLEngineTrainingCancelJobOperator Disponibile in 1.20.12 CancelCustomTrainingJobOperator Da definire
LifeSciencesRunPipelineOperator Disponibile in 1.20.12 Operatori di Google Cloud Batch Da definire
MLEngineCreateModelOperator Disponibile in 1.20.12 Operatore VertexAI corrispondente Da definire

Passaggi successivi