Airflow-DAGs planen und auslösen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird erläutert, wie Planung und DAG-Trigger in Airflow funktionieren. einen Zeitplan für einen DAG definieren und wie ein DAG manuell ausgelöst oder pausiert wird.

Airflow-DAGs in Cloud Composer

Airflow-DAGs in Cloud Composer werden in einer oder mehreren Cloud Composer-Umgebungen in Ihrem Projekt ausgeführt. Sie laden Quelldateien Ihrer Airflow-DAGs hoch, Cloud Storage-Bucket, der einer Umgebung zugeordnet ist. Der Umweltschutz parst die Airflow-Instanz diese Dateien und plant DAG-Ausführungen wie definiert nach dem Zeitplan jedes DAG. Während einer DAG-Ausführung plant und führt Airflow die einzelnen Aufgaben, aus denen ein DAG besteht, in einer vom DAG definierten Reihenfolge aus.

Weitere Informationen zu den Kernkonzepten von Airflow wie Airflow-DAGs, DAG-Ausführungen, Aufgaben oder Operatoren finden Sie auf der Seite Core Concepts (Kernkonzepte) in der Airflow-Dokumentation

DAG-Planung in Airflow

Airflow bietet die folgenden Konzepte für den Planungsmechanismus:

Logisches Datum

Stellt ein Datum dar, für das eine bestimmte DAG-Ausführung ausgeführt wird.

Dies ist nicht das tatsächliche Datum, an dem Airflow einen DAG ausführt, sondern ein Zeitraum, in dem eine bestimmte DAG-Ausführung verarbeitet werden muss. Bei einem DAG, der beispielsweise täglich um 12:00 Uhr ausgeführt werden soll, wäre das logische Datum auch der 12:00 Uhr eines bestimmten Tages. Da er zweimal täglich ausgeführt wird, muss er die letzten 12 Stunden verarbeiten. Zugleich muss die im Der DAG selbst verwendet möglicherweise gar nicht das logische Datum oder das Zeitintervall. Ein DAG kann beispielsweise dasselbe Skript einmal täglich ausführen, ohne den Wert des logischen Datums.

In Airflow-Versionen vor 2.2 wird dieses Datum als Ausführungsdatum bezeichnet.

Ausführungsdatum

Gibt das Datum an, an dem eine bestimmte DAG-Ausführung ausgeführt wird.

Bei einem DAG, der beispielsweise täglich um 12:00 Uhr ausgeführt werden soll, erfolgt die tatsächliche Ausführung des DAG möglicherweise um 12:05 Uhr, also einige Zeit nach Ablauf des logischen Datums.

Zeitplanintervall

Gibt an, wann und wie oft ein DAG ausgeführt werden muss, in Form logischer Datumsangaben.

Ein täglicher Zeitplan bedeutet beispielsweise, dass ein DAG einmal pro Tag ausgeführt wird und die logischen Datumsangaben für die DAG-Ausführungen 24-Stunden-Intervalle haben.

Startdatum

Gibt an, wann Airflow Ihren DAG planen soll.

Aufgaben in Ihrem DAG können individuelle Startzeiten haben oder Sie können ein einziges Startdatum für alle Aufgaben festlegen. Basiert auf dem frühesten Startdatum für Aufgaben im DAG und im Zeitplanintervall plant Airflow DAG-Ausführungen.

Catchup, Backfill und Wiederholungsversuche

Mechanismen zur Ausführung von DAG-Ausführungen für vergangene Zeiträume.

Mit der Funktion „Aufholen“ werden DAG-Ausführungen ausgeführt, die noch nicht ausgeführt wurden, z. B. wenn der DAG für einen längeren Zeitraum pausiert und dann wieder fortgesetzt wurde. Sie können Backfill verwenden, um DAG-Ausführungen in einem bestimmten Zeitraum auszuführen. Mit „Wiederholungen“ wird angegeben, wie oft Airflow versuchen muss, Aufgaben aus einem DAG auszuführen.

Die Planung funktioniert so:

  1. Nach Ablauf des Startdatums wartet Airflow auf das nächste Auftreten des Zeitplanintervall.

  2. Airflow plant die erste DAG-Ausführung am Ende dieses Zeitplans Intervall.

    Beispiel: Ein DAG soll stündlich ausgeführt werden und das Startdatum ist die erste DAG-Ausführung um 13:00 Uhr.

