Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Sie eine ereignisbasierte Push-Architektur erstellen, indem Sie Cloud Composer-DAGs als Reaktion auf Änderungen des Pub/Sub-Themas auslösen. Beispiele in dieser Anleitung zeigen, wie der gesamte Zyklus der Pub/Sub-Verwaltung, einschließlich der Aboverwaltung, als Teil des DAG-Prozesses ausgeführt wird. Sie eignet sich für einige der gängigen Anwendungsfälle, wenn Sie DAGs auslösen, aber keine zusätzlichen Zugriffsberechtigungen einrichten möchten.
Beispielsweise können über Pub/Sub gesendete Nachrichten als Lösung verwendet werden, wenn Sie aus Sicherheitsgründen keinen direkten Zugriff auf eine Cloud Composer-Umgebung gewähren möchten. Sie können eine Cloud Functions-Funktion konfigurieren, die Pub/Sub-Nachrichten erstellt und in einem Pub/Sub-Thema veröffentlicht. Sie können dann einen DAG erstellen, der Pub/Sub-Nachrichten abruft und diese Nachrichten dann verarbeitet.
In diesem konkreten Beispiel erstellen Sie eine Cloud Function und stellen zwei DAGs bereit. Der erste DAG ruft Pub/Sub-Nachrichten ab und löst den zweiten DAG entsprechend dem Pub/Sub-Nachrichteninhalt aus.
In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und der Google Cloud Console vertraut sind.
Lernziele
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:
Wenn Sie diese Anleitung abgeschlossen haben, können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie eines:
Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.
Sorgen Sie dafür, dass Ihr Google Cloud-Projektnutzer die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:
- Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Pub/Sub-Bearbeiter (
roles/pubsub.editor
) - Administrator für Umgebung und Storage-Objekte (
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Functions-Administrator (
roles/cloudfunctions.admin
) - Loganzeige (
roles/logging.viewer
)
- Dienstkontonutzer (
Prüfen Sie, ob das Dienstkonto, mit dem die Cloud Functions-Funktion ausgeführt wird, in Ihrem Projekt über ausreichende Berechtigungen für den Zugriff auf Pub/Sub verfügt. Cloud Functions verwendet standardmäßig das App Engine-Standarddienstkonto. Dieses Dienstkonto hat die Rolle Bearbeiter, die über ausreichende Berechtigungen für diese Anleitung verfügt.
Die APIs für Ihr Projekt aktivieren
Console
Cloud Composer, Cloud Functions, and Pub/Sub APIs aktivieren.
gcloud
Aktivieren Sie die Cloud Composer, Cloud Functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Aktivieren Sie die Cloud Composer API in Ihrem Projekt. Fügen Sie dazu dem Terraform-Skript die folgenden Ressourcendefinitionen hinzu:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Im Rahmen dieser Vorgehensweise gewähren Sie dem Composer-Dienst-Agent-Konto die Rolle Dienst-Agent-Erweiterung für Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext
). Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud-Projekt auszuführen.
Pub/Sub-Thema erstellen
In diesem Beispiel wird ein DAG als Antwort auf eine Nachricht ausgelöst, die an ein Pub/Sub-Thema gesendet wurde. Erstellen Sie ein Pub/Sub-Thema für dieses Beispiel:
Console
Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.
Klicken Sie auf Thema erstellen.
Geben Sie im Feld Themen-ID
dag-topic-trigger
als ID für Ihr Thema ein.Übernehmen Sie für alle anderen Optionen die Standardeinstellungen.
Klicken Sie auf Thema erstellen.
gcloud
Führen Sie in der Google Cloud CLI den Befehl gcloud pubsub topics create aus, um ein Thema zu erstellen:
gcloud pubsub topics create dag-topic-trigger
Terraform
Fügen Sie dem Terraform-Skript die folgenden Ressourcendefinitionen hinzu:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
DAGs hochladen
Laden Sie DAGs in Ihre Umgebung hoch:
- Speichern Sie die folgende DAG-Datei auf Ihrem lokalen Computer.
- Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
- Laden Sie die bearbeitete DAG-Datei in Ihre Umgebung hoch.
Der Beispielcode enthält zwei DAGs: trigger_dag
und target_dag
.
Der DAG trigger_dag
abonniert ein Pub/Sub-Thema, ruft Pub/Sub-Nachrichten ab und löst einen weiteren DAG aus, der in der DAG-ID der Pub/Sub-Nachrichtendaten angegeben ist. In diesem Beispiel löst trigger_dag
den DAG target_dag
aus, der Meldungen an die Aufgabenlogs ausgibt.
Der DAG trigger_dag
enthält die folgenden Aufgaben:
subscribe_task
: Abonnieren Sie ein Pub/Sub-Thema.pull_messages_operator
: Liest Pub/Sub-Nachrichtendaten mitPubSubPullOperator
.trigger_target_dag
: Lösen Sie einen weiteren DAG (in diesem Beispieltarget_dag
) entsprechend den Daten in den Nachrichten aus, die aus dem Pub/Sub-Thema abgerufen wurden.
Der DAG target_dag
enthält nur eine Aufgabe: output_to_logs
. Diese Aufgabe gibt Nachrichten mit einer Verzögerung von einer Sekunde an das Aufgabenlog aus.
Cloud Functions-Funktion bereitstellen, die Nachrichten in einem Pub/Sub-Thema veröffentlicht
In diesem Abschnitt stellen Sie eine Cloud Function bereit, die Nachrichten in einem Pub/Sub-Thema veröffentlicht.
Cloud Functions-Funktion erstellen und Konfiguration angeben
Console
Wechseln Sie in der Google Cloud Console zur Seite Cloud Functions.
Klicken Sie auf Funktion erstellen.
Wählen Sie im Feld Umgebung die Option 1. Generation aus.
Geben Sie im Feld Funktionsname den Namen für die Funktion ein:
pubsub-publisher
.Wählen Sie im Feld Trigger die Option HTTP aus.
Wählen Sie im Abschnitt Authentifizierung die Option Nicht authentifizierte Aufrufe zulassen aus. Diese Option gewährt nicht authentifizierten Nutzern die Möglichkeit, eine HTTP-Funktion aufzurufen.
Klicken Sie auf „Speichern“.
Klicken Sie auf Weiter, um zum Schritt Code zu gelangen.
Terraform
Verwenden Sie für diesen Schritt gegebenenfalls die Google Cloud Console, da es keine einfache Möglichkeit gibt, den Quellcode der Funktion über Terraform zu verwalten.
In diesem Beispiel wird gezeigt, wie Sie eine Cloud Function aus einer lokalen ZIP-Archivdatei hochladen. Dazu erstellen Sie einen Cloud Storage-Bucket, speichern die Datei in diesem Bucket und verwenden die Datei aus dem Bucket als Quelle für Cloud Function. Bei diesem Ansatz aktualisiert Terraform den Quellcode Ihrer Funktion nicht automatisch, auch wenn Sie eine neue Archivdatei erstellen. Um den Funktionscode wieder hochzuladen, können Sie den Dateinamen des Archivs ändern.
- Laden Sie die Dateien
pubsub_publisher.py
undrequirements.txt
herunter. - Ersetzen Sie in der Datei
pubsub_publisher.py
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
. - Erstellen Sie ein ZIP-Archiv namens
pubsub_function.zip
mit der Dateipbusub_publisner.py
undrequirements.txt
. - Speichern Sie das ZIP-Archiv in einem Verzeichnis, in dem Ihr Terraform-Script gespeichert ist.
- Fügen Sie dem Terraform-Skript die folgenden Ressourcendefinitionen hinzu und ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Codeparameter für Cloud Functions-Funktion angeben
Console
Wählen Sie im Schritt Code im Feld Laufzeit die Sprachlaufzeit aus, die Ihre Funktion verwendet. Wählen Sie in diesem Beispiel Python 3.10 aus.
Geben Sie im Feld Einstiegspunkt den Wert
pubsub_publisher
ein. Dies ist der Code, der bei der Ausführung Ihrer Cloud Functions-Funktion ausgeführt wird. Der Wert dieses Flags muss ein Funktionsname oder ein vollständig qualifizierter Klassenname sein, der im Quellcode vorhanden ist.
Terraform
Diesen Schritt überspringen. Parameter von Cloud Function sind bereits in der Ressource google_cloudfunctions_function
definiert.
Cloud Function-Code hochladen
Console
Wählen Sie im Feld Quellcode die entsprechende Option für die Bereitstellung des Quellcodes der Funktion aus. In dieser Anleitung fügen Sie den Funktionscode mit dem Inline-Editor von Cloud Functions hinzu. Alternativ können Sie eine ZIP-Datei hochladen oder Cloud Source Repositories verwenden.
- Fügen Sie das folgende Codebeispiel in die Datei main.py ein.
- Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
Terraform
Diesen Schritt überspringen. Parameter von Cloud Function sind bereits in der Ressource google_cloudfunctions_function
definiert.
Cloud Function-Abhängigkeiten angeben
Console
Geben Sie die Funktionsabhängigkeiten in der Metadatendatei requirements.txt an:
Wenn Sie die Funktion bereitstellen, lädt und installiert Cloud Functions die in der Datei requirements.txt deklarierten Abhängigkeiten mit einer Zeile pro Paket.
Diese Datei muss sich im selben Verzeichnis wie die Datei main.py befinden, die den Funktionscode enthält. Weitere Informationen finden Sie unter Anforderungsdateien in der Dokumentation zu pip
.
Terraform
Diesen Schritt überspringen. Abhängigkeiten von Cloud Function werden in der Datei requirements.txt
im Archiv pubsub_function.zip
definiert.
Cloud Functions-Funktion bereitstellen
Console
Klicken Sie auf Bereitstellen. Wenn die Bereitstellung erfolgreich abgeschlossen wurde, wird die Funktion in der Google Cloud Console auf der Seite Cloud Functions mit einem grünen Häkchen angezeigt.
Prüfen Sie, ob das Dienstkonto, mit dem die Cloud Functions-Funktion ausgeführt wird, in Ihrem Projekt ausreichend Berechtigungen für den Zugriff auf Pub/Sub hat.
Terraform
Initialisieren Sie Terraform:
terraform init
Überprüfen Sie die Konfiguration und achten Sie darauf, dass die von Terraform erstellten oder aktualisierten Ressourcen Ihren Erwartungen entsprechen:
terraform plan
Führen Sie den folgenden Befehl aus, um zu prüfen, ob Ihre Konfiguration gültig ist:
terraform validate
Wenden Sie die Terraform-Konfiguration an, indem Sie den folgenden Befehl ausführen und in die Eingabeaufforderung "yes" eingeben:
terraform apply
Warten Sie, bis Terraform die Meldung „Apply complete“ anzeigt.
Rufen Sie in der Google Cloud Console in der UI Ihre Ressourcen auf, um zu prüfen, ob Terraform sie erstellt oder aktualisiert hat.
Cloud Functions-Funktion testen
So prüfen Sie, ob Ihre Funktion eine Nachricht zu einem Pub/Sub-Thema veröffentlicht und die Beispiel-DAGs wie beabsichtigt funktionieren:
Prüfen Sie, ob die DAGs aktiv sind:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Prüfen Sie die Werte in der Spalte State auf DAGs mit den Namen
trigger_dag
undtarget_dag
. Beide DAGs müssen den StatusActive
haben.
Senden Sie eine Pub/Sub-Testnachricht. Sie können dies in Cloud Shell tun:
Rufen Sie in der Google Cloud Console die Seite Funktionen auf.
Klicken Sie auf den Namen der Funktion:
pubsub-publisher
.Rufen Sie den Tab Test auf.
Geben Sie im Bereich Auslösendes Ereignis konfigurieren das folgende JSON-Schlüssel/Wert-Paar ein:
{"message": "target_dag"}
. Ändern Sie das Schlüssel/Wert-Paar nicht, da diese Nachricht den Test-DAG später auslöst.Klicken Sie im Abschnitt Testbefehl auf In Cloud Shell testen.
Warten Sie im Cloud Shell-Terminal, bis automatisch ein Befehl angezeigt wird. Führen Sie diesen Befehl aus, indem Sie
Enter
drücken.Wenn die Meldung Cloud Shell autorisieren angezeigt wird, klicken Sie auf Autorisieren.
Prüfen Sie, ob der Nachrichteninhalt der Pub/Sub-Nachricht entspricht. In diesem Beispiel muss die Ausgabenachricht mit
Message b'target_dag' with message_length 10 published to
als Antwort Ihrer Funktion beginnen.
Prüfen Sie, ob
target_dag
ausgelöst wurde:Warten Sie mindestens eine Minute, damit eine neue DAG-Ausführung von
trigger_dag
abgeschlossen ist.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf
trigger_dag
, um die Seite DAG-Details aufzurufen. Auf dem Tab Ausführungen wird eine Liste der DAG-Ausführungen für den DAGtrigger_dag
angezeigt.Dieser DAG wird jede Minute ausgeführt und verarbeitet alle von der Funktion gesendeten Pub/Sub-Nachrichten. Wenn keine Nachrichten gesendet wurden, wird die Aufgabe
trigger_target
in den DAG-Ausführungslogs alsSkipped
gekennzeichnet. Wenn DAGs ausgelöst wurden, wird die Aufgabetrigger_target
alsSuccess
markiert.Sehen Sie sich mehrere aktuelle DAG-Ausführungen an, um eine DAG-Ausführung zu finden, bei der alle drei Aufgaben (
subscribe_task
,pull_messages_operator
undtrigger_target
) den StatusSuccess
haben.Kehren Sie zum Tab DAGs zurück und prüfen Sie, ob in der Spalte Erfolgreiche Ausführungen für den DAG
target_dag
eine erfolgreiche Ausführung aufgeführt ist.
Zusammenfassung
In dieser Anleitung haben Sie erfahren, wie Sie mit Cloud Functions Nachrichten in einem Pub/Sub-Thema veröffentlichen und einen DAG bereitstellen, der ein Pub/Sub-Thema abonniert, Pub/Sub-Nachrichten abruft und einen anderen DAG auslöst, der in der DAG-ID der Nachrichtendaten angegeben ist.
Es gibt auch alternative Möglichkeiten zum Erstellen und Verwalten von Pub/Sub-Abos und zum Auslösen von DAGs, die in dieser Anleitung nicht behandelt werden. Sie können beispielsweise Cloud Functions verwenden, um Airflow-DAGs auszulösen, wenn ein bestimmtes Ereignis eintritt. Sehen Sie sich unsere Anleitungen an, um die anderen Google Cloud-Features selbst auszuprobieren.
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder behalten Sie das Projekt bei und löschen Sie die einzelnen Ressourcen.
Projekt löschen
Google Cloud-Projekt löschen:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Console
- Löschen Sie die Cloud Composer-Umgebung. Während dieses Vorgangs löschen Sie auch den Bucket der Umgebung.
- Löschen Sie das Pub/Sub-Thema
dag-topic-trigger
. Löschen Sie die Cloud Functions-Funktion.
Rufen Sie in der Google Cloud Console Cloud Functions auf.
Klicken Sie auf das Kästchen für die Funktion, die Sie löschen möchten:
pubsub-publisher
.Klicken Sie auf Löschen und folgen Sie der Anleitung.
Terraform
- Achten Sie darauf, dass Ihr Terraform-Skript keine Einträge für Ressourcen enthält, die noch für Ihr Projekt benötigt werden. Beispielsweise können Sie einige APIs aktiviert lassen und IAM-Berechtigungen zugewiesen lassen (wenn Sie solche Definitionen zu Ihrem Terraform-Skript hinzugefügt haben).
- Führen Sie
terraform destroy
aus. - Löschen Sie den Bucket der Umgebung manuell. Cloud Composer löscht ihn nicht automatisch. Sie können dies über die Google Cloud Console oder die Google Cloud CLI tun.
Nächste Schritte
- DAGs testen
- HTTP-Funktionen testen
- Cloud Functions-Funktion bereitstellen
- Probieren Sie andere Google Cloud-Funktionen selbst aus. Sehen Sie sich unsere Anleitungen an.