Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite erfahren Sie, wie Sie eine ereignisbasierte Push-Architektur erstellen, Cloud Composer-DAGs als Reaktion auf Pub/Sub auslösen Themenänderungen. Beispiele in dieser Anleitung veranschaulichen den gesamten Zyklus der Pub/Sub-Verwaltung, einschließlich der Aboverwaltung, als Teil des DAG-Prozesses. Es eignet sich für einige der häufigsten Anwendungsfälle, in denen DAGs auslösen, aber keine zusätzlichen Zugriffsberechtigungen einrichten möchten.
Beispielsweise können über Pub/Sub gesendete Nachrichten als Lösung dienen. wenn Sie keinen direkten Zugriff auf aus Sicherheitsgründen. Sie können eine Cloud Run-Funktion konfigurieren, die Pub/Sub-Nachrichten erstellt und in einem Pub/Sub-Thema veröffentlicht. Sie können dann einen DAG erstellen, Pub/Sub-Nachrichten abruft und diese Nachrichten dann verarbeitet.
In diesem konkreten Beispiel erstellen Sie eine Cloud Run-Funktion und stellen zwei DAGs. Der erste DAG ruft Pub/Sub-Nachrichten ab und löst den zweiten DAG entsprechend dem Inhalt der Pub/Sub-Nachricht aus.
In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und der Google Cloud Console vertraut sind.
Lernziele
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:
- Cloud Composer (siehe auch zusätzliche Kosten)
- Pub/Sub
- Cloud Run-Funktionen
Nachdem Sie diese Anleitung abgeschlossen haben, können Sie weitere Kosten vermeiden, indem Sie die von Ihnen erstellten Ressourcen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
Für diese Anleitung benötigen Sie eine Google Cloud Projekt erstellen. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie eines:
Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.
Sorgen Sie dafür, dass Ihr Google Cloud-Projektnutzer die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:
- Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Pub/Sub-Bearbeiter (
roles/pubsub.editor
) - Administrator für Umgebung und Storage-Objekte
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run-Administrator (
roles/cloudfunctions.admin
) - Loganzeige (
roles/logging.viewer
)
- Dienstkontonutzer (
Achten Sie darauf, dass der Dienstkonto, das Ihre Cloud Run-Funktion ausführt über ausreichende Berechtigungen in Ihrem Projekt verfügt, um auf Pub/Sub zuzugreifen. Cloud Run-Funktionen verwenden standardmäßig das App Engine-Standarddienstkonto. Dieses Dienstkonto hat die Rolle Bearbeiter mit ausreichender Berechtigungen für diese Anleitung.
Die APIs für Ihr Projekt aktivieren
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Aktivieren Sie die Cloud Composer API in Ihrem Projekt, indem Sie Folgendes hinzufügen: Ressourcendefinitionen in Ihr Terraform-Skript ein:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Im Rahmen dieses Verfahrens
Sie gewähren die Dienst-Agent-Erweiterung für die Cloud Composer v2 API.
Rolle (roles/composer.ServiceAgentV2Ext
) für den Composer-Dienst-Agent
Konto. Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud-Projekt auszuführen.
Pub/Sub-Thema erstellen
In diesem Beispiel wird ein DAG als Reaktion auf eine Nachricht ausgelöst, die an ein Pub/Sub-Thema gesendet wurde. Erstellen Sie ein Pub/Sub-Thema, das in diesem Beispiel verwendet werden soll:
Console
Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.
Klicken Sie auf Thema erstellen.
Geben Sie im Feld Themen-ID
dag-topic-trigger
als ID für Ihr .Übernehmen Sie für alle anderen Optionen die Standardeinstellungen.
Klicken Sie auf Thema erstellen.
gcloud
Führen Sie den Befehl gcloud pubsub topics create -Befehl in der Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Fügen Sie dem Terraform-Skript die folgenden Ressourcendefinitionen hinzu:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Ersetzen Sie <PROJECT_ID>
durch Projekt-ID.
Ihres Projekts. Beispiel: example-project
DAGs hochladen
Laden Sie DAGs in Ihre Umgebung hoch:
- Speichern Sie die folgende DAG-Datei auf Ihrem lokalen Computer.
- Ersetzen Sie
<PROJECT_ID>
durch Projekt-ID. Ihres Projekts. Beispiel:example-project
- Laden Sie die bearbeitete DAG-Datei in Ihre Umgebung hoch.
Der Beispielcode enthält zwei DAGs: trigger_dag
und target_dag
.
Der DAG trigger_dag
abonniert ein Pub/Sub-Thema, ruft
Pub/Sub-Nachrichten und löst einen anderen DAG aus, der in der DAG-ID angegeben ist
der Pub/Sub-Nachrichtendaten. In diesem Beispiel löst trigger_dag
aus
Den DAG target_dag
, der Meldungen an die Aufgabenlogs ausgibt
Der trigger_dag
-DAG enthält die folgenden Aufgaben:
subscribe_task
: Abonnieren Sie ein Pub/Sub-Thema.pull_messages_operator
: Pub/Sub-Nachrichtendaten mitPubSubPullOperator
lesentrigger_target_dag
: Einen weiteren DAG auslösen (in diesem Beispieltarget_dag
) laut den Daten in den Nachrichten, die aus Pub/Sub abgerufen wurden, .
Der DAG target_dag
enthält nur eine Aufgabe: output_to_logs
. Diese Aufgabe
gibt Nachrichten mit einer Verzögerung von einer Sekunde im Aufgabenlog aus.
Cloud Run-Funktion bereitstellen, die Nachrichten in einem Pub/Sub-Thema veröffentlicht
In diesem Abschnitt stellen Sie eine Cloud Run-Funktion bereit, die Nachrichten in einem Pub/Sub-Thema veröffentlicht.
Cloud Run-Funktion erstellen und konfigurieren
Console
Wechseln Sie in der Google Cloud Console zur Seite Cloud Run-Funktionen.
Klicken Sie auf Funktion erstellen.
Wählen Sie im Feld Umgebung die Option 1. Generation aus.
Geben Sie im Feld Funktionsname den Namen der Funktion ein:
pubsub-publisher
Wählen Sie im Feld Trigger die Option HTTP aus.
Wählen Sie im Abschnitt Authentication (Authentifizierung) die Option Nicht authentifizierte Aufrufe zulassen Mit dieser Option können nicht authentifizierte Nutzer eine HTTP-Funktion aufrufen.
Klicken Sie auf „Speichern“.
Klicken Sie auf Weiter, um zum Schritt Code zu gelangen.
Terraform
Verwenden Sie für diesen Schritt gegebenenfalls die Google Cloud Console, da es keine einfache Möglichkeit, den Quellcode der Funktion über Terraform zu verwalten.
In diesem Beispiel wird gezeigt, wie Sie eine Cloud Run-Funktion hochladen. aus einer lokalen ZIP-Archivdatei erstellen, indem Sie einen Cloud Storage-Bucket erstellen, Speichern der Datei in diesem Bucket und Verwenden der Datei aus dem Bucket als Quelle für die Cloud Run-Funktion. Wenn Sie diesen Ansatz verwenden, Terraform aktualisiert den Quellcode der Funktion nicht automatisch. auch wenn Sie eine neue Archivdatei erstellen. Wenn Sie den Funktionscode noch einmal hochladen möchten, können Sie den Dateinamen des Archivs ändern.
- Laden Sie die
pubsub_publisher.py
und dierequirements.txt
-Dateien. - Ersetzen Sie in der Datei
pubsub_publisher.py
<PROJECT_ID>
durch den Projekt-ID Ihres Projekts. Beispiel:example-project
. - Erstellen Sie ein ZIP-Archiv namens
pubsub_function.zip
mit dempbusub_publisner.py
und die Dateirequirements.txt
. - Speichern Sie das ZIP-Archiv in einem Verzeichnis, in dem Ihr Terraform-Script gespeichert ist.
- Fügen Sie Ihrem Terraform-Skript die folgenden Ressourcendefinitionen hinzu und
Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Parameter für den Cloud Run-Funktionscode angeben
Console
Wählen Sie im Schritt Code im Feld Laufzeit die Sprache aus. die die Funktion verwendet. Wählen Sie in diesem Beispiel Python 3.10 aus.
Geben Sie im Feld Einstiegspunkt den Wert
pubsub_publisher
ein. Dies ist der Code der beim Ausführen der Cloud Run-Funktion ausgeführt wird. Der Wert von Dieses Flag muss ein Funktionsname oder ein voll qualifizierter Klassenname sein, in Ihrem Quellcode vorhanden ist.
Terraform
Diesen Schritt überspringen. Cloud Run-Funktionsparameter sind bereits in
Die Ressource google_cloudfunctions_function
.
Cloud Run-Funktionscode hochladen
Console
Wählen Sie im Feld Quellcode die entsprechende Option für den den Quellcode der Funktion bereitstellen. In dieser Anleitung fügen Sie den Funktionscode mit dem Inline-Editor von Cloud Run Functions hinzu. Alternativ können Sie eine ZIP-Datei hochladen oder Cloud Source Repositories erstellen.
- Fügen Sie das folgende Codebeispiel in die Datei main.py ein.
- Ersetzen Sie
<PROJECT_ID>
durch Projekt-ID. Ihres Projekts. Beispiel:example-project
Terraform
Diesen Schritt überspringen. Cloud Run-Funktionsparameter sind bereits in
Die Ressource google_cloudfunctions_function
.
Abhängigkeiten von Cloud Run-Funktionen angeben
Console
Geben Sie die Abhängigkeiten der Funktion in der Metadatendatei requirements.txt an:
Wenn Sie Ihre Funktion bereitstellen, werden die Cloud Run-Funktionen heruntergeladen und installiert
Abhängigkeiten, die in der Datei requirements.txt deklariert sind (eine Zeile pro Paket).
Diese Datei muss sich im selben Verzeichnis befinden wie die Datei main.py, die den Funktionscode enthält. Weitere Informationen finden Sie unter
Anforderungsdateien
in der pip
-Dokumentation.
Terraform
Diesen Schritt überspringen. Cloud Run-Funktionsabhängigkeiten werden definiert in
Die Datei requirements.txt
im Archiv pubsub_function.zip
.
Cloud Run-Funktion bereitstellen
Console
Klicken Sie auf Bereitstellen. Wenn die Bereitstellung erfolgreich abgeschlossen wurde, wird die Funktion angezeigt. mit einem grünen Häkchen auf der Seite Cloud Run-Funktionen in der Google Cloud Console
Prüfen Sie, ob das Dienstkonto, mit dem die Cloud Run-Funktion ausgeführt wird, korrekt ausgeführt wird die erforderlichen Berechtigungen in Ihrem Projekt haben, um auf Pub/Sub
Terraform
Initialisieren Sie Terraform:
terraform init
Überprüfen Sie die Konfiguration und stellen Sie sicher, dass die von Terraform verwendeten Ressourcen die Sie erstellen oder aktualisieren werden:
terraform plan
Führen Sie folgenden Befehl aus, um zu prüfen, ob Ihre Konfiguration gültig ist: Befehl:
terraform validate
Wenden Sie die Terraform-Konfiguration an, indem Sie den folgenden Befehl ausführen und mit „Ja“ eingeben:
terraform apply
Warten Sie, bis Terraform die Meldung „Apply complete“ anzeigt.
Rufen Sie in der Google Cloud Console in der UI Ihre Ressourcen auf, um ob Terraform sie erstellt oder aktualisiert hat.
Cloud Run-Funktion testen
So prüfen Sie, ob Ihre Funktion eine Nachricht in einem Pub/Sub-Thema veröffentlicht und die Beispiel-DAGs wie vorgesehen funktionieren:
Prüfen Sie, ob die DAGs aktiv sind:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Überprüfen Sie die Werte in der Spalte State auf DAGs mit den Namen
trigger_dag
undtarget_dag
. Beide DAGs müssen den StatusActive
haben.
Senden Sie eine Pub/Sub-Testnachricht. Sie können es in Cloud Shell:
Rufen Sie in der Google Cloud Console die Seite Funktionen auf.
Klicken Sie auf den Namen der Funktion:
pubsub-publisher
.Rufen Sie den Tab Testen auf.
Geben Sie im Bereich Auslösendes Ereignis konfigurieren Folgendes ein: JSON-Schlüssel/Wert-Paar:
{"message": "target_dag"}
. Ändern Sie nicht das Schlüssel/Wert-Paar da diese Nachricht den Test-DAG später auslöst.Klicken Sie im Abschnitt Testbefehl auf In Cloud Shell testen.
Warten Sie im Cloud Shell-Terminal, bis ein Befehl angezeigt wird. automatisch. Führen Sie diesen Befehl aus, indem Sie
Enter
drücken.Wenn die Meldung Cloud Shell autorisieren angezeigt wird, gehen Sie so vor: Klicken Sie auf Autorisieren.
Prüfen Sie, ob der Inhalt der Nachricht mit der Pub/Sub-Nachricht übereinstimmt. In diesem Beispiel muss die Ausgabenachricht mit
Message b'target_dag' with message_length 10 published to
als Antwort von Ihrer Funktion.
Prüfen Sie, ob
target_dag
ausgelöst wurde:Warten Sie mindestens eine Minute, damit ein neuer DAG-Lauf von
trigger_dag
abgeschlossen wird.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf
trigger_dag
, um die Seite DAG-Details aufzurufen. Wählen Sie auf der Seite Runs (Läufe) wird eine Liste der DAG-Ausführungen für den DAGtrigger_dag
angezeigt.Dieser DAG wird minütlich ausgeführt und verarbeitet alle Pub/Sub- Nachrichten, die von der Funktion gesendet wurden. Wenn keine Nachrichten gesendet wurden, wird die Aufgabe
trigger_target
in den DAG-Ausführungsprotokollen alsSkipped
markiert. Wenn DAGs ausgelöst wurden, wird die Aufgabetrigger_target
alsSuccess
markiert.Suchen Sie in den letzten DAG-Ausführungen nach einer, bei der alle drei Aufgaben (
subscribe_task
,pull_messages_operator
undtrigger_target
) den StatusSuccess
haben.Kehren Sie zum Tab DAGs zurück und prüfen Sie, ob in der Spalte Erfolgreiche Ausführungen für den DAG
target_dag
eine erfolgreiche Ausführung aufgeführt ist.
Fazit
In dieser Anleitung haben Sie erfahren, wie Sie mit Cloud Run-Funktionen Nachrichten in einem Pub/Sub-Thema an und stellen einen DAG bereit, der einen Pub/Sub-Thema, ruft Pub/Sub-Nachrichten ab und Trigger ein anderer DAG, der in der DAG-ID der Nachrichtendaten angegeben ist.
Es gibt auch alternative Möglichkeiten, Pub/Sub-Abos erstellen und verwalten und auslösen von DAGs, die sich außerhalb in dieser Anleitung behandelt. So können Sie zum Beispiel Cloud Run-Funktionen zum Auslösen von Airflow-DAGs verwenden Ein bestimmtes Ereignis eintritt. Sehen Sie sich unsere Anleitungen an, um die anderen Google Cloud-Features testen.
Bereinigen
Um zu vermeiden, dass Ihrem Google Cloud-Konto die Ressourcen in Rechnung gestellt werden Löschen Sie entweder das Projekt, das die Ressourcen enthält, oder das Projekt behalten und die einzelnen Ressourcen löschen.
Projekt löschen
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Console
- Löschen Sie die Cloud Composer-Umgebung. Außerdem löschen Sie den Bucket der Umgebung.
- Löschen Sie das Pub/Sub-Thema
dag-topic-trigger
. Löschen Sie die Cloud Run-Funktion.
Rufen Sie in der Google Cloud Console Cloud Run-Funktionen auf.
Klicken Sie das Kästchen für die Funktion an, die Sie löschen möchten,
pubsub-publisher
Klicken Sie auf Löschen und folgen Sie der Anleitung.
Terraform
- Ihr Terraform-Skript darf keine Einträge für Ressourcen, die für Ihr Projekt noch benötigt werden. Zum Beispiel haben Sie möchten vielleicht einige APIs aktiviert lassen und Berechtigungen zugewiesen sind (sofern Sie diese Definitionen Terraform-Script.
- Führen Sie
terraform destroy
aus. - Löschen Sie den Bucket der Umgebung manuell. Cloud Composer löscht sie nicht automatisch. Sie können es von mit der Google Cloud Console oder der Google Cloud CLI.
Nächste Schritte
- DAGs testen
- HTTP-Funktionen testen
- Cloud Run-Funktion bereitstellen
- Probieren Sie andere Google Cloud-Funktionen selbst aus. Werfen Sie einen Blick finden Sie in unseren Anleitungen.