Im Abschnitt Airflow-DAG planen in diesem Dokument wird Folgendes beschrieben: wie Sie die Planung für Ihre DAGs mit diesen Konzepten einrichten können. Weitere Informationen Informationen zu DAG-Ausführungen und zur Planung finden Sie unter DAG-Ausführungen finden Sie in der Airflow-Dokumentation.

Möglichkeiten zum Auslösen eines DAG

Airflow bietet die folgenden Möglichkeiten zum Auslösen eines DAG:

  • Trigger nach Zeitplan starten. Airflow löst den DAG aus basierend auf dem in der DAG-Datei festgelegten Zeitplan.

  • Manuell auslösen. Sie können einen DAG manuell über Google Cloud Console, Airflow-UI oder durch Ausführen eines Airflow-Befehlszeilenbefehls über die Google Cloud CLI.

  • Als Reaktion auf Ereignisse auslösen. Die Standardmethode zum Auslösen eines DAG in ist die Verwendung eines Sensors.

Weitere Möglichkeiten zum Auslösen von DAGs:

Hinweise

  • Achten Sie darauf, dass Ihr Konto eine Rolle hat, die Objekte im Umgebungs-Buckets aufrufen und DAGs ansehen und auslösen. Weitere Informationen finden Sie unter Zugriffssteuerung.

Airflow-DAG planen

In der DAG-Datei definieren Sie einen Zeitplan für einen DAG. Bearbeiten Sie die Definition des DAG in auf folgende Weise:

  1. Suchen und bearbeiten Sie die DAG-Datei auf Ihrem Computer. Wenn Sie die DAG-Datei nicht haben, können Sie eine Kopie aus dem Bucket der Umgebung herunterladen. Bei einem neuen DAG können Sie alle Parameter beim Erstellen der DAG-Datei definieren.

  2. Definieren Sie im Parameter schedule_interval den Zeitplan. Sie können einen Cron-Ausdruck wie 0 0 * * * oder eine Voreinstellung wie @daily. Für Weitere Informationen finden Sie unter Cron- und Zeitintervalle. finden Sie in der Airflow-Dokumentation.

    Airflow bestimmt logische Datumsangaben für DAG-Ausführungen anhand des von Ihnen festgelegten Zeitplans.

  3. Definieren Sie im Parameter start_date das Startdatum.

    Anhand dieses Parameters ermittelt Airflow das logische Datum der ersten DAG-Ausführung.

  4. (Optional) Legen Sie im Parameter catchup fest, ob Airflow ausgeführt werden soll. alle vorherigen Ausführungen dieses DAG vom Startdatum bis zum aktuellen Datum, noch nicht ausgeführt.

    Das logische Datum von DAG-Ausführungen, die während des Nachholvorgangs ausgeführt werden, liegt in der Vergangenheit. Das Ausführungsdatum entspricht dem Zeitpunkt, zu dem die DAG-Ausführung tatsächlich stattgefunden hat.

  5. (Optional) Legen Sie im Parameter retries fest, wie oft Airflow verwendet wird. muss Aufgaben wiederholen, die fehlgeschlagen sind. Jeder DAG besteht aus einem oder mehreren Aufgaben). Standardmäßig wird für Aufgaben in Cloud Composer zwei Wiederholungen ausgeführt Mal.

  6. Laden Sie die neue Version des DAG in den Bucket.

  7. Warten Sie, bis Airflow den DAG erfolgreich analysiert hat. Sie können beispielsweise die Liste der DAGs in Ihrer Umgebung in der Google Cloud Console oder in der Airflow-Benutzeroberfläche aufrufen.

Die folgende Beispiel-DAG-Definition wird zweimal täglich um 00:00 und 12:00 Uhr ausgeführt. Das Startdatum ist auf den 1. Januar 2024 festgelegt. Airflow führt den Workflow nach dem Upload oder Pausieren jedoch nicht für vergangene Zeiträume aus, da die Funktion „Aufholen“ deaktiviert ist.

Der DAG enthält eine Aufgabe namens insert_query_job, die mit dem Operator BigQueryInsertJobOperator eine Zeile in eine Tabelle einfügt. Dieser Operator ist einer der BigQuery-Operatoren von Google Cloud, mit denen Sie Datensätze und Tabellen verwalten, Abfragen ausführen und Daten validieren können. Wenn eine bestimmte Ausführung dieser Aufgabe fehlschlägt, wiederholt Airflow sie um vier weitere mit dem standardmäßigen Wiederholungsintervall. Das logische Datum für diese Wiederholungen bleibt gleich.

