Scrivi i DAG Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa guida mostra come scrivere un grafo diretto aciclico di Apache Airflow (DAG) eseguito in un ambiente Cloud Composer.

Poiché Apache Airflow non fornisce un solido isolamento di DAG e attività, consigliamo di usare ambienti di produzione e di test separati per evitare interferenze DAG. Per ulteriori informazioni, consulta Test dei DAG.

Strutturazione di un DAG Airflow

Un DAG Airflow è definito in un file Python ed è composto da quanto segue componenti:

  • Definizione di DAG
  • Operatori Airflow
  • Relazioni con gli operatori

I seguenti snippet di codice mostrano esempi di ogni componente fuori contesto.

Una definizione di DAG

L'esempio seguente mostra un DAG Airflow definizione:

Flusso d'aria 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Flusso d'aria 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Operatori e attività

Gli operatori Airflow descrivono il lavoro da svolgere. Un'attività task è un'istanza specifica di un operatore.

Flusso d'aria 2

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Flusso d'aria 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Relazioni delle attività

Le relazioni delle attività descrivono l'ordine in cui il lavoro deve essere completato.

Flusso d'aria 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Flusso d'aria 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Esempio di flusso di lavoro DAG completo in Python

Il seguente flusso di lavoro è un modello DAG completo e funzionante, composto da due attività: un'attività hello_python e un'attività goodbye_bash:

Flusso d'aria 2


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Flusso d'aria 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Per saperne di più sulla definizione dei DAG Airflow, consulta Tutorial di Airflow e Concetti di Airflow.

Operatori Airflow

Gli esempi seguenti mostrano alcuni operatori di Airflow più diffusi. Per un riferimento autorevole degli operatori Airflow, consulta Riferimento per operatori e hook e l'indice dei provider.

BashOperator

Utilizza BashOperator per eseguire programmi a riga di comando.

Flusso d'aria 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Flusso d'aria 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer esegue i comandi forniti in uno script Bash su un Worker Airflow. Il worker è un container Docker basato su Debian e include diversi pacchetti.

PythonOperator

Utilizza la PythonOperator per eseguire codice Python arbitrario.

Cloud Composer esegue il codice Python in un container che include per la versione immagine di Cloud Composer utilizzati in del tuo ambiente.

Per installare pacchetti Python aggiuntivi, consulta Installazione delle dipendenze Python.

Operatori Google Cloud

Per eseguire attività che utilizzano i prodotti Google Cloud, utilizza la Operatori di Google Cloud Airflow. Ad esempio: Operatori di BigQuery interrogare ed elaborare i dati in BigQuery.

Esistono molti altri operatori Airflow per Google Cloud e per i singoli servizi forniti da Google Cloud. Consulta Operatori Google Cloud per consultare l'elenco completo.

Flusso d'aria 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Flusso d'aria 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

Utilizza la EmailOperator per inviare email da un DAG. Per inviare le email da un ambiente Cloud Composer, configurare l'ambiente per l'utilizzo di SendGrid.

Flusso d'aria 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Flusso d'aria 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Notifiche in caso di errore dell'operatore

Imposta email_on_failure su True per inviare una notifica via email quando un operatore nel DAG non funziona. Per inviare notifiche email da Cloud Composer dell'ambiente di lavoro, configurare l'ambiente per l'utilizzo di SendGrid.

