Déclencher des DAG à l'aide de Cloud Functions et de messages Pub/Sub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment créer une architecture push basée sur des événements Déclencher des DAG Cloud Composer en réponse à Pub/Sub les changements de sujet. Les exemples de ce tutoriel montrent comment gérer le cycle complet de gestion de Pub/Sub, y compris la gestion des abonnements, dans le cadre du processus DAG. Il convient à certains cas d'utilisation courants lorsque vous devez déclencher des DAG, mais que vous ne souhaitez pas configurer d'autorisations d'accès supplémentaires.

Par exemple, les messages envoyés via Pub/Sub peuvent être utilisés comme solution si vous ne souhaitez pas fournir un accès direct à une instance Cloud Composer pour des raisons de sécurité. Vous pouvez configurer un Fonction Cloud Run qui crée des messages Pub/Sub et les publie sur un sujet Pub/Sub. Vous pouvez ensuite créer un DAG qui extrait les messages Pub/Sub, puis les traite.

Dans cet exemple spécifique, vous créez une fonction Cloud Run et déployez deux DAG. Le premier DAG extrait les messages Pub/Sub et déclenche un second DAG en fonction du contenu du message Pub/Sub.

Dans ce tutoriel, nous partons du principe que vous connaissez bien Python et la console Google Cloud.

Objectifs

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Une fois que vous avez terminé ce tutoriel, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir, plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet comme suit:

  1. Dans la console Google Cloud, sélectionnez ou créez un projet :

    Accéder au sélecteur de projet

  2. Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.

  3. Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires :

    • Utilisateur du compte de service (roles/iam.serviceAccountUser)
    • Éditeur Pub/Sub (roles/pubsub.editor)
    • Administrateur de l'environnement et des objets Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrateur des fonctions Cloud Run (roles/cloudfunctions.admin)
    • Visionneuse de journaux (roles/logging.viewer)
  4. Assurez-vous que la propriété Compte de service qui exécute votre fonction Cloud Run dispose des autorisations suffisantes dans votre projet pour accéder à Pub/Sub. Par défaut, les fonctions Cloud Run utilisent le compte de service App Engine par défaut. Ce compte de service dispose du rôle Éditeur, qui dispose de suffisamment de pour ce tutoriel.

Activer les API pour votre projet.

Console

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Activez l'API Cloud Composer dans votre projet en ajoutant les définitions de ressources suivantes à votre script Terraform :

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Remplacez <PROJECT_ID> par l'ID de projet de votre projet. Par exemple, example-project.

Créer votre environnement Cloud Composer

Créez un environnement Cloud Composer 2.

Dans le cadre de cette procédure, vous attribuez le rôle Extension de l'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) au compte d'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud.

Créer un sujet Pub/Sub

Cet exemple déclenche un DAG en réponse à un message envoyé dans un sujet Pub/Sub. Créer un sujet Pub/Sub à utiliser dans ce cours Exemple:

Console

  1. Dans la console Google Cloud, accédez à la page Sujets Pub/Sub.

    Accéder à Sujets Pub/Sub

  2. Cliquez sur Créer un sujet.

  3. Dans le champ ID du sujet, saisissez dag-topic-trigger comme ID pour votre sujet.

  4. Conservez les valeurs par défaut des autres options.

  5. Cliquez sur Créer un sujet.

gcloud

Pour créer un sujet, exécutez la gcloud pubsub topics create dans la Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Ajoutez les définitions de ressources suivantes à votre script Terraform :

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Remplacez <PROJECT_ID> par l'ID du projet. de votre projet. Par exemple, example-project.

Importer vos DAG

Importez des DAG dans votre environnement:

  1. Enregistrez le fichier DAG suivant sur votre ordinateur local.
  2. Remplacez <PROJECT_ID> par l'ID du projet. de votre projet. Par exemple, example-project.
  3. Importez le fichier DAG modifié dans votre environnement.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

L'exemple de code contient deux DAG : trigger_dag et target_dag.

Le DAG trigger_dag s'abonne à un sujet Pub/Sub, extrait messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID du DAG des données de message Pub/Sub. Dans cet exemple, trigger_dag déclenche le DAG target_dag, qui génère des messages dans les journaux des tâches.

Le DAG trigger_dag contient les tâches suivantes :

  • subscribe_task : Abonnez-vous à un sujet Pub/Sub.
  • pull_messages_operator: lit les données d'un message Pub/Sub. avec PubSubPullOperator.
  • trigger_target_dag: déclenchez un autre DAG (dans cet exemple, target_dag). selon les données des messages extraits sur ce sujet.

Le DAG target_dag ne contient qu'une seule tâche: output_to_logs. Cette tâche imprime des messages dans le journal des tâches avec un délai d'une seconde.

Déployer une fonction Cloud Run qui publie des messages sur un sujet Pub/Sub

Dans cette section, vous allez déployer une fonction Cloud Run qui publie des messages sur un sujet Pub/Sub.

