Scrittura DAG Airflow

Cloud Composer 1 | Cloud Composer 2

Questa guida mostra come scrivere un grafo diretto aciclico (DAG) di Apache Airflow eseguito in un ambiente Cloud Composer.

Poiché Apache Airflow non fornisce un isolamento elevato di DAG e attività, ti consigliamo di utilizzare ambienti di produzione e di test separati per evitare interferenze dei DAG. Per maggiori informazioni, consulta la pagina Test dei DAG.

Struttura di un DAG Airflow

Un DAG Airflow è definito in un file Python ed è composto dai seguenti componenti:

  • Definizione di DAG
  • Operatori Airflow
  • Relazioni con gli operatori

I seguenti snippet di codice mostrano esempi di ogni componente fuori contesto.

Una definizione di DAG

L'esempio seguente illustra una definizione di DAG:

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Operatori e attività

Gli operatori descrivono il lavoro da svolgere. Un'attività è un'istanza specifica di un operatore.

Airflow 2

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Relazioni attività

Le relazioni delle attività descrivono l'ordine in cui il lavoro deve essere completato.

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Esempio di flusso di lavoro DAG completo in Python

Il seguente flusso di lavoro è un modello DAG funzionante composto da due attività: un'attività hello_python e un'attività goodbye_bash:

Airflow 2


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Per ulteriori informazioni sulla definizione dei DAG Airflow, consulta il tutorial di Airflow e i concetti di Airflow.

Operatori Airflow

I seguenti esempi mostrano alcuni comuni operatori di Airflow. Per un riferimento autorevole degli operatori Airflow, consulta la pagina relativa a Riferimento per operatori e hook e Indice provider.

BashOperator

Utilizza BashOperator per eseguire programmi a riga di comando.

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer esegue i comandi forniti in uno script Bash su un worker Airflow. Il worker è un container Docker basato su Debian e include diversi pacchetti.

PythonOperator

Utilizza PythonOperator per eseguire codice Python arbitrario.

Cloud Composer esegue il codice Python in un container che include pacchetti per la versione dell'immagine di Cloud Composer utilizzata nel tuo ambiente.

Per installare altri pacchetti Python, consulta la pagina relativa all'installazione delle dipendenze Python.

Operatori Google Cloud

Per eseguire attività che utilizzano i prodotti Google Cloud, utilizza gli operatori di Google Cloud Airflow. Ad esempio, gli operatori BigQuery eseguono query ed elaborano dati in BigQuery.

Esistono molti altri operatori Airflow per Google Cloud e per i singoli servizi forniti da Google Cloud. Per l'elenco completo, consulta la pagina Operatori Google Cloud.

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

Utilizza EmailOperator per inviare email da un DAG. Per inviare email da un ambiente Cloud Composer, devi configurare l'ambiente per l'utilizzo di SendGrid.

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Notifiche in caso di errore dell'operatore

