Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page explique comment créer une architecture push basée sur des événements Déclencher des DAG Cloud Composer en réponse à Pub/Sub les changements de sujet. Les exemples de ce tutoriel montrent comment gérer le cycle complet de gestion de Pub/Sub, y compris la gestion des abonnements, dans le cadre du processus DAG. Il convient à certains cas d'utilisation courants lorsque vous devez déclencher des DAG, mais que vous ne souhaitez pas configurer d'autorisations d'accès supplémentaires.
Par exemple, les messages envoyés via Pub/Sub peuvent être utilisés comme solution si vous ne souhaitez pas fournir un accès direct à une instance Cloud Composer pour des raisons de sécurité. Vous pouvez configurer un Fonction Cloud Run qui crée des messages Pub/Sub et les publie sur un sujet Pub/Sub. Vous pouvez ensuite créer un DAG qui extrait les messages Pub/Sub, puis les traite.
Dans cet exemple spécifique, vous créez une fonction Cloud Run et déployez deux DAG. Le premier DAG extrait les messages Pub/Sub et déclenche un second DAG en fonction du contenu du message Pub/Sub.
Dans ce tutoriel, nous partons du principe que vous connaissez bien Python et la console Google Cloud.
Objectifs
Coûts
Ce tutoriel utilise les composants facturables suivants de Google Cloud :
- Cloud Composer (consultez également frais supplémentaires).
- Pub/Sub
- Fonctions Cloud Run
Une fois que vous avez terminé ce tutoriel, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir, plus, consultez la section Effectuer un nettoyage.
Avant de commencer
Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet comme suit:
Dans la console Google Cloud, sélectionnez ou créez un projet :
Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.
Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires :
- Utilisateur du compte de service (
roles/iam.serviceAccountUser
) - Éditeur Pub/Sub (
roles/pubsub.editor
) - Administrateur de l'environnement et des objets Storage (
roles/composer.environmentAndStorageObjectAdmin
) - Administrateur des fonctions Cloud Run (
roles/cloudfunctions.admin
) - Visionneuse de journaux (
roles/logging.viewer
)
- Utilisateur du compte de service (
Assurez-vous que la propriété Compte de service qui exécute votre fonction Cloud Run dispose des autorisations suffisantes dans votre projet pour accéder à Pub/Sub. Par défaut, les fonctions Cloud Run utilisent le compte de service App Engine par défaut. Ce compte de service dispose du rôle Éditeur, qui dispose de suffisamment de pour ce tutoriel.
Activer les API pour votre projet.
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Activez l'API Cloud Composer dans votre projet en ajoutant les définitions de ressources suivantes à votre script Terraform :
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Remplacez <PROJECT_ID>
par l'ID de projet de votre projet. Par exemple, example-project
.
Créer votre environnement Cloud Composer
Créez un environnement Cloud Composer 2.
Dans le cadre de cette procédure, vous attribuez le rôle Extension de l'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) au compte d'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud.
Créer un sujet Pub/Sub
Cet exemple déclenche un DAG en réponse à un message envoyé dans un sujet Pub/Sub. Créer un sujet Pub/Sub à utiliser dans ce cours Exemple:
Console
Dans la console Google Cloud, accédez à la page Sujets Pub/Sub.
Cliquez sur Créer un sujet.
Dans le champ ID du sujet, saisissez
dag-topic-trigger
comme ID pour votre sujet.Conservez les valeurs par défaut des autres options.
Cliquez sur Créer un sujet.
gcloud
Pour créer un sujet, exécutez la gcloud pubsub topics create dans la Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Ajoutez les définitions de ressources suivantes à votre script Terraform :
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Remplacez <PROJECT_ID>
par l'ID du projet.
de votre projet. Par exemple, example-project
.
Importer vos DAG
Importez des DAG dans votre environnement:
- Enregistrez le fichier DAG suivant sur votre ordinateur local.
- Remplacez
<PROJECT_ID>
par l'ID du projet. de votre projet. Par exemple,example-project
. - Importez le fichier DAG modifié dans votre environnement.
L'exemple de code contient deux DAG : trigger_dag
et target_dag
.
Le DAG trigger_dag
s'abonne à un sujet Pub/Sub, extrait
messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID du DAG
des données de message Pub/Sub. Dans cet exemple, trigger_dag
déclenche
le DAG target_dag
, qui génère des messages dans les journaux des tâches.
Le DAG trigger_dag
contient les tâches suivantes :
subscribe_task
: Abonnez-vous à un sujet Pub/Sub.pull_messages_operator
: lit les données d'un message Pub/Sub. avecPubSubPullOperator
.trigger_target_dag
: déclenchez un autre DAG (dans cet exemple,target_dag
). selon les données des messages extraits sur ce sujet.
Le DAG target_dag
ne contient qu'une seule tâche: output_to_logs
. Cette tâche
imprime des messages dans le journal des tâches avec un délai d'une seconde.
Déployer une fonction Cloud Run qui publie des messages sur un sujet Pub/Sub
Dans cette section, vous allez déployer une fonction Cloud Run qui publie des messages sur un sujet Pub/Sub.
Créer une fonction Cloud Run et spécifier sa configuration
Console
Dans la console Google Cloud, accédez à la page Fonctions Cloud Run.
Cliquez sur Créer une fonction.
Dans le champ Environnement, sélectionnez 1re génération.
Dans le champ Nom de la fonction, saisissez le nom de votre fonction:
pubsub-publisher
Dans le champ Trigger (Déclencheur), sélectionnez HTTP.
Dans la section Authentification, sélectionnez Autorisez les appels non authentifiés. Cette option permet aux utilisateurs non authentifiés d'appeler une fonction HTTP.
Cliquez sur Enregistrer.
Cliquez sur Suivant pour passer à l'étape Code.
Terraform
Envisagez d'utiliser la console Google Cloud pour cette étape, car il n'existe aucun moyen simple de gérer le code source de la fonction à partir de Terraform.
Cet exemple montre comment importer une fonction Cloud Run à partir d'un fichier d'archive ZIP local en créant un bucket Cloud Storage, de stocker le fichier dans ce bucket, puis d'utiliser le fichier du bucket source de la fonction Cloud Run. Si vous utilisez cette approche, Terraform ne met pas automatiquement à jour le code source de votre fonction, même si vous créez un nouveau fichier d'archive. Pour réimporter le code de la fonction, peut changer le nom de fichier de l’archive.
- Téléchargez le
pubsub_publisher.py
etrequirements.txt
. - Dans le fichier
pubsub_publisher.py
, remplacez<PROJECT_ID>
par l'ID de projet de votre projet. Par exemple,example-project
. - Créez une archive ZIP nommée
pubsub_function.zip
avec les fichierspbusub_publisner.py
etrequirements.txt
. - Enregistrez l'archive ZIP dans un répertoire où est stocké votre script Terraform.
- Ajoutez les définitions de ressources suivantes à votre script Terraform et
Remplacez
<PROJECT_ID>
par l'ID de votre projet.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Spécifier les paramètres du code de la fonction Cloud Run
Console
À l'étape Code, dans le champ Environnement d'exécution, sélectionnez l'environnement d'exécution de langage utilisé par votre fonction. Dans cet exemple, sélectionnez Python 3.10.
Dans le champ Point d'entrée, saisissez
pubsub_publisher
. Il s'agit du code qui est exécuté lors de l'exécution de votre fonction Cloud Run. La valeur cet indicateur doit être un nom de fonction ou un nom de classe complet dans votre code source.
Terraform
Ignorez cette étape. Les paramètres de la fonction Cloud Run sont déjà définis
la ressource google_cloudfunctions_function
.
Importer le code de votre fonction Cloud Run
Console
Dans le champ Code source, sélectionnez l'option appropriée en fonction de la manière dont vous fournir le code source de la fonction. Dans ce tutoriel, vous allez ajouter le code de votre fonction à l'aide de l'éditeur intégré des fonctions Cloud Run. Vous pouvez également importer un fichier ZIP ou utiliser Cloud Source Repositories.
- Placez l'exemple de code suivant dans le fichier main.py.
- Remplacez
<PROJECT_ID>
par l'ID du projet. de votre projet. Par exemple,example-project
.
Terraform
Ignorez cette étape. Les paramètres de la fonction Cloud Run sont déjà définis dans la ressource google_cloudfunctions_function
.
Spécifier les dépendances de votre fonction Cloud Run
Console
Spécifiez les dépendances de la fonction dans le fichier de métadonnées requirements.txt :
Lorsque vous déployez votre fonction, Cloud Run Functions télécharge et installe les dépendances déclarées dans le fichier requirements.txt, une ligne par package.
Ce fichier doit se trouver dans le même répertoire que le fichier main.py qui contient le code de votre fonction. Pour en savoir plus, consultez la section Fichiers de spécifications dans la documentation pip
.
Terraform
Ignorez cette étape. Les dépendances des fonctions Cloud Run sont définies
le fichier requirements.txt
dans l'archive pubsub_function.zip
.
Déployer votre fonction Cloud Run
Console
Cliquez sur Déployer. Une fois le déploiement terminé, la fonction apparaît avec une coche verte sur la page Fonctions Cloud Run de la console Google Cloud.
Assurez-vous que le compte de service qui exécute votre fonction Cloud Run dispose des autorisations nécessaires dans votre projet pour accéder Pub/Sub.
Terraform
Initialisez Terraform :
terraform init
Examinez la configuration et vérifiez que les ressources que Terraform va créer ou mettre à jour correspondent à vos attentes :
terraform plan
Pour vérifier si votre configuration est valide, exécutez la commande suivante : :
terraform validate
Appliquez la configuration Terraform en exécutant la commande suivante et en saisissant "yes" lorsque vous y êtes invité :
terraform apply
Attendez que Terraform affiche le message "Apply completed!" (Application terminée).
Dans la console Google Cloud, accédez à vos ressources dans l'interface utilisateur pour vous assurer que Terraform les a créées ou mises à jour.
Tester votre fonction Cloud Run
Vérifier que votre fonction publie un message sur un sujet Pub/Sub et que les exemples de DAG fonctionnent comme prévu:
Vérifiez que les DAG sont actifs :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet DAG.
Vérifiez les valeurs de la colonne État pour les DAG nommés
trigger_dag
ettarget_dag
. Les deux DAG doivent être à l'étatActive
.
Envoyez un message Pub/Sub test. Vous pouvez le faire dans Cloud Shell :
Dans la console Google Cloud, accédez à la page Fonctions.
Cliquez sur le nom de votre fonction,
pubsub-publisher
.Accédez à l'onglet Test.
Dans la section Configurer l'événement déclencheur, saisissez les valeurs suivantes : Clé-valeur JSON:
{"message": "target_dag"}
. Ne modifiez pas la paire clé-valeur, car ce message déclenche le DAG de test plus tard.Dans la section Tester la commande, cliquez sur Tester dans Cloud Shell.
Dans le terminal Cloud Shell, attendez qu'une commande s'affiche automatiquement. Exécutez cette commande en appuyant sur
Enter
.Si le message Autoriser Cloud Shell s'affiche, cliquez sur Autoriser.
Vérifiez que le contenu du message correspond à l'instance Pub/Sub . Dans cet exemple, le message de sortie doit commencer par
Message b'target_dag' with message_length 10 published to
en tant que de votre fonction.
Vérifiez que
target_dag
a été déclenché :Attendez au moins une minute pour qu'une nouvelle exécution du DAG de
trigger_dag
se termine.Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet DAG.
Cliquez sur
trigger_dag
pour accéder à la page Détails du DAG. Sur Runs (Exécutions) , la liste des exécutions de DAG pour le DAGtrigger_dag
s'affiche.Ce DAG s'exécute toutes les minutes et traite tous les messages Pub/Sub envoyés depuis la fonction. Si aucun message n'a été envoyé, la tâche
trigger_target
est marquée commeSkipped
dans les journaux d'exécution du DAG. Si des DAG ont été déclenchés, la tâchetrigger_target
est marquée commeSuccess
.Examinez plusieurs exécutions de DAG récentes pour trouver une exécution de DAG où les trois tâches (
subscribe_task
,pull_messages_operator
ettrigger_target
) sont à l'étatSuccess
.Revenez à l'onglet DAG et vérifiez que la colonne Exécutions réussies du DAG
target_dag
indique une exécution réussie.
Résumé
Dans ce tutoriel, vous avez appris à utiliser des fonctions Cloud Run pour publier des messages sur un sujet Pub/Sub et déployer un DAG qui s'abonne à un sujet Pub/Sub, extrait des messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID de DAG des données de message.
Il existe également d'autres moyens Créer et gérer des abonnements Pub/Sub et de déclencher des DAG externes de ce tutoriel. Par exemple, vous pouvez Utiliser des fonctions Cloud Run pour déclencher des DAG Airflow lorsqu'un événement spécifié se produit. Consultez nos tutoriels pour tester les autres fonctionnalités Google Cloud.
Effectuer un nettoyage
Pour éviter que les ressources soient facturées sur votre compte Google Cloud, utilisées dans ce tutoriel, supprimez le projet qui contient les ressources ou conserver le projet et supprimer les ressources individuelles.
Supprimer le projet
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Supprimer des ressources individuelles
Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.
Console
- Supprimez l'environnement Cloud Composer. Vous devez également supprimer le bucket de l'environnement au cours de cette procédure.
- Supprimez le sujet Pub/Sub,
dag-topic-trigger
. Supprimez la fonction Cloud Run.
Dans la console Google Cloud, accédez à "Fonctions Cloud Run".
Cochez la case correspondant à la fonction que vous souhaitez supprimer :
pubsub-publisher
.Cliquez sur Supprimer, puis suivez les instructions.
Terraform
- Assurez-vous que votre script Terraform ne contient pas d'entrées pour les ressources toujours requises par votre projet. Par exemple, vous pouvez choisir de conserver certaines API activées et les autorisations IAM attribuées (si vous avez ajouté de telles définitions à votre script Terraform).
- Exécutez
terraform destroy
. - Supprimer manuellement le bucket de l'environnement. Cloud Composer ne le supprime pas automatiquement. Vous pouvez le faire depuis la console Google Cloud ou la Google Cloud CLI.
Étape suivante
- Tester les DAG
- Test des fonctions HTTP
- Déployer une fonction Cloud Run
- Testez d'autres fonctionnalités de Google Cloud. Découvrez nos tutoriels.