Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird erläutert, wie Planung und DAG-Trigger in Airflow funktionieren. einen Zeitplan für einen DAG definieren und wie ein DAG manuell ausgelöst oder pausiert wird.
Airflow-DAGs in Cloud Composer
Airflow-DAGs in Cloud Composer werden ausgeführt in eine oder mehrere Cloud Composer-Umgebungen in Ihrem Projekt arbeiten. Sie laden Quelldateien Ihrer Airflow-DAGs hoch, Cloud Storage-Bucket, der einer Umgebung zugeordnet ist. Die Airflow-Instanz der Umgebung parset dann diese Dateien und plant DAG-Ausführungen gemäß dem Zeitplan der einzelnen DAGs. Während einer DAG-Ausführung plant und führt Airflow die einzelnen Aufgaben, aus denen ein DAG besteht, in einer vom DAG definierten Reihenfolge aus.
Weitere Informationen zu den Hauptkonzepten von Airflow wie Airflow-DAGs, DAG-Ausführungen, Aufgaben oder Operatoren finden Sie auf der Seite Grundlegende Konzepte in der Airflow-Dokumentation.
DAG-Planung in Airflow
Airflow bietet die folgenden Konzepte für seinen Planungsmechanismus:
- Logisches Datum
Ein Datum, an dem ein bestimmter DAG ausgeführt wird.
Dies ist nicht das tatsächliche Datum, an dem Airflow einen DAG ausführt, sondern ein Zeitraum, in dem eine bestimmte DAG-Ausführung verarbeitet werden muss. Für einen DAG, der jeden Tag um 12:00 Uhr ausgeführt werden soll, wäre das logische Datum ebenfalls 12:00 Uhr, an einem bestimmten Tag. Da die Ausführung zweimal täglich erfolgt, der letzten 12 Stunden. Zugleich muss die im Der DAG selbst verwendet möglicherweise gar nicht das logische Datum oder das Zeitintervall. So kann ein DAG beispielsweise dasselbe Script einmal pro Tag ausführen, ohne den Wert des logischen Datums zu verwenden.
In Airflow-Versionen vor 2.2 wird dieses Datum als Ausführungsdatum bezeichnet.
- Ausführungsdatum
Gibt das Datum an, an dem eine bestimmte DAG-Ausführung ausgeführt wird.
Bei einem DAG, der beispielsweise täglich um 12:00 Uhr ausgeführt werden soll, erfolgt die tatsächliche Ausführung des DAG möglicherweise um 12:05 Uhr, also einige Zeit nach Ablauf des logischen Datums.
- Zeitplanintervall
Stellt dar, wann und wie oft ein DAG in logischer Reihenfolge ausgeführt werden muss Daten.
Ein täglicher Zeitplan bedeutet beispielsweise, dass ein DAG einmal pro Tag ausgeführt wird, Die logischen Datumsangaben für die DAG-Ausführungen haben 24-Stunden-Intervalle.
- Startdatum
Gibt an, wann Airflow Ihren DAG planen soll.
Aufgaben in Ihrem DAG können individuelle Startzeiten haben oder Sie können ein einziges Startdatum für alle Aufgaben festlegen. Basiert auf dem frühesten Startdatum für Aufgaben im DAG und im Zeitplanintervall plant Airflow DAG-Ausführungen.
- Nachhol-, Backfill- und Wiederholungsvorgänge
Mechanismen zum Ausführen von DAG-Ausführungen für vergangene Zeiträume.
Catchup führt DAG-Ausführungen aus, die noch nicht ausgeführt wurden. Beispiel: wenn der DAG für einen längeren Zeitraum pausiert und dann wieder aktiviert wurde. Mit Backfill können Sie DAG-Ausführungen für einen bestimmten Zeitraum ausführen. Wiederholungsversuche gibt an, wie viele Versuche Airflow beim Ausführen von Aufgaben über einen DAG ausführen muss.
Die Planung funktioniert so:
Nach Ablauf des Startdatums wartet Airflow auf das nächste Auftreten des Zeitplanintervalls.
Airflow plant die erste DAG-Ausführung am Ende dieses Zeitplans Intervall.
Wenn ein DAG beispielsweise jede Stunde ausgeführt werden soll und das Startdatum um 12:00 Uhr ist, erfolgt die erste DAG-Ausführung heute um 13:00 Uhr.
Im Abschnitt Airflow-DAG planen dieses Dokuments wird beschrieben, wie Sie die Planung für Ihre DAGs mithilfe dieser Konzepte einrichten. Weitere Informationen zu DAG-Ausführungen und Planung finden Sie in der Airflow-Dokumentation unter DAG-Ausführungen.
Möglichkeiten zum Auslösen eines DAG
Airflow bietet die folgenden Möglichkeiten zum Auslösen eines DAG:
Trigger nach Zeitplan starten. Airflow löst den DAG aus basierend auf dem in der DAG-Datei festgelegten Zeitplan.
Manuell auslösen. Sie können einen DAG manuell auslösen über Google Cloud Console, Airflow-UI oder durch Ausführen eines Airflow-Befehlszeilenbefehls über die Google Cloud CLI.
Trigger als Reaktion auf Ereignisse. Als Standardmethode zum Auslösen eines DAG als Reaktion auf Ereignisse wird die Verwendung eines Sensors verwendet.
Weitere Möglichkeiten zum Auslösen von DAGs:
Programmatische Auslösung Sie können einen DAG mit der Airflow REST API auslösen. Zum Beispiel aus einem Python-Script.
Programmatisch als Reaktion auf Ereignisse auslösen. Sie können DAGs in auf Ereignisse reagieren, indem Sie Cloud Run-Funktionen und die Airflow REST API
Hinweise
- Achten Sie darauf, dass Ihr Konto eine Rolle hat, die Objekte im Umgebungs-Buckets aufrufen und DAGs ansehen und auslösen. Weitere Informationen finden Sie unter Zugriffssteuerung.
Airflow-DAG planen
In der DAG-Datei definieren Sie einen Zeitplan für einen DAG. Bearbeiten Sie die DAG-Definition so:
Suchen und bearbeiten Sie die DAG-Datei auf Ihrem Computer. Wenn Sie die DAG-Datei nicht haben, können Sie eine Kopie aus dem Bucket der Umgebung herunterladen. Für einen neuen DAG alle Parameter definieren können, wenn Sie die DAG-Datei erstellen.
Definieren Sie im Parameter
schedule_interval
den Zeitplan. Sie können einen Cron-Ausdruck wie0 0 * * *
oder eine Voreinstellung wie@daily
. Für Weitere Informationen finden Sie unter Cron- und Zeitintervalle. finden Sie in der Airflow-Dokumentation.Airflow bestimmt logische Datumsangaben für DAG-Ausführungen anhand des von Ihnen festgelegten Zeitplans.
Definieren Sie im Parameter
start_date
das Startdatum.Anhand dieses Parameters ermittelt Airflow das logische Datum der ersten DAG-Ausführung.
Optional: Legen Sie mit dem Parameter
catchup
fest, ob Airflow alle vorherigen Ausführungen dieses DAGs vom Startdatum bis zum aktuellen Datum ausführen muss, die noch nicht ausgeführt wurden.DAG-Ausführungen, die während des Nachholens ausgeführt werden, haben ihr logisches Datum in der vergangen sind. Das Ausführungsdatum gibt den Zeitpunkt an, zu dem die DAG-Ausführung tatsächlich ausgeführt wurde. ausgeführt haben.
Optional: Legen Sie mit dem Parameter
retries
fest, wie oft Airflow fehlgeschlagene Aufgaben wiederholen muss. Jeder DAG besteht aus einer oder mehreren einzelnen Aufgaben. Standardmäßig werden Aufgaben in Cloud Composer zweimal wiederholt.Laden Sie die neue Version des DAG in den Bucket der Umgebung hoch.
Warten Sie, bis Airflow den DAG erfolgreich analysiert hat. Sie können beispielsweise prüfen, der Liste der DAGs in Ihrer Umgebung in der In der Google Cloud Console oder in der Airflow-UI
Die folgende Beispiel-DAG-Definition wird zweimal täglich um 00:00 und 12:00 Uhr ausgeführt. Das Startdatum ist auf den 1. Januar 2024 festgelegt. Airflow führt den Workflow nach dem Upload oder Pausieren jedoch nicht für vergangene Zeiträume aus, da die Funktion „Aufholen“ deaktiviert ist.
Der DAG enthält eine Aufgabe namens insert_query_job
, die mit dem Operator BigQueryInsertJobOperator
eine Zeile in eine Tabelle einfügt. Dieser Operator ist einer der BigQuery-Operatoren von Google Cloud, mit denen Sie Datensätze und Tabellen verwalten, Abfragen ausführen und Daten validieren können.
Wenn eine bestimmte Ausführung dieser Aufgabe fehlschlägt, wird sie von Airflow noch viermal mit dem Standardwiederholungsintervall wiederholt. Das logische Datum für diese Wiederholungen bleibt gleich.
In der SQL-Abfrage für diese Zeile werden Airflow-Vorlagen verwendet, um das logische Datum und den Namen des DAG in die Zeile zu schreiben.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Sie können diesen DAG manuell auslösen und sich dann die Logs zur Aufgabenausführung ansehen.
Weitere Beispiele für Planungsparameter
Die folgenden Beispiele für Planungsparameter zeigen, wie die Planung mit verschiedenen Parameterkombinationen funktioniert:
Wenn
start_date
gleichdatetime(2024, 4, 4, 16, 25)
undschedule_interval
gleich30 16 * * *
beginnt, erfolgt die erste DAG-Ausführung am 5. April 2024 um 16:30 Uhr.Wenn
start_date
gleichdatetime(2024, 4, 4, 16, 35)
undschedule_interval
gleich30 16 * * *
beginnt, erfolgt die erste DAG-Ausführung am 6. April 2024 um 16:30 Uhr. Da das Startdatum nach dem Zeitplanintervall am 4. April 2024 liegt, erfolgt die DAG-Ausführung nicht am 5. April 2024. Stattdessen enthält der Zeitplan Das Intervall endet am 5. April 2024 um 16:35 Uhr. Die nächste DAG-Ausführung ist also geplant. am nächsten Tag um 16:30 Uhr.Wenn
start_date
den Wertdatetime(2024, 4, 4)
hat und derschedule_interval
@daily
, dann ist die erste DAG-Ausführung für den 5. April 2024 um 00:00 Uhr geplant.Wenn
start_date
datetime(2024, 4, 4, 16, 30)
ist undschedule_interval
ist0 * * * *
, dann ist die erste DAG-Ausführung geplant für den 4. April 2024 um 18:00 Uhr. Nach dem angegebenen Datum und der angegebenen Uhrzeit plant Airflow eine DAG-Ausführung zur Minute 0 jeder Stunde. Der nächste Zeitpunkt, zu dem dies der Fall ist, ist 17:00 Uhr. Zu diesem Zeitpunkt plant Airflow eine DAG-Ausführung am Ende des Zeitplanintervalls, also um 18:00 Uhr.
DAG manuell auslösen
Wenn Sie einen Airflow-DAG manuell auslösen, wird Airflow ausgeführt einmal, unabhängig von dem in der DAG-Datei angegebenen Zeitplan.
Console
Die DAG-UI wird in Cloud Composer 1.17.8 und höher unterstützt.
So lösen Sie einen DAG über die Google Cloud Console aus:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie eine Umgebung aus, um die zugehörigen Details anzusehen.
Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.
Klicken Sie auf den Namen eines DAG.
Klicken Sie auf der Seite DAG-Details auf DAG auslösen. Eine neue DAG-Ausführung erstellt.
Airflow-UI
So lösen Sie einen DAG über die Airflow-Benutzeroberfläche aus:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Melden Sie sich mit dem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.
Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.
Optional: Geben Sie die DAG-Ausführungskonfiguration an.
Klickbasierter Trigger
gcloud
Führen Sie in Airflow 1.10.12 oder niedriger den Befehl trigger_dag
der Airflow-Befehlszeile aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
Führen Sie in Airflow 1.10.14 oder höher, einschließlich Airflow 2, den Befehl dags trigger
der Airflow-Befehlszeile aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.DAG_ID
: der Name des DAG.
Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Cloud Composer-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.
Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie in der Befehlsreferenz zu gcloud composer environments run
.
DAG-Ausführungslogs und -details ansehen
In der Google Cloud Console haben Sie folgende Möglichkeiten:
- Status früherer DAG-Ausführungen und DAG-Details ansehen
- Detaillierte Protokolle aller DAG-Ausführungen und aller Aufgaben aus diesen DAGs ansehen.
- DAG-Statistiken ansehen
Darüber hinaus bietet Cloud Composer Zugriff auf die Airflow-UI, die eigene Weboberfläche von Airflow.
DAG pausieren
Console
Die DAG-Benutzeroberfläche wird in Cloud Composer 1.17.8 und höher unterstützt.
So pausieren Sie einen DAG über die Google Cloud Console:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Wählen Sie eine Umgebung aus, um die zugehörigen Details anzusehen.
Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.
Klicken Sie auf den Namen eines DAG.
Klicken Sie auf der Seite DAG-Details auf DAG pausieren.
Airflow-UI
So pausieren Sie einen DAG über die Airflow-UI:
- Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Melden Sie sich mit dem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.
Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs auf die Ein/Aus-Schaltfläche neben dem Namen des DAG.
gcloud
Führen Sie in Airflow 1.10.12 oder niedriger den Befehl pause
der Airflow-Befehlszeile aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
pause -- DAG_ID
Führen Sie in Airflow 1.10.14 oder höher, einschließlich Airflow 2, den Befehl dags pause
der Airflow-Befehlszeile aus:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.DAG_ID
: der Name des DAG.
Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Cloud Composer-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.
Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie in der Befehlsreferenz zu gcloud composer environments run
.