DAGs schreiben (Workflows)

In dieser Anleitung erfahren Sie, wie Sie einen gerichteten azyklischen Graphen (Directed Acyclic Graph; DAG) in Apache Airflow für die Ausführung in einer Cloud Composer-Umgebung schreiben.

DAG strukturieren

Ein DAG wird in einer Python-Datei definiert und besteht aus einer DAG-Definition, Operatoren und Operatorbeziehungen. Die folgenden Code-Snippets zeigen Beispiele der einzelnen Komponenten außerhalb des Kontexts:

  1. Eine DAG-Definition.

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
  2. Operatoren, mit denen die auszuführende Arbeit beschrieben wird. Eine Instanziierung eines Operators wird als Aufgabe bezeichnet.

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')
  3. Operator-Beziehungen, die beschreiben, in welcher Reihenfolge die Arbeit ausgeführt werden soll.

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Der folgende Workflow ist ein vollständiges Beispiel mit den Aufgaben hello_python und goodbye_bash:

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Weitere Informationen zum Definieren von Airflow-DAGs finden Sie in der Airflow-Anleitung und in den Airflow-Konzepten.

Operatoren

Die folgenden Beispiele enthalten einige beliebte Airflow-Operatoren. Eine verbindliche Referenz zu den Airflow-Operatoren finden Sie in der Apache Airflow API-Referenz oder im Quellcode der Operatoren core und contrib.

BashOperator

Mit dem BashOperator können Sie Befehlszeilenprogramme ausführen.

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer führt die bereitgestellten Befehle in einem Bash-Skript auf einem Worker aus. Der Worker ist ein Docker-Container auf Basis von Debian und enthält mehrere Pakete.

PythonOperator

Verwenden Sie den PythonOperator, um beliebigen Python-Code auszuführen. Cloud Composer führt den Python-Code in einem Container mit den folgenden Paketen aus:

Informationen zum Installieren weiterer Python-Pakete finden Sie unter Python-Abhängigkeiten installieren.

Google Cloud-Operatoren

Mit den Google Cloud-Airflow-Operatoren können Sie Aufgaben ausführen, die Google Cloud-Produkte nutzen. Cloud Composer konfiguriert automatisch eine Airflow-Verbindung zum Projekt der Umgebung.

EmailOperator

Verwenden Sie den EmailOperator, um E-Mails von einem DAG zu senden. Wenn Sie E-Mails aus einer Cloud Composer-Umgebung senden möchten, müssen Sie die Verwendung von SendGrid in der Umgebung konfigurieren.

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Mitteilungen

Zum Senden einer E-Mail-Benachrichtigung, wenn ein Operator im DAG fehlerhaft ist, legen Sie für email_on_failure den Wert True fest. Zum Senden von E-Mails aus einer Cloud Composer-Umgebung müssen Sie Ihre Umgebung für die Verwendung von SendGrid konfigurieren.

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Leitlinien

  1. Platzieren Sie benutzerdefinierte Python-Bibliotheken im ZIP-Archiv eines DAG in einem verschachtelten Verzeichnis. Platzieren Sie Bibliotheken nicht auf der obersten Ebene des DAG-Verzeichnisses.

    Airflow prüft den Ordner dags/ nur auf DAGs in Python-Modulen, die sich auf der obersten Ebene des Ordners "DAGs" und auf der obersten Ebene eines ZIP-Archivs befinden, das ebenfalls im Ordner dags/ auf oberster Ebene enthalten ist. Wenn Airflow in einem ZIP-Archiv ein Python-Modul ermittelt, das weder airflow- noch DAG-Teilstrings enthält, beendet Airflow die Verarbeitung des ZIP-Archivs. Airflow liefert nur diejenigen DAGs, die bis zum jeweiligen Zeitpunkt ermittelt wurden.

  2. Achten Sie aus Gründen der Fehlertoleranz darauf, nicht mehrere DAG-Objekte im gleichen Python-Modul zu definieren.

  3. Definieren Sie subDAGs nicht als Objekte der obersten Ebene.

    Im Allgemeinen erfasst Airflow DAG-Objekte im globalen Namespace eines Moduls im Verzeichnis dags/ als DAGs der obersten Ebene. Als Objekte der obersten Ebene definierte SubDags werden zusätzlich zu den Zeitplänen anderer DAGs, in denen die SubDags eingebettet sind, nach ihrem eigenen Zeitplan ausgeführt.

  4. Platzieren Sie Dateien, die zum Zeitpunkt des DAG-Parsens erforderlich sind, im Verzeichnis dags/ und nicht im Verzeichnis data/. Das Verzeichnis data/ wird nicht im Webserver bereitgestellt.

FAQ

DAGs erstellen

Operatoren verwenden

Nächste Schritte