Créer une fonction Cloud Run et spécifier sa configuration

Console

  1. Dans la console Google Cloud, accédez à la page Fonctions Cloud Run.

    Accéder aux fonctions Cloud Run

  2. Cliquez sur Créer une fonction.

  3. Dans le champ Environnement, sélectionnez 1re génération.

  4. Dans le champ Nom de la fonction, saisissez le nom de votre fonction: pubsub-publisher

  5. Dans le champ Trigger (Déclencheur), sélectionnez HTTP.

  6. Dans la section Authentification, sélectionnez Autorisez les appels non authentifiés. Cette option permet aux utilisateurs non authentifiés d'appeler une fonction HTTP.

  7. Cliquez sur Enregistrer.

  8. Cliquez sur Suivant pour passer à l'étape Code.

Terraform

Envisagez d'utiliser la console Google Cloud pour cette étape, car il n'existe aucun moyen simple de gérer le code source de la fonction à partir de Terraform.

Cet exemple montre comment importer une fonction Cloud Run à partir d'un fichier d'archive ZIP local en créant un bucket Cloud Storage, de stocker le fichier dans ce bucket, puis d'utiliser le fichier du bucket source de la fonction Cloud Run. Si vous utilisez cette approche, Terraform ne met pas automatiquement à jour le code source de votre fonction, même si vous créez un nouveau fichier d'archive. Pour réimporter le code de la fonction, peut changer le nom de fichier de l’archive.

  1. Téléchargez le pubsub_publisher.py et requirements.txt .
  2. Dans le fichier pubsub_publisher.py, remplacez <PROJECT_ID> par l'ID de projet de votre projet. Par exemple, example-project.
  3. Créez une archive ZIP nommée pubsub_function.zip avec les fichiers pbusub_publisner.py et requirements.txt.
  4. Enregistrez l'archive ZIP dans un répertoire où est stocké votre script Terraform.
  5. Ajoutez les définitions de ressources suivantes à votre script Terraform et Remplacez <PROJECT_ID> par l'ID de votre projet.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Spécifier les paramètres du code de la fonction Cloud Run

Console

  1. À l'étape Code, dans le champ Environnement d'exécution, sélectionnez l'environnement d'exécution de langage utilisé par votre fonction. Dans cet exemple, sélectionnez Python 3.10.

  2. Dans le champ Point d'entrée, saisissez pubsub_publisher. Il s'agit du code qui est exécuté lors de l'exécution de votre fonction Cloud Run. La valeur cet indicateur doit être un nom de fonction ou un nom de classe complet dans votre code source.

Terraform

Ignorez cette étape. Les paramètres de la fonction Cloud Run sont déjà définis la ressource google_cloudfunctions_function.

Importer le code de votre fonction Cloud Run

Console

Dans le champ Code source, sélectionnez l'option appropriée en fonction de la manière dont vous fournir le code source de la fonction. Dans ce tutoriel, vous allez ajouter le code de votre fonction à l'aide de l'éditeur intégré des fonctions Cloud Run. Vous pouvez également importer un fichier ZIP ou utiliser Cloud Source Repositories.

  1. Placez l'exemple de code suivant dans le fichier main.py.
  2. Remplacez <PROJECT_ID> par l'ID du projet. de votre projet. Par exemple, example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Ignorez cette étape. Les paramètres de la fonction Cloud Run sont déjà définis dans la ressource google_cloudfunctions_function.

Spécifier les dépendances de votre fonction Cloud Run

Console

Spécifiez les dépendances de la fonction dans le fichier de métadonnées requirements.txt :

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

Lorsque vous déployez votre fonction, Cloud Run Functions télécharge et installe les dépendances déclarées dans le fichier requirements.txt, une ligne par package. Ce fichier doit se trouver dans le même répertoire que le fichier main.py qui contient le code de votre fonction. Pour en savoir plus, consultez la section Fichiers de spécifications dans la documentation pip.

Terraform

Ignorez cette étape. Les dépendances des fonctions Cloud Run sont définies le fichier requirements.txt dans l'archive pubsub_function.zip.

Déployer votre fonction Cloud Run

Console

Cliquez sur Déployer. Une fois le déploiement terminé, la fonction apparaît avec une coche verte sur la page Fonctions Cloud Run de la console Google Cloud.

Assurez-vous que le compte de service qui exécute votre fonction Cloud Run dispose des autorisations nécessaires dans votre projet pour accéder Pub/Sub.

Terraform

  1. Initialisez Terraform :

    terraform init
    
  2. Examinez la configuration et vérifiez que les ressources que Terraform va créer ou mettre à jour correspondent à vos attentes :

    terraform plan
    
  3. Pour vérifier si votre configuration est valide, exécutez la commande suivante : :

    terraform validate
    
  4. Appliquez la configuration Terraform en exécutant la commande suivante et en saisissant "yes" lorsque vous y êtes invité :

    terraform apply
    

