Déclencher des DAG avec Cloud Functions et des messages Pub/Sub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment créer une architecture push basée sur des événements Déclencher des DAG Cloud Composer en réponse à Pub/Sub les changements de sujet. Les exemples de ce tutoriel montrent comment gérer l'intégralité du cycle de la gestion Pub/Sub, y compris la gestion des abonnements, dans le cadre du processus DAG. Il convient à certains des cas d'utilisation courants doivent déclencher des DAG, mais ne souhaitent pas configurer d'autorisations d'accès supplémentaires.

Par exemple, les messages envoyés via Pub/Sub peuvent être utilisés comme solution si vous ne souhaitez pas fournir un accès direct à une instance Cloud Composer pour des raisons de sécurité. Vous pouvez configurer un Cloud Functions qui crée des messages Pub/Sub et les publie sur un sujet Pub/Sub. Vous pouvez ensuite créer un DAG extrait et traite les messages Pub/Sub.

Dans cet exemple spécifique, vous allez créer une fonction Cloud deux DAG. Le premier DAG extrait les messages Pub/Sub et déclenche un second DAG en fonction du contenu du message Pub/Sub.

Dans ce tutoriel, nous partons du principe que vous connaissez bien Python et la console Google Cloud.

Objectifs

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Une fois ce tutoriel terminé, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir, plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Pour ce tutoriel, vous avez besoin d'un project. Configurez le projet comme suit:

  1. Dans la console Google Cloud, sélectionnez ou créez un projet:

    Accéder au sélecteur de projet

  2. Assurez-vous que la facturation est activée pour votre projet. Découvrez comment vérifier si la facturation est activée sur un projet.

  3. Assurez-vous que l'utilisateur de votre projet Google Cloud dispose des rôles suivants pour créer les ressources nécessaires:

    • Utilisateur du compte de service (roles/iam.serviceAccountUser)
    • Éditeur Pub/Sub (roles/pubsub.editor)
    • Administrateur de l'environnement et des objets Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrateur Cloud Functions (roles/cloudfunctions.admin)
    • Lecteur de journaux (roles/logging.viewer)
  4. Vérifiez que la Compte de service qui exécute votre fonction Cloud dispose des autorisations suffisantes dans votre projet pour accéder à Pub/Sub. Par Cloud Functions utilise par défaut Compte de service App Engine par défaut. Ce compte de service dispose du rôle Éditeur, qui dispose de suffisamment de pour ce tutoriel.

Activer les API pour votre projet.

Console

Activer les API Cloud Composer, Cloud Functions, and Pub/Sub.

Activer les API

gcloud

Activer les API Cloud Composer, Cloud Functions, and Pub/Sub :

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Activez l'API Cloud Composer dans votre projet en ajoutant ce qui suit : de ressources à votre script Terraform:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Remplacez <PROJECT_ID> par l'ID du projet. de votre projet. Exemple : example-project.

Créer votre environnement Cloud Composer

Créez un environnement Cloud Composer 2.

Dans le cadre de cette procédure, vous accordez l'extension d'agent de service de l'API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) à l'agent de service Composer de service. Cloud Composer utilise ce compte pour effectuer des opérations dans votre projet Google Cloud.

Créer un sujet Pub/Sub

Cet exemple déclenche un DAG en réponse à un message envoyé à un sujet Pub/Sub. Créer un sujet Pub/Sub à utiliser dans ce cours Exemple:

Console

  1. Dans la console Google Cloud, accédez à la page Sujets Pub/Sub.

    Accéder à Sujets Pub/Sub

  2. Cliquez sur Créer un sujet.

  3. Dans le champ ID du sujet, saisissez dag-topic-trigger comme ID pour votre sur ce sujet.

  4. Conservez les valeurs par défaut des autres options.

  5. Cliquez sur Créer un sujet.

gcloud

