Airflow-DAGs schreiben

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

In diesem Leitfaden erfahren Sie, wie Sie einen gerichteten azyklischen Graphen in Apache Airflow schreiben (DAG), der in einer Cloud Composer-Umgebung ausgeführt wird.

Da Apache Airflow keine starke DAG- und Aufgabenisolierung bietet, empfehlen wir, separate Produktions- und Testumgebungen um DAG-Interferenzen zu vermeiden. Weitere Informationen finden Sie unter DAGs testen.

Airflow-DAG strukturieren

Ein Airflow-DAG wird in einer Python-Datei definiert und besteht aus folgenden Komponenten: Komponenten:

  • DAG-Definition
  • Airflow-Operatoren
  • Operatorbeziehungen

Die folgenden Code-Snippets zeigen Beispiele für die einzelnen Komponenten außerhalb des Kontexts.

Eine DAG-Definition

Im folgenden Beispiel wird ein Airflow-DAG veranschaulicht: Definition:

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Operatoren und Aufgaben

Airflow Operator beschreiben die auszuführende Arbeit. Eine Aufgabe task ist eine spezifische Instanz eines Operators.

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Aufgabenbeziehungen

Aufgabenbeziehungen beschreiben die Reihenfolge in die die Arbeit erledigen muss.

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Beispiel für einen vollständigen DAG-Workflow in Python

Der folgende Workflow ist eine vollständige, funktionierende DAG-Vorlage, die aus zwei Aufgaben: die Aufgabe hello_python und die Aufgabe goodbye_bash:


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Weitere Informationen zum Definieren von Airflow-DAGs finden Sie in der Airflow-Anleitung und Airflow-Konzepte

Airflow-Operatoren

Die folgenden Beispiele enthalten einige beliebte Airflow-Operatoren. Für eine Referenz zu Airflow-Operatoren finden Sie in der Operator- und Hook-Referenz und Anbieterindex.

BashOperator

Mit dem BashOperator können Sie Befehlszeilenprogramme ausführen.

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer führt die bereitgestellten Befehle in einem Bash-Skript auf einem Airflow-Worker Der Worker ist ein Debian-basierter Docker-Container und enthält für verschiedene Pakete.

PythonOperator

Verwenden Sie die Methode PythonOperator verwendet, um beliebigen Python-Code auszuführen.

Cloud Composer führt den Python-Code in einem Container aus, der Folgendes enthält: Pakete für die Cloud Composer-Image-Version, die in für Ihre Umgebung.

Informationen zum Installieren weiterer Python-Pakete finden Sie unter Python-Abhängigkeiten installieren.

Google Cloud-Operatoren

Verwenden Sie zum Ausführen von Aufgaben, die Google Cloud-Produkte verwenden, den Google Cloud-Airflow-Operatoren. Beispiel: BigQuery-Operatoren Daten in BigQuery abfragen und verarbeiten können.

Es gibt viele weitere Airflow-Operatoren für Google Cloud und einzelne Dienste von Google Cloud. Weitere Informationen finden Sie unter Eine vollständige Liste der Google Cloud-Operatoren finden Sie hier.

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

EmailOperator

Verwenden Sie die Methode EmailOperator verwendet, um E-Mails von einem DAG zu senden. Senden E-Mails aus einer Cloud Composer-Umgebung zu senden, Konfigurieren Sie Ihre Umgebung für die Verwendung von SendGrid.

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Benachrichtigungen bei Ausfall des Betreibers

