Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite erfahren Sie, wie Sie eine ereignisbasierte Push-Architektur erstellen, indem Sie Cloud Composer-DAGs als Reaktion auf Änderungen an Pub/Sub-Themen auslösen. Beispiele in dieser Anleitung veranschaulichen den gesamten Zyklus der Pub/Sub-Verwaltung, einschließlich der Aboverwaltung, als Teil des DAG-Prozesses. Sie eignet sich für einige der gängigen Anwendungsfälle, in denen Sie DAGs auslösen, aber keine zusätzlichen Zugriffsberechtigungen einrichten möchten.
Beispielsweise können über Pub/Sub gesendete Nachrichten als Lösung dienen. wenn Sie keinen direkten Zugriff auf aus Sicherheitsgründen. Sie können ein Cloud Run-Funktion, die Pub/Sub-Nachrichten erstellt veröffentlicht sie in einem Pub/Sub-Thema. Sie können dann einen DAG erstellen, Pub/Sub-Nachrichten abruft und diese Nachrichten dann verarbeitet.
In diesem konkreten Beispiel erstellen Sie eine Cloud Run-Funktion und stellen zwei DAGs. Der erste DAG ruft Pub/Sub-Nachrichten ab und löst den zweiter DAG entsprechend dem Pub/Sub-Nachrichteninhalt.
In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und der Google Cloud Console vertraut sind.
Lernziele
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:
- Cloud Composer (siehe auch zusätzliche Kosten)
- Pub/Sub
- Cloud Run-Funktionen
Nachdem Sie diese Anleitung abgeschlossen haben, können Sie weitere Kosten vermeiden, indem Sie die von Ihnen erstellten Ressourcen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
Für diese Anleitung benötigen Sie eine Google Cloud Projekt erstellen. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie ein Projekt.
Die Abrechnung für Ihr Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist
Der Nutzer Ihres Google Cloud-Projekts muss die folgenden Rollen haben, um die erforderlichen Ressourcen zu erstellen:
- Dienstkontonutzer (
roles/iam.serviceAccountUser
) - Pub/Sub-Bearbeiter (
roles/pubsub.editor
) - Administrator für Umgebung und Storage-Objekte
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrator von Cloud Run-Funktionen (
roles/cloudfunctions.admin
) - Loganzeige (
roles/logging.viewer
)
- Dienstkontonutzer (
Achten Sie darauf, dass der Dienstkonto, das Ihre Cloud Run-Funktion ausführt über ausreichende Berechtigungen in Ihrem Projekt verfügt, um auf Pub/Sub zuzugreifen. Von verwenden Cloud Run-Funktionen App Engine-Standarddienstkonto Dieses Dienstkonto hat die Rolle Bearbeiter, die für diese Anleitung ausreichende Berechtigungen bietet.
Die APIs für Ihr Projekt aktivieren
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Aktivieren Sie die Cloud Composer API in Ihrem Projekt, indem Sie Folgendes hinzufügen: Ressourcendefinitionen in Ihr Terraform-Skript ein:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Ersetzen Sie <PROJECT_ID>
durch Projekt-ID.
Ihres Projekts. Beispiel: example-project
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Im Rahmen dieses Verfahrens
Sie gewähren die Dienst-Agent-Erweiterung für die Cloud Composer v2 API.
Rolle (roles/composer.ServiceAgentV2Ext
) für den Composer-Dienst-Agent
Konto. Cloud Composer verwendet dieses Konto, um Vorgänge auszuführen
in Ihrem Google Cloud-Projekt.
Pub/Sub-Thema erstellen
In diesem Beispiel wird ein DAG als Antwort auf eine Nachricht ausgelöst, die an einen Pub/Sub-Thema. Erstellen Sie ein Pub/Sub-Thema, das in diesem Beispiel verwendet werden soll:
Console
Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.
Klicken Sie auf Thema erstellen.
Geben Sie im Feld Themen-ID
dag-topic-trigger
als ID für das Thema ein.Übernehmen Sie für alle anderen Optionen die Standardeinstellungen.
Klicken Sie auf Thema erstellen.
gcloud
Führen Sie den Befehl gcloud pubsub topics create in der Google Cloud CLI aus, um ein Thema zu erstellen:
gcloud pubsub topics create dag-topic-trigger
Terraform
Fügen Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzu:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
DAGs hochladen
Laden Sie DAGs in Ihre Umgebung hoch:
- Speichern Sie die folgende DAG-Datei auf Ihrem lokalen Computer.
- Ersetzen Sie
<PROJECT_ID>
durch Projekt-ID. Ihres Projekts. Beispiel:example-project
- Laden Sie die bearbeitete DAG-Datei in Ihre Umgebung hoch.
Der Beispielcode enthält zwei DAGs: trigger_dag
und target_dag
.
Der trigger_dag
-DAG abonniert ein Pub/Sub-Thema, ruft Pub/Sub-Nachrichten ab und löst einen anderen DAG aus, der in der DAG-ID der Pub/Sub-Nachrichtendaten angegeben ist. In diesem Beispiel löst trigger_dag
aus
Den DAG target_dag
, der Meldungen an die Aufgabenlogs ausgibt
Der DAG trigger_dag
enthält die folgenden Aufgaben:
subscribe_task
: Sie abonnieren ein Pub/Sub-Thema.pull_messages_operator
: Pub/Sub-Nachrichtendaten mitPubSubPullOperator
lesentrigger_target_dag
: Einen weiteren DAG auslösen (in diesem Beispieltarget_dag
) laut den Daten in den Nachrichten, die aus Pub/Sub abgerufen wurden, .
Der DAG target_dag
enthält nur eine Aufgabe: output_to_logs
. Mit dieser Aufgabe werden Nachrichten mit einer Verzögerung von einer Sekunde in das Aufgabenprotokoll gedruckt.
Cloud Run-Funktion bereitstellen, die Nachrichten in einem Pub/Sub-Thema veröffentlicht
In diesem Abschnitt stellen Sie eine Cloud Run-Funktion bereit, die Nachrichten in einem Pub/Sub-Thema veröffentlicht.
Cloud Run-Funktion erstellen und ihre Konfiguration angeben
Console
Wechseln Sie in der Google Cloud Console zur Seite Cloud Run-Funktionen.
Klicken Sie auf Funktion erstellen.
Wählen Sie im Feld Umgebung die Option 1. Generation aus.
Geben Sie im Feld Funktionsname den Namen für die Funktion ein:
pubsub-publisher
.Wählen Sie im Feld Trigger die Option HTTP aus.
Wählen Sie im Abschnitt Authentication (Authentifizierung) die Option Nicht authentifizierte Aufrufe zulassen Diese Option gewährt nicht authentifizierte Nutzer die Möglichkeit, eine HTTP-Funktion aufzurufen.
Klicken Sie auf „Speichern“.
Klicken Sie auf Weiter, um zum Schritt Code zu gelangen.
Terraform
Verwenden Sie für diesen Schritt die Google Cloud Console, da es keine einfache Möglichkeit gibt, den Quellcode der Funktion über Terraform zu verwalten.
In diesem Beispiel wird gezeigt, wie Sie eine Cloud Run-Funktion hochladen. aus einer lokalen ZIP-Archivdatei, indem Sie einen Cloud Storage-Bucket erstellen, Speichern der Datei in diesem Bucket und Verwenden der Datei aus dem Bucket als Quelle für die Cloud Run-Funktion. Wenn Sie diesen Ansatz verwenden, Terraform aktualisiert den Quellcode der Funktion nicht automatisch. auch wenn Sie eine neue Archivdatei erstellen. Wenn Sie den Funktionscode noch einmal hochladen möchten, können Sie den Dateinamen des Archivs ändern.
- Laden Sie die
pubsub_publisher.py
und dierequirements.txt
Dateien. - Ersetzen Sie in der Datei
pubsub_publisher.py
die Variable<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
. - Erstellen Sie ein ZIP-Archiv namens
pubsub_function.zip
mit dempbusub_publisner.py
und die Dateirequirements.txt
. - Speichern Sie das ZIP-Archiv in einem Verzeichnis, in dem sich Ihr Terraform-Script befindet.
- Fügen Sie Ihrem Terraform-Skript die folgenden Ressourcendefinitionen hinzu und
Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Codeparameter für Cloud Run-Funktionen angeben
Console
Wählen Sie im Schritt Code im Feld Laufzeit die Sprache aus. die die Funktion verwendet. Wählen Sie in diesem Beispiel Python 3.10 aus.
Geben Sie im Feld Einstiegspunkt den Wert
pubsub_publisher
ein. Dies ist der Code der beim Ausführen der Cloud Run-Funktion ausgeführt wird. Der Wert von Dieses Flag muss ein Funktionsname oder ein voll qualifizierter Klassenname sein, in Ihrem Quellcode vorhanden ist.
Terraform
Diesen Schritt überspringen. Cloud Run-Funktionsparameter sind bereits in
Die Ressource google_cloudfunctions_function
.
Code der Cloud Run-Funktion hochladen
Console
Wählen Sie im Feld Quellcode die entsprechende Option für den den Quellcode der Funktion bereitstellen. In dieser Anleitung fügen Sie den Funktionscode mit dem Inline-Editor von Cloud Run Functions hinzu. Alternativ können Sie eine ZIP-Datei hochladen oder Cloud Source Repositories erstellen.
- Fügen Sie das folgende Codebeispiel in die Datei main.py ein.
- Ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
Terraform
Diesen Schritt überspringen. Die Parameter der Cloud Run-Funktion sind bereits in der Ressource google_cloudfunctions_function
definiert.
Abhängigkeiten von Cloud Run-Funktionen angeben
Console
Geben Sie die Abhängigkeiten der Funktion in der Metadatendatei requirements.txt an:
Wenn Sie Ihre Funktion bereitstellen, werden die Cloud Run-Funktionen heruntergeladen und installiert
Abhängigkeiten, die in der Datei requirements.txt deklariert sind (eine Zeile pro Paket).
Diese Datei muss sich im selben Verzeichnis befinden wie die Datei main.py, die
Ihren Funktionscode. Weitere Informationen finden Sie in der pip
-Dokumentation unter Anforderungen.
Terraform
Diesen Schritt überspringen. Die Abhängigkeiten von Cloud Run-Funktionen werden in der Datei requirements.txt
im Archiv pubsub_function.zip
definiert.
Cloud Run-Funktion bereitstellen
Console
Klicken Sie auf Bereitstellen. Wenn die Bereitstellung erfolgreich abgeschlossen wurde, wird die Funktion angezeigt. mit einem grünen Häkchen auf der Seite Cloud Run-Funktionen in der Google Cloud Console
Prüfen Sie, ob das Dienstkonto, das die Cloud Run-Funktion ausführt, die erforderlichen Berechtigungen in Ihrem Projekt haben, um auf Pub/Sub
Terraform
Initialisieren Sie Terraform:
terraform init
Prüfen Sie die Konfiguration und prüfen Sie, ob die Ressourcen, die Terraform erstellen oder aktualisieren wird, Ihren Erwartungen entsprechen:
terraform plan
Führen Sie folgenden Befehl aus, um zu prüfen, ob Ihre Konfiguration gültig ist: Befehl:
terraform validate
Wenden Sie die Terraform-Konfiguration an. Führen Sie dazu den folgenden Befehl aus und geben Sie „yes“ (Ja) an der Eingabeaufforderung ein:
terraform apply
Warten Sie, bis Terraform die Meldung „Apply complete“ anzeigt.
Rufen Sie in der Google Cloud Console in der UI Ihre Ressourcen auf, um ob Terraform sie erstellt oder aktualisiert hat.
Cloud Run-Funktion testen
Prüfen, ob Ihre Funktion eine Nachricht in einem Pub/Sub-Thema veröffentlicht und dass die Beispiel-DAGs wie vorgesehen funktionieren:
Prüfen Sie, ob die DAGs aktiv sind:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Überprüfen Sie die Werte in der Spalte State auf DAGs mit den Namen
trigger_dag
undtarget_dag
. Beide DAGs müssen den StatusActive
haben.
Senden Sie eine Pub/Sub-Testnachricht. So gehts in Cloud Shell:
Rufen Sie in der Google Cloud Console die Seite Functions auf.
Klicken Sie auf den Namen Ihrer Funktion,
pubsub-publisher
.Rufen Sie den Tab Testen auf.
Geben Sie im Bereich Auslösendes Ereignis konfigurieren Folgendes ein: JSON-Schlüssel/Wert-Paar:
{"message": "target_dag"}
. Ändern Sie das Schlüssel/Wert-Paar nicht, da diese Nachricht später den Test-DAG auslöst.Klicken Sie im Abschnitt Testbefehl auf In Cloud Shell testen.
Warten Sie im Cloud Shell-Terminal, bis ein Befehl angezeigt wird. automatisch. Führen Sie diesen Befehl aus, indem Sie
Enter
drücken.Wenn die Meldung Cloud Shell autorisieren angezeigt wird, gehen Sie so vor: Klicken Sie auf Autorisieren.
Prüfen, ob der Nachrichteninhalt dem Pub/Sub entspricht . In diesem Beispiel muss die Ausgabenachricht mit
Message b'target_dag' with message_length 10 published to
als Antwort von Ihrer Funktion.
Prüfen Sie, ob
target_dag
ausgelöst wurde:Warten Sie mindestens eine Minute, damit ein neuer DAG-Lauf von
trigger_dag
abgeschlossen wird.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf
trigger_dag
, um die Seite DAG-Details aufzurufen. Auf dem Tab Ausführungen wird eine Liste der DAG-Ausführungen für die DAGtrigger_dag
angezeigt.Dieser DAG wird minütlich ausgeführt und verarbeitet alle Pub/Sub- Nachrichten, die von der Funktion gesendet wurden. Wenn keine Nachrichten gesendet wurden, Die Aufgabe
trigger_target
ist in den DAG-Ausführungslogs alsSkipped
gekennzeichnet. Wenn DAGs ausgelöst wurden, wird die Aufgabetrigger_target
alsSuccess
markiert.Sehen Sie sich mehrere aktuelle DAG-Ausführungen an, um eine DAG-Ausführung zu finden, bei der alle drei Aufgaben (
subscribe_task
,pull_messages_operator
undtrigger_target
) haben den StatusSuccess
.Kehren Sie zum Tab DAGs zurück und prüfen Sie, ob die Meldung Erfolgreiche Ausführungen für den DAG
target_dag
eine erfolgreiche Ausführung.
Fazit
In dieser Anleitung haben Sie gelernt, wie Sie mit Cloud Run-Funktionen Nachrichten in einem Pub/Sub-Thema veröffentlichen und einen DAG bereitstellen, der ein Pub/Sub-Thema abonniert, Pub/Sub-Nachrichten abruft und einen anderen DAG auslöst, der in der DAG-ID der Nachrichtendaten angegeben ist.
Es gibt auch alternative Möglichkeiten, Pub/Sub-Abos zu erstellen und zu verwalten und DAGs auszulösen. Diese werden in dieser Anleitung jedoch nicht behandelt. So können Sie beispielsweise Airflow-DAGs mit Cloud Run-Funktionen auslösen, wenn ein bestimmtes Ereignis eintritt. Sehen Sie sich unsere Anleitungen an, um die anderen Google Cloud-Features testen.
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, können Sie entweder das Projekt löschen, das die Ressourcen enthält, oder das Projekt beibehalten und die einzelnen Ressourcen löschen.
Projekt löschen
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Console
- Löschen Sie die Cloud Composer-Umgebung. Außerdem wird dabei der Bucket der Umgebung gelöscht.
- Löschen Sie das Pub/Sub-Thema
dag-topic-trigger
. Löschen Sie die Cloud Run-Funktion.
Rufen Sie in der Google Cloud Console Cloud Run-Funktionen auf.
Klicken Sie auf das Kästchen der Funktion, die Sie löschen möchten,
pubsub-publisher
Klicken Sie auf Löschen und folgen Sie der Anleitung.
Terraform
- Ihr Terraform-Skript darf keine Einträge für Ressourcen, die für Ihr Projekt noch benötigt werden. Beispielsweise können Sie einige APIs aktiviert und IAM-Berechtigungen weiterhin zuweisen lassen, wenn Sie Ihrem Terraform-Script entsprechende Definitionen hinzugefügt haben.
- Führen Sie
terraform destroy
aus. - Löschen Sie den Bucket der Umgebung manuell. Cloud Composer nicht automatisch gelöscht. Sie können dies über die Google Cloud Console oder die Google Cloud CLI tun.
Nächste Schritte
- DAGs testen
- HTTP-Funktionen testen
- Cloud Run-Funktion bereitstellen
- Probieren Sie andere Google Cloud-Funktionen selbst aus. Werfen Sie einen Blick finden Sie in unseren Anleitungen.