Transformer les événements reçus

Vous pouvez transformer vos données d'événement en écrivant des expressions de transformation à l'aide du CEL. Par exemple, vous pouvez modifier les charges utiles d'événements pour respecter le contrat d'API spécifique d'une destination.

Notez que les événements sont toujours envoyés au format CloudEvents à l'aide d'une requête HTTP en mode "contenu binaire", sauf si vous spécifiez une liaison de message.

Définir les formats de données d'entrée et de sortie

En plus d'écrire une expression de transformation en CEL, vous pouvez éventuellement spécifier le format de données des données d'événement entrantes. Cela permet à Eventarc Advanced de savoir comment analyser la charge utile de l'événement. Vous pouvez également convertir les données d'un format à un autre.

Les formats suivants sont acceptés: Avro, JSON et Protobuf. Pour en savoir plus, consultez la section Formater les événements reçus.

Expressions de transformation

Lors de la transformation d'événements, tous les attributs d'événement peuvent être accessibles dans une expression CEL en tant que variables via un objet message prédéfini. Ces variables sont renseignées avec des valeurs basées sur les données d'événement lors de l'exécution. Exemple :

  • message.id renvoie l'attribut id de l'événement
  • message.data renvoie une représentation de la valeur CEL de la charge utile de l'événement.
  • message.data.some-key renvoie le contenu d'un champ nommé some-key à partir de la charge utile de l'événement.

Les champs de message.data sont toujours représentés sous forme de types String, et les valeurs sont mappées à partir de l'événement d'origine à l'aide du schéma spécifié lors de la définition du format de données d'entrée.

L'expression de transformation doit exprimer un événement complet qui inclut les attributs de contexte de l'événement et la charge utile des données de l'événement. Les expressions sont écrites en JSON, mais les fonctions, macros et opérateurs CEL prédéfinis, ainsi que les expressions régulières utilisant RE2 sont acceptés. Eventarc Advanced est également compatible avec certaines fonctions d'extension qui peuvent être utilisées pour transformer les données d'événement.

Vous trouverez ci-dessous deux exemples d'utilisation d'expressions CEL pour transformer vos données d'événement. Pour en savoir plus sur les cas d'utilisation et les exemples, consultez la section Exemples de transformations.

Exemple: Mettre en forme les valeurs d'attribut

L'exemple suivant met en forme les valeurs d'attribut phone_number à l'aide de fonctions d'expression régulière. (Les autres attributs ont été omis.)

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Voici les fonctions d'expression régulière utilisées dans l'exemple précédent:

  • re.capture: capture la première valeur de groupe sans nom ou nommé. Les arguments sont les suivants :
    • target: chaîne à analyser
    • regex: expression régulière utilisée pour capturer des valeurs

    Renvoie une chaîne de la première valeur de groupe capturée.

  • re.captureN: effectue une correspondance complète sur la chaîne et l'expression régulière données. Les arguments sont les suivants:
    • target: chaîne à analyser
    • regex: expression régulière utilisée pour capturer des valeurs

    Renvoie une mappe avec des paires clé-valeur pour un groupe nommé (nom du groupe, chaîne capturée) ou un groupe sans nom (indice de groupe, chaîne capturée).

  • re.extract: correspond aux valeurs de groupe de la chaîne cible donnée et réécrit la chaîne. Les arguments sont les suivants:
    • target : chaîne à analyser
    • regex: expression régulière utilisée pour extraire des valeurs
    • rewrite: expression régulière pour le formatage du résultat

    Renvoie une chaîne des valeurs extraites, mise en forme en fonction de l'argument rewrite.

Exemple: Mappeur un tableau à un tableau d'objets

L'exemple suivant met en correspondance un tableau d'entiers avec un tableau d'objets. (Les autres attributs ont été omis.)

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configurer un pipeline pour transformer des événements

Vous pouvez configurer un pipeline pour transformer les données d'événement dans la console Google Cloud ou à l'aide de gcloud CLI.

Notez qu'une seule médiation par pipeline est acceptée.

