Cloud Composer 1 Cloud Composer 2
Auf dieser Seite wird beschrieben, wie Sie mit Cloud Functions DAGs als Reaktion auf Ereignisse auslösen.
Airflow wurde mit einem regulären Zeitplan ausgeführt. Sie können DAGs jedoch als Reaktion auf Ereignisse auslösen. Eine Möglichkeit, dies zu erreichen, ist die Verwendung von Cloud Functions, um Cloud Composers-Instanzen auszulösen, wenn ein bestimmtes Ereignis eintritt. Sie können beispielsweise eine Funktion erstellen, die einen DAG auslöst, wenn sich ein Objekt in einem Cloud Storage-Bucket ändert oder wenn eine Nachricht an ein Pub/Sub-Thema gesendet wird.
Im vorliegenden Beispiel wird bei jeder Änderung an einem Cloud Storage-Bucket ein DAG ausgeführt. Änderungen an einem Objekt in einem Bucket lösen eine Funktion aus. Diese Funktion sendet eine Anfrage an die Airflow REST API Ihrer Cloud Composer-Umgebung. Airflow verarbeitet diese Anfrage und führt einen DAG aus. Der DAG gibt Informationen zur Änderung aus.
Hinweis
Die APIs für Ihr Projekt aktivieren
Console
Cloud Composer and Cloud Functions APIs aktivieren.
gcloud
Cloud Composer and Cloud Functions APIs aktivieren.
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Airflow REST API aktivieren
Je nach Airflow-Version:
- Für Airflow 2 ist die stabile REST API bereits aktiviert. Wenn die stabile API in Ihrer Umgebung deaktiviert ist, aktivieren Sie die stabile REST API.
- Aktivieren Sie für Airflow 1 die experimentelle REST API.
Cloud Storage-Bucket erstellen
In diesem Beispiel wird ein DAG als Reaktion auf Änderungen in einem Cloud Storage-Bucket ausgelöst. Erstellen Sie einen neuen Bucket, der in diesem Beispiel verwendet werden kann.
URL des Airflow-Webservers abrufen
In diesem Beispiel werden REST API-Anfragen an den Airflow-Webserverendpunkt gesendet.
Sie verwenden den Teil der Airflow-Weboberflächen-URL vor .appspot.com
in Ihrem Cloud Functions-Funktionscode.
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie auf den Namen Ihrer Umgebung.
Rufen Sie auf der Seite Umgebungsdetails den Tab Umgebungsdetails auf.
Die URL des Airflow-Webservers ist in der Airflow-Web-UI aufgeführt.
gcloud
Führen Sie dazu diesen Befehl aus:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Ersetzen Sie:
ENVIRONMENT_NAME
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
client_id des IAM-Proxys abrufen
Zum Senden einer Anfrage an den Airflow REST API-Endpunkt ist die Client-ID des IAM-Proxys erforderlich, der den Airflow-Webserver schützt.
Cloud Composer stellt diese Informationen nicht direkt zur Verfügung. Senden Sie stattdessen eine nicht authentifizierte Anfrage an den Airflow-Webserver und erfassen Sie die Client-ID aus der Weiterleitungs-URL:
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
Ersetzen Sie AIRFLOW_URL
durch die URL der Airflow-Weboberfläche.
Suchen Sie in der Ausgabe nach dem String nach client_id
. Beispiel:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
Speichern Sie den folgenden Code in einer Datei namens get_client_id.py
. Geben Sie Ihre Werte für project_id
, location
und composer_environment
ein und führen Sie den Code dann in Cloud Shell oder Ihrer lokalen Umgebung aus.
DAG über Cloud Functions auslösen
DAG in Ihre Umgebung hochladen
Laden Sie einen DAG in Ihre Umgebung hoch. Der folgende Beispiel-DAG gibt die empfangene DAG-Ausführungskonfiguration aus. Sie lösen diesen DAG über eine Funktion aus, die Sie später in diesem Leitfaden erstellen.
Cloud Functions-Funktion bereitstellen, die den DAG auslöst
Cloud Functions-Funktion-Konfigurationsparameter angeben
Trigger Wählen Sie für dieses Beispiel einen Trigger aus, der arbeitet, wenn ein neues Objekt in einem Bucket erstellt oder ein vorhandenes Objekt überschrieben wird.
Triggertyp Cloud Storage
Ereignistyp Abschließen/Erstellen
Bucket Wählen Sie einen Bucket aus, der diese Funktion auslösen muss.
Bei Fehler noch einmal versuchen: Wir empfehlen, diese Option für dieses Beispiel zu deaktivieren. Wenn Sie eine eigene Funktion in einer Produktionsumgebung verwenden, aktivieren Sie diese Option zur Umgang mit vorübergehenden Fehlern.
Laufzeitdienstkonto im Bereich Laufzeit, Build, Verbindungen und Sicherheitseinstellungen. Wählen Sie abhängig von Ihren Einstellungen eine der folgenden Optionen aus:
Wählen Sie Standardmäßiges Compute Engine-Dienstkonto aus. Mit Standard-IAM-Berechtigungen können in diesem Konto Funktionen ausgeführt werden, die auf Cloud Composer-Umgebungen zugreifen.
Erstellen Sie ein benutzerdefiniertes Dienstkonto mit der Rolle Composer-Nutzer und geben Sie es als Laufzeitdienstkonto für diese Funktion an. Diese Option folgt dem Prinzip der Mindestberechtigung.
Laufzeit und Einstiegspunkt im Schritt Code. Wählen Sie beim Hinzufügen von Code für dieses Beispiel die Python 3.7- oder höhere Laufzeit aus und geben Sie
trigger_dag
als Einstiegspunkt an.
Anforderungen hinzufügen
Geben Sie die Abhängigkeiten in der Datei requirements.txt
an:
Cloud Functions-Funktionscode hinzufügen
Fügen Sie der Datei main.py
den folgenden Code hinzu und ersetzen Sie den folgenden Code:
Ersetze den Wert der Variable
client_id
durch denclient_id
-Wert, den du zuvor abgerufen hast.Ersetzen Sie den Wert der Variable
webserver_id
durch die Mandantenprojekt-ID, die Teil der URL der Airflow-Weboberfläche vor.appspot.com
ist. Sie haben die URL der Airflow-Weboberfläche bereits erhalten.Geben Sie die von Ihnen verwendete Airflow REST API-Version an:
- Wenn Sie die stabile Airflow API verwenden, legen Sie die Variable
USE_EXPERIMENTAL_API
aufFalse
fest. - Wenn Sie die experimentelle Airflow REST API verwenden, sind keine Änderungen erforderlich. Die Variable
USE_EXPERIMENTAL_API
ist bereits aufTrue
festgelegt.
- Wenn Sie die stabile Airflow API verwenden, legen Sie die Variable
Funktion testen
So prüfen Sie, ob die Funktion und der DAG wie gewünscht funktionieren:
- Warten Sie, bis die Funktion bereitgestellt ist.
- Laden Sie eine Datei in Ihren Cloud Storage-Bucket hoch. Sie können die Funktion auch manuell auslösen, indem Sie in der Google Cloud Console die Aktion Funktion testen auswählen.
- Rufen Sie die DAG-Seite auf der Airflow-Weboberfläche auf. Der DAG sollte einen aktiven oder bereits abgeschlossenen DAG-Ausführung enthalten.
- Prüfen Sie in der Airflow-UI die Aufgabenlogs für diese Ausführung. Sie sollten sehen, dass die Aufgabe
print_gcs_info
die von der Funktion empfangenen Daten an die Logs ausgibt:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
Weitere Informationen
- Auf die Airflow-UI zugreifen
- Auf die Airflow REST API zugreifen
- DAGs schreiben
- Cloud Functions schreiben
- Cloud Storage-Trigger