Zum Senden einer E-Mail-Benachrichtigung, wenn ein Operator im DAG fehlerhaft ist, legen Sie für email_on_failure den Wert True fest. Zum Senden von E-Mails aus einer Cloud Composer-Umgebung müssen Sie Ihre Umgebung für die Verwendung von SendGrid konfigurieren.

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Richtlinien für DAG-Workflows

  • Platzieren Sie benutzerdefinierte Python-Bibliotheken im ZIP-Archiv eines DAG in einem verschachtelten Verzeichnis. Platzieren Sie Bibliotheken nicht auf der obersten Ebene des DAG-Verzeichnisses.

    Wenn Airflow den Ordner dags/ scannt, sucht Airflow nur nach DAGs in Python-Module, die sich auf der obersten Ebene des DAGs-Ordners und in der obersten Ebene des Ebene eines ZIP-Archivs, das sich auch im obersten Ordner dags/ befindet. Wenn Airflow findet in einem ZIP-Archiv ein Python-Modul, das keine sowohl den airflow- als auch den DAG-Teilstrings verwenden, beendet Airflow die Verarbeitung der ZIP-Datei -Archiv. Airflow gibt nur die DAGs zurück, die bis zu diesem Zeitpunkt gefunden wurden.

  • Achten Sie aus Gründen der Fehlertoleranz darauf, nicht mehrere DAG-Objekte im gleichen Python-Modul zu definieren.

  • Verwenden Sie SubDAGs nicht. Stattdessen Aufgaben in DAGs gruppieren.

  • Platzieren Sie Dateien, die zum Zeitpunkt der DAG-Analyse erforderlich sind, im Ordner dags/, nicht im Ordner data/.

  • Einheitentests für Ihre DAGs implementieren

  • Testen Sie entwickelte oder geänderte DAGs, wie in den Anleitung zum Testen von DAGs.

  • Achten Sie darauf, dass sich entwickelte DAGs nicht erhöhen DAG-Analysezeiten zu oft.

  • Airflow-Aufgaben können aus verschiedenen Gründen fehlschlagen. Um Ausfälle der ganze DAG-Ausführungen ausführen möchten, sollten Sie Wiederholungsversuche für Aufgaben aktivieren. Wenn Sie die maximale Anzahl von Wiederholungsversuchen auf 0 festlegen, werden keine Wiederholungen ausgeführt.

    Wir empfehlen, das Attribut default_task_retries mit einem Wert für die andere Aufgabenwiederholungen als 0. Darüber hinaus können Sie die Parameter retries auf Aufgabenebene.

  • Wenn Sie GPU in Ihren Airflow-Aufgaben verwenden möchten, erstellen Sie eine separate GKE-Cluster, der auf Knoten basiert und Maschinen mit GPUs verwendet. Verwenden Sie GKEStartPodOperator, um Ihre Aufgaben auszuführen.

  • Führen Sie keine CPU- und arbeitsspeicherintensiven Aufgaben im Knotenpool des Clusters aus, andere Airflow-Komponenten (Planer, Worker, Webserver) ausgeführt werden. Verwenden Sie stattdessen KubernetesPodOperator oder GKEStartPodOperator.

  • Wenn Sie DAGs in einer Umgebung bereitstellen, laden Sie nur die Dateien hoch, für die Interpretation und Ausführung von DAGs in den Ordner /dags verschieben.

  • Begrenzen Sie die Anzahl der DAG-Dateien im Ordner „/dags“.

    Airflow parst kontinuierlich DAGs im Ordner /dags. Das Parsing ist ein Prozess, der den DAGs-Ordner und die Anzahl der Dateien durchsucht, (mit ihren Abhängigkeiten) die Leistung beeinträchtigt. von DAG-Parsing und Aufgabenplanung. Es ist viel effizienter, 100 Dateien mit jeweils 100 DAGs über 10.000 Dateien mit jeweils einem DAG. wird empfohlen. Diese Optimierung ist ein Gleichgewicht zwischen Parsing-Zeit und Effizienz von DAG-Erstellung und -Verwaltung

    Sie können auch 10.000 DAG-Dateien bereitstellen, 100 ZIP-Dateien mit jeweils 100 DAG-Dateien erstellen

    Wenn Sie mehr als 10.000 DAG-Dateien haben, könnte eine gute Option sein, DAGs programmatisch zu generieren. Beispiel: können Sie eine einzelne Python-DAG-Datei implementieren, DAG-Objekte (z. B. 20 oder 100 DAG-Objekte)

FAQs zum Schreiben von DAGs

Wie minimiere ich Codewiederholungen, wenn ich die gleichen oder ähnliche Aufgaben in mehreren DAGs ausführen möchte?

Wir empfehlen, Bibliotheken und Wrapper für die Codewiederholung zu minimieren.

Wie kann ich Code in mehreren DAG-Dateien wiederverwenden?

Platzieren Sie Ihre Dienstprogrammfunktionen in einem lokale Python-Bibliothek und importieren die Funktionen. Sie können in allen DAGs, die sich im dags/-Ordner Ihres Buckets befinden, auf die Funktionen verweisen.

Wie minimiere ich das Risiko unterschiedlicher Definitionen?

Angenommen, es gibt zwei Teams, die Rohdaten zu Umsatzkennzahlen zusammenfassen möchten. Die Teams schreiben zwei geringfügig unterschiedliche Aufgaben für den gleichen Sachverhalt. Definieren Sie Bibliotheken für die Arbeit mit den Umsatzdaten, sodass diejenigen, die DAGs implementieren, die Definition des zusammengefassten Umsatzes eindeutig festlegen müssen.

Wie lege ich Abhängigkeiten zwischen DAGs fest?

Das hängt davon ab, wie Sie die Abhängigkeit definieren möchten.

Wenn Sie zwei DAGs haben (DAG A und DAG B) und DAG B nach dem DAG ausgelöst werden soll A: Sie können eine TriggerDagRunOperator am Ende von DAG A.

Wenn DAG B nur von einem von DAG A generierten Artefakt abhängt (z. B. eine Pub/Sub-Meldung), ist ein Sensor möglicherweise besser geeignet.

Wenn DAG B eng mit DAG A integriert ist, können Sie die beiden DAGs möglicherweise in einen DAG zusammenführen.

Wie übergebe ich eindeutige Ausführungs-IDs an einen DAG und die zugehörigen Aufgaben?

Angenommen, es sollen Dataproc-Clusternamen und -Dateipfade übergeben werden.

