Airflow Summit 2023
Unisciti alla community di Airflow dal 19 al 21 settembre durante la conferenza Airflow Summit 2023 per scoprire di più su Airflow e condividere la tua esperienza. La chiamata per i documenti è aperta

Scrittura di DAG di Airflow

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questa guida mostra come scrivere un grafo diretto aciclico (DAG) con Apache Airflow in esecuzione in un ambiente Cloud Composer.

Strutturare un DAG Airflow

Un DAG Airflow è definito in un file Python ed è composto dai seguenti componenti: una definizione DAG, un operatore e le relazioni dell'operatore. I seguenti snippet di codice mostrano esempi di ciascun componente fuori contesto:

  1. Una definizione di DAG.

    Flusso d'aria 2

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

    Flusso d'aria 1

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

  2. Operatori per descrivere il lavoro da svolgere. L'istanza di un operatore è chiamata attività.

    Flusso d'aria 2

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

    Flusso d'aria 1

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

  3. Relazioni alle attività per descrivere l'ordine in cui completare il lavoro.

    Flusso d'aria 2

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

    Flusso d'aria 1

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Esempio di flusso di lavoro completo di DAG in Python

Il seguente flusso di lavoro è un modello DAG funzionante completo composto da due attività: un'attività hello_python e un'attività goodbye_bash:

Flusso d'aria 2

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Flusso d'aria 1

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Tutorial e concetti di Airflow

Per ulteriori informazioni sulla definizione dei DAG Airflow, consulta il tutorial di Airflow e i concetti di Airflow.

Operatori Airflow

I seguenti esempi mostrano alcuni degli operatori Airflow più diffusi. Per un riferimento autorevole degli operatori Airflow, consulta la sezione Riferimento API AirApache o sfoglia il codice sorgente degli operatori core, contrib e provider.

Operatore bash

Utilizza BashOperator per eseguire programmi a riga di comando.

Flusso d'aria 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Flusso d'aria 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command="bq ls {} || bq mk {}".format(bq_dataset_name, bq_dataset_name),
    )

Cloud Composer esegue i comandi forniti in uno script Bash su un worker. Il worker è un container Docker basato su Debian e include diversi pacchetti.

Operatore Python

Utilizza PythonOperator per eseguire codice Python arbitrario.

Cloud Composer esegue il codice Python in un container che include i pacchetti per la versione immagine di Cloud Composer utilizzata nel tuo ambiente.

Per installare altri pacchetti Python, consulta Installazione di dipendenze Python.

Operatori Google Cloud

Utilizza gli operatori Google Cloud Airflow per eseguire attività che utilizzano i prodotti Google Cloud. Cloud Composer configura automaticamente una connessione Airflow al progetto dell'ambiente.

Operatore email

Utilizza EmailOperator per inviare email da un DAG. Per inviare email da un ambiente Cloud Composer, devi configurare il tuo ambiente per l'utilizzo di SendGrid.

Flusso d'aria 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Flusso d'aria 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Notifiche

Imposta email_on_failure su True per inviare una notifica via email quando un operatore del DAG non riesce. Per inviare notifiche email da un ambiente Cloud Composer, devi configurare il tuo ambiente per l'utilizzo di SendGrid.

