Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page vous explique comment créer une architecture push basée sur des événements en déclenchant des DAG Cloud Composer en réponse aux modifications de sujets Pub/Sub. Les exemples de ce tutoriel montrent comment gérer l'intégralité du cycle de la gestion Pub/Sub, y compris la gestion des abonnements, dans le cadre du processus DAG. Il convient à certains cas d'utilisation courants lorsque vous devez déclencher des DAG, mais que vous ne souhaitez pas configurer d'autorisations d'accès supplémentaires.
Par exemple, les messages envoyés via Pub/Sub peuvent être utilisés comme solution si vous ne souhaitez pas fournir un accès direct à un environnement Cloud Composer pour des raisons de sécurité. Vous pouvez configurer une fonction Cloud Run qui crée des messages Pub/Sub et les publie sur un sujet Pub/Sub. Vous pouvez ensuite créer un DAG qui extrait les messages Pub/Sub, puis les traite.
Dans cet exemple spécifique, vous créez une fonction Cloud Run et déployez deux DAG. Le premier DAG extrait les messages Pub/Sub et déclenche le deuxième DAG en fonction du contenu du message Pub/Sub.
Dans ce tutoriel, nous partons du principe que vous connaissez bien Python et la console Google Cloud.
Objectifs
Coûts
Ce tutoriel utilise les composants facturables suivants de Google Cloud :
- Cloud Composer (consultez également frais supplémentaires).
- Pub/Sub
- Fonctions Cloud Run
Une fois que vous avez terminé ce tutoriel, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir, plus, consultez la section Effectuer un nettoyage.
Avant de commencer
Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet comme suit:
Dans la console Google Cloud, sélectionnez ou créez un projet:
Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.
Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires :
- Utilisateur du compte de service (
roles/iam.serviceAccountUser
) - Éditeur Pub/Sub (
roles/pubsub.editor
) - Administrateur de l'environnement et des objets Storage (
roles/composer.environmentAndStorageObjectAdmin
) - Administrateur des fonctions Cloud Run (
roles/cloudfunctions.admin
) - Lecteur de journaux (
roles/logging.viewer
)
- Utilisateur du compte de service (
Assurez-vous que la propriété Compte de service qui exécute votre fonction Cloud Run dispose des autorisations suffisantes dans votre projet pour accéder à Pub/Sub. Par par défaut, les fonctions Cloud Run utilisent Compte de service App Engine par défaut. Ce compte de service dispose du rôle Éditeur, qui dispose d'autorisations suffisantes pour ce tutoriel.
Activer les API pour votre projet.
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Activez l'API Cloud Composer dans votre projet en ajoutant les définitions de ressources suivantes à votre script Terraform :
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Remplacez <PROJECT_ID>
par l'ID de projet de votre projet. Par exemple, example-project
.
Créer votre environnement Cloud Composer
Créez un environnement Cloud Composer 2.
Dans le cadre de cette procédure, vous attribuez le rôle Extension de l'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) au compte d'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations
dans votre projet Google Cloud.
Créer un sujet Pub/Sub
Cet exemple déclenche un DAG en réponse à un message envoyé dans un sujet Pub/Sub. Créez un sujet Pub/Sub à utiliser dans cet exemple :
Console
Dans la console Google Cloud, accédez à la page Sujets Pub/Sub.
Cliquez sur Créer un sujet.
Dans le champ ID du sujet, saisissez
dag-topic-trigger
comme ID pour votre sujet.Conservez les valeurs par défaut des autres options.
Cliquez sur Créer un sujet.
gcloud
Pour créer un sujet, exécutez la gcloud pubsub topics create dans la Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Ajoutez les définitions de ressources suivantes à votre script Terraform :
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Remplacez <PROJECT_ID>
par l'ID de projet de votre projet. Par exemple, example-project
.
Importer vos DAG
Importez des DAG dans votre environnement:
- Enregistrez le fichier DAG suivant sur votre ordinateur local.
- Remplacez
<PROJECT_ID>
par l'ID du projet. de votre projet. Par exemple,example-project
. - Importez le fichier DAG modifié dans votre environnement.
L'exemple de code contient deux DAG : trigger_dag
et target_dag
.
Le DAG trigger_dag
s'abonne à un sujet Pub/Sub, extrait
messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID du DAG
des données de message Pub/Sub. Dans cet exemple, trigger_dag
déclenche
le DAG target_dag
, qui génère des messages dans les journaux des tâches.
Le DAG trigger_dag
contient les tâches suivantes :
subscribe_task
: Abonnez-vous à un sujet Pub/Sub.pull_messages_operator
: lisez les données d'un message Pub/Sub avecPubSubPullOperator
.trigger_target_dag
: déclenchez un autre DAG (dans cet exemple,target_dag
). selon les données des messages extraits sur ce sujet.
Le DAG target_dag
ne contient qu'une seule tâche : output_to_logs
. Cette tâche
imprime des messages dans le journal des tâches avec un délai d'une seconde.
Déployer une fonction Cloud Run qui publie des messages sur un sujet Pub/Sub
Dans cette section, vous allez déployer une fonction Cloud Run qui publie des messages dans un sujet Pub/Sub.
Créer une fonction Cloud Run et spécifier sa configuration
Console
Dans la console Google Cloud, accédez à la page Fonctions Cloud Run.
Cliquez sur Créer une fonction.
Dans le champ Environnement, sélectionnez 1re génération.
Dans le champ Nom de la fonction, saisissez le nom de votre fonction :
pubsub-publisher
.Dans le champ Trigger (Déclencheur), sélectionnez HTTP.
Dans la section Authentification, sélectionnez Autorisez les appels non authentifiés. Cette option accorde utilisateurs non authentifiés la possibilité d'appeler une fonction HTTP.
Cliquez sur Enregistrer.
Cliquez sur Suivant pour passer à l'étape Code.
Terraform
Nous vous conseillons d'utiliser la console Google Cloud pour cette étape, car il n'y a pas simple de gérer le code source de la fonction à partir de Terraform.
Cet exemple montre comment importer une fonction Cloud Run à partir d'un fichier d'archive ZIP local en créant un bucket Cloud Storage, de stocker le fichier dans ce bucket, puis d'utiliser le fichier du bucket source de la fonction Cloud Run. Si vous utilisez cette approche, Terraform ne met pas automatiquement à jour le code source de votre fonction, même si vous créez un nouveau fichier d'archive. Pour réimporter le code de la fonction, peut changer le nom de fichier de l’archive.
- Téléchargez le
pubsub_publisher.py
etrequirements.txt
. - Dans le fichier
pubsub_publisher.py
, remplacez<PROJECT_ID>
par l'ID de projet de votre projet. Par exemple,example-project
. - Créez une archive ZIP nommée
pubsub_function.zip
avec les fichierspbusub_publisner.py
etrequirements.txt
. - Enregistrez l'archive ZIP dans un répertoire où est stocké votre script Terraform.
- Ajoutez les définitions de ressources suivantes à votre script Terraform et
Remplacez
<PROJECT_ID>
par l'ID de votre projet.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Spécifier les paramètres de code de fonction Cloud Run
Console
À l'étape Code, dans le champ Environnement d'exécution, sélectionnez l'environnement d'exécution de langage utilisé par votre fonction. Dans cet exemple, sélectionnez Python 3.10.
Dans le champ Point d'entrée, saisissez
pubsub_publisher
. Il s'agit du code qui s'exécute lorsque votre fonction Cloud Run s'exécute. La valeur de cette option doit être un nom de fonction ou un nom de classe complet qui existe dans votre code source.
Terraform
Ignorez cette étape. Les paramètres de la fonction Cloud Run sont déjà définis
la ressource google_cloudfunctions_function
.
Importer le code de votre fonction Cloud Run
Console
Dans le champ Code source, sélectionnez l'option appropriée en fonction de la manière dont vous fournir le code source de la fonction. Dans ce tutoriel, vous allez ajouter le code de votre fonction à l'aide des fonctions Cloud Run. Éditeur intégré : Vous pouvez également importer un fichier ZIP ou utiliser Cloud Source Repositories.
- Placez l'exemple de code suivant dans le fichier main.py.
- Remplacez
<PROJECT_ID>
par l'ID de projet de votre projet. Par exemple,example-project
.
Terraform
Ignorez cette étape. Les paramètres de la fonction Cloud Run sont déjà définis dans la ressource google_cloudfunctions_function
.
Spécifier les dépendances de votre fonction Cloud Run
Console
Spécifiez les dépendances des fonctions dans le fichier de métadonnées requirements.txt:
Lorsque vous déployez votre fonction, les fonctions Cloud Run téléchargent et installent
déclarées dans le fichier requirements.txt, à raison d'une ligne par package.
Ce fichier doit se trouver dans le même répertoire que le fichier main.py qui contient le code de votre fonction. Pour en savoir plus, consultez la section Fichiers de spécifications dans la documentation pip
.
Terraform
Ignorez cette étape. Les dépendances des fonctions Cloud Run sont définies dans le fichier requirements.txt
de l'archive pubsub_function.zip
.
Déployer votre fonction Cloud Run
Console
Cliquez sur Déployer. Une fois le déploiement terminé, la fonction apparaît avec une coche verte sur la page Fonctions Cloud Run de la console Google Cloud.
Assurez-vous que le compte de service qui exécute votre fonction Cloud Run dispose des autorisations nécessaires dans votre projet pour accéder Pub/Sub.
Terraform
Initialisez Terraform :
terraform init
Examiner la configuration et vérifier que les ressources gérées par Terraform que vous allez créer ou mettre à jour, répondent à vos attentes:
terraform plan
Pour vérifier si votre configuration est valide, exécutez la commande suivante : :
terraform validate
Appliquez la configuration Terraform en exécutant la commande suivante : en saisissant "yes" à l'invite:
terraform apply
Attendez que Terraform affiche le message "Apply completed!" (Application terminée).
Dans la console Google Cloud, accédez à vos ressources dans l'interface utilisateur pour vous assurer que Terraform les a créées ou mises à jour.
Tester votre fonction Cloud Run
Vérifier que votre fonction publie un message sur un sujet Pub/Sub et que les exemples de DAG fonctionnent comme prévu:
Vérifiez que les DAG sont actifs :
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet DAG.
Vérifiez les valeurs de la colonne State (État) pour les DAG nommés
trigger_dag
ettarget_dag
Les deux DAG doivent être à l'étatActive
.
Envoyer un message Pub/Sub de test. Vous pouvez le faire dans Cloud Shell :
Dans la console Google Cloud, accédez à la page Fonctions.
Cliquez sur le nom de votre fonction,
pubsub-publisher
.Accédez à l'onglet Test.
Dans la section Configurer l'événement déclencheur, saisissez les valeurs suivantes : Clé-valeur JSON:
{"message": "target_dag"}
. Ne modifiez pas la clé-valeur car ce message déclenche ultérieurement le DAG de test.Dans la section Tester la commande, cliquez sur Tester dans Cloud Shell.
Dans le terminal Cloud Shell, attendez qu'une commande s'affiche automatiquement. Exécutez cette commande en appuyant sur
Enter
.Si le message Autoriser Cloud Shell s'affiche, cliquez sur Autoriser.
Vérifiez que le contenu du message correspond à l'instance Pub/Sub . Dans cet exemple, le message de sortie doit commencer par
Message b'target_dag' with message_length 10 published to
en tant que de votre fonction.
Vérifiez que
target_dag
a été déclenché:Attendez au moins une minute pour qu'une nouvelle exécution du DAG de
trigger_dag
se termine.Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet DAG.
Cliquez sur
trigger_dag
pour accéder à la page Détails du DAG. Dans l'onglet Exécutions, une liste des exécutions de DAG pour le DAGtrigger_dag
s'affiche.Ce DAG s'exécute toutes les minutes et traite tous les messages Pub/Sub envoyés depuis la fonction. Si aucun message n'a été envoyé, la tâche
trigger_target
est marquée commeSkipped
dans les journaux d'exécution du DAG. Si DAG ont été déclenchés, la tâchetrigger_target
est marquée commeSuccess
Examinez plusieurs exécutions de DAG récentes pour trouver une exécution de DAG où les trois tâches (
subscribe_task
,pull_messages_operator
ettrigger_target
) sont à l'étatSuccess
.Revenez à l'onglet DAG et vérifiez que la colonne Exécutions réussies du DAG
target_dag
indique une exécution réussie.
Résumé
Dans ce tutoriel, vous avez appris à utiliser les fonctions Cloud Run pour publier messages sur un sujet Pub/Sub et déployer un DAG qui s'abonne à un Sujet Pub/Sub, extrait les messages Pub/Sub et déclencheurs un autre DAG spécifié dans l'ID du DAG des données du message.
Il existe également d'autres moyens Créer et gérer des abonnements Pub/Sub et de déclencher des DAG externes de ce tutoriel. Par exemple, vous pouvez Utiliser des fonctions Cloud Run pour déclencher des DAG Airflow lorsqu'un événement spécifié se produit. Consultez nos tutoriels pour tester les autres fonctionnalités Google Cloud.
Effectuer un nettoyage
Pour éviter que les ressources soient facturées sur votre compte Google Cloud, utilisées dans ce tutoriel, supprimez le projet qui contient les ressources ou conserver le projet et supprimer les ressources individuelles.
Supprimer le projet
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Supprimer des ressources individuelles
Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.
Console
- Supprimez l'environnement Cloud Composer. Vous avez également supprimer le bucket de l'environnement au cours de cette procédure.
- Supprimez le sujet Pub/Sub,
dag-topic-trigger
. Supprimez la fonction Cloud Run.
Dans la console Google Cloud, accédez à "Fonctions Cloud Run".
Cochez la case correspondant à la fonction que vous souhaitez supprimer :
pubsub-publisher
.Cliquez sur Supprimer, puis suivez les instructions.
Terraform
- Assurez-vous que votre script Terraform ne contient pas d'entrées pour ressources qui restent requises par votre projet. Par exemple : vous pouvez laisser certaines API activées autorisations toujours attribuées (si vous avez ajouté ce type de définitions à vos script Terraform).
- Exécutez
terraform destroy
. - Supprimer manuellement le bucket de l'environnement. Cloud Composer ne la supprime pas automatiquement. Vous pouvez le faire depuis la console Google Cloud ou la Google Cloud CLI.
Étape suivante
- Tester les DAG
- Test des fonctions HTTP
- Déployer une fonction Cloud Run
- Testez d'autres fonctionnalités de Google Cloud. Découvrez nos tutoriels.