In diesem Fall können Sie eine zufällige eindeutige ID generieren und dafür str(uuid.uuid4()) in einem PythonOperator zurückgeben. Dadurch wird die ID XComs, damit Sie in anderen Operatoren auf die ID verweisen können mithilfe von Vorlagenfeldern.

Prüfen Sie vor dem Generieren einer uuid, ob eine DagRun-spezifische ID sinnvoller wäre. Sie können auf diese IDs in Jinja-Substitutionen auch mit Makros verweisen.

Wie trenne ich Aufgaben in einem DAG?

Eine Aufgabe sollte eine idempotente Arbeitseinheit sein. Vermeiden Sie es deshalb, einen aus mehreren Schritten bestehenden Workflow in eine einzelne Aufgabe aufzunehmen, z. B. in ein komplexes Programm, das in einem PythonOperator ausgeführt wird.

Soll ich mehrere Aufgaben in einem einzelnen DAG definieren, um Daten aus mehreren Quellen zusammenzufassen?

Angenommen, ich habe mehrere Tabellen mit Rohdaten und möchte tägliche Zusammenfassungen für jede einzelne Tabelle erstellen. Die Aufgaben sind nicht voneinander abhängig. Soll ich eine Aufgabe und einen DAG für jede Tabelle oder einen allgemeinen DAG erstellen?

Wenn es für Sie kein Problem ist, dass jede Aufgabe die gleichen Attribute auf DAG-Ebene verwendet (z. B. schedule_interval), ist es sinnvoll, mehrere Aufgaben in einem einzigen DAG zu definieren. Andernfalls können zur Minimierung der Codewiederholung mehrere DAGs aus einem einzigen Python-Modul generiert werden. Dazu platzieren Sie diese in den globalen globals() des Moduls.

Wie beschränke ich die Anzahl gleichzeitiger Aufgaben, die in einem DAG ausgeführt werden?

Ich möchte z. B. vermeiden, dass API-Nutzungslimits und -kontingente überschritten oder zu viele Prozesse gleichzeitig ausgeführt werden.

Sie können Airflow-Pools in der Airflow-Web-UI und verknüpfte Aufgaben mit vorhandenen Pools in Ihren DAGs.

FAQs zur Verwendung von Operatoren

Soll ich den DockerOperator verwenden?

Wir raten davon ab, Das DockerOperator, es sei denn, es wird zum Starten von Container in einer Docker-Remote-Installation (nicht im Cluster). In einer Cloud Composer-Umgebung hat der Operator keine Zugriff auf Docker-Daemons.

Verwenden Sie stattdessen KubernetesPodOperator oder GKEStartPodOperator. Diese Operatoren starten Kubernetes-Pods in Kubernetes- bzw. GKE-Cluster. Beachten Sie, dass wir empfehlen, Pods im Cluster einer Umgebung zu starten, da dies und Ressourcenwettbewerbe bieten.

Soll ich den SubDagOperator verwenden?

Die Verwendung von SubDagOperator wird nicht empfohlen.

Verwenden Sie Alternativen, wie unter Aufgaben gruppieren beschrieben.

Soll ich Python-Code nur in PythonOperators ausführen, um Python-Operatoren vollständig zu trennen?

Abhängig von Ihrem Ziel haben Sie mehrere Optionen.

Falls Ihr einziges Ziel ist, separate Python-Abhängigkeiten beizubehalten, können Sie PythonVirtualenvOperator verwenden.

Erwägen Sie die Verwendung der KubernetesPodOperator. Mit diesem Operator können Sie um Kubernetes-Pods zu definieren und die Pods in anderen Clustern auszuführen.

Wie füge ich benutzerdefinierte binäre oder Nicht-PyPI-Pakete hinzu?

Sie können dazu Pakete installieren, die in privaten Paket-Repositories gehostet werden,

Wie übergebe ich Argumente einheitlich an einen DAG und die zugehörigen Aufgaben?

Sie können die integrierte Unterstützung von Airflow für Jinja-Vorlagen zum Übergeben von Argumenten, die verwendet werden können in Vorlagenfeldern.

Wann findet die Vorlagenersetzung statt?

Die Vorlagen werden auf den Airflow-Workern unmittelbar vor dem Aufruf der pre_execute-Funktion eines Operators ersetzt. In der Praxis bedeutet das, dass Vorlagen erst kurz vor der Ausführung einer Aufgabe ersetzt.

Wie kann ich erkennen, welche Operatorargumente die Vorlagenersetzung unterstützen?

Operatorargumente, die die Jinja2-Vorlagenersetzung unterstützen, sind explizit gekennzeichnet sind.

Suchen Sie in der Operatordefinition nach dem Feld template_fields. Sie enthält eine Liste von Argumentnamen, für die die Vorlagenersetzung durchgeführt wird.

Siehe zum Beispiel den BashOperator, der die Vorlagenerstellung für die Argumente bash_command und env.

Nächste Schritte