Console

  1. Dans la console Google Cloud, accédez à la page Eventarc > Pipelines.

    Accéder à la page Pipelines

  2. Vous pouvez créer un pipeline ou, si vous mettez à jour un pipeline, cliquer sur son nom.

    Notez que la mise à jour d'un pipeline peut prendre plus de 10 minutes.

  3. Sur la page Détails du pipeline, cliquez sur Modifier.

  4. Dans le volet Médiation des événements, procédez comme suit:

    1. Cochez la case Appliquer une transformation.
    2. Dans la liste Format entrant, sélectionnez le format applicable.

      Pour en savoir plus, consultez la section Formater les événements reçus.

    3. Dans le champ Expression CEL, écrivez une expression de transformation au format JSON. Les fonctions, macros et opérateurs CEL prédéfinis, ainsi que les expressions régulières, sont acceptés. Exemple :

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      L'exemple précédent effectue les opérations suivantes:

      • Supprime tous les attributs de l'événement d'origine, à l'exception de son id.
      • Définit l'attribut datacontenttype sur application/json.
      • Remplace la charge utile de l'événement par une chaîne JSON statique
    4. Cliquez sur Continuer.

  5. Dans le volet Destination, procédez comme suit:

    1. Le cas échéant, sélectionnez un format dans la liste Format sortant.

      Pour en savoir plus, consultez la section Formater les événements reçus.

    2. Vous pouvez également appliquer une liaison de message. Pour en savoir plus, consultez la section Définir une liaison de message de ce document.

  6. Cliquez sur Enregistrer.

gcloud

  1. Ouvrez un terminal.

  2. Vous pouvez créer un pipeline ou le mettre à jour à l'aide de la commande gcloud beta eventarc pipelines update:

    Notez que la mise à jour d'un pipeline peut prendre plus de 10 minutes.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Remplacez les éléments suivants :

    • PIPELINE_NAME: ID du pipeline ou nom complet
    • REGION: un emplacement Eventarc Advanced compatible

      Vous pouvez également définir la propriété d'emplacement de gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: expression écrite en JSON. Les fonctions, macros et opérateurs CEL prédéfinis, ainsi que les expressions régulières, sont acceptés. Un indicateur mediations permet d'appliquer une clé transformation_template.

    Exemple :

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    L'exemple précédent effectue les opérations suivantes:

    • Supprime tous les attributs de l'événement d'origine, à l'exception de son id.
    • Définit l'attribut datacontenttype sur application/json.
    • Remplace la charge utile de l'événement par une chaîne JSON statique

Fonctions d'extension

Eventarc Advanced prend en charge les fonctions d'extension suivantes, qui peuvent être utilisées pour transformer les données d'événement reçues via un bus.

Fonction Description
denormalize

Dénormalise une carte ou une liste en ajoutant des données redondantes pour améliorer les performances de lecture. Les noms de champ dans le mappage obtenu sont délimités par un point (.). L'index de la liste est converti en clé de chaîne, à partir de 0.

Notez que vous ne pouvez pas utiliser de point (.) dans les noms de champs Avro et Protobuf. Utilisez donc uniquement cette fonction pour cibler des données JSON.

Par exemple: map.() -> map(string, dyn) ou list() -> map(string, dyn)

merge

Joint deux champs et renvoie le champ combiné. Les champs avec des noms en double sont fusionnés.

Par exemple : message.(message) -> message

removeFields

Supprime des champs spécifiques d'un événement. Les noms de champ sont résolus en tant que chemins d'accès. Le caractère point (.) est utilisé comme délimiteur.

Notez que le format JSON brut est attendu. Si vous marshallez le code JSON, la transformation peut être appliquée à une chaîne JSON et entraîner une erreur.

Par exemple : message.(list(string)) -> message

setField

Ajoute ou remplace un champ de l'événement par une clé donnée. Le nom du champ est résolu en tant que chemin d'accès. Le caractère point (.) est utilisé comme délimiteur.

Par exemple : message.(string, dyn) -> message

Exemple: Ajouter un attribut à la charge utile de l'événement sans modifier d'autres données

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Exemple: Dénormaliser la liste d'éléments à partir de la charge utile de l'événement

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Exemple: Supprimer un champ de la charge utile de l'événement

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Définir une liaison de message

Par défaut, les événements sont toujours envoyés à une destination au format CloudEvents à l'aide d'une requête HTTP en mode "contenu binaire". Vous pouvez modifier ce comportement en définissant une liaison de message et en construisant une nouvelle requête HTTP.

Tous les en-têtes HTTP introduits par d'autres règles ou contrôles (par exemple, des jetons OAuth ou OIDC) sont conservés et fusionnés avec les en-têtes issus de l'expression de liaison.

Vous pouvez définir une liaison de messages lorsque vous configurez un pipeline dans la console Google Cloud ou à l'aide de gcloud CLI.

Console

  1. Dans la console Google Cloud, accédez à la page Eventarc > Pipelines.

    Accéder à la page Pipelines

  2. Vous pouvez créer un pipeline ou, si vous mettez à jour un pipeline, cliquer sur son nom.

    Notez que la mise à jour d'un pipeline peut prendre plus de 10 minutes.

  3. Sur la page Détails du pipeline, cliquez sur Modifier.

  4. Dans le volet Destination, appliquez une liaison de message, qui est une expression CEL écrite en JSON. Une nouvelle requête HTTP est alors créée et envoyée à la destination du pipeline.

    Pour en savoir plus, consultez les sections Accéder aux messages entrants et Créer des requêtes HTTP de ce document.

  5. Cliquez sur Enregistrer.