Pour créer un sujet, exécutez la gcloud pubsub topics create dans la Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Ajoutez les définitions de ressources suivantes à votre script Terraform:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Remplacez <PROJECT_ID> par l'ID du projet. de votre projet. Exemple : example-project.

Importer vos DAG

Importez des DAG dans votre environnement:

  1. Enregistrez le fichier DAG suivant sur votre ordinateur local.
  2. Remplacez <PROJECT_ID> par l'ID du projet. de votre projet. Exemple : example-project.
  3. Importez le fichier DAG modifié dans votre environnement.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

L'exemple de code contient deux DAG: trigger_dag et target_dag.

Le DAG trigger_dag s'abonne à un sujet Pub/Sub, extrait messages Pub/Sub et déclenche un autre DAG spécifié dans l'ID du DAG des données de message Pub/Sub. Dans cet exemple, trigger_dag déclenche le DAG target_dag, qui génère des messages dans les journaux des tâches.

Le DAG trigger_dag contient les tâches suivantes:

  • subscribe_task: abonnement à un sujet Pub/Sub.
  • pull_messages_operator: lit les données d'un message Pub/Sub. avec PubSubPullOperator.
  • trigger_target_dag: déclenchez un autre DAG (dans cet exemple, target_dag). selon les données des messages extraits sur ce sujet.

Le DAG target_dag ne contient qu'une seule tâche: output_to_logs. Cette tâche imprime des messages dans le journal des tâches avec un délai d'une seconde.

Déployer une fonction Cloud qui publie des messages sur un sujet Pub/Sub

Dans cette section, vous allez déployer une fonction Cloud qui publie des messages sur un sujet Pub/Sub.

Créer une fonction Cloud et spécifier sa configuration

Console

  1. Dans la console Google Cloud, accédez à la page Cloud Functions.

    Accéder à Cloud Functions

  2. Cliquez sur Créer une fonction.

  3. Dans le champ Environnement, sélectionnez 1re génération.

  4. Dans le champ Nom de la fonction, saisissez le nom de votre fonction: pubsub-publisher

  5. Dans le champ Trigger (Déclencheur), sélectionnez HTTP.

  6. Dans la section Authentification, sélectionnez Autorisez les appels non authentifiés. Cette option accorde utilisateurs non authentifiés la possibilité d'appeler une fonction HTTP.

  7. Cliquez sur Enregistrer.

  8. Cliquez sur Suivant pour passer à l'étape Code.

Terraform

Nous vous conseillons d'utiliser la console Google Cloud pour cette étape, car il n'y a pas simple de gérer le code source de la fonction à partir de Terraform.

Cet exemple montre comment importer une fonction Cloud à partir d'un fichier d'archive ZIP local en créant un bucket Cloud Storage, de stocker le fichier dans ce bucket, puis d'utiliser le fichier du bucket source de la fonction Cloud. Si vous utilisez cette approche, Terraform ne met pas automatiquement à jour le code source de votre fonction, même si vous créez un nouveau fichier d'archive. Pour réimporter le code de la fonction, peut changer le nom de fichier de l’archive.

  1. Téléchargez le pubsub_publisher.py et requirements.txt .
  2. Dans le fichier pubsub_publisher.py, remplacez <PROJECT_ID> par le ID de projet de votre projet. Par exemple, example-project.
  3. Créez une archive ZIP nommée pubsub_function.zip avec le paramètre pbusub_publisner.py et le fichier requirements.txt.
  4. Enregistrez l'archive ZIP dans un répertoire où est stocké votre script Terraform.
  5. Ajoutez les définitions de ressources suivantes à votre script Terraform et Remplacez <PROJECT_ID> par l'ID de votre projet.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Spécifier les paramètres du code de la fonction Cloud