Flusso d'aria 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Flusso d'aria 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Linee guida per il flusso di lavoro di DAG

  1. Utilizza Airflow 2 anziché Airflow 1.

    La community di Airflow non pubblica più nuove release secondarie o patch per Airflow 1.

  2. Inserisci le librerie Python personalizzate in un archivio ZIP di DAG in una directory nidificata. Non inserire le librerie al livello superiore della directory DAG.

    Quando Airflow analizza la cartella dags/, verifica solo i DAG nei moduli Python che si trovano nel livello più alto della cartella DAG e nel livello superiore di un archivio ZIP che si trova anche nella cartella dags/ di primo livello. Se Airflow rileva un modulo Python in un archivio ZIP che non contiene sia sottostringhe airflow sia DAG, Airflow smette di elaborare l'archivio ZIP. Airflow restituisce solo i DAG trovati fino a quel momento.

  3. Per la tolleranza di errore, non definire più oggetti DAG nello stesso modulo Python.

  4. Non utilizzare SubDAG. Utilizza invece alternative.

  5. Inserisci i file obbligatori per il tempo di analisi di DAG nella cartella dags/, non nella directory data/.

  6. Implementare i test delle unità per i DAG

  7. Testa i DAG sviluppati o modificati, come consigliato nelle istruzioni per testare i DAG

  8. Verifica che i DAG sviluppati non aumentino troppo i tempi di analisi DAG.

  9. Le attività di Airflow possono non riuscire per diversi motivi. Per evitare errori nell'esecuzione completa di DAG, consigliamo di abilitare i nuovi tentativi di attività. Se imposti il numero massimo di nuovi tentativi su 0, non viene eseguito alcun nuovo tentativo.

    Ti consigliamo di sostituire l'opzione default_task_retries con un valore per l'attività che ritira diverso da 0. Inoltre, puoi impostare il parametro dei nuovi tentativi a livello di attività (se necessario).

  10. Se vuoi utilizzare la GPU nelle attività di Airflow, crea un cluster GKE separato basato sui nodi che usano macchine con GPU. Utilizza GKEStartPodOperator per eseguire le tue attività.

  11. Evita di eseguire attività ad alta intensità di CPU e memoria nel pool di nodi del cluster in cui sono in esecuzione altri componenti di Airflow (scheduler, worker, server web). Utilizza KubernetesPodOperator o GKEStartPodOperator.

  12. Quando esegui il deployment di DAG in un ambiente, carica solo i file che sono assolutamente necessari per interpretare ed eseguire i DAG nella cartella /dags.

  13. Limita il numero di file DAG nella cartella /dags

    Airflow analizza continuamente i DAG nella cartella /dags. L'analisi è un processo che viene eseguito in loop nella cartella DAG e il numero di file che devono essere caricati (con le loro dipendenze) influisce sulle prestazioni dell'analisi di DAG e della pianificazione delle attività. È molto più efficiente utilizzare 100 file con 100 DAG rispetto ai 10.000 file con 1 DAG ciascuno; pertanto, si consiglia tale ottimizzazione. Questa ottimizzazione è un equilibrio tra l'analisi dei tempi e l'efficienza della creazione e della gestione di DAG.

    Puoi anche valutare, ad esempio, che per eseguire il deployment di 10.000 file DAG potresti creare 100 file ZIP contenenti ciascuno 100 file DAG.

    Oltre ai suggerimenti sopra riportati, se hai più di 10.000 file DAG, la generazione di DAG in modo programmatico potrebbe essere una buona opzione. Ad esempio, puoi implementare un singolo file DAG Python che genera un certo numero di oggetti DAG (ad esempio 20, 100 oggetti DAG).

Domande frequenti sulla scrittura di DAG

Come faccio a ridurre al minimo la ripetizione del codice se voglio eseguire attività uguali o simili in più DAG?

Consigliamo di definire le librerie e i wrapper per ridurre al minimo la ripetizione del codice.

Come faccio a riutilizzare il codice tra i file DAG?

Inserisci le funzioni della tua utilità in una libreria Python locale e importale. Puoi fare riferimento alle funzioni in qualsiasi DAG che si trova nella cartella dags/ nel bucket del tuo ambiente.

Come faccio a ridurre al minimo il rischio che si verifichino definizioni diverse?

Ad esempio, hai due team che vogliono aggregare dati non elaborati in metriche relative alle entrate. I team scrivono due attività leggermente diverse che svolgono la stessa cosa. Definire le librerie per utilizzare i dati sulle entrate in modo che gli strumenti per l'implementazione di DAG debbano chiarire la definizione di entrate che vengono aggregate.

Come posso impostare le dipendenze tra i DAG?

Dipende dal modo in cui vuoi definire la dipendenza.

Se hai due DAG (DAG A e DAG B) e vuoi che DAG B si attivi dopo DAG A, puoi inserire un TriggerDagRunOperator alla fine del DAG A.

Se DAG B dipende solo da un artefatto generato da DAG A, ad esempio un messaggio Pub/Sub, un sensore potrebbe funzionare meglio.

Se DAG B è integrato strettamente con DAG A, potresti essere in grado di unire i due DAG in un DAG.

Come posso trasmettere gli ID esecuzione univoci a un DAG e alle sue attività?

Ad esempio, vuoi passare i nomi e i percorsi dei file di Dataproc.

Puoi generare un ID univoco casuale restituendo str(uuid.uuid4()) in PythonOperator. In questo modo, l'ID viene inserito in XComs in modo che tu possa fare riferimento all'ID in altri operatori tramite campi basati su modelli.

Prima di generare un uuid, valuta se un ID specifico di DagRun avrebbe più valore. Puoi fare riferimento a questi ID anche nelle sostituzioni in Jinja utilizzando le macro.

Come faccio a separare le attività in un DAG?

Ogni attività deve essere un'unità di lavoro idempotente. Di conseguenza, dovresti evitare di incapsulare un flusso di lavoro in più passaggi all'interno di una singola attività, ad esempio un programma complesso in esecuzione in PythonOperator.

Devo definire più attività in un singolo DAG per aggregare i dati da più origini?