Imposta email_on_failure su True per inviare una notifica via email in caso di errore di un operatore nel DAG. Per inviare notifiche email da un ambiente Cloud Composer, devi configurare l'ambiente per l'utilizzo di SendGrid.

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Linee guida per il flusso di lavoro dei DAG

  1. Inserisci eventuali librerie Python personalizzate nell'archivio ZIP di un DAG in una directory nidificata. Non posizionare le librerie al livello superiore della directory dei DAG.

    Quando Airflow esegue la scansione della cartella dags/, Airflow controlla solo i DAG nei moduli Python che si trovano nella cartella di primo livello della cartella DAG e nel livello superiore di un archivio ZIP anch'esso nella cartella dags/ di primo livello. Se Airflow rileva un modulo Python in un archivio ZIP che non contiene entrambe le sottostringhe airflow e DAG, Airflow smette di elaborare l'archivio ZIP. Airflow restituisce solo i DAG trovati fino a quel punto.

  2. Utilizza Airflow 2 anziché Airflow 1.

    La community di Airflow non pubblica più nuove release secondarie o patch per Airflow 1.

  3. Per la tolleranza di errore, non definire più oggetti DAG nello stesso modulo Python.

  4. Non utilizzare i SubDAG. Puoi invece raggruppare le attività all'interno dei DAG.

  5. Inserisci i file richiesti al momento dell'analisi dei DAG nella cartella dags/, non nella cartella data/.

  6. Implementare i test delle unità per i DAG

  7. Testa i DAG sviluppati o modificati come consigliato nelle istruzioni per il test dei DAG.

  8. Verifica che i DAG sviluppati non aumentino troppo i tempi di analisi DAG.

  9. Le attività Airflow possono non riuscire per diversi motivi. Per evitare errori delle intera esecuzione di DAG, consigliamo di abilitare i nuovi tentativi delle attività. Se imposti il numero massimo di nuovi tentativi su 0, non vengono eseguiti nuovi tentativi.

    Ti consigliamo di eseguire l'override dell'opzione default_task_retries con un valore diverso da 0 per l'attività ritirata. Inoltre, puoi impostare il parametro retries a livello di attività.

  10. Se vuoi utilizzare la GPU nelle attività Airflow, crea un cluster GKE separato basato sui nodi che utilizzano macchine con GPU. Utilizza GKEStartPodOperator per eseguire le attività.

  11. Evita di eseguire attività che consumano molta CPU e memoria nel pool di nodi del cluster dove sono in esecuzione altri componenti Airflow (pianificatori, worker, server web). Usa invece KubernetesPodOperator o GKEStartPodOperator.

  12. Quando esegui il deployment di DAG in un ambiente, carica solo i file assolutamente necessari per l'interpretazione e l'esecuzione dei DAG nella cartella /dags.

  13. Limita il numero di file DAG nella cartella /dags.

    Airflow esegue l'analisi continua dei DAG nella cartella /dags. L'analisi è un processo che esegue un loop nella cartella dei DAG e il numero di file da caricare (con le relative dipendenze) influisce sulle prestazioni dell'analisi dei DAG e della pianificazione delle attività. È molto più efficiente utilizzare 100 file con 100 DAG ciascuno rispetto a 10.000 file con 1 DAG ciascuno, pertanto è consigliabile eseguire questa ottimizzazione. Questa ottimizzazione è un equilibrio tra tempo di analisi ed efficienza della creazione e della gestione dei DAG.

    Ad esempio, per eseguire il deployment di 10.000 file DAG, puoi creare 100 file ZIP, ciascuno contenente 100 file DAG.

    Oltre ai suggerimenti precedenti, se hai più di 10.000 file DAG, potrebbe essere una buona soluzione generare i DAG in modo programmatico. Ad esempio, puoi implementare un singolo file DAG Python che genera un certo numero di oggetti DAG, ad esempio 20.100 oggetti DAG.

Evita di utilizzare operatori Airflow deprecati

Gli operatori elencati nella seguente tabella sono deprecati. Evita di usarle nei DAG. Utilizza invece alternative aggiornate fornite.

Operatore deprecato Operatore da utilizzare
BigQueryExecuteQueryOperator BigQueryInsertJobOperator
BigQueryPatchDatasetOperator BigQueryUpdateTableOperator
DataflowCreateJavaJobOperator BeamRunJavaPipelineOperator
DataflowCreatePythonJobOperator BeamRunPythonPipelineOperator
DataprocScaleClusterOperator DataprocUpdateClusterOperator
DataprocSubmitPigJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkSqlJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkJobOperator DataprocSubmitJobOperator
DataprocSubmitHadoopJobOperator DataprocSubmitJobOperator
DataprocSubmitPySparkJobOperator DataprocSubmitJobOperator
MLEngineManageModelOperator MLEngineCreateModelOperator, MLEngineGetModelOperator
MLEngineManageVersionOperator MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion
GCSObjectsWtihPrefixExistenceSensor GCSObjectsWithPrefixExistenceSensor

Domande frequenti per la scrittura di DAG

Come faccio a ridurre al minimo la ripetizione del codice se voglio eseguire le stesse attività o attività simili in più DAG?

Ti suggeriamo di definire le librerie e i wrapper per ridurre al minimo la ripetizione del codice.

Come posso riutilizzare il codice tra file DAG?

Inserisci le funzioni di utilità in una libreria Python locale e importa le funzioni. Puoi fare riferimento alle funzioni in qualsiasi DAG situato nella cartella dags/ del bucket del tuo ambiente.

Come faccio a ridurre al minimo il rischio che esistano definizioni diverse?

Ad esempio, hai due team che vogliono aggregare i dati non elaborati in metriche sulle entrate. I team scrivono due attività leggermente diverse che svolgono la stessa funzione. Definire le librerie per usare i dati sulle entrate in modo che gli implementatori dei DAG debbano chiarire la definizione di entrate aggregate.

Come faccio a impostare le dipendenze tra i DAG?

Ciò dipende da come vuoi definire la dipendenza.

Se hai due DAG (DAG A e DAG B) e vuoi che il DAG B si attivi dopo il DAG A, puoi inserire un TriggerDagRunOperator alla fine del Dag A.

