Déclencher des DAG à l'aide de Cloud Functions et de messages Pub/Sub

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment créer une architecture push basée sur des événements en déclenchant des DAG Cloud Composer en réponse aux modifications du sujet Pub/Sub. Les exemples de ce tutoriel illustrent la gestion du cycle complet de gestion de Pub/Sub, y compris la gestion des abonnements, dans le cadre du processus DAG. Elle convient à certains des cas d'utilisation courants lorsque vous devez déclencher des DAG sans configurer d'autorisations d'accès supplémentaires.

Par exemple, les messages envoyés via Pub/Sub peuvent être utilisés comme solution si vous ne souhaitez pas fournir un accès direct à un environnement Cloud Composer pour des raisons de sécurité. Vous pouvez configurer une fonction Cloud qui crée des messages Pub/Sub et les publie dans un sujet Pub/Sub. Vous pouvez ensuite créer un DAG qui extrait les messages Pub/Sub, puis les gère.

Dans cet exemple spécifique, vous allez créer une fonction Cloud et déployer deux DAG. Le premier DAG extrait les messages Pub/Sub et déclenche le deuxième DAG en fonction du contenu du message Pub/Sub.

Dans ce tutoriel, nous partons du principe que vous connaissez Python et la console Google Cloud.

Objectifs

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Une fois que vous avez terminé ce tutoriel, supprimez les ressources que vous avez créées afin d'éviter de continuer à payer des frais. Pour en savoir, plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Pour ce tutoriel, vous avez besoin d'un projet Google Cloud. Configurez le projet de la manière suivante:

  1. Dans la console Google Cloud, sélectionnez ou créez un projet:

    Accéder au sélecteur de projet

  2. Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.

  3. Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires:

    • Utilisateur du compte de service (roles/iam.serviceAccountUser)
    • Éditeur Pub/Sub (roles/pubsub.editor)
    • Administrateur de l'environnement et des objets Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrateur Cloud Functions (roles/cloudfunctions.admin)
    • Lecteur de journaux (roles/logging.viewer)
  4. Assurez-vous que le compte de service qui exécute votre fonction Cloud dispose des autorisations suffisantes dans votre projet pour accéder à Pub/Sub. Par défaut, Cloud Functions utilise le compte de service App Engine par défaut. Ce compte de service dispose du rôle Éditeur, qui dispose des autorisations suffisantes pour ce tutoriel.

Activer les API pour votre projet.

Console

Activer les API Cloud Composer, Cloud Functions, and Pub/Sub.

Activer les API

gcloud

Activer les API Cloud Composer, Cloud Functions, and Pub/Sub :

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Activez l'API Cloud Composer dans votre projet en ajoutant les définitions de ressources suivantes à votre script Terraform:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Remplacez <PROJECT_ID> par l'ID de votre projet. Exemple :example-project

Créer l'environnement Cloud Composer

Créez un environnement Cloud Composer 2.

Dans le cadre de cette procédure, vous attribuez le rôle Extension d'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) au compte d'agent de service Composer. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud.

Créer un sujet Pub/Sub

Cet exemple déclenche un DAG en réponse à un message transmis à un sujet Pub/Sub. Créez un sujet Pub/Sub à utiliser dans cet exemple:

Console

  1. Dans la console Google Cloud, accédez à la page Sujets Pub/Sub.

    Accéder à la page "Sujets Pub/Sub"

  2. Cliquez sur Créer un sujet.

  3. Dans le champ ID du sujet, saisissez dag-topic-trigger comme ID pour votre sujet.

  4. Conservez les valeurs par défaut des autres options.

  5. Cliquez sur Créer un sujet.

gcloud

Pour créer un sujet, exécutez la commande gcloud pubsub topics create dans Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Ajoutez les définitions de ressources suivantes à votre script Terraform:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Remplacez <PROJECT_ID> par l'ID de votre projet. Exemple :example-project

Importer vos DAG

Importez des DAG dans votre environnement:

  1. Enregistrez le fichier DAG suivant sur votre ordinateur local.
  2. Remplacez <PROJECT_ID> par l'ID de votre projet. Exemple :example-project
  3. Importez le fichier DAG modifié dans votre environnement.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"

def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids

# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)

def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")

# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

L'exemple de code contient deux DAG: trigger_dag et target_dag.

Le DAG trigger_dag s'abonne à un sujet Pub/Sub, extrait les messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID du DAG des données du message Pub/Sub. Dans cet exemple, trigger_dag déclenche le DAG target_dag, qui génère des messages dans les journaux de la tâche.

Le DAG trigger_dag contient les tâches suivantes:

  • subscribe_task: s'abonner à un sujet Pub/Sub.
  • pull_messages_operator: lire les données d'un message Pub/Sub avec PubSubPullOperator.
  • trigger_target_dag: déclenche un autre DAG (dans cet exemple, target_dag) en fonction des données des messages extraits du sujet Pub/Sub.