Flusso d'aria 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Flusso d'aria 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Linee guida per i flussi di lavoro DAG

  • Inserisci eventuali librerie Python personalizzate nell'archivio ZIP di un DAG in una . Non inserire le librerie al livello superiore della directory dei DAG.

    Quando Airflow analizza la cartella dags/, verifica solo la presenza di DAG in I moduli Python che si trovano al primo livello della cartella dei DAG e in alto livello di un archivio ZIP situato anche nella cartella di primo livello dags/. Se Airflow rileva un modulo Python in un archivio ZIP che non contiene entrambe le sottostringhe airflow e DAG, Airflow smette di elaborare il file ZIP . Airflow restituisce solo i DAG trovati fino a quel punto.

  • Utilizza Airflow 2 anziché Airflow 1.

    La community di Airflow non pubblica nuove release secondarie o patch per Airflow 1.

  • Per la tolleranza di errore, non definire più oggetti DAG nello stesso codice in maggior dettaglio più avanti in questo modulo.

  • Non utilizzare i DAG secondari. Invece, raggruppare le attività all'interno dei DAG.

  • Inserisci i file richiesti al momento dell'analisi dei DAG nella cartella dags/, non nella cartella data/.

  • Implementa i test delle unità per i DAG.

  • Testa i DAG sviluppati o modificati come consigliato in istruzioni per testare i DAG.

  • Verifica che i DAG sviluppati non aumentino Tempi di analisi DAG eccessivi.

  • Le attività Airflow possono non riuscire per diversi motivi. Per evitare errori delle intere esecuzioni di DAG, consigliamo di abilitare i nuovi tentativi delle attività. Se imposti il numero massimo di nuovi tentativi su 0, non verranno eseguiti nuovi tentativi.

    Ti consigliamo di sostituire il valore default_task_retries con un valore per il parametro con tentativi diversi da quelli di 0. Inoltre, puoi impostare Parametro retries a livello di attività.

  • Se vuoi utilizzare GPU nelle attività Airflow, crea Cluster GKE basato su nodi che utilizzano macchine con GPU. Utilizza le funzionalità di GKEStartPodOperator per eseguire le tue attività.

  • Evita di eseguire attività che usano molta CPU e memoria nel pool di nodi del cluster, dove sono in esecuzione altri componenti di Airflow (scheduler, worker, server web). Utilizza invece KubernetesPodOperator o GKEStartPodOperator.

  • Quando esegui il deployment dei DAG in un ambiente, carica solo i file che sono assolutamente necessarie per interpretare ed eseguire i DAG nella cartella /dags.

  • Limita il numero di file DAG nella cartella /dags.

    Airflow esegue l'analisi continua dei DAG nella cartella /dags. L'analisi è un che esegue il loop attraverso la cartella dei DAG e il numero di file che che devono essere caricati (con le loro dipendenze) influisce sulle prestazioni dell'analisi dei DAG e della pianificazione delle attività. È molto più efficiente usare 100 file con 100 DAG ciascuno di 10.000 file con 1 DAG ciascuno e quindi tali è consigliabile eseguire l'ottimizzazione. Questa ottimizzazione è un equilibrio tra tempi di analisi ed efficienza della creazione e della gestione dei DAG.

    Puoi anche prendere in considerazione, ad esempio, il deployment di 10.000 file DAG, creare 100 file ZIP ciascuno contenente 100 file DAG.

    Oltre ai suggerimenti precedenti, se hai più di 10.000 file DAG, generare i DAG in modo programmatico potrebbe essere una buona opzione. Ad esempio: puoi implementare un singolo file DAG Python che genera un certo numero di Oggetti DAG (ad esempio, 20, 100 oggetti DAG).

Evita di utilizzare operatori Airflow deprecati

Gli operatori elencati nella seguente tabella sono deprecati. Alcuni di questi operatori erano supportati nelle prime versioni di Cloud Composer 1. Evita di utilizzarli in dei tuoi DAG. Utilizza invece le alternative fornite e aggiornate.

Operatore obsoleto Operatore da utilizzare
BigQueryExecuteQueryOperator BigQueryInsertJobOperator
BigQueryPatchDatasetOperator BigQueryUpdateTableOperator
DataflowCreateJavaJobOperator BeamRunJavaPipelineOperator
DataflowCreatePythonJobOperator BeamRunPythonPipelineOperator
DataprocScaleClusterOperator DataprocUpdateClusterOperator
DataprocSubmitPigJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkSqlJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkJobOperator DataprocSubmitJobOperator
DataprocSubmitHadoopJobOperator DataprocSubmitJobOperator
DataprocSubmitPySparkJobOperator DataprocSubmitJobOperator
MLEngineManageModelOperator MLEngineCreateModelOperator, MLEngineGetModelOperator
MLEngineManageVersionOperator MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion
GCSObjectsWtihPrefixExistenceSensor GCSObjectsWithPrefixExistenceSensor

Domande frequenti sulla scrittura dei DAG

Come faccio a ridurre al minimo le ripetizioni del codice se voglio eseguire attività uguali o simili in più DAG?

Ti consigliamo di definire librerie e wrapper per al minimo le ripetizioni del codice.

Come posso riutilizzare il codice tra i file DAG?

Inserisci le funzioni di utilità in un libreria Python locale e importare le funzioni. Puoi fare riferimento alle funzioni in qualsiasi DAG localizzato nella cartella dags/ del bucket dell'ambiente.

Come faccio a ridurre al minimo il rischio che possano insorgere definizioni diverse?

Ad esempio, supponiamo che tu abbia due team che desiderano aggregare i dati non elaborati metrics. I team scrivono due attività leggermente diverse che svolgono lo stesso cosa. Definisci le librerie lavorare con i dati sulle entrate in modo che gli implementatori DAG debbano chiarire la definizione di entrate aggregate.

Come posso impostare le dipendenze tra i DAG?

