Attendre des événements à l'aide de rappels et de déclencheurs Eventarc

Votre workflow peut avoir besoin d'attendre un processus externe. Vous pouvez utiliser des rappels HTTP pour attendre qu'un autre service envoie une requête à un point de terminaison de rappel. Cette requête reprend l'exécution du workflow. Vous pouvez également attendre à l'aide de l'interrogation.

Au lieu d'utiliser l'interrogation, ce tutoriel explique comment attendre des événements ou des messages Pub/Sub à l'aide de rappels HTTP et de déclencheurs Eventarc. Bien que vous puissiez déclencher un workflow avec des événements ou des messages Pub/Sub, vous pouvez souhaiter interrompre cette exécution pour attendre un autre événement avant de continuer. Par exemple, un événement déclenche un workflow pour lancer un processus, mais le workflow doit attendre un autre événement qui signale que le processus est terminé. Pour ce faire, vous pouvez faire en sorte qu'un workflow rappelle un autre workflow.

Créer une base de données Firestore

Firestore stocke vos données dans des documents contenant des champs mappés à des valeurs. Ces documents sont stockés dans des collections, qui sont des conteneurs pour vos documents que vous pouvez utiliser pour organiser vos données et créer des requêtes. En savoir plus sur Firestore

Notez que chaque projet Google Cloud est limité à une base de données Firestore. Suivez les étapes ci-dessous si vous devez créer une base de données.

Console

  1. Dans la console Google Cloud , accédez à la page Premiers pas de Firestore.

    Accéder à "Commencer"

  2. Cliquez sur Sélectionner le mode natif.

    Pour savoir comment sélectionner un mode de base de données et pour une comparaison détaillée des fonctionnalités de chaque option, consultez Choisir entre le mode natif et le mode Datastore.

  3. Dans la liste Sélectionnez un emplacement, sélectionnez nam5 (États-Unis).

    L'emplacement s'applique à la base de données Firestore et à l'application App Engine de votre projet Google Cloud . Une fois votre base de données créée, vous ne pouvez plus modifier son emplacement.

  4. Cliquez sur Créer une base de données.

gcloud

Pour créer une base de données Firestore, vous devez d'abord créer une application App Engine, puis exécuter la commande gcloud firestore databases create :

gcloud app create --region=us-central
gcloud firestore databases create --region=us-central

Vous pouvez ignorer l'avertissement us-central is not a valid Firestore location. App Engine et Firestore acceptent les mêmes emplacements, mais la région App Engine us-central (Iowa) est mappée vers la région multirégionale Firestore nam5 (États-Unis).

Créer un sujet Pub/Sub

Ce tutoriel utilise Pub/Sub comme source d'événements. Créez un sujet Pub/Sub pour pouvoir y publier un message. Découvrez comment créer et gérer des thèmes.

Console

  1. Dans la console Google Cloud , accédez à la page Sujets de Pub/Sub.

    Accéder aux sujets

  2. Cliquez sur Créer un sujet.

  3. Dans le champ ID du sujet, saisissez topic-callback.

  4. Acceptez les autres valeurs par défaut.

  5. Cliquez sur Create topic (Créer un sujet).

gcloud

Pour créer un sujet, exécutez la commande gcloud pubsub topics create :

gcloud pubsub topics create topic-callback

Créer un bucket Cloud Storage

Ce tutoriel utilise Cloud Storage comme source d'événements. Créez un bucket Cloud Storage pour pouvoir y importer un fichier. Découvrez comment créer des buckets de stockage.

Console

  1. Dans la console Google Cloud , accédez à la page Buckets de Cloud Storage.

    Accéder à Cloud Storage

  2. Cliquez sur  Créer.

  3. Dans le champ Nom de votre bucket, saisissez PROJECT_ID-bucket-callback.

    L'ID du projet est utilisé dans le workflow callback-event-sample pour identifier le bucket.

  4. Cliquez sur Continuer.

  5. Pour le type d'emplacement, sélectionnez Région, puis us-central1 (Iowa).

  6. Acceptez les autres valeurs par défaut.

  7. Cliquez sur Créer.