Le DAG target_dag ne contient qu'une seule tâche: output_to_logs. Cette tâche imprime des messages dans le journal des tâches avec un délai d'une seconde.

Déployer une fonction Cloud qui publie des messages sur un sujet Pub/Sub

Dans cette section, vous allez déployer une fonction Cloud qui publie des messages dans un sujet Pub/Sub.

Créer une fonction Cloud et spécifier sa configuration

Console

  1. Dans la console Google Cloud, accédez à la page Cloud Functions.

    Accéder à Cloud Functions

  2. Cliquez sur Créer une fonction.

  3. Dans le champ Environnement, sélectionnez 1re génération.

  4. Dans le champ Nom de la fonction, saisissez le nom de votre fonction : pubsub-publisher.

  5. Dans le champ Trigger (Déclencheur), sélectionnez HTTP.

  6. Dans la section Authentification, sélectionnez Autoriser les appels non authentifiés. Cette option permet aux utilisateurs non authentifiés d'appeler une fonction HTTP.

  7. Cliquez sur Enregistrer.

  8. Cliquez sur Suivant pour passer à l'étape Code.

Terraform

Pensez à utiliser la console Google Cloud pour cette étape, car il n'existe pas de moyen simple de gérer le code source de la fonction depuis Terraform.

Cet exemple montre comment importer une fonction Cloud à partir d'un fichier d'archive ZIP local en créant un bucket Cloud Storage, en stockant le fichier dans ce bucket, puis en utilisant le fichier du bucket comme source pour la fonction Cloud. Si vous utilisez cette approche, Terraform ne met pas automatiquement à jour le code source de votre fonction, même si vous créez un fichier d'archive. Pour réimporter le code de la fonction, vous pouvez modifier le nom du fichier de l'archive.

  1. Téléchargez les fichiers pubsub_publisher.py et requirements.txt.
  2. Dans le fichier pubsub_publisher.py, remplacez <PROJECT_ID> par l'ID de votre projet. Par exemple, example-project.
  3. Créez une archive ZIP nommée pubsub_function.zip avec le fichier pbusub_publisner.py et le fichier requirements.txt.
  4. Enregistrez l'archive ZIP dans un répertoire où est stocké votre script Terraform.
  5. Ajoutez les définitions de ressources suivantes à votre script Terraform et remplacez <PROJECT_ID> par l'ID de votre projet.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Spécifier les paramètres de code de la fonction Cloud

Console

  1. À l'étape Code, dans le champ Environnement d'exécution, sélectionnez l'environnement d'exécution du langage que votre fonction utilise. Dans cet exemple, sélectionnez Python 3.10.

  2. Dans le champ Point d'entrée, saisissez pubsub_publisher. Il s'agit du code qui est exécuté lors de l'exécution de votre fonction Cloud. La valeur de cette option doit être un nom de fonction ou un nom de classe complet existant dans votre code source.

Terraform

Ignorez cette étape. Les paramètres de la fonction Cloud sont déjà définis dans la ressource google_cloudfunctions_function.

Importer le code de votre fonction Cloud

Console

Dans le champ Code source, sélectionnez l'option appropriée pour fournir le code source de la fonction. Dans ce tutoriel, vous allez ajouter le code de votre fonction à l'aide de l'éditeur intégré Cloud Functions. Vous pouvez également importer un fichier ZIP ou utiliser Cloud Source Repositories.

  1. Placez l'exemple de code suivant dans le fichier main.py.
  2. Remplacez <PROJECT_ID> par l'ID de votre projet. Exemple :example-project
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"

def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Ignorez cette étape. Les paramètres de la fonction Cloud sont déjà définis dans la ressource google_cloudfunctions_function.

Spécifier les dépendances de la fonction Cloud

Console

Spécifiez les dépendances des fonctions dans le fichier de métadonnées requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.17.0

Lorsque vous déployez votre fonction, Cloud Functions télécharge et installe des dépendances déclarées dans le fichier requirements.txt, à raison d'une ligne par package. Ce fichier doit se trouver dans le même répertoire que le fichier main.py qui contient le code de votre fonction. Pour en savoir plus, consultez la section Fichiers d'exigences dans la documentation pip.

Terraform

Ignorez cette étape. Les dépendances de la fonction Cloud sont définies dans le fichier requirements.txt de l'archive pubsub_function.zip.

Déployer votre fonction Cloud

Console

Cliquez sur Deploy (Déployer). Une fois le déploiement terminé, la fonction apparaît avec une coche verte sur la page Cloud Functions de la console Google Cloud.

Assurez-vous que le compte de service qui exécute votre fonction Cloud dispose d'autorisations suffisantes dans votre projet pour accéder à Pub/Sub.

Terraform

  1. Initialisez Terraform :

    terraform init
    
  2. Examinez la configuration et vérifiez que les ressources que Terraform va créer ou mettre à jour correspondent à vos attentes:

    terraform plan
    
  3. Pour vérifier si votre configuration est valide, exécutez la commande suivante:

    terraform validate
    
  4. Appliquez la configuration Terraform en exécutant la commande suivante et en saisissant "yes" à l'invite:

    terraform apply
    