gcloud

  1. Ouvrez un terminal.

  2. Vous pouvez créer un pipeline ou le mettre à jour à l'aide de la commande gcloud beta eventarc pipelines update:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Remplacez les éléments suivants :

    • PIPELINE_NAME: ID du pipeline ou nom complet
    • REGION: un emplacement Eventarc Advanced compatible

      Vous pouvez également définir la propriété d'emplacement de gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: expression CEL écrite en JSON qui génère une nouvelle requête HTTP, qui est ensuite envoyée à la destination du pipeline.

      Pour en savoir plus, consultez les sections Accéder aux messages entrants et Créer des requêtes HTTP de ce document.

    Exemple :

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Notez que si vous utilisez une clé http_endpoint_message_binding_template, vous devez également définir les clés http_endpoint_uri et network_attachment.

Accéder aux messages entrants

Vous pouvez utiliser une expression CEL pour accéder à un message CloudEvents entrant comme suit:

  • Utilisez la valeur message.data pour accéder au champ data du message entrant.
  • Utilisez les valeurs message.key (key étant le nom de l'attribut) pour accéder aux attributs du message entrant.
  • Utilisez une variable headers pour accéder à tous les en-têtes ajoutés à la requête HTTP par les médiations précédentes dans la chaîne de traitement. Cette variable définit un mappage de paires clé-valeur correspondant aux en-têtes HTTP supplémentaires et non aux en-têtes d'origine de la requête entrante initiale.

    Par exemple, l'expression CEL suivante peut être utilisée pour créer une requête HTTP composée uniquement d'en-têtes en ajoutant un en-tête supplémentaire à ceux ajoutés dans les médiations de pipeline précédentes:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Créer des requêtes HTTP

Le résultat de l'expression CEL doit être un mappage de paires clé-valeur dont les champs headers et body sont utilisés pour créer la requête HTTP comme suit.

Pour les champs headers:

  • Si une carte headers existe en raison de l'expression CEL, ses paires clé-valeur sont mappées directement sur les en-têtes de requête HTTP, et ses valeurs sont construites à l'aide de l'encodage de chaîne canonique du type de données correspondant.
  • Si un champ headers n'existe pas, la requête HTTP générée ne contient aucun en-tête.

Pour les champs body:

  • Si un champ body existe en raison de l'expression CEL, sa valeur est mappée directement sur le corps de la requête HTTP.
  • Si la valeur du champ body est de type bytes ou string, elle est utilisée comme corps de la requête HTTP telle quelle. Sinon, elle est convertie en chaîne JSON.
  • Si le champ body n'existe pas, le corps de la requête HTTP résultant est le corps de la liaison de message HTTP CloudEvents finale en mode contenu binaire.

Tous les autres champs résultant de l'expression CEL sont ignorés.

Fonctions d'extension

Eventarc Advanced prend en charge les fonctions d'extension suivantes, qui peuvent être utilisées pour transformer les données d'événement lorsque vous spécifiez une liaison de message.

Fonction Description
merge

Fusionne une carte CEL transmise dans la carte CEL à laquelle la fonction est appliquée. Si la même clé existe dans les deux mappages ou si la valeur de la clé est de type map, les deux mappages sont fusionnés. Sinon, la valeur du mappage transmis est utilisée.

Exemple : map1.merge(map2) -> map3

toBase64

Convertit une valeur CEL en chaîne encodée en base64.

Exemple : map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Convertit un message en mappe CEL correspondant à une représentation JSON d'un message CloudEvents, puis applique toDestinationPayloadFormat aux données du message. Définit également le datacontenttype de l'événement sur le format sortant spécifié (output_payload_format_*). Si aucun format sortant n'est défini, tous les datacontenttype existants sont utilisés. Sinon, le datacontenttype n'est pas défini. Si le message ne respecte pas la spécification CloudEvents, la fonction échoue. Notez que pour convertir les données en chaîne JSON, vous pouvez utiliser toJsonString.

Exemple : message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Convertit message.data au format sortant spécifié (output_payload_format_*). Si aucun format sortant n'est défini, message.data est renvoyé tel quel.

Exemple : message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Convertit une valeur CEL en chaîne JSON.

Par exemple : map.toJsonString() -> string

toMap

Convertit une liste CEL de cartes CEL en une seule carte CEL.

Exemple : list(map).toMap() -> map

Exemple: conserver les en-têtes, ajouter un nouvel en-tête, définir le corps sur le format de destination

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

Étape suivante