Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite erfahren Sie, wie Sie eine ereignisbasierte Push-Architektur erstellen, indem Sie Cloud Composer-DAGs als Reaktion auf Änderungen an Pub/Sub-Themen auslösen. In den Beispielen in dieser Anleitung wird der vollständige Pub/Sub-Verwaltungszyklus, einschließlich der Aboverwaltung, als Teil des DAG-Prozesses veranschaulicht. Sie eignet sich für einige der gängigen Anwendungsfälle, in denen Sie DAGs auslösen, aber keine zusätzlichen Zugriffsberechtigungen einrichten möchten.
Nachrichten, die über Pub/Sub gesendet werden, können beispielsweise als Lösung verwendet werden, wenn Sie aus Sicherheitsgründen keinen direkten Zugriff auf eine Cloud Composer-Umgebung gewähren möchten. Sie können eine Cloud Run-Funktion konfigurieren, die Pub/Sub-Nachrichten erstellt und in einem Pub/Sub-Thema veröffentlicht. Sie können dann einen DAG erstellen, der Pub/Sub-Nachrichten abholt und verarbeitet.
In diesem Beispiel erstellen Sie eine Cloud Run-Funktion und stellen zwei DAGs bereit. Der erste DAG ruft Pub/Sub-Nachrichten ab und löst den zweiten DAG entsprechend dem Inhalt der Pub/Sub-Nachricht aus.
In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und der Google Cloud Console vertraut sind.
Lernziele
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloudverwendet:
- Cloud Composer (siehe auch zusätzliche Kosten)
- Pub/Sub
- Cloud Run Functions
Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweis
Für diese Anleitung benötigen Sie ein Google Cloud Projekt. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie ein Projekt.
Die Abrechnung für Ihr Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist
Der Google Cloud Nutzer Ihres Projekts muss die folgenden Rollen haben, um die erforderlichen Ressourcen zu erstellen:
- Service Account User (
roles/iam.serviceAccountUser
) - Pub/Sub-Bearbeiter (
roles/pubsub.editor
) - Administrator für Umgebung und Storage-Objekte
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run-Administrator (
roles/cloudfunctions.admin
) - Loganzeige (
roles/logging.viewer
)
- Service Account User (
Das Dienstkonto, über das Ihre Cloud Run-Funktion ausgeführt wird, muss in Ihrem Projekt ausreichende Berechtigungen für den Zugriff auf Pub/Sub haben. Cloud Run-Funktionen verwenden standardmäßig das App Engine-Standarddienstkonto. Dieses Dienstkonto hat die Rolle Bearbeiter, die für diese Anleitung ausreichende Berechtigungen bietet.
Die APIs für Ihr Projekt aktivieren
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Aktivieren Sie die Cloud Composer API in Ihrem Projekt, indem Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzufügen:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Im Rahmen dieses Verfahrens weisen Sie dem Konto des Composer-Dienst-Agents die Rolle Dienst-Agent-Erweiterung für die Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext
) zu. Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud Projekt auszuführen.
Pub/Sub-Thema erstellen
In diesem Beispiel wird ein DAG als Reaktion auf eine Nachricht ausgelöst, die an ein Pub/Sub-Thema gesendet wurde. Erstellen Sie ein Pub/Sub-Thema, das Sie in diesem Beispiel verwenden möchten:
Console
Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.
Klicken Sie auf Thema erstellen.
Geben Sie im Feld Themen-ID
dag-topic-trigger
als ID für das Thema ein.Behalten Sie für die anderen Optionen die Standardeinstellungen bei.
Klicken Sie auf Thema erstellen.
gcloud
Führen Sie den Befehl gcloud pubsub topics create in der Google Cloud CLI aus, um ein Thema zu erstellen:
gcloud pubsub topics create dag-topic-trigger
Terraform
Fügen Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzu:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
DAGs hochladen
Laden Sie DAGs in Ihre Umgebung hoch:
- Speichern Sie die folgende DAG-Datei auf Ihrem lokalen Computer.
- Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
- Laden Sie die bearbeitete DAG-Datei in Ihre Umgebung hoch.
Der Beispielcode enthält zwei DAGs: trigger_dag
und target_dag
.
Der trigger_dag
-DAG abonniert ein Pub/Sub-Thema, ruft Pub/Sub-Nachrichten ab und löst einen anderen DAG aus, der in der DAG-ID der Pub/Sub-Nachrichtendaten angegeben ist. In diesem Beispiel löst trigger_dag
den DAG target_dag
aus, der Nachrichten in die Aufgabenprotokolle ausgibt.
Der trigger_dag
-DAG enthält die folgenden Aufgaben:
subscribe_task
: Sie abonnieren ein Pub/Sub-Thema.pull_messages_operator
: Pub/Sub-Nachrichtendaten mitPubSubPullOperator
lesentrigger_target_dag
: Auslösen eines anderen DAG (in diesem Beispieltarget_dag
) gemäß den Daten in den Nachrichten, die aus dem Pub/Sub-Thema abgerufen wurden.
Der target_dag
-DAG enthält nur eine Aufgabe: output_to_logs
. Mit dieser Aufgabe werden Nachrichten mit einer Verzögerung von einer Sekunde in das Aufgabenprotokoll gedruckt.
Cloud Run-Funktion bereitstellen, die Nachrichten in einem Pub/Sub-Thema veröffentlicht
In diesem Abschnitt stellen Sie eine Cloud Run-Funktion bereit, die Nachrichten in einem Pub/Sub-Thema veröffentlicht.
Cloud Run-Funktion erstellen und konfigurieren
Console
Wechseln Sie in der Google Cloud Console zur Seite Cloud Run-Funktionen.
Klicken Sie auf Funktion erstellen.
Wählen Sie im Feld Umgebung 1. Generation aus.
Geben Sie im Feld Funktionsname den Namen für die Funktion ein:
pubsub-publisher
.Wählen Sie im Feld Trigger die Option HTTP aus.
Wählen Sie im Bereich Authentifizierung die Option Nicht authentifizierte Aufrufe zulassen aus. Mit dieser Option können nicht authentifizierte Nutzer eine HTTP-Funktion aufrufen.
Klicken Sie auf „Speichern“.
Klicken Sie auf Weiter, um zum Schritt Code zu gelangen.
Terraform
Verwenden Sie für diesen Schritt die Google Cloud Console, da es keine einfache Möglichkeit gibt, den Quellcode der Funktion über Terraform zu verwalten.
In diesem Beispiel wird gezeigt, wie Sie eine Cloud Run-Funktion aus einer lokalen ZIP-Archivdatei hochladen. Dazu erstellen Sie einen Cloud Storage-Bucket, speichern die Datei in diesem Bucket und verwenden die Datei aus dem Bucket dann als Quelle für die Cloud Run-Funktion. Bei diesem Ansatz aktualisiert Terraform den Quellcode Ihrer Funktion nicht automatisch, auch wenn Sie eine neue Archivdatei erstellen. Wenn Sie den Funktionscode noch einmal hochladen möchten, können Sie den Dateinamen des Archivs ändern.
- Laden Sie die Dateien
pubsub_publisher.py
undrequirements.txt
herunter. - Ersetzen Sie in der Datei
pubsub_publisher.py
die Variable<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
. - Erstellen Sie ein ZIP-Archiv mit dem Namen
pubsub_function.zip
mit der Dateipbusub_publisner.py
und der Dateirequirements.txt
. - Speichern Sie das ZIP-Archiv in einem Verzeichnis, in dem sich Ihr Terraform-Script befindet.
- Fügen Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzu und ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Codeparameter für Cloud Run-Funktionen angeben
Console
Wählen Sie im Schritt Code im Feld Laufzeit die Sprachlaufzeit aus, die Ihre Funktion verwendet. Wählen Sie in diesem Beispiel Python 3.10 aus.
Geben Sie im Feld Einstiegspunkt den Wert
pubsub_publisher
ein. Dies ist der Code, der beim Ausführen der Cloud Run-Funktion ausgeführt wird. Der Wert dieses Flags muss ein Funktionsname oder ein voll qualifizierter Klassenname sein, der in Ihrem Quellcode vorhanden ist.
Terraform
Diesen Schritt überspringen. Die Parameter der Cloud Run-Funktion sind bereits in der Ressource google_cloudfunctions_function
definiert.
Code der Cloud Run-Funktion hochladen
Console
Wählen Sie im Feld Quellcode die entsprechende Option für die Bereitstellung des Funktionsquellcodes aus. In dieser Anleitung fügen Sie den Funktionscode mit dem Inline-Editor von Cloud Run Functions hinzu. Alternativ können Sie eine ZIP-Datei hochladen oder Cloud Source Repositories verwenden.
- Fügen Sie das folgende Codebeispiel in die Datei main.py ein.
- Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
Terraform
Diesen Schritt überspringen. Die Parameter der Cloud Run-Funktion sind bereits in der Ressource google_cloudfunctions_function
definiert.
Abhängigkeiten der Cloud Run-Funktion angeben
Console
Geben Sie die Abhängigkeiten der Funktion in der Metadatendatei requirements.txt an:
Wenn Sie die Funktion bereitstellen, lädt und installiert Cloud Run Functions die in der Datei requirements.txt deklarierten Abhängigkeiten, eine Zeile pro Paket.
Diese Datei muss sich im selben Verzeichnis befinden wie die Datei main.py, die den Funktionscode enthält. Weitere Informationen finden Sie in der pip
-Dokumentation unter Anforderungen.
Terraform
Diesen Schritt überspringen. Abhängigkeiten von Cloud Run-Funktionen werden in der Datei requirements.txt
im Archiv pubsub_function.zip
definiert.
Cloud Run-Funktion bereitstellen
Console
Klicken Sie auf Bereitstellen. Wenn die Bereitstellung erfolgreich abgeschlossen wurde, wird die Funktion in der Google Cloud Console auf der Seite Cloud Run Functions mit einem grünen Häkchen angezeigt.
Das Dienstkonto, unter dem Ihre Cloud Run-Funktion ausgeführt wird, muss ausreichende Berechtigungen für den Zugriff auf Pub/Sub in Ihrem Projekt haben.
Terraform
Initialisieren Sie Terraform:
terraform init
Prüfen Sie die Konfiguration und prüfen Sie, ob die Ressourcen, die Terraform erstellen oder aktualisieren wird, Ihren Erwartungen entsprechen:
terraform plan
Führen Sie den folgenden Befehl aus, um zu prüfen, ob Ihre Konfiguration gültig ist:
terraform validate
Wenden Sie die Terraform-Konfiguration an. Führen Sie dazu den folgenden Befehl aus und geben Sie an der Eingabeaufforderung „yes“ ein:
terraform apply
Warten Sie, bis Terraform die Meldung „Apply complete“ anzeigt.
Rufen Sie in der Google Cloud Console Ihre Ressourcen in der Benutzeroberfläche auf, um sicherzustellen, dass Terraform sie erstellt oder aktualisiert hat.
Cloud Run-Funktion testen
So prüfen Sie, ob Ihre Funktion eine Nachricht in einem Pub/Sub-Thema veröffentlicht und die Beispiel-DAGs wie vorgesehen funktionieren:
Prüfen Sie, ob die DAGs aktiv sind:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Prüfen Sie die Werte in der Spalte Status für DAGs mit den Namen
trigger_dag
undtarget_dag
. Beide DAGs müssen den StatusActive
haben.
Senden Sie eine Pub/Sub-Testnachricht. So gehts in Cloud Shell:
Rufen Sie in der Google Cloud Console die Seite Functions auf.
Klicken Sie auf den Namen Ihrer Funktion,
pubsub-publisher
.Rufen Sie den Tab Testen auf.
Geben Sie im Abschnitt Auslösendes Ereignis konfigurieren den folgenden JSON-Schlüssel/Wert ein:
{"message": "target_dag"}
. Ändern Sie das Schlüssel/Wert-Paar nicht, da diese Nachricht später den Test-DAG auslöst.Klicken Sie im Bereich Testbefehl auf In Cloud Shell testen.
Warten Sie im Cloud Shell-Terminal, bis ein Befehl automatisch angezeigt wird. Führen Sie diesen Befehl aus, indem Sie
Enter
drücken.Wenn die Meldung Cloud Shell autorisieren angezeigt wird, klicken Sie auf Autorisieren.
Prüfen Sie, ob der Nachrichteninhalt mit der Pub/Sub-Nachricht übereinstimmt. In diesem Beispiel muss die Ausgabenachricht mit
Message b'target_dag' with message_length 10 published to
beginnen, als Antwort Ihrer Funktion.
Prüfen Sie, ob
target_dag
ausgelöst wurde:Warten Sie mindestens eine Minute, bis ein neuer DAG-Lauf von
trigger_dag
abgeschlossen ist.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf
trigger_dag
, um die Seite DAG-Details aufzurufen. Auf dem Tab Ausführungen wird eine Liste der DAG-Ausführungen für den DAGtrigger_dag
angezeigt.Dieser DAG wird jede Minute ausgeführt und verarbeitet alle Pub/Sub-Nachrichten, die von der Funktion gesendet werden. Wenn keine Nachrichten gesendet wurden, wird die Aufgabe
trigger_target
in den DAG-Ausführungsprotokollen alsSkipped
markiert. Wenn DAGs ausgelöst wurden, wird die Aufgabetrigger_target
alsSuccess
markiert.Suchen Sie in den letzten DAG-Ausführungen nach einer, bei der alle drei Aufgaben (
subscribe_task
,pull_messages_operator
undtrigger_target
) den StatusSuccess
haben.Kehren Sie zum Tab DAGs zurück und prüfen Sie, ob in der Spalte Erfolgreiche Ausführungen für den DAG
target_dag
eine erfolgreiche Ausführung aufgeführt ist.
Zusammenfassung
In dieser Anleitung haben Sie gelernt, wie Sie mit Cloud Run-Funktionen Nachrichten in einem Pub/Sub-Thema veröffentlichen und einen DAG bereitstellen, der ein Pub/Sub-Thema abonniert, Pub/Sub-Nachrichten abruft und einen anderen DAG auslöst, der in der DAG-ID der Nachrichtendaten angegeben ist.
Es gibt auch alternative Möglichkeiten zum Erstellen und Verwalten von Pub/Sub-Abos und zum Triggern von DAGs, die in dieser Anleitung nicht behandelt werden. So können Sie beispielsweise Airflow-DAGs mit Cloud Run-Funktionen auslösen, wenn ein bestimmtes Ereignis eintritt. In unseren Anleitungen können Sie die anderenGoogle Cloud -Funktionen selbst ausprobieren.
Bereinigen
Damit Ihrem Google Cloud -Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, können Sie entweder das Projekt löschen, das die Ressourcen enthält, oder das Projekt beibehalten und die einzelnen Ressourcen löschen.
Projekt löschen
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Console
- Löschen Sie die Cloud Composer-Umgebung. Außerdem wird dabei der Bucket der Umgebung gelöscht.
- Löschen Sie das Pub/Sub-Thema,
dag-topic-trigger
. Löschen Sie die Cloud Run-Funktion.
Rufen Sie in der Google Cloud Console die Cloud Run-Funktionen auf.
Klicken Sie auf das Kästchen neben der Funktion, die Sie löschen möchten,
pubsub-publisher
.Klicken Sie auf Löschen und folgen Sie der Anleitung.
Terraform
- Achten Sie darauf, dass Ihr Terraform-Script keine Einträge für Ressourcen enthält, die für Ihr Projekt noch erforderlich sind. Beispielsweise können Sie einige APIs aktiviert und IAM-Berechtigungen weiterhin zuweisen lassen, wenn Sie Ihrem Terraform-Script entsprechende Definitionen hinzugefügt haben.
- Führen Sie
terraform destroy
aus. - Löschen Sie den Bucket der Umgebung manuell. Cloud Composer löscht sie nicht automatisch. Sie können dies über die Google Cloud Console oder die Google Cloud CLI tun.
Nächste Schritte
- DAGs testen
- HTTP-Funktionen testen
- Cloud Run-Funktion bereitstellen
- Weitere Google Cloud Features testen Anleitungen