Se il DAG B dipende solo da un artefatto generato dal DAG A, ad esempio un messaggio Pub/Sub, un sensore potrebbe funzionare meglio.

Se il DAG B è integrato strettamente con il DAG A, potresti essere in grado di unire i due DAG in un DAG.

Come posso passare ID esecuzione univoci a un DAG e alle relative attività?

Ad esempio, vuoi passare i nomi dei cluster e i percorsi dei file di Dataproc.

Puoi generare un ID univoco casuale restituendo str(uuid.uuid4()) in un PythonOperator. L'ID viene inserito in XComs in modo da poter fare riferimento all'ID in altri operatori tramite campi basati su modelli.

Prima di generare un uuid, valuta se un ID specifico per DagRun potrebbe essere più utile. Puoi anche fare riferimento a questi ID nelle sostituzioni Jinja utilizzando le macro.

Come posso separare le attività in un DAG?

Ogni attività deve essere un'unità idempotente di lavoro. Di conseguenza, dovresti evitare di incapsulare un flusso di lavoro a più passaggi all'interno di una singola attività, ad esempio un programma complesso eseguito in un PythonOperator.

Devo definire più attività in un singolo DAG per aggregare i dati di più origini?

Ad esempio, se disponi di più tabelle con dati non elaborati e vuoi creare aggregati giornalieri per ogni tabella. Le attività non dipendono l'una dall'altra. Devi creare un'attività e un DAG per ogni tabella o un DAG generale?

Se ti va bene che ogni attività condivida le stesse proprietà a livello di DAG, come schedule_interval, ha senso definire più attività in un singolo DAG. In caso contrario, per ridurre al minimo la ripetizione del codice, è possibile generare più DAG da un singolo modulo Python inserendoli nell'elemento globals() del modulo.

Come posso limitare il numero di attività simultanee in esecuzione in un DAG?

Ad esempio, vuoi evitare di superare i limiti/quote di utilizzo delle API o evitare di eseguire troppi processi simultanei.

Puoi definire pool Airflow nell'interfaccia utente web di Airflow e associare le attività ai pool esistenti nei tuoi DAG.

Domande frequenti sull'utilizzo degli operatori

Devo usare l'DockerOperator?

Sconsigliamo di utilizzare DockerOperator, a meno che non venga utilizzato per avviare i container su un'installazione Docker remota (non all'interno del cluster di un ambiente). In un ambiente Cloud Composer, l'operatore non ha accesso ai daemon Docker.

Utilizza invece KubernetesPodOperator o GKEStartPodOperator. Questi operatori avviano i pod Kubernetes rispettivamente in cluster Kubernetes o GKE. Tieni presente che sconsigliamo di avviare i pod nel cluster di un ambiente, perché questo può portare alla concorrenza delle risorse.

Devo usare l'SubDagOperator?

Non è consigliabile utilizzare SubDagOperator.

Utilizza le alternative suggerite nelle istruzioni per raggruppare le attività.

Devo eseguire il codice Python solo in PythonOperators per separare completamente gli operatori Python?

A seconda del tuo obiettivo, hai alcune opzioni a disposizione.

Se il tuo unico problema è mantenere dipendenze Python separate, puoi utilizzare PythonVirtualenvOperator.

Valuta la possibilità di utilizzare la KubernetesPodOperator. Questo operatore consente di definire i pod di Kubernetes e di eseguirli in altri cluster.

Come posso aggiungere pacchetti binari personalizzati o non PyPI?

Puoi installare pacchetti ospitati in repository di pacchetti privati.

Puoi anche utilizzare la KubernetesPodOperator per eseguire un pod Kubernetes con la tua immagine creata con pacchetti personalizzati.

Come posso trasferire in modo uniforme gli argomenti a un DAG e alle sue attività?

Puoi utilizzare il supporto integrato di Airflow per i modelli Jinja al fine di passare argomenti che possono essere utilizzati nei campi basati su modelli.

Quando avviene la sostituzione dei modelli?

La sostituzione del modello avviene sui worker Airflow appena prima della chiamata della funzione pre_execute di un operatore. Questo significa che i modelli vengono sostituiti solo prima dell'esecuzione.

Come faccio a sapere quali argomenti dell'operatore supportano la sostituzione dei modelli?

Gli argomenti dell'operatore che supportano la sostituzione dei modelli Jinja2 sono contrassegnati esplicitamente come tali.

Cerca il campo template_fields nella definizione dell'operatore, che contiene un elenco di nomi di argomenti sottoposti alla sostituzione del modello.

Ad esempio, consulta BashOperator, che supporta la creazione di modelli per gli argomenti bash_command e env.

Passaggi successivi