DAGs hinzufügen und aktualisieren

Cloud Composer 1 Cloud Composer 2

Auf dieser Seite wird beschrieben, wie Sie DAGs in Ihrer Cloud Composer-Umgebung verwalten.

Cloud Composer verwendet einen Cloud Storage-Bucket zum Speichern von DAGs Ihrer Cloud Composer-Umgebung. Ihre Umgebung synchronisiert DAGs aus diesem Bucket mit Airflow-Komponenten wie Airflow-Worker und Planer.

Hinweise

  • Da Apache Airflow keine strikte DAG-Isolation bietet, empfehlen wir, separate Produktions- und Testumgebungen einzurichten, um DAG-Interferenzen zu vermeiden. Weitere Informationen finden Sie unter DAGs testen.
  • Prüfen Sie, ob Ihr Konto ausreichende Berechtigungen zum Verwalten von DAGs hat.
  • Änderungen am DAG werden innerhalb von 3 bis 5 Minuten an Airflow übertragen. Sie können den Aufgabenstatus in der Airflow-Weboberfläche anzeigen.

Auf den Bucket Ihrer Umgebung zugreifen

So greifen Sie auf den Bucket zu, der Ihrer Umgebung zugeordnet ist:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Suchen Sie in der Liste der Umgebungen eine Zeile mit dem Namen Ihrer Umgebung und klicken Sie in der Spalte DAGs-Ordner auf den Link DAGs. Die Seite Bucket-Details wird geöffnet. Es wird der Inhalt des Ordners /dags im Bucket Ihrer Umgebung angezeigt.

gcloud

Die gcloud CLI enthält separate Befehle zum Hinzufügen und Löschen von DAGs im Bucket Ihrer Umgebung.

Wenn Sie mit dem Bucket Ihrer Umgebung interagieren möchten, können Sie auch das gsutil-Befehlszeilentool verwenden. Führen Sie den folgenden gcloud CLI-Befehl aus, um die Adresse des Buckets Ihrer Umgebung abzurufen:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Beispiel:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Erstellen Sie eine API-Anfrage environments.get. In der Ressource Environment, in der Ressource EnvironmentConfig, in der Ressource dagGcsPrefix die Adresse des Buckets Ihrer Umgebung.

Beispiel:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Verwenden Sie die Bibliothek google-auth, um Anmeldedaten zu erhalten, und die Bibliothek requests, um die REST API aufzurufen.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

DAG hinzufügen oder aktualisieren

Wenn Sie einen DAG hinzufügen oder aktualisieren möchten, verschieben Sie die Python-Datei .py für den DAG in den Ordner /dags im Bucket der Umgebung.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Suchen Sie in der Liste der Umgebungen eine Zeile mit dem Namen Ihrer Umgebung und klicken Sie in der Spalte DAGs-Ordner auf den Link DAGs. Die Seite Bucket-Details wird geöffnet. Es wird der Inhalt des Ordners /dags im Bucket Ihrer Umgebung angezeigt.

  3. Klicken Sie auf Dateien hochladen. Wählen Sie dann im Dialogfeld des Browsers die Python-Datei .py für den DAG aus und bestätigen Sie die Auswahl.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.
  • LOCAL_FILE_TO_UPLOAD ist die Python-Datei .py für den DAG.

Beispiel:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

DAG mit aktiven DAG-Ausführungen aktualisieren

Wenn Sie einen DAG aktualisieren, der aktive DAGs ausführt, gilt Folgendes:

  • Alle aktuell ausgeführten Aufgaben werden mit der ursprünglichen DAG-Datei abgeschlossen.
  • Alle Aufgaben, die geplant sind, aber derzeit nicht ausgeführt werden, verwenden die aktualisierte DAG-Datei.
  • Alle Aufgaben, die in der aktualisierten DAG-Datei nicht mehr vorhanden sind, werden als entfernt markiert.

DAGs aktualisieren, die einen engen Zeitplan haben

Nachdem Sie eine DAG-Datei hochgeladen haben, dauert es einige Zeit, bis Airflow diese Datei geladen und den DAG aktualisiert hat. Wenn Ihr DAG nach einem regelmäßigen Zeitplan ausgeführt wird, sollten Sie dafür sorgen, dass der DAG die aktualisierte Version der DAG-Datei verwendet. Anleitung:

  1. Pausieren Sie den DAG in der Airflow-UI.
  2. Laden Sie eine aktualisierte DAG-Datei hoch.
  3. Warten Sie, bis die Aktualisierungen in der Airflow-UI angezeigt werden. Das bedeutet, dass der DAG vom Planer korrekt geparst und in der Airflow-Datenbank aktualisiert wurde.

    Wenn die Airflow-UI die aktualisierten DAGs anzeigt, ist dadurch nicht garantiert, dass Airflow-Worker die aktualisierte Version der DAG-Datei haben. Das liegt daran, dass DAG-Dateien unabhängig für Planer und Worker synchronisiert werden.

  4. Sie können die Wartezeit verlängern, damit die DAG-Datei mit allen Workern in Ihrer Umgebung synchronisiert wird. Die Synchronisierung erfolgt mehrmals pro Minute. In einer fehlerfreien Umgebung reicht es, wenn 20 bis 30 Sekunden gewartet werden, bis alle Worker synchronisiert wurden.

  5. (Optional) Wenn Sie absolut sicher sein möchten, dass alle Worker die neue Version der DAG-Datei haben, prüfen Sie die Logs jedes einzelnen Workers. Anleitung:

    1. Öffnen Sie in der Google Cloud Console den Tab Logs für Ihre Umgebung.

    2. Wechseln Sie zu Composer-Logs > Infrastruktur > Cloud Storage-Synchronisierung und prüfen Sie die Logs jedes Workers in Ihrer Umgebung. Suchen Sie nach dem neuesten Syncing dags directory-Logelement, das nach dem Hochladen der neuen DAG-Datei einen Zeitstempel hat. Wenn Sie ein Finished syncing-Element entdecken, das diesem Element folgt, wurden die DAGs erfolgreich mit diesem Worker synchronisiert.

  6. Heben Sie die Pausierung des DAG auf.

DAG löschen

In diesem Abschnitt wird beschrieben, wie Sie einen DAG löschen.

DAG in Ihrer Umgebung löschen

Zum Löschen eines DAG entfernen Sie die Python-Datei .py für den DAG aus dem /dags-Ordner der Umgebung im Bucket Ihrer Umgebung.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Suchen Sie in der Liste der Umgebungen eine Zeile mit dem Namen Ihrer Umgebung und klicken Sie in der Spalte DAGs-Ordner auf den Link DAGs. Die Seite Bucket-Details wird geöffnet. Der Inhalt des Ordners /dags im Bucket Ihrer Umgebung wird angezeigt.

  3. Wählen Sie die DAG-Datei aus, klicken Sie auf Löschen und bestätigen Sie den Vorgang.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.
  • DAG_FILE durch die Python-Datei .py für den DAG.

Beispiel:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

DAG aus der Airflow-UI entfernen

So entfernen Sie die Metadaten für einen DAG aus der Airflow-Weboberfläche:

Airflow-UI

  1. Rufen Sie die Airflow-UI für Ihre Umgebung auf.
  2. Klicken Sie für den DAG auf DAG löschen.

gcloud

Führen Sie in der gcloud CLI den folgenden Befehl aus:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.
  • DAG_NAME ist der Name des zu löschenden DAG.

Nächste Schritte