gcloud

Pour créer un bucket, exécutez la commande gcloud storage buckets create :

gcloud storage buckets create gs://PROJECT_ID-bucket-callback \
    --location=us-central1

L'ID du projet est utilisé dans le workflow callback-event-sample pour identifier le bucket.

Une fois les sources d'événements créées, vous pouvez déployer le workflow du récepteur d'événements.

Déployer un workflow qui écoute les événements

Le workflow callback-event-listener est déclenché lorsqu'un message est publié dans un sujet Pub/Sub ou lorsqu'un fichier est importé dans un bucket Cloud Storage. Le workflow reçoit l'événement, récupère les détails du rappel appropriés dans la base de données Firestore, puis envoie une requête HTTP au point de terminaison de rappel.

Console

  1. Dans la console Google Cloud , accédez à la page Workflows :

    Accéder à "Workflows"

  2. Cliquez sur  Créer.

  3. Saisissez un nom pour le nouveau workflow : callback-event-listener.

  4. Dans la liste Région, sélectionnez us-central1.

  5. Sélectionnez le compte de service que vous avez créé précédemment.

  6. Cliquez sur Suivant.

  7. Dans l'éditeur de workflow, saisissez la définition suivante pour votre workflow :

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  8. Cliquez sur Déployer.

gcloud

  1. Créez un fichier de code source pour votre workflow :

    touch callback-event-listener.yaml
  2. Dans un éditeur de texte, copiez le workflow suivant dans votre fichier de code source :

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  3. Déployez le workflow en saisissant la commande suivante :

    gcloud workflows deploy callback-event-listener \
        --source=callback-event-listener.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Remplacez SERVICE_ACCOUNT_NAME par le nom du compte de service que vous avez créé précédemment.

Déployer un workflow qui attend des événements

Le workflow callback-event-sample stocke les détails de son rappel dans une base de données Firestore, interrompt son exécution, puis attend que des événements spécifiques se produisent.

Console

  1. Dans la console Google Cloud , accédez à la page Workflows :

    Accéder à "Workflows"

  2. Cliquez sur  Créer.

  3. Saisissez un nom pour le nouveau workflow : callback-event-sample.

  4. Dans la liste Région, sélectionnez us-central1.

  5. Sélectionnez le compte de service que vous avez créé précédemment.

  6. Cliquez sur Suivant.

  7. Dans l'éditeur de workflow, saisissez la définition suivante pour votre workflow :

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  8. Cliquez sur Déployer.

gcloud

  1. Créez un fichier de code source pour votre workflow :

    touch callback-event-sample.yaml
  2. Dans un éditeur de texte, copiez le workflow suivant dans votre fichier de code source :

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  3. Déployez le workflow en saisissant la commande suivante :

    gcloud workflows deploy callback-event-sample \
        --source=callback-event-sample.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Remplacez SERVICE_ACCOUNT_NAME par le nom du compte de service que vous avez créé précédemment.

Créer un déclencheur Eventarc pour acheminer les événements Pub/Sub

Un déclencheur Eventarc vous permet de router des événements en spécifiant des filtres pour le déclencheur, y compris la source de l'événement et le workflow cible. Créez un déclencheur Eventarc pour exécuter le workflow callback-event-listener suite à la publication d'un message dans un sujet Pub/Sub. En savoir plus sur le déclenchement d'un workflow