Attendez que Terraform affiche le message "Apply completed!" (Application terminée).

Dans la console Google Cloud, accédez à vos ressources dans l'interface utilisateur pour vous assurer que Terraform les a créées ou mises à jour.

Tester la fonction Cloud

Pour vérifier que votre fonction publie un message sur un sujet Pub/Sub et que les exemples de DAG fonctionnent comme prévu, procédez comme suit:

  1. Vérifiez que les DAG sont actifs:

    1. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    3. Accédez à l'onglet DAG.

    4. Vérifiez les valeurs de la colonne State pour les DAG nommés trigger_dag et target_dag. Les deux DAG doivent être à l'état Active.

  2. Envoyez un message Pub/Sub de test. Vous pouvez le faire dans Cloud Shell:

    1. Dans la console Google Cloud, accédez à la page Fonctions.

      Accéder à Cloud Functions

    2. Cliquez sur le nom de votre fonction, pubsub-publisher.

    3. Accédez à l'onglet Test.

    4. Dans la section Configurer l'événement de déclenchement, saisissez la clé-valeur JSON suivante: {"message": "target_dag"}. Ne modifiez pas la paire clé-valeur, car ce message déclenche le DAG de test ultérieurement.

    5. Dans la section Tester la commande, cliquez sur Tester dans Cloud Shell.

    6. Dans le terminal Cloud Shell, attendez qu'une commande s'affiche automatiquement. Exécutez cette commande en appuyant sur Enter.

    7. Si le message Autoriser Cloud Shell s'affiche, cliquez sur Autoriser.

    8. Vérifiez que le contenu du message correspond au message Pub/Sub. Dans cet exemple, le message de sortie doit commencer par Message b'target_dag' with message_length 10 published to comme réponse de votre fonction.

  3. Vérifiez que target_dag a été déclenché:

    1. Attendez au moins une minute pour qu'une nouvelle exécution du DAG trigger_dag se termine.

    2. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    3. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    4. Accédez à l'onglet DAG.

    5. Cliquez sur trigger_dag pour accéder à la page Détails du DAG. Dans l'onglet Exécutions, la liste des exécutions du DAG trigger_dag s'affiche.

      Ce DAG s'exécute toutes les minutes et traite tous les messages Pub/Sub envoyés à partir de la fonction. Si aucun message n'a été envoyé, la tâche trigger_target est marquée comme Skipped dans les journaux d'exécution du DAG. Si des DAG ont été déclenchés, la tâche trigger_target est marquée comme Success.

    6. Parcourez plusieurs exécutions de DAG récentes afin de localiser une exécution de DAG dans laquelle les trois tâches (subscribe_task, pull_messages_operator et trigger_target) sont à l'état Success.

    7. Revenez à l'onglet DAG et vérifiez que la colonne Réussies du DAG target_dag indique une exécution réussie.

Résumé

Dans ce tutoriel, vous avez appris à utiliser Cloud Functions pour publier des messages sur un sujet Pub/Sub et déployer un DAG qui s'abonne à un sujet Pub/Sub, extrait les messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID du DAG des données du message.

Il existe d'autres moyens de créer et gérer des abonnements Pub/Sub et de déclencher des DAG qui sortent du cadre de ce tutoriel. Par exemple, vous pouvez utiliser Cloud Functions pour déclencher les DAG Airflow lorsqu'un événement spécifié se produit. Consultez nos tutoriels pour tester par vous-même les autres fonctionnalités de Google Cloud.

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

Supprimer le projet

    Supprimez un projet Google Cloud :

    gcloud projects delete PROJECT_ID

Supprimer des ressources individuelles

Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.

Console

  1. Supprimez l'environnement Cloud Composer. Vous allez également supprimer le bucket de l'environnement au cours de cette procédure.
  2. Supprimez le sujet Pub/Sub, dag-topic-trigger.
  3. Supprimez la fonction Cloud.

    1. Dans la console Google Cloud, accédez à Cloud Functions.

      Accéder à Cloud Functions

    2. Cochez la case correspondant à la fonction que vous souhaitez supprimer, pubsub-publisher.

    3. Cliquez sur Supprimer, puis suivez les instructions.

Terraform

  1. Assurez-vous que votre script Terraform ne contient pas d'entrées pour les ressources toujours requises par votre projet. Par exemple, vous pouvez conserver certaines API activées et l'attribution d'autorisations IAM (si vous avez ajouté de telles définitions à votre script Terraform).
  2. Exécutez terraform destroy.
  3. Supprimez manuellement le bucket de l'environnement. Cloud Composer ne le supprime pas automatiquement. Vous pouvez le faire à partir de la console Google Cloud ou de Google Cloud CLI.

Étapes suivantes