Console

  1. À l'étape Code, sélectionnez le langage dans le champ Runtime (Environnement d'exécution). de l'environnement d'exécution utilisé par votre fonction. Dans cet exemple, sélectionnez Python 3.10.

  2. Dans le champ Point d'entrée, saisissez pubsub_publisher. Il s'agit du code qui s'exécute en même temps que votre fonction Cloud. La valeur cet indicateur doit être un nom de fonction ou un nom de classe complet dans votre code source.

Terraform

Ignorez cette étape. Les paramètres de la fonction Cloud sont déjà définis la ressource google_cloudfunctions_function.

Importer le code de votre fonction Cloud

Console

Dans le champ Code source, sélectionnez l'option appropriée en fonction de la manière dont vous fournir le code source de la fonction. Dans ce tutoriel, vous allez ajouter le code de votre fonction à l'aide de l'API Cloud Functions Éditeur intégré : Vous pouvez également importer un fichier ZIP ou utiliser Cloud Source Repositories.

  1. Placez l'exemple de code suivant dans le fichier main.py.
  2. Remplacez <PROJECT_ID> par l'ID du projet. de votre projet. Exemple : example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Ignorez cette étape. Les paramètres de la fonction Cloud sont déjà définis la ressource google_cloudfunctions_function.

Spécifier les dépendances de votre fonction Cloud

Console

Spécifiez les dépendances des fonctions dans le fichier de métadonnées requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

Lorsque vous déployez votre fonction, Cloud Functions télécharge et installe déclarées dans le fichier requirements.txt, à raison d'une ligne par package. Ce fichier doit se trouver dans le même répertoire que le fichier main.py qui contient le code de votre fonction. Pour en savoir plus, consultez Fichiers d'exigences dans la documentation pip.

Terraform

Ignorez cette étape. Les dépendances Cloud Functions sont définies le fichier requirements.txt dans l'archive pubsub_function.zip.

Déployer votre fonction Cloud

Console

Cliquez sur Déployer. Une fois le déploiement terminé, la fonction s'affiche par une coche verte sur la page Cloud Functions dans le console Google Cloud.

Assurez-vous que le compte de service qui exécute votre fonction Cloud dispose des autorisations nécessaires dans votre projet pour accéder Pub/Sub.

Terraform

  1. Initialisez Terraform :

    terraform init
    
  2. Examiner la configuration et vérifier que les ressources gérées par Terraform que vous allez créer ou mettre à jour, répondent à vos attentes:

    terraform plan
    
  3. Pour vérifier si votre configuration est valide, exécutez la commande suivante : commande:

    terraform validate
    
  4. Appliquez la configuration Terraform en exécutant la commande suivante : en saisissant "yes" à l'invite:

    terraform apply
    

Attendez que Terraform affiche le message "Apply completed!" (Application terminée).

Dans la console Google Cloud, accédez à vos ressources dans l'UI pour effectuer vous assurer que Terraform les a créées ou mises à jour.

Tester la fonction Cloud

