Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite erfahren Sie, wie Sie eine ereignisbasierte Push-Architektur erstellen, indem Sie Cloud Composer-DAGs als Reaktion auf Änderungen an Pub/Sub-Themen auslösen. Beispiele in dieser Anleitung veranschaulichen den gesamten Zyklus der Pub/Sub-Verwaltung, einschließlich der Aboverwaltung, als Teil des DAG-Prozesses. Sie eignet sich für einige der gängigen Anwendungsfälle, in denen Sie DAGs auslösen, aber keine zusätzlichen Zugriffsberechtigungen einrichten möchten.
Beispielsweise können über Pub/Sub gesendete Nachrichten als Lösung dienen. wenn Sie keinen direkten Zugriff auf aus Sicherheitsgründen. Sie können eine Cloud Run-Funktion konfigurieren, die Pub/Sub-Nachrichten erstellt und in einem Pub/Sub-Thema veröffentlicht. Sie können dann einen DAG erstellen, Pub/Sub-Nachrichten abruft und diese Nachrichten dann verarbeitet.
In diesem konkreten Beispiel erstellen Sie eine Cloud Run-Funktion und stellen zwei DAGs. Der erste DAG ruft Pub/Sub-Nachrichten ab und löst den zweiten DAG entsprechend dem Inhalt der Pub/Sub-Nachricht aus.
In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und der Google Cloud Console vertraut sind.
Lernziele
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:
- Cloud Composer (siehe auch zusätzliche Kosten)
- Pub/Sub
- Cloud Run-Funktionen
Nachdem Sie diese Anleitung abgeschlossen haben, können Sie weitere Kosten vermeiden, indem Sie die von Ihnen erstellten Ressourcen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie ein Projekt.
Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.
Der Nutzer Ihres Google Cloud-Projekts muss die folgenden Rollen haben, um die erforderlichen Ressourcen zu erstellen:
- Service Account User (
roles/iam.serviceAccountUser
) - Pub/Sub-Bearbeiter (
roles/pubsub.editor
) - Administrator für Umgebung und Storage-Objekte
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run-Administrator (
roles/cloudfunctions.admin
) - Loganzeige (
roles/logging.viewer
)
- Service Account User (
Achten Sie darauf, dass der Dienstkonto, das Ihre Cloud Run-Funktion ausführt über ausreichende Berechtigungen in Ihrem Projekt verfügt, um auf Pub/Sub zuzugreifen. Cloud Run-Funktionen verwenden standardmäßig das App Engine-Standarddienstkonto. Dieses Dienstkonto hat die Rolle Bearbeiter mit ausreichender Berechtigungen für diese Anleitung.
Die APIs für Ihr Projekt aktivieren
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Aktivieren Sie die Cloud Composer API in Ihrem Projekt, indem Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzufügen:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Im Rahmen dieses Verfahrens
Sie gewähren die Dienst-Agent-Erweiterung für die Cloud Composer v2 API.
Rolle (roles/composer.ServiceAgentV2Ext
) für den Composer-Dienst-Agent
Konto. Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud-Projekt auszuführen.
Pub/Sub-Thema erstellen
In diesem Beispiel wird ein DAG als Reaktion auf eine Nachricht ausgelöst, die an ein Pub/Sub-Thema gesendet wurde. Erstellen Sie ein Pub/Sub-Thema, das Sie in diesem Beispiel verwenden möchten:
Console
Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.
Klicken Sie auf Thema erstellen.
Geben Sie im Feld Themen-ID
dag-topic-trigger
als ID für Ihr .Übernehmen Sie für alle anderen Optionen die Standardeinstellungen.
Klicken Sie auf Thema erstellen.
gcloud
Um ein Thema zu erstellen, führen Sie den gcloud pubsub topics create -Befehl in der Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Fügen Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzu:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
DAGs hochladen
Laden Sie DAGs in Ihre Umgebung hoch:
- Speichern Sie die folgende DAG-Datei auf Ihrem lokalen Computer.
- Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
- Laden Sie die bearbeitete DAG-Datei in Ihre Umgebung hoch.
Der Beispielcode enthält zwei DAGs: trigger_dag
und target_dag
.
Der trigger_dag
-DAG abonniert ein Pub/Sub-Thema, ruft Pub/Sub-Nachrichten ab und löst einen anderen DAG aus, der in der DAG-ID der Pub/Sub-Nachrichtendaten angegeben ist. In diesem Beispiel löst trigger_dag
den DAG target_dag
aus, der Nachrichten in die Aufgabenprotokolle ausgibt.
Der trigger_dag
-DAG enthält die folgenden Aufgaben:
subscribe_task
: Sie abonnieren ein Pub/Sub-Thema.pull_messages_operator
: Pub/Sub-Nachrichtendaten lesen mitPubSubPullOperator
.trigger_target_dag
: Einen weiteren DAG auslösen (in diesem Beispieltarget_dag
) laut den Daten in den Nachrichten, die aus Pub/Sub abgerufen wurden, .
Der DAG target_dag
enthält nur eine Aufgabe: output_to_logs
. Diese Aufgabe
gibt Nachrichten mit einer Verzögerung von einer Sekunde im Aufgabenlog aus.
Cloud Run-Funktion bereitstellen, die Nachrichten in einem Pub/Sub-Thema veröffentlicht
In diesem Abschnitt stellen Sie eine Cloud Run-Funktion bereit, die Nachrichten in einem Pub/Sub-Thema veröffentlicht.
Cloud Run-Funktion erstellen und konfigurieren
Console
Wechseln Sie in der Google Cloud Console zur Seite Cloud Run-Funktionen.
Klicken Sie auf Funktion erstellen.
Wählen Sie im Feld Umgebung die Option 1. Generation aus.
Geben Sie im Feld Funktionsname den Namen der Funktion ein:
pubsub-publisher
Wählen Sie im Feld Trigger die Option HTTP aus.
Wählen Sie im Bereich Authentifizierung die Option Nicht authentifizierte Aufrufe zulassen aus. Mit dieser Option können nicht authentifizierte Nutzer eine HTTP-Funktion aufrufen.
Klicken Sie auf „Speichern“.
Klicken Sie auf Weiter, um zum Schritt Code zu gelangen.
Terraform
Verwenden Sie für diesen Schritt die Google Cloud Console, da es keine einfache Möglichkeit gibt, den Quellcode der Funktion über Terraform zu verwalten.
In diesem Beispiel wird gezeigt, wie Sie eine Cloud Run-Funktion hochladen. aus einer lokalen ZIP-Archivdatei, indem Sie einen Cloud Storage-Bucket erstellen, Speichern der Datei in diesem Bucket und Verwenden der Datei aus dem Bucket als Quelle für die Cloud Run-Funktion. Wenn Sie diesen Ansatz verwenden, Terraform aktualisiert den Quellcode der Funktion nicht automatisch. auch wenn Sie eine neue Archivdatei erstellen. Um den Funktionscode noch einmal hochzuladen, den Dateinamen des Archivs ändern kann.
- Laden Sie die
pubsub_publisher.py
und dierequirements.txt
-Dateien. - Ersetzen Sie in der Datei
pubsub_publisher.py
die Variable<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
. - Erstellen Sie ein ZIP-Archiv mit dem Namen
pubsub_function.zip
mit der Dateipbusub_publisner.py
und der Dateirequirements.txt
. - Speichern Sie das ZIP-Archiv in einem Verzeichnis, in dem sich Ihr Terraform-Script befindet.
- Fügen Sie Ihrem Terraform-Skript die folgenden Ressourcendefinitionen hinzu:
Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Parameter für den Cloud Run-Funktionscode angeben
Console
Wählen Sie im Schritt Code im Feld Laufzeit die Sprache aus. die die Funktion verwendet. Wählen Sie in diesem Beispiel Python 3.10 aus.
Geben Sie im Feld Einstiegspunkt den Wert
pubsub_publisher
ein. Dies ist der Code, der beim Ausführen der Cloud Run-Funktion ausgeführt wird. Der Wert dieses Flags muss ein Funktionsname oder ein voll qualifizierter Klassenname sein, der in Ihrem Quellcode vorhanden ist.
Terraform
Diesen Schritt überspringen. Die Parameter der Cloud Run-Funktion sind bereits in der Ressource google_cloudfunctions_function
definiert.
Cloud Run-Funktionscode hochladen
Console
Wählen Sie im Feld Quellcode die entsprechende Option für die Bereitstellung des Funktionsquellcodes aus. In dieser Anleitung fügen Sie den Funktionscode mit dem Inline-Editor von Cloud Run Functions hinzu. Alternativ können Sie eine ZIP-Datei hochladen oder Cloud Source Repositories erstellen.
- Fügen Sie das folgende Codebeispiel in die Datei main.py ein.
- Ersetzen Sie
<PROJECT_ID>
durch Projekt-ID. Ihres Projekts. Beispiel:example-project
Terraform
Diesen Schritt überspringen. Cloud Run-Funktionsparameter sind bereits in
Die Ressource google_cloudfunctions_function
.
Abhängigkeiten von Cloud Run-Funktionen angeben
Console
Geben Sie die Abhängigkeiten der Funktion in der Metadatendatei requirements.txt an:
Wenn Sie Ihre Funktion bereitstellen, werden die Cloud Run-Funktionen heruntergeladen und installiert
Abhängigkeiten, die in der Datei requirements.txt deklariert sind (eine Zeile pro Paket).
Diese Datei muss sich im selben Verzeichnis befinden wie die Datei main.py, die den Funktionscode enthält. Weitere Informationen finden Sie in der pip
-Dokumentation unter Anforderungen.
Terraform
Diesen Schritt überspringen. Die Abhängigkeiten von Cloud Run-Funktionen werden in der Datei requirements.txt
im Archiv pubsub_function.zip
definiert.
Cloud Run-Funktion bereitstellen
Console
Klicken Sie auf Bereitstellen. Wenn die Bereitstellung erfolgreich abgeschlossen wurde, wird die Funktion angezeigt. mit einem grünen Häkchen auf der Seite Cloud Run-Funktionen in der Google Cloud Console
Prüfen Sie, ob das Dienstkonto, mit dem die Cloud Run-Funktion ausgeführt wird, korrekt ausgeführt wird die erforderlichen Berechtigungen in Ihrem Projekt haben, um auf Pub/Sub
Terraform
Initialisieren Sie Terraform:
terraform init
Prüfen Sie die Konfiguration und prüfen Sie, ob die Ressourcen, die Terraform erstellen oder aktualisieren wird, Ihren Erwartungen entsprechen:
terraform plan
Führen Sie folgenden Befehl aus, um zu prüfen, ob Ihre Konfiguration gültig ist: Befehl:
terraform validate
Wenden Sie die Terraform-Konfiguration an, indem Sie den folgenden Befehl ausführen und mit „Ja“ eingeben:
terraform apply
Warten Sie, bis Terraform die Meldung „Apply complete“ anzeigt.
Rufen Sie in der Google Cloud Console Ihre Ressourcen in der Benutzeroberfläche auf, um sicherzustellen, dass Terraform sie erstellt oder aktualisiert hat.
Cloud Run-Funktion testen
So prüfen Sie, ob Ihre Funktion eine Nachricht in einem Pub/Sub-Thema veröffentlicht und die Beispiel-DAGs wie vorgesehen funktionieren:
Prüfen Sie, ob die DAGs aktiv sind:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Prüfen Sie die Werte in der Spalte Status für DAGs mit den Namen
trigger_dag
undtarget_dag
. Beide DAGs müssen den StatusActive
haben.
Senden Sie eine Pub/Sub-Testnachricht. So gehts in Cloud Shell:
Rufen Sie in der Google Cloud Console die Seite Funktionen auf.
Klicken Sie auf den Namen der Funktion:
pubsub-publisher
.Rufen Sie den Tab Testen auf.
Geben Sie im Bereich Auslösendes Ereignis konfigurieren Folgendes ein: JSON-Schlüssel/Wert-Paar:
{"message": "target_dag"}
. Ändern Sie das Schlüssel/Wert-Paar nicht, da diese Nachricht später den Test-DAG auslöst.Klicken Sie im Abschnitt Testbefehl auf In Cloud Shell testen.
Warten Sie im Cloud Shell-Terminal, bis ein Befehl angezeigt wird. automatisch. Führen Sie diesen Befehl aus, indem Sie
Enter
drücken.Wenn die Meldung Cloud Shell autorisieren angezeigt wird, klicken Sie auf Autorisieren.
Prüfen Sie, ob der Inhalt der Nachricht mit der Pub/Sub-Nachricht übereinstimmt. In diesem Beispiel muss die Ausgabenachricht mit
Message b'target_dag' with message_length 10 published to
als Antwort von Ihrer Funktion.
Prüfen Sie, ob
target_dag
ausgelöst wurde:Warten Sie mindestens eine Minute, bis ein neuer DAG-Lauf von
trigger_dag
abgeschlossen ist.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf
trigger_dag
, um die Seite DAG-Details aufzurufen. Auf dem Tab Ausführungen wird eine Liste der DAG-Ausführungen für die DAGtrigger_dag
angezeigt.Dieser DAG wird minütlich ausgeführt und verarbeitet alle Pub/Sub- Nachrichten, die von der Funktion gesendet wurden. Wenn keine Nachrichten gesendet wurden, wird die Aufgabe
trigger_target
in den DAG-Ausführungsprotokollen alsSkipped
markiert. Wenn DAGs ausgelöst wurden, ist die Aufgabetrigger_target
markiert alsSuccess
.Sehen Sie sich mehrere aktuelle DAG-Ausführungen an, um eine DAG-Ausführung zu finden, bei der alle drei Aufgaben (
subscribe_task
,pull_messages_operator
undtrigger_target
) den StatusSuccess
haben.Kehren Sie zum Tab DAGs zurück und prüfen Sie, ob die Meldung Erfolgreiche Ausführungen für den DAG
target_dag
eine erfolgreiche Ausführung.
Fazit
In dieser Anleitung haben Sie gelernt, wie Sie mit Cloud Run-Funktionen Nachrichten in einem Pub/Sub-Thema veröffentlichen und einen DAG bereitstellen, der ein Pub/Sub-Thema abonniert, Pub/Sub-Nachrichten abruft und einen anderen DAG auslöst, der in der DAG-ID der Nachrichtendaten angegeben ist.
Es gibt auch alternative Möglichkeiten, Pub/Sub-Abos erstellen und verwalten und auslösen von DAGs, die sich außerhalb in dieser Anleitung behandelt. So können Sie beispielsweise mit Cloud Run-Funktionen Airflow-DAGs auslösen, wenn ein bestimmtes Ereignis eintritt. Sehen Sie sich unsere Anleitungen an, um die anderen Google Cloud-Features testen.
Bereinigen
Um zu vermeiden, dass Ihrem Google Cloud-Konto die Ressourcen in Rechnung gestellt werden Löschen Sie entweder das Projekt, das die Ressourcen enthält, oder das Projekt behalten und die einzelnen Ressourcen löschen.
Projekt löschen
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Console
- Löschen Sie die Cloud Composer-Umgebung. Außerdem wird dabei der Bucket der Umgebung gelöscht.
- Löschen Sie das Pub/Sub-Thema
dag-topic-trigger
. Löschen Sie die Cloud Run-Funktion.
Rufen Sie in der Google Cloud Console Cloud Run-Funktionen auf.
Klicken Sie auf das Kästchen neben der Funktion, die Sie löschen möchten,
pubsub-publisher
.Klicken Sie auf Löschen und folgen Sie der Anleitung.
Terraform
- Ihr Terraform-Skript darf keine Einträge für Ressourcen, die für Ihr Projekt noch benötigt werden. Beispielsweise können Sie einige APIs aktiviert und IAM-Berechtigungen weiterhin zuweisen lassen, wenn Sie Ihrem Terraform-Script entsprechende Definitionen hinzugefügt haben.
- Führen Sie
terraform destroy
aus. - Löschen Sie den Bucket der Umgebung manuell. Cloud Composer löscht sie nicht automatisch. Sie können es von mit der Google Cloud Console oder der Google Cloud CLI.
Nächste Schritte
- DAGs testen
- HTTP-Funktionen testen
- Cloud Run-Funktion bereitstellen
- Probieren Sie andere Google Cloud-Funktionen selbst aus. Anleitungen