Die SQL-Abfrage für diese Zeile verwendet Airflow-Vorlagen zum Schreiben des logischen Datums eines DAG und Name in die Zeile ein.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Sie können diesen DAG manuell auslösen und sich dann die Logs zur Aufgabenausführung ansehen.

Weitere Beispiele für Planungsparameter

Die folgenden Beispiele für Planungsparameter zeigen, wie die Planung mit verschiedenen Parameterkombinationen funktioniert:

  • Wenn start_date den Wert datetime(2024, 4, 4, 16, 25) und schedule_interval den Wert 30 16 * * * hat, erfolgt die erste DAG-Ausführung um 16:30 Uhr am 5. April 2024.

  • Wenn start_date gleich datetime(2024, 4, 4, 16, 35) und schedule_interval gleich 30 16 * * * beginnt, erfolgt die erste DAG-Ausführung am 6. April 2024 um 16:30 Uhr. Da das Startdatum nach dem Zeitplanintervall am 4. April 2024 liegt, die DAG-Ausführung nicht am 5. April 2024. Stattdessen enthält der Zeitplan Das Intervall endet am 5. April 2024 um 16:35 Uhr. Die nächste DAG-Ausführung ist also geplant. am nächsten Tag um 16:30 Uhr.

  • Wenn start_date datetime(2024, 4, 4) und der schedule_interval @daily ist, wird die erste DAG-Ausführung am 5. April 2024 um 00:00 Uhr geplant.

  • Wenn start_date datetime(2024, 4, 4, 16, 30) ist und schedule_interval ist 0 * * * *, dann ist die erste DAG-Ausführung geplant für den 4. April 2024 um 18:00 Uhr. Nach dem angegebenen Datum und der angegebenen Uhrzeit plant Airflow eine DAG-Ausführung zur Minute 0 jeder Stunde. Der nächste Zeitpunkt, zu dem dies der Fall ist, ist 17:00 Uhr. Zu diesem Zeitpunkt plant Airflow eine DAG-Ausführung am Ende des Zeitplanintervalls, also um 18:00 Uhr.

DAG manuell auslösen

Wenn Sie einen Airflow-DAG manuell auslösen, führt Airflow ihn unabhängig vom in der DAG-Datei angegebenen Zeitplan einmal aus.

Console

So lösen Sie einen DAG über die Google Cloud Console aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite „Umgebungen“

  2. Wählen Sie eine Umgebung aus, um die zugehörigen Details aufzurufen.

  3. Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.

  4. Klicken Sie auf den Namen eines DAG.

  5. Klicken Sie auf der Seite DAG-Details auf DAG auslösen. Eine neue DAG-Ausführung erstellt.

Airflow-UI

So lösen Sie einen DAG über die Airflow-UI aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Melden Sie sich mit dem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.

  4. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.

  5. Optional: Geben Sie die DAG-Ausführungskonfiguration an.

  6. Klickbasierter Trigger

gcloud

Führen Sie den Befehl dags trigger der Airflow-Befehlszeile aus:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.
  • DAG_ID: der Name des DAG.

Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Cloud Composer-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.

Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie in der Befehlsreferenz zu gcloud composer environments run.

DAG-Ausführungslogs und -details ansehen

In der Google Cloud Console haben Sie folgende Möglichkeiten:

Außerdem bietet Cloud Composer Zugriff auf die Airflow-Benutzeroberfläche, die Weboberfläche von Airflow.

DAG pausieren

Console

So halten Sie einen DAG in der Google Cloud Console an:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite „Umgebungen“

  2. Wählen Sie eine Umgebung aus, um die zugehörigen Details aufzurufen.

  3. Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.

  4. Klicken Sie auf den Namen eines DAG.

  5. Klicken Sie auf der Seite DAG-Details auf DAG pausieren.

Airflow-UI

So pausieren Sie einen DAG über die Airflow-Benutzeroberfläche:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

Zur Seite Umgebungen

  1. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  2. Melden Sie sich mit dem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.

  3. Klicken Sie auf der Airflow-Weboberfläche auf der Seite DAGs auf auf die Ein/Aus-Schaltfläche neben dem Namen des DAG.

gcloud

Führen Sie den Befehl dags pause der Airflow-Befehlszeile aus:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.
  • DAG_ID: der Name des DAG.

Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Cloud Composer-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.

Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie in der Befehlsreferenz zu gcloud composer environments run.

Nächste Schritte