Vérifier que votre fonction publie un message sur un sujet Pub/Sub et que les exemples de DAG fonctionnent comme prévu:

  1. Vérifiez que les DAG sont actifs:

    1. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    2. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    3. Accédez à l'onglet DAG.

    4. Vérifiez les valeurs de la colonne State (État) pour les DAG nommés trigger_dag et target_dag Les deux DAG doivent être à l'état Active.

  2. Envoyez un message Pub/Sub test. Vous pouvez le faire dans Cloud Shell:

    1. Dans la console Google Cloud, accédez à la page Fonctions.

      Accéder à Cloud Functions

    2. Cliquez sur le nom de votre fonction, pubsub-publisher.

    3. Accédez à l'onglet Test.

    4. Dans la section Configurer l'événement déclencheur, saisissez les valeurs suivantes : Clé-valeur JSON: {"message": "target_dag"}. Ne modifiez pas la clé-valeur car ce message déclenche ultérieurement le DAG de test.

    5. Dans la section Tester la commande, cliquez sur Tester dans Cloud Shell.

    6. Dans le terminal Cloud Shell, attendez qu'une commande s'affiche. automatiquement. Exécutez cette commande en appuyant sur Enter.

    7. Si le message Autoriser Cloud Shell s'affiche, cliquez sur Autoriser.

    8. Vérifiez que le contenu du message correspond à l'instance Pub/Sub . Dans cet exemple, le message de sortie doit commencer par Message b'target_dag' with message_length 10 published to en tant que de votre fonction.

  3. Vérifiez que target_dag a été déclenché:

    1. Attendez au moins une minute pour qu'une nouvelle exécution du DAG trigger_dag terminé.

    2. Dans la console Google Cloud, accédez à la page Environnements.

      Accéder à la page Environnements

    3. Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.

    4. Accédez à l'onglet DAG.

    5. Cliquez sur trigger_dag pour accéder à la page Détails du DAG. Dans Runs (Exécutions) , la liste des exécutions du DAG trigger_dag s'affiche.

      Ce DAG s'exécute toutes les minutes et traite l'ensemble des données Pub/Sub les messages envoyés à partir de la fonction. Si aucun message n'a été envoyé, le La tâche trigger_target est marquée comme Skipped dans les journaux d'exécution du DAG. Si DAG ont été déclenchés, la tâche trigger_target est marquée comme Success

    6. Examinez plusieurs exécutions de DAG récentes pour localiser une exécution de DAG où tous trois tâches (subscribe_task, pull_messages_operator et trigger_target) sont à l'état Success.

    7. Revenez à l'onglet DAGs (DAG) et vérifiez que la boîte de dialogue Exécutions réussies du DAG target_dag indique une exécution réussie.

Résumé

Dans ce tutoriel, vous avez appris à utiliser Cloud Functions pour publier messages sur un sujet Pub/Sub et déployer un DAG qui s'abonne à un Sujet Pub/Sub, extrait les messages Pub/Sub et déclencheurs un autre DAG spécifié dans l'ID du DAG des données du message.

Il existe également d'autres moyens Créer et gérer des abonnements Pub/Sub et de déclencher des DAG externes de ce tutoriel. Par exemple, vous pouvez Utiliser Cloud Functions pour déclencher les DAG Airflow lorsqu'un événement spécifié se produit. Consultez nos tutoriels pour essayer les autres les fonctionnalités Google Cloud.

Effectuer un nettoyage

Pour éviter que les ressources soient facturées sur votre compte Google Cloud, utilisées dans ce tutoriel, supprimez le projet qui contient les ressources ou conserver le projet et supprimer les ressources individuelles.

Supprimer le projet

    Supprimez un projet Google Cloud :

    gcloud projects delete PROJECT_ID

Supprimer des ressources individuelles

Si vous envisagez d'explorer plusieurs tutoriels et guides de démarrage rapide, réutiliser des projets peut vous aider à ne pas dépasser les limites de quotas des projets.

Console

  1. Supprimez l'environnement Cloud Composer. Vous avez également supprimer le bucket de l'environnement au cours de cette procédure.
  2. Supprimez le sujet Pub/Sub, dag-topic-trigger.
  3. Supprimez la fonction Cloud.

    1. Dans la console Google Cloud, accédez à Cloud Functions.

      Accéder à Cloud Functions

    2. Cochez la case correspondant à la fonction que vous souhaitez supprimer, pubsub-publisher

    3. Cliquez sur Supprimer, puis suivez les instructions.

Terraform

  1. Assurez-vous que votre script Terraform ne contient pas d'entrées pour ressources qui restent requises par votre projet. Par exemple : vous pouvez laisser certaines API activées autorisations toujours attribuées (si vous avez ajouté ce type de définitions à vos script Terraform).
  2. Exécutez terraform destroy.
  3. Supprimer manuellement le bucket de l'environnement. Cloud Composer ne la supprime pas automatiquement. Vous pouvez le faire depuis la console Google Cloud ou la Google Cloud CLI.

Étape suivante