Questo dipende da come vuoi definire la dipendenza.

Se hai due DAG (DAG A e DAG B) e vuoi che il DAG B si attivi dopo il DAG R, puoi inserire un TriggerDagRunOperator alla fine del DAG A.

Se il DAG B dipende solo da un artefatto generato dal DAG A, come un un messaggio Pub/Sub, un sensore potrebbe funzionare meglio.

Se il DAG B è integrato strettamente con il DAG A, potresti riuscire a unire i due i DAG in un DAG.

Come posso passare ID esecuzione univoci a un DAG e alle sue attività?

Ad esempio, vuoi passare i nomi dei cluster Dataproc e i percorsi dei file.

Puoi generare un ID univoco casuale restituendo str(uuid.uuid4()) in un PythonOperator. L'ID viene inserito XComs per poter fare riferimento all'ID in altri operatori tramite campi basati su modelli.

Prima di generare un uuid, valuta se un ID specifico per DagRun deve essere più prezioso. Puoi anche fare riferimento a questi ID nelle sostituzioni Jinja utilizzando le macro.

Come si separano le attività in un DAG?

Ogni attività deve essere un'unità di lavoro idempotente. Di conseguenza, dovresti evitare incapsulare un flusso di lavoro in più fasi all'interno di una singola attività, ad esempio una programma in esecuzione su PythonOperator.

Devo definire più attività in un singolo DAG per aggregare dati da più origini?

Ad esempio, se hai più tabelle con dati non elaborati e vuoi creare dati aggregati per ogni tabella. Le attività non dipendono l'una dall'altra. Dovrei crei un'attività e un DAG per ogni tabella o crei un DAG generale?

Se ti va bene che ogni attività condivida le stesse proprietà a livello di DAG, come schedule_interval, allora ha senso definire più attività in un singolo con il DAG. Altrimenti, per ridurre al minimo le ripetizioni del codice, è possibile generare più DAG da un singolo modulo Python inserendoli nell'oggetto globals() del modulo.

Come posso limitare il numero di attività simultanee in esecuzione in un DAG?

Ad esempio, vuoi evitare di superare i limiti/quote di utilizzo delle API eseguendo troppi processi simultanei.

Puoi definire Pool di Airflow nella UI web di Airflow e attività associate con i pool esistenti nei DAG.

Domande frequenti sull'utilizzo degli operatori

Dovrei usare DockerOperator?

Sconsigliamo di utilizzare DockerOperator, a meno che non venga utilizzato per avviare di container su un'installazione Docker remota (non all'interno ). In un ambiente Cloud Composer l'operatore non ha ai daemon Docker.

Usa invece KubernetesPodOperator o GKEStartPodOperator. Questi operatori avviano i pod Kubernetes Kubernetes o GKE. Tieni presente che non consiglia di avviare pod nel cluster di un ambiente, perché alla concorrenza tra le risorse.

Dovrei usare SubDagOperator?

Ti consigliamo di non utilizzare SubDagOperator.

Utilizza le alternative come suggerito in Raggruppamento delle attività.

Devo eseguire il codice Python solo in PythonOperators per separare completamente gli operatori Python?

A seconda del tuo obiettivo, hai a disposizione alcune opzioni.

Se la tua unica preoccupazione è mantenere dipendenze Python separate, puoi usare PythonVirtualenvOperator.

Valuta la possibilità di utilizzare il KubernetesPodOperator. Questo operatore ti consente per definire i pod Kubernetes ed eseguirli in altri cluster.

Come faccio ad aggiungere pacchetti binari personalizzati o non PyPI?

Puoi installare pacchetti ospitati in repository di pacchetti privati.

Come posso passare in modo uniforme gli argomenti a un DAG e alle sue attività?

Puoi utilizzare il supporto integrato di Airflow per Modelli Jinja per trasmettere argomenti che possono essere utilizzati nei campi basati su modelli.

Quando avviene la sostituzione del modello?

La sostituzione del modello viene eseguita sui worker Airflow appena prima di pre_execute di un operatore. In pratica, ciò significa che i modelli non viene sostituito fino a poco prima dell'esecuzione di un'attività.

Come faccio a sapere quali argomenti dell'operatore supportano la sostituzione del modello?

Gli argomenti dell'operatore che supportano la sostituzione del modello Jinja2 sono esplicitamente contrassegnati come tali.

Cerca il campo template_fields nella definizione dell'operatore. che contiene un elenco di nomi di argomenti sottoposti a sostituzione del modello.

Ad esempio, vedi BashOperator, che supporta la creazione di modelli per gli argomenti bash_command e env.

Passaggi successivi