DAGs auslösen

Auf dieser Seite wird gezeigt, wie DAGs in Cloud Composer-Umgebungen ausgelöst werden.

Airflow bietet zwei Möglichkeiten zum Auslösen eines DAG:

  • Trigger nach Zeitplan starten. Wenn Sie einen DAG erstellen, geben Sie einen Plan dafür an. Airflow löst den DAG auf Basis der angegebenen Planungsparameter aus.
  • Manuell auslösen. Sie können einen DAG mit einem Befehl der Airflow-Befehlszeile oder über die Airflow-Weboberfläche auslösen. In Cloud Composer ist der Befehl der Airflow-Befehlszeile, der DAGs auslöst, über den Befehl gcloud composer verfügbar.

Außerdem können Sie mithilfe von Cloud Functions und der Airflow REST API DAGs als Reaktion auf Ereignisse auslösen. Weitere Informationen finden Sie unter DAGs mit Cloud Functions auslösen.

DAG nach einem Zeitplan auslösen

So lösen Sie einen DAG nach einem Zeitplan aus:

  1. Geben Sie die Parameter start_date und schedule_interval in der DAG-Datei an, wie weiter unten in diesem Abschnitt beschrieben.
  2. Laden Sie die DAG-Datei in Ihre Umgebung hoch.

Parameter für die Planung

Wenn Sie einen DAG definieren, geben Sie im Parameter schedule_interval an, wie oft Sie den DAG ausführen möchten. Im Parameter start_date geben Sie an, wann Airflow mit der Planung Ihres DAG beginnen soll. Für Aufgaben in Ihrem DAG kann entweder ein individuelles Startdatum festgelegt werden oder Sie können für alle Aufgaben ein einzelnes Startdatum angeben. Basierend auf dem minimalen Startdatum für Aufgaben in Ihrem DAG und im Zeitplanintervall plant Airflow die DAG-Ausführungen.

So funktioniert die Terminplanung: Nachdem start_date bestanden wurde, wartet Airflow auf das folgende Vorkommen von schedule_interval. Anschließend wird geplant, dass die erste DAG-Ausführung am Ende dieses Zeitplanintervalls ausgeführt wird. Wenn beispielsweise ein DAG planmäßig jede Stunde ausgeführt wird und das Startdatum auf 12:00 Uhr liegt, wird die erste DAG-Ausführung heute um 13:00 Uhr ausgeführt.

Das folgende Beispiel zeigt einen DAG, der jede Stunde ab 17:00 Uhr am 5. April 2021 ausgeführt wird. Mit den im Beispiel verwendeten Parametern plant Airflow für die erste DAG-Ausführung am 5. April 2021 um 16:00 Uhr.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2021
    start_date=datetime(2021, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

Weitere Informationen zu den Planungparametern finden Sie in der Airflow-Dokumentation unter DAG-Ausführungen.

Weitere Beispiele für Planungsparameter

Die folgenden Beispiele für Planungsparameter zeigen, wie die Planung mit verschiedenen Parameterkombinationen funktioniert:

  • Wenn start_date den Wert datetime(2021, 4, 4, 16, 25) hat und schedule_interval der 30 16 * * * ist, findet die erste DAG-Ausführung am 5. April 2021 um 16:30 statt.
  • Wenn start_date datetime(2021, 4, 4, 16, 35) und schedule_interval den Wert 30 16 * * * hat, dann wird die erste DAG-Ausführung am 6. April 2021 um 16:30 ausgeführt. Da das Startdatum nach dem Zeitplanintervall am 4. April 2021 liegt, läuft der DAG nicht am 5. April 2021. Stattdessen endet das Zeitplanintervall am 5. April 2021 um 16:35 Uhr. Der nächste DAG wird am nächsten Tag für 16:30 Uhr geplant.
  • Wenn start_date datetime(2021, 4, 4) und der schedule_interval @daily ist, wird die erste DAG-Ausführung am 5. April 2021 für 00:00 geplant.
  • Wenn start_date datetime(2021, 4, 4, 16, 30) und der schedule_interval 0 * * * * ist, wird die erste DAG-Ausführung am 4. April 2021 um 18:00 Uhr geplant. Nach Ablauf des angegebenen Datums und der Uhrzeit plant Airflow eine DAG-Ausführung um die Minute 0 jeder Stunde. Der nächste Zeitpunkt, zu dem dies geschieht, ist 17:00 Uhr. Derzeit plant Airflow eine DAG-Ausführung am Ende des Zeitplanintervalls, d. h. um 18:00 Uhr.

DAG mit gcloud auslösen

Zum Ausführen eines DAG mit gcloud führen Sie den Befehl trigger_dag der Airflow-Befehlszeile aus:

Airflow 1.10-Befehlszeile

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Airflow 2.0-Befehlszeile

  gcloud beta composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Compute Engine-Region, in der sich die Umgebung befindet.
  • DAG_ID mit dem Namen des DAG.

Weitere Informationen zum Ausführen von Airflow-Befehlszeilenbefehlen in Cloud Composer-Umgebungen finden Sie unter Airflow-Befehlszeilenbefehle ausführen.

Weitere Informationen zu den verfügbaren Befehlen der Airflow-Befehlszeile finden Sie in der Referenz zu gcloud composer environments run.

DAG mit der Airflow-Weboberfläche auslösen

So lösen Sie einen DAG über die Airflow-Weboberfläche aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite "Umgebungen"

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Melden Sie sich mit einem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.

  4. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche DAG auslösen.

  5. (Optional) Geben Sie die DAG-Ausführungskonfiguration an.

  6. Klicken Sie auf Trigger.

Nächste Schritte