Console

  1. Dans la console Google Cloud , accédez à la page Eventarc.

    Accéder à Eventarc

  2. Cliquez sur Créer un déclencheur.

  3. Saisissez un nom de déclencheur

    Par exemple, trigger-pubsub-events-listener.

  4. Dans la liste Fournisseur d'événements, sélectionnez Cloud Pub/Sub.

  5. Dans la liste Événement, sous Personnalisé, sélectionnez google.cloud.pubsub.topic.v1.messagePublished.

  6. Dans la liste Sélectionner un sujet Cloud Pub/Sub, sélectionnez le sujet que vous avez créé précédemment.

  7. Dans la liste Région, sélectionnez us-central1 (Iowa).

  8. Si vous y êtes invité, attribuez le rôle iam.serviceAccountTokenCreator au compte de service Pub/Sub.

  9. Sélectionnez le compte de service que vous avez créé précédemment.

  10. Dans la liste Destination de l'événement, sélectionnez Workflows.

  11. Dans la liste Sélectionner un workflow, sélectionnez le workflow callback-event-listener.

  12. Cliquez sur Créer.

gcloud

Pour créer un déclencheur, exécutez la commande gcloud eventarc triggers create :

gcloud eventarc triggers create trigger-pubsub-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=topic-callback \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Les événements sont transformés et transmis à l'exécution du workflow en tant qu'arguments d'exécution. Notez que l'activation du nouveau déclencheur peut prendre jusqu'à deux minutes.

Créer un déclencheur Eventarc pour acheminer les événements Cloud Storage

Un déclencheur Eventarc vous permet de router des événements en spécifiant des filtres pour le déclencheur, y compris la source de l'événement et le workflow cible. Créez un déclencheur Eventarc pour exécuter le workflow callback-event-listener lorsqu'un fichier est importé dans un bucket Cloud Storage. En savoir plus sur le déclenchement d'un workflow

Console

  1. Dans la console Google Cloud , accédez à la page Eventarc.

    Accéder à Eventarc

  2. Cliquez sur Créer un déclencheur.

  3. Saisissez un nom de déclencheur

    Par exemple, trigger-storage-events-listener.

  4. Dans la liste Fournisseur d'événements, sélectionnez Cloud Storage.

  5. Dans la liste Événement, sous Direct, sélectionnez google.cloud.storage.object.v1.finalized.

  6. Dans la liste Bucket, recherchez le bucket que vous avez créé précédemment, puis sélectionnez-le.

  7. Dans la liste Région, acceptez la valeur par défaut us-central1 (Iowa) en fonction de votre bucket Cloud Storage.

  8. Si vous y êtes invité, attribuez le rôle iam.serviceAccountTokenCreator au compte de service Pub/Sub.

  9. Sélectionnez le compte de service que vous avez créé précédemment.

  10. Dans la liste Destination de l'événement, sélectionnez Workflows.

  11. Dans la liste Sélectionner un workflow, sélectionnez le workflow callback-event-listener.

  12. Cliquez sur Créer.

gcloud

Pour créer un déclencheur, exécutez la commande gcloud eventarc triggers create :

gcloud eventarc triggers create trigger-storage-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=PROJECT_ID-bucket-callback" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Les événements sont transformés et transmis à l'exécution du workflow en tant qu'arguments d'exécution. Notez que l'activation du nouveau déclencheur peut prendre jusqu'à deux minutes.

Exécuter le workflow principal

L'exécution d'un workflow exécute la définition actuelle du workflow associé au workflow. Exécutez le workflow callback-event-sample. Il s'agit du workflow principal. Il attend que des événements spécifiques se produisent et ne reprend son exécution que lorsque le workflow secondaire effectue les demandes de rappel appropriées.

Console

  1. Dans la console Google Cloud , accédez à la page Workflows.

    Accéder à "Workflows"

  2. Sur la page Workflows, cliquez sur le workflow callback-event-sample pour accéder à sa page d'informations.

  3. Sur la page Détails du workflow, cliquez sur Exécuter.

  4. Cliquez à nouveau sur Exécuter.

    L'exécution du workflow commence. Pendant l'exécution, vous devriez voir un état d'exécution Running et une entrée de journal semblable à la suivante : Started waiting 1hr for an event from source topic-callback.