Attendez que Terraform affiche le message "Apply completed!" (Application terminée).

Dans la console Google Cloud, accédez à vos ressources dans l'interface utilisateur pour vous assurer que Terraform les a créées ou mises à jour.

Tester votre fonction Cloud Run

Vérifier que votre fonction publie un message sur un sujet Pub/Sub et que les exemples de DAG fonctionnent comme prévu:

  1. Vérifiez que les DAG sont actifs :

    1. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    3. Accédez à l'onglet DAG.

    4. Vérifiez les valeurs de la colonne État pour les DAG nommés trigger_dag et target_dag. Les deux DAG doivent être à l'état Active.

  2. Envoyez un message Pub/Sub test. Vous pouvez le faire dans Cloud Shell :

    1. Dans la console Google Cloud, accédez à la page Fonctions.

      Accéder aux fonctions Cloud Run

    2. Cliquez sur le nom de votre fonction, pubsub-publisher.

    3. Accédez à l'onglet Test.

    4. Dans la section Configurer l'événement déclencheur, saisissez les valeurs suivantes : Clé-valeur JSON: {"message": "target_dag"}. Ne modifiez pas la paire clé-valeur, car ce message déclenche le DAG de test plus tard.

    5. Dans la section Tester la commande, cliquez sur Tester dans Cloud Shell.

    6. Dans le terminal Cloud Shell, attendez qu'une commande s'affiche automatiquement. Exécutez cette commande en appuyant sur Enter.

    7. Si le message Autoriser Cloud Shell s'affiche, cliquez sur Autoriser.

    8. Vérifiez que le contenu du message correspond à l'instance Pub/Sub . Dans cet exemple, le message de sortie doit commencer par Message b'target_dag' with message_length 10 published to en tant que de votre fonction.

  3. Vérifiez que target_dag a été déclenché :

    1. Attendez au moins une minute pour qu'une nouvelle exécution du DAG de trigger_dag se termine.

    2. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    3. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    4. Accédez à l'onglet DAG.

    5. Cliquez sur trigger_dag pour accéder à la page Détails du DAG. Sur Runs (Exécutions) , la liste des exécutions de DAG pour le DAG trigger_dag s'affiche.

      Ce DAG s'exécute toutes les minutes et traite tous les messages Pub/Sub envoyés depuis la fonction. Si aucun message n'a été envoyé, la tâche trigger_target est marquée comme Skipped dans les journaux d'exécution du DAG. Si des DAG ont été déclenchés, la tâche trigger_target est marquée comme Success.

    6. Examinez plusieurs exécutions de DAG récentes pour trouver une exécution de DAG où les trois tâches (subscribe_task, pull_messages_operator et trigger_target) sont à l'état Success.

    7. Revenez à l'onglet DAG et vérifiez que la colonne Exécutions réussies du DAG target_dag indique une exécution réussie.

Résumé

Dans ce tutoriel, vous avez appris à utiliser des fonctions Cloud Run pour publier des messages sur un sujet Pub/Sub et déployer un DAG qui s'abonne à un sujet Pub/Sub, extrait des messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID de DAG des données de message.

Il existe également d'autres moyens Créer et gérer des abonnements Pub/Sub et de déclencher des DAG externes de ce tutoriel. Par exemple, vous pouvez Utiliser des fonctions Cloud Run pour déclencher des DAG Airflow lorsqu'un événement spécifié se produit. Consultez nos tutoriels pour tester les autres fonctionnalités Google Cloud.

Effectuer un nettoyage

Pour éviter que les ressources soient facturées sur votre compte Google Cloud, utilisées dans ce tutoriel, supprimez le projet qui contient les ressources ou conserver le projet et supprimer les ressources individuelles.

Supprimer le projet

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Supprimer des ressources individuelles

Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.

Console

  1. Supprimez l'environnement Cloud Composer. Vous devez également supprimer le bucket de l'environnement au cours de cette procédure.
  2. Supprimez le sujet Pub/Sub, dag-topic-trigger.
  3. Supprimez la fonction Cloud Run.

    1. Dans la console Google Cloud, accédez à "Fonctions Cloud Run".

      Accéder aux fonctions Cloud Run

    2. Cochez la case correspondant à la fonction que vous souhaitez supprimer : pubsub-publisher.

    3. Cliquez sur Supprimer, puis suivez les instructions.

Terraform

  1. Assurez-vous que votre script Terraform ne contient pas d'entrées pour les ressources toujours requises par votre projet. Par exemple, vous pouvez choisir de conserver certaines API activées et les autorisations IAM attribuées (si vous avez ajouté de telles définitions à votre script Terraform).
  2. Exécutez terraform destroy.
  3. Supprimer manuellement le bucket de l'environnement. Cloud Composer ne le supprime pas automatiquement. Vous pouvez le faire depuis la console Google Cloud ou la Google Cloud CLI.

Étape suivante