Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite erfahren Sie, wie Sie eine ereignisbasierte Push-Architektur erstellen, Cloud Composer-DAGs als Reaktion auf Pub/Sub auslösen Themenänderungen. In den Beispielen in dieser Anleitung wird der vollständige Pub/Sub-Verwaltungszyklus, einschließlich der Aboverwaltung, als Teil des DAG-Prozesses veranschaulicht. Sie eignet sich für einige der gängigen Anwendungsfälle, in denen Sie DAGs auslösen, aber keine zusätzlichen Zugriffsberechtigungen einrichten möchten.
Nachrichten, die über Pub/Sub gesendet werden, können beispielsweise als Lösung verwendet werden, wenn Sie aus Sicherheitsgründen keinen direkten Zugriff auf eine Cloud Composer-Umgebung gewähren möchten. Sie können eine Cloud Run-Funktion konfigurieren, die Pub/Sub-Nachrichten erstellt und in einem Pub/Sub-Thema veröffentlicht. Sie können dann einen DAG erstellen, der Pub/Sub-Nachrichten abholt und verarbeitet.
In diesem konkreten Beispiel erstellen Sie eine Cloud Run-Funktion und stellen zwei DAGs. Der erste DAG ruft Pub/Sub-Nachrichten ab und löst den zweiter DAG entsprechend dem Pub/Sub-Nachrichteninhalt.
In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und der Google Cloud Console vertraut sind.
Lernziele
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:
- Cloud Composer (siehe auch zusätzliche Kosten)
- Pub/Sub
- Cloud Run-Funktionen
Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
Für diese Anleitung benötigen Sie eine Google Cloud Projekt erstellen. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie ein Projekt.
Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.
Der Nutzer Ihres Google Cloud-Projekts muss die folgenden Rollen haben, um die erforderlichen Ressourcen zu erstellen:
- Service Account User (
roles/iam.serviceAccountUser
) - Pub/Sub-Bearbeiter (
roles/pubsub.editor
) - Administrator für Umgebung und Storage-Objekte
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run-Administrator (
roles/cloudfunctions.admin
) - Loganzeige (
roles/logging.viewer
)
- Service Account User (
Achten Sie darauf, dass der Dienstkonto, das Ihre Cloud Run-Funktion ausführt über ausreichende Berechtigungen in Ihrem Projekt verfügt, um auf Pub/Sub zuzugreifen. Von verwenden Cloud Run-Funktionen App Engine-Standarddienstkonto Dieses Dienstkonto hat die Rolle Bearbeiter, die für diese Anleitung ausreichende Berechtigungen bietet.
Die APIs für Ihr Projekt aktivieren
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Aktivieren Sie die Cloud Composer API in Ihrem Projekt, indem Sie Folgendes hinzufügen: Ressourcendefinitionen in Ihr Terraform-Skript ein:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Im Rahmen dieses Verfahrens
Sie gewähren die Dienst-Agent-Erweiterung für die Cloud Composer v2 API.
Rolle (roles/composer.ServiceAgentV2Ext
) für den Composer-Dienst-Agent
Konto. Cloud Composer verwendet dieses Konto, um Vorgänge auszuführen
in Ihrem Google Cloud-Projekt.
Pub/Sub-Thema erstellen
In diesem Beispiel wird ein DAG als Antwort auf eine Nachricht ausgelöst, die an einen Pub/Sub-Thema. Erstellen Sie ein Pub/Sub-Thema, das in diesem Beispiel verwendet werden soll:
Console
Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.
Klicken Sie auf Thema erstellen.
Geben Sie im Feld Themen-ID
dag-topic-trigger
als ID für das Thema ein.Übernehmen Sie für alle anderen Optionen die Standardeinstellungen.
Klicken Sie auf Thema erstellen.
gcloud
Führen Sie den Befehl gcloud pubsub topics create in der Google Cloud CLI aus, um ein Thema zu erstellen:
gcloud pubsub topics create dag-topic-trigger
Terraform
Fügen Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzu:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Ersetzen Sie <PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel: example-project
DAGs hochladen
Laden Sie DAGs in Ihre Umgebung hoch:
- Speichern Sie die folgende DAG-Datei auf Ihrem lokalen Computer.
- Ersetzen Sie
<PROJECT_ID>
durch Projekt-ID. Ihres Projekts. Beispiel:example-project
- Laden Sie die bearbeitete DAG-Datei in Ihre Umgebung hoch.
Der Beispielcode enthält zwei DAGs: trigger_dag
und target_dag
.
Der DAG trigger_dag
abonniert ein Pub/Sub-Thema, ruft
Pub/Sub-Nachrichten und löst einen anderen DAG aus, der in der DAG-ID angegeben ist
der Pub/Sub-Nachrichtendaten. In diesem Beispiel löst trigger_dag
den DAG target_dag
aus, der Nachrichten in die Aufgabenprotokolle ausgibt.
Der trigger_dag
-DAG enthält die folgenden Aufgaben:
subscribe_task
: Sie abonnieren ein Pub/Sub-Thema.pull_messages_operator
: Pub/Sub-Nachrichtendaten mitPubSubPullOperator
lesentrigger_target_dag
: Auslösen eines anderen DAG (in diesem Beispieltarget_dag
) gemäß den Daten in den Nachrichten, die aus dem Pub/Sub-Thema abgerufen wurden.
Der DAG target_dag
enthält nur eine Aufgabe: output_to_logs
. Mit dieser Aufgabe werden Nachrichten mit einer Verzögerung von einer Sekunde in das Aufgabenprotokoll gedruckt.
Cloud Run-Funktion bereitstellen, die Nachrichten in einem Pub/Sub-Thema veröffentlicht
In diesem Abschnitt stellen Sie eine Cloud Run-Funktion bereit, die Nachrichten in einem Pub/Sub-Thema veröffentlicht.
Cloud Run-Funktion erstellen und konfigurieren
Console
Wechseln Sie in der Google Cloud Console zur Seite Cloud Run-Funktionen.
Klicken Sie auf Funktion erstellen.
Wählen Sie im Feld Umgebung die Option 1. Generation aus.
Geben Sie im Feld Funktionsname den Namen der Funktion ein:
pubsub-publisher
Wählen Sie im Feld Trigger die Option HTTP aus.
Wählen Sie im Bereich Authentifizierung die Option Nicht authentifizierte Aufrufe zulassen aus. Mit dieser Option können nicht authentifizierte Nutzer eine HTTP-Funktion aufrufen.
Klicken Sie auf „Speichern“.
Klicken Sie auf Weiter, um zum Schritt Code zu gelangen.
Terraform
Verwenden Sie für diesen Schritt gegebenenfalls die Google Cloud Console, da es keine einfache Möglichkeit, den Quellcode der Funktion über Terraform zu verwalten.
In diesem Beispiel wird gezeigt, wie Sie eine Cloud Run-Funktion aus einer lokalen ZIP-Archivdatei hochladen. Dazu erstellen Sie einen Cloud Storage-Bucket, speichern die Datei in diesem Bucket und verwenden die Datei aus dem Bucket dann als Quelle für die Cloud Run-Funktion. Bei diesem Ansatz aktualisiert Terraform den Quellcode Ihrer Funktion nicht automatisch, auch wenn Sie eine neue Archivdatei erstellen. Wenn Sie den Funktionscode noch einmal hochladen möchten, können Sie den Dateinamen des Archivs ändern.
- Laden Sie die
pubsub_publisher.py
und dierequirements.txt
Dateien. - Ersetzen Sie in der Datei
pubsub_publisher.py
die Variable<PROJECT_ID>
durch die Projekt-ID Ihres Projekts. Beispiel:example-project
. - Erstellen Sie ein ZIP-Archiv namens
pubsub_function.zip
mit dempbusub_publisner.py
und die Dateirequirements.txt
. - Speichern Sie das ZIP-Archiv in einem Verzeichnis, in dem Ihr Terraform-Script gespeichert ist.
- Fügen Sie Ihrem Terraform-Script die folgenden Ressourcendefinitionen hinzu und ersetzen Sie
<PROJECT_ID>
durch die Projekt-ID Ihres Projekts.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Codeparameter für Cloud Run-Funktionen angeben
Console
Wählen Sie im Schritt Code im Feld Laufzeit die Sprache aus. die die Funktion verwendet. Wählen Sie in diesem Beispiel Python 3.10 aus.
Geben Sie im Feld Einstiegspunkt den Wert
pubsub_publisher
ein. Dies ist der Code, der beim Ausführen der Cloud Run-Funktion ausgeführt wird. Der Wert dieses Flags muss ein Funktionsname oder ein voll qualifizierter Klassenname sein, der in Ihrem Quellcode vorhanden ist.
Terraform
Diesen Schritt überspringen. Die Parameter der Cloud Run-Funktion sind bereits in der Ressource google_cloudfunctions_function
definiert.
Cloud Run-Funktionscode hochladen
Console
Wählen Sie im Feld Quellcode die entsprechende Option für die Bereitstellung des Funktionsquellcodes aus. In dieser Anleitung fügen Sie den Funktionscode mit dem Inline-Editor von Cloud Run Functions hinzu. Alternativ können Sie eine ZIP-Datei hochladen oder Cloud Source Repositories erstellen.
- Fügen Sie das folgende Codebeispiel in die Datei main.py ein.
- Ersetzen Sie
<PROJECT_ID>
durch Projekt-ID. Ihres Projekts. Beispiel:example-project
Terraform
Diesen Schritt überspringen. Cloud Run-Funktionsparameter sind bereits in
Die Ressource google_cloudfunctions_function
.
Abhängigkeiten von Cloud Run-Funktionen angeben
Console
Geben Sie die Abhängigkeiten der Funktion in der Metadatendatei requirements.txt an:
Wenn Sie die Funktion bereitstellen, lädt und installiert Cloud Run Functions die in der Datei requirements.txt deklarierten Abhängigkeiten, eine Zeile pro Paket.
Diese Datei muss sich im selben Verzeichnis befinden wie die Datei main.py, die
Ihren Funktionscode. Weitere Informationen finden Sie in der pip
-Dokumentation unter Anforderungen.
Terraform
Diesen Schritt überspringen. Cloud Run-Funktionsabhängigkeiten werden definiert in
Die Datei requirements.txt
im Archiv pubsub_function.zip
.
Cloud Run-Funktion bereitstellen
Console
Klicken Sie auf Bereitstellen. Wenn die Bereitstellung erfolgreich abgeschlossen wurde, wird die Funktion angezeigt. mit einem grünen Häkchen auf der Seite Cloud Run-Funktionen in der Google Cloud Console
Prüfen Sie, ob das Dienstkonto, das die Cloud Run-Funktion ausführt, die erforderlichen Berechtigungen in Ihrem Projekt haben, um auf Pub/Sub
Terraform
Initialisieren Sie Terraform:
terraform init
Prüfen Sie die Konfiguration und prüfen Sie, ob die Ressourcen, die Terraform erstellen oder aktualisieren wird, Ihren Erwartungen entsprechen:
terraform plan
Führen Sie folgenden Befehl aus, um zu prüfen, ob Ihre Konfiguration gültig ist: Befehl:
terraform validate
Wenden Sie die Terraform-Konfiguration an. Führen Sie dazu den folgenden Befehl aus und geben Sie „yes“ (Ja) an der Eingabeaufforderung ein:
terraform apply
Warten Sie, bis Terraform die Meldung „Apply complete“ anzeigt.
Rufen Sie in der Google Cloud Console Ihre Ressourcen in der Benutzeroberfläche auf, um sicherzustellen, dass Terraform sie erstellt oder aktualisiert hat.
Cloud Run-Funktion testen
Prüfen, ob Ihre Funktion eine Nachricht in einem Pub/Sub-Thema veröffentlicht und dass die Beispiel-DAGs wie vorgesehen funktionieren:
Prüfen Sie, ob die DAGs aktiv sind:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Überprüfen Sie die Werte in der Spalte State auf DAGs mit den Namen
trigger_dag
undtarget_dag
. Beide DAGs müssen den StatusActive
haben.
Senden Sie eine Pub/Sub-Testnachricht. So gehts in Cloud Shell:
Rufen Sie in der Google Cloud Console die Seite Functions auf.
Klicken Sie auf den Namen Ihrer Funktion,
pubsub-publisher
.Rufen Sie den Tab Testen auf.
Geben Sie im Abschnitt Auslösendes Ereignis konfigurieren den folgenden JSON-Schlüssel/Wert ein:
{"message": "target_dag"}
. Ändern Sie nicht das Schlüssel/Wert-Paar da diese Nachricht später den Test-DAG auslöst.Klicken Sie im Abschnitt Testbefehl auf In Cloud Shell testen.
Warten Sie im Cloud Shell-Terminal, bis ein Befehl automatisch angezeigt wird. Führen Sie diesen Befehl aus, indem Sie
Enter
drücken.Wenn die Meldung Cloud Shell autorisieren angezeigt wird, klicken Sie auf Autorisieren.
Prüfen, ob der Nachrichteninhalt dem Pub/Sub entspricht . In diesem Beispiel muss die Ausgabenachricht mit
Message b'target_dag' with message_length 10 published to
als Antwort von Ihrer Funktion.
Prüfen Sie, ob
target_dag
ausgelöst wurde:Warten Sie mindestens eine Minute, damit eine neue DAG-Ausführung von
trigger_dag
ausgeführt wird abgeschlossen wird.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf
trigger_dag
, um die Seite DAG-Details aufzurufen. Wählen Sie auf der Seite Runs (Läufe) wird eine Liste der DAG-Ausführungen für den DAGtrigger_dag
angezeigt.Dieser DAG wird jede Minute ausgeführt und verarbeitet alle Pub/Sub-Nachrichten, die von der Funktion gesendet werden. Wenn keine Nachrichten gesendet wurden, Die Aufgabe
trigger_target
ist in den DAG-Ausführungslogs alsSkipped
gekennzeichnet. Wenn DAGs ausgelöst wurden, wird die Aufgabetrigger_target
alsSuccess
markiert.Sehen Sie sich mehrere aktuelle DAG-Ausführungen an, um eine DAG-Ausführung zu finden, bei der alle drei Aufgaben (
subscribe_task
,pull_messages_operator
undtrigger_target
) haben den StatusSuccess
.Kehren Sie zum Tab DAGs zurück und prüfen Sie, ob die Meldung Erfolgreiche Ausführungen für den DAG
target_dag
eine erfolgreiche Ausführung.
Fazit
In dieser Anleitung haben Sie erfahren, wie Sie mit Cloud Run-Funktionen Nachrichten in einem Pub/Sub-Thema an und stellen einen DAG bereit, der einen Pub/Sub-Thema, ruft Pub/Sub-Nachrichten ab und Trigger ein anderer DAG, der in der DAG-ID der Nachrichtendaten angegeben ist.
Es gibt auch alternative Möglichkeiten, Pub/Sub-Abos erstellen und verwalten und auslösen von DAGs, die sich außerhalb in dieser Anleitung behandelt. So können Sie zum Beispiel Cloud Run-Funktionen zum Auslösen von Airflow-DAGs verwenden Ein bestimmtes Ereignis eintritt. In unseren Anleitungen können Sie die anderen Google Cloud-Funktionen selbst ausprobieren.
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, können Sie entweder das Projekt löschen, das die Ressourcen enthält, oder das Projekt beibehalten und die einzelnen Ressourcen löschen.
Projekt löschen
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Console
- Löschen Sie die Cloud Composer-Umgebung. Außerdem löschen Sie den Bucket der Umgebung.
- Löschen Sie das Pub/Sub-Thema
dag-topic-trigger
. Löschen Sie die Cloud Run-Funktion.
Rufen Sie in der Google Cloud Console Cloud Run-Funktionen auf.
Klicken Sie auf das Kästchen der Funktion, die Sie löschen möchten,
pubsub-publisher
Klicken Sie auf Löschen und folgen Sie der Anleitung.
Terraform
- Achten Sie darauf, dass Ihr Terraform-Script keine Einträge für Ressourcen enthält, die für Ihr Projekt noch erforderlich sind. Beispielsweise können Sie einige APIs aktiviert und IAM-Berechtigungen weiterhin zuweisen lassen, wenn Sie Ihrem Terraform-Script entsprechende Definitionen hinzugefügt haben.
- Führen Sie
terraform destroy
aus. - Löschen Sie den Bucket der Umgebung manuell. Cloud Composer löscht sie nicht automatisch. Sie können es von mit der Google Cloud Console oder der Google Cloud CLI.
Nächste Schritte
- DAGs testen
- HTTP-Funktionen testen
- Cloud Run-Funktion bereitstellen
- Probieren Sie andere Google Cloud-Funktionen selbst aus. Werfen Sie einen Blick finden Sie in unseren Anleitungen.