gcloud

Pour exécuter un workflow, exécutez la commande gcloud workflows run :

gcloud workflows run callback-event-sample \
    --location=us-central1

L'exécution du workflow commence. Pendant l'exécution, vous devriez voir un état d'exécution semblable à ce qui suit :

Waiting for execution [a848a164-268a-449c-b2fe-396f32f2ed66] to complete...working...

Générer des événements et vérifier l'état de l'exécution

Vous pouvez vérifier que les résultats sont conformes à vos attentes en générant des événements, en affichant les entrées de journal et en vérifiant l'état d'exécution du workflow.

Publier un message

Publiez un message dans le sujet Pub/Sub que vous avez créé précédemment.

Console

  1. Dans la console Google Cloud , accédez à la page Sujets de Pub/Sub.

    Accéder aux sujets

  2. Cliquez sur topic-callback.

  3. Cliquez sur l'onglet Messages.

  4. Cliquez sur Publier un message.

  5. Dans le champ Corps du message, saisissez Hello World.

  6. Cliquez sur Publier.

gcloud

Pour publier un message, exécutez la commande gcloud pubsub topics publish :

gcloud pubsub topics publish topic-callback \
    --message="Hello World"

Importer un objet

Importez un fichier dans le bucket Cloud Storage que vous avez créé précédemment.

Console

  1. Dans la console Google Cloud , accédez à la page Buckets de Cloud Storage.

    Accéder à la page "Buckets"

  2. Cliquez sur le nom du bucket que vous avez créé précédemment.

  3. Dans l'onglet Objets, effectuez l'une des actions suivantes :

    • Effectuez un glisser-déposer du fichier souhaité depuis votre bureau ou votre gestionnaire de fichiers vers le volet principal de la console Google Cloud .

    • Cliquez sur Importer des fichiers, sélectionnez le fichier que vous souhaitez importer, puis cliquez sur Ouvrir.

gcloud

Pour importer un fichier, exécutez la commande gcloud storage cp :

gcloud storage cp OBJECT_LOCATION gs://PROJECT_ID-bucket-callback/

Remplacez OBJECT_LOCATION par le chemin d'accès local à votre objet. Exemple :random.txt

Afficher les entrées de journal et l'état d'exécution

Vérifiez que le workflow callback-event-sample s'est terminé correctement.

Console

  1. Dans la console Google Cloud , accédez à la page Workflows.

    Accéder à "Workflows"

  2. Sur la page Workflows, cliquez sur le workflow callback-event-sample pour accéder à sa page d'informations.

  3. Sur la page Détails du workflow, cliquez sur l'ID d'exécution approprié pour récupérer les détails d'une exécution spécifique.

    L'état d'exécution doit être Réussite. Dans le volet "Sortie", vous devriez voir les événements Pub/Sub et Cloud Storage reçus.

gcloud

  1. Filtrez les entrées de journal et renvoyez la sortie au format JSON :

    gcloud logging read "resource.type=workflows.googleapis.com/Workflow AND textPayload:calling OR textPayload:waiting" \
        --format=json
  2. Recherchez des entrées de journal semblables à ceci :

    "textPayload": "Stopped waiting for an event from source..."
    "textPayload": "Calling back url https://workflowexecutions.googleapis.com/v1/projects/..."
    "textPayload": "Started waiting 1hr for an event from source..."
    
  3. Vérifiez l'état de la dernière tentative d'exécution :

    gcloud workflows executions wait-last

    Le résultat doit ressembler à ce qui suit :

    Using cached execution name: projects/1085953646031/locations/us-central1/workflows/callback-event-sample/executions/79929e4e-82c1-4da1-b068-f828034c01b7
    Waiting for execution [79929e4e-82c1-4da1-b068-f828034c01b7] to complete...done.
    [...]
    state: SUCCEEDED