Ad esempio, hai più tabelle con dati non elaborati e vuoi creare aggregati giornalieri per ogni tabella. Le attività non dipendono l'una dall'altra. Dovresti creare un'attività e un DAG per ogni tabella oppure un DAG generale?

Se accetti che ogni attività condivida le stesse proprietà a livello di DAG, come schedule_interval, è opportuno definire più attività in un singolo DAG. Altrimenti, per ridurre al minimo la ripetizione del codice, è possibile generare più DAG da un singolo modulo Python inserendoli nel globals() del modulo.

Come faccio a limitare il numero di attività simultanee in esecuzione in un DAG?

Ad esempio, vuoi evitare di superare i limiti/le quote di utilizzo dell'API oppure evitare di eseguire troppi processi simultanei.

Puoi definire i pool di Airflow nella UI web di Airflow e associare le attività ai pool esistenti nei tuoi DAG.

Domande frequenti sull'utilizzo degli operatori

Devo utilizzare DockerOperator?

Non è consigliabile utilizzare DockerOperator, a meno che non venga utilizzato per avviare i container in un'installazione Docker remota (non all'interno di un cluster dell'ambiente). In un ambiente Cloud Composer l'operatore non ha accesso ai daemon di Docker.

Utilizza invece KubernetesPodOperator o GKEStartPodOperator. Questi operatori avviano rispettivamente i pod Kubernetes nei cluster Kubernetes o GKE. Nota che non è consigliabile avviare i pod nel cluster di un ambiente perché ciò potrebbe causare concorrenza nelle risorse.

Devo utilizzare SubDagOperator?

Non è consigliabile utilizzare SubDagOperator.

Utilizza alternative come suggerito nelle istruzioni per le attività di raggruppamento

Devo eseguire il codice Python solo in PythonOperators per separare completamente gli operatori Python?

A seconda dell'obiettivo che hai scelto, hai a disposizione alcune opzioni.

Se il tuo unico problema è mantenere separate le dipendenze Python, puoi utilizzare PythonVirtualenvOperator.

Prendi in considerazione l'utilizzo di KubernetesPodOperator. Questo operatore consente di definire i pod Kubernetes ed eseguire i pod in altri cluster.

Come faccio ad aggiungere pacchetti binari personalizzati o non PyPI?

Puoi installare pacchetti ospitati in repository privati.

Puoi anche utilizzare KubernetesPodOperator per eseguire un pod Kubernetes con la tua immagine creata con pacchetti personalizzati.

Come posso trasmettere in modo uniforme gli argomenti a un DAG e alle sue attività?

Puoi utilizzare il supporto integrato di Airflow per il modello Jinja per trasmettere argomenti che possono essere utilizzati nei campi basati su modelli.

Quando viene eseguita la sostituzione del modello?

La sostituzione dei modelli avviene sui worker di Airflow poco prima che venga richiamata la funzione pre_execute di un operatore. In pratica, ciò significa che i modelli vengono sostituiti solo prima dell'esecuzione di un'attività.

Come posso sapere quali argomenti dell'operatore supportano la sostituzione dei modelli?

Gli argomenti dell'operatore che supportano la sostituzione del modello Jinja2 sono contrassegnati esplicitamente come tali.

Cerca il campo template_fields nella definizione dell'operatore, che contiene un elenco di nomi di argomenti che verranno sottoposti alla sostituzione del modello.

Ad esempio, vedi BashOperator, che supporta i modelli per gli argomenti bash_command e env.

Evita di utilizzare operatori Airflow deprecati

Gli operatori elencati nella seguente tabella sono obsoleti. Evita di utilizzarli nei DAG. Utilizza invece alternative aggiornate.

Operatore deprecato Operatore da utilizzare
Esecuzione query BigQuery BigQuery BigQueryInsertJobOperator
Operatore BigQueryPatchDataset Operazione BigQueryUpdateTable
DataflowCreateJavaJobOperator BeamRunJavaPipelineOperator
DataflowCreatePythonJobOperator Operatore BeamRunPythonPipeline
DataprocScaleClusterOperator DataprocUpdateClusterOperator
DataprocInvioPigJobOperator DataprocInviaJobOperator
DataprocInviaSparkSqlJobOperator DataprocInviaJobOperator
DataprocInviaSparkJobOperator DataprocInviaJobOperator
DataprocInviaHadoopJobOperator DataprocInviaJobOperator
DataprocInviaPySparkJobOperator DataprocInviaJobOperator
MLEngineManageModelOperator MLEngineCreateModelOperator, MLEngineGetModelOperator
MLEngineManageVersionOperator MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion
GCSObjectsWtihPrefixExistenceSensor GCSObjectsWithPrefixExistenceSensor

Passaggi successivi