Trasforma gli eventi ricevuti

Puoi trasformare i dati degli eventi scrivendo espressioni di trasformazione utilizzando CEL. Ad esempio, puoi modificare i payload degli eventi in modo da soddisfare il contratto API specifico di una destinazione.

Tieni presente che gli eventi vengono sempre pubblicati in un formato CloudEvents utilizzando una richiesta HTTP in modalità di contenuti binari a meno che tu non specifichi una associazione di messaggi.

Impostare i formati dei dati di input e output

Oltre a scrivere un'espressione di trasformazione in CEL, facoltativamente puoi specificare il formato dei dati degli eventi in entrata. In questo modo, Eventarc Advanced sa come analizzare il payload dell'evento. Puoi anche convertire i dati da un formato all'altro.

Sono supportati i seguenti formati: Avro, JSON e Protobuf. Per scoprire di più, consulta la sezione Formattare gli eventi ricevuti.

Espressioni di trasformazione

Quando si trasformano gli eventi, è possibile accedere a tutti gli attributi dell'evento in un'espressione CEL come variabili tramite un oggetto message predefinito. Queste variabili vengono completate con valori basati sui dati sugli eventi in fase di esecuzione. Ad esempio:

  • message.id restituisce l'attributo id dell'evento
  • message.data restituisce una rappresentazione del valore CEL del payload dell'evento
  • message.data.some-key restituisce i contenuti di un campo denominato some-key dal payload dell'evento

I campi in message.data sono sempre rappresentati come tipi String e i valori vengono mappati dall'evento originale utilizzando lo schema specificato durante l'impostazione del formato dei dati di input.

L'espressione di trasformazione deve esprimere un evento completo che includa gli attributi del contesto dell'evento e il payload dei dati dell'evento. Le espressioni sono scritte in JSON, ma sono supportate funzioni, macro e operatori CEL predefiniti, nonché espressioni regolari che utilizzano RE2. Eventarc Advanced supporta anche determinate funzioni di estensione che possono essere utilizzate per trasformare i dati sugli eventi.

Di seguito sono riportati due esempi di utilizzo di espressioni CEL per trasformare i dati degli eventi. Per altri casi d'uso ed esempi, consulta Esempi di trasformazione.

Esempio: formatta i valori degli attributi

L'esempio seguente formatta i valori dell'attributo phone_number utilizzando le funzioni di espressione regolare. (Gli altri attributi sono stati omessi).

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Di seguito sono riportate le funzioni di espressione regolare utilizzate nell'esempio precedente:

  • re.capture: acquisisce il primo valore del gruppo senza nome o denominato. Gli argomenti sono i seguenti:
    • target: la stringa da analizzare
    • regex: espressione regolare utilizzata per acquisire i valori

    Restituisce una stringa del primo valore del gruppo acquisito.

  • re.captureN: esegue una corrispondenza completa sulla stringa e sull'espressione regolare specificate. Gli argomenti sono i seguenti:
    • target: la stringa da analizzare
    • regex: espressione regolare utilizzata per acquisire i valori

    Restituisce una mappa con coppie chiave/valore per un gruppo denominato (nome gruppo, stringa acquisita) o un gruppo senza nome (indice gruppo, stringa acquisita).

  • re.extract: abbina i valori di gruppo della stringa di destinazione specificata e riscrivi la stringa. Gli argomenti sono i seguenti:
    • target: stringa da analizzare
    • regex: espressione regolare impiegata per estrarre i valori
    • rewrite: espressione regolare per la formattazione del risultato

    Restituisce una stringa dei valori estratti formattata in base all'argomento rewrite.

Esempio: mappare un array a un array di oggetti

L'esempio seguente mappa un array di interi in un array di oggetti. (Gli altri attributi sono stati omessi).

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configurare una pipeline per trasformare gli eventi

Puoi configurare una pipeline per trasformare i dati sugli eventi nella console Google Cloud o utilizzando l'interfaccia a riga di comando gcloud.

Tieni presente che è supportata una sola mediazione per pipeline.

Console

  1. Nella console Google Cloud, vai alla pagina Eventarc > Pipeline.

    Vai a Pipeline

  2. Puoi creare una pipeline o, se stai aggiornando una pipeline, fai clic sul nome della pipeline.

    Tieni presente che l'aggiornamento di una pipeline potrebbe richiedere più di 10 minuti.

  3. Nella pagina Dettagli della pipeline, fai clic su Modifica.

  4. Nel riquadro Mediazione eventi, segui questi passaggi:

    1. Seleziona la casella di controllo Applica una trasformazione.
    2. Nell'elenco Formato in entrata, seleziona il formato applicabile.

      Per ulteriori informazioni, consulta Formattare gli eventi ricevuti.

    3. Nel campo Espressione CEL, scrivi un'espressione di trasformazione in JSON. Sono supportate funzioni, macro e operatori CEL predefinite, nonché espressioni regolari. Ad esempio:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      L'esempio precedente esegue le seguenti operazioni:

      • Rimuove tutti gli attributi dall'evento originale, ad eccezione di id
      • Imposta l'attributo datacontenttype su application/json
      • Sostituisce il payload dell'evento con una stringa JSON statica
    4. Fai clic su Continua.

  5. Nel riquadro Destinazione, segui questi passaggi:

    1. Se applicabile, seleziona un formato dall'elenco Formato in uscita.

      Per ulteriori informazioni, consulta Formattare gli eventi ricevuti.

    2. Se vuoi, applica una associazione di messaggi. Per ulteriori informazioni, consulta la sezione Definire una definizione di messaggio di questo documento.

  6. Fai clic su Salva.

gcloud

  1. Apri un terminale.

  2. Puoi creare una pipeline o aggiornarla utilizzando il comando gcloud beta eventarc pipelines update:

    Tieni presente che l'aggiornamento di una pipeline potrebbe richiedere più di 10 minuti.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Sostituisci quanto segue:

    • PIPELINE_NAME: l'ID della pipeline o un nome completamente qualificato
    • REGION: un'ubicazione Eventarc Advanced supportata

      In alternativa, puoi impostare la proprietà della posizione gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: un'espressione scritta in JSON. Sono supportate funzioni, macro e operatori predefinite di CEL, nonché espressioni regolari. Un mediations flag viene utilizzato per applicare una chiave transformation_template.

    Esempio:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    L'esempio precedente esegue le seguenti operazioni:

    • Rimuove tutti gli attributi dall'evento originale, ad eccezione di id
    • Imposta l'attributo datacontenttype su application/json
    • Sostituisce il payload dell'evento con una stringa JSON statica

Funzioni di estensione

Eventarc Advanced supporta le seguenti funzioni di estensione che possono essere utilizzate per trasformare i dati sugli eventi ricevuti tramite un bus.

Funzione Descrizione
denormalize

Denormalizza una mappa o un elenco aggiungendo dati ridondanti per migliorare le prestazioni di lettura. I nomi dei campi nella mappa risultante sono delimitati da un punto (.). L'indice dell'elenco viene convertito in una chiave di stringa, a partire da 0.

Tieni presente che, poiché non puoi utilizzare un punto (.) nei nomi dei campi Avro e Protobuf, utilizza questa funzione solo per scegliere come target i dati JSON.

Ad esempio: map.() -> map(string, dyn) o list() -> map(string, dyn)

merge

Unisce due campi e restituisce il campo combinato. I campi con nomi duplicati vengono uniti.

Ad esempio: message.(message) -> message

removeFields

Rimuove campi specifici da un evento. I nomi dei campi vengono risolti come percorsi. Il carattere punto (.) viene utilizzato come delimitatore.

Tieni presente che è previsto un JSON non elaborato. Se esegui il marshalling del JSON, la trasformazione potrebbe essere applicata a una stringa JSON e generare un errore.

Ad esempio: message.(list(string)) -> message

setField

Aggiunge o sostituisce un campo dell'evento con una determinata chiave. Il nome del campo viene risolto come percorso. Il carattere punto (.) viene utilizzato come delimitatore.

Ad esempio: message.(string, dyn) -> message

Esempio: aggiungi un attributo al payload dell'evento senza modificare altri dati

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Esempio: denormalizza l'elenco di elementi dal payload dell'evento

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Esempio: rimuovi un campo dal payload dell'evento

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Definire un'associazione di messaggi

Per impostazione predefinita, gli eventi vengono sempre pubblicati in una destinazione in formato CloudEvents utilizzando una richiesta HTTP in modalità di contenuti binari. Facoltativamente, puoi ignorare questo comportamento definendo un collegamento del messaggio e costruendo una nuova richiesta HTTP.

Eventuali intestazioni HTTP introdotte da altri criteri o controlli (ad esempio, token OAuth o OIDC) vengono conservate e unite alle intestazioni risultanti dall'espressione di associazione.

Puoi definire una definizione dei messaggi quando configuri una pipeline nella console Google Cloud o utilizzando la riga di comando gcloud.

Console

  1. Nella console Google Cloud, vai alla pagina Eventarc > Pipeline.

    Vai a Pipeline

  2. Puoi creare una pipeline o, se stai aggiornando una pipeline, fai clic sul nome della pipeline.

    Tieni presente che l'aggiornamento di una pipeline potrebbe richiedere più di 10 minuti.

  3. Nella pagina Dettagli della pipeline, fai clic su Modifica.

  4. Nel riquadro Destinazione, applica una associazione messaggio, ovvero un'espressione CEL scritta in JSON. Il risultato è una richiesta HTTP appena creata che viene poi inviata alla destinazione della pipeline.

    Per ulteriori informazioni, consulta le sezioni Accedere ai messaggi in entrata e Creare richieste HTTP di questo documento.

  5. Fai clic su Salva.

gcloud

  1. Apri un terminale.

  2. Puoi creare una pipeline o aggiornarla utilizzando il comando gcloud beta eventarc pipelines update:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Sostituisci quanto segue:

    • PIPELINE_NAME: l'ID della pipeline o un nome completamente qualificato
    • REGION: un'ubicazione Eventarc Advanced supportata

      In alternativa, puoi impostare la proprietà della posizione gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: un'espressione CEL scritta in JSON che genera una richiesta HTTP appena creata che viene poi inviata alla destinazione della pipeline.

      Per ulteriori informazioni, consulta le sezioni Accedere ai messaggi in entrata e Creare richieste HTTP di questo documento.

    Esempio:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Tieni presente che se utilizzi una chiave http_endpoint_message_binding_template, devi impostare anche le chiavi http_endpoint_uri e network_attachment.

Accedere ai messaggi in arrivo

Puoi utilizzare un'espressione CEL per accedere a un messaggio CloudEvents in entrata nel seguente modo:

  • Utilizza il valore message.data per accedere al campo data del messaggio in entrata.
  • Utilizza i valori message.key (dove key è il nome dell'attributo) per accedere agli attributi del messaggio in entrata.
  • Utilizza una variabile headers per accedere a eventuali intestazioni aggiunte alla richiesta HTTP dalle mediazioni precedenti nella catena di elaborazione. Questa variabile definisce una mappa di coppie chiave-valore corrispondenti alle intestazioni HTTP aggiuntive e non alle intestazioni originali della richiesta in entrata iniziale.

    Ad esempio, la seguente espressione CEL può essere utilizzata per creare una richiesta HTTP solo con intestazioni aggiungendo un'intestazione aggiuntiva a quelle aggiunte nelle mediazioni della pipeline precedenti:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Creare richieste HTTP

Il risultato dell'espressione CEL deve essere una mappa di coppie chiave-valore i cui campi headers e body vengono utilizzati per costruire la richiesta HTTP come segue.

Per i campi headers:

  • Se esiste una mappa headers come risultato dell'espressione CEL, le relative coppie chiave-valore vengono mappate direttamente alle intestazioni di richiesta HTTP e i relativi valori vengono costruiti utilizzando la codifica di stringa canonica del tipo di dati corrispondente.
  • Se non esiste un campo headers, la richiesta HTTP risultante non conterrà alcuna intestazione.

Per i campi body:

  • Se esiste un campo body come risultato dell'espressione CEL, il relativo valore viene mappato direttamente al corpo della richiesta HTTP.
  • Se il valore del campo body è di tipo bytes o string, viene utilizzato come corpo della richiesta HTTP così com'è; in caso contrario, viene convertito in una stringa JSON.
  • Se il campo body non esiste, il corpo della richiesta HTTP risultante è il corpo dell'associazione del messaggio HTTP CloudEvents finale in modalità di contenuti binari.

Eventuali altri campi risultanti dall'espressione CEL vengono ignorati.

Funzioni di estensione

Eventarc Advanced supporta le seguenti funzioni di estensione che possono essere utilizzate per trasformare i dati sugli eventi quando si specifica una definizione di messaggio.

Funzione Descrizione
merge

Unisce una mappa CEL passata alla mappa CEL a cui viene applicata la funzione. Se la stessa chiave esiste in entrambe le mappe o se il valore della chiave è di tipo map, entrambe le mappe vengono unite; in caso contrario, viene utilizzato il valore della mappa passata.

Esempio: map1.merge(map2) -> map3

toBase64

Converte un valore CEL in una stringa con codifica URL base64.

Esempio: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Converte un messaggio in una mappa CEL che corrisponde a una rappresentazione JSON di un messaggio CloudEvents e applica toDestinationPayloadFormat ai dati del messaggio. Imposta inoltre il datacontenttype dell'evento sul formato di esportazione specificato (output_payload_format_*). Se non è impostato un formato di esportazione, viene utilizzato qualsiasi datacontenttype esistente; in caso contrario, il datacontenttype non viene impostato. Se il messaggio non è conforme alla specifica CloudEvents, la funzione non va a buon fine. Tieni presente che per convertire i dati in una stringa JSON, puoi utilizzare toJsonString.

Esempio: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Converte message.data nel formato in uscita specificato (output_payload_format_*). Se non è impostato un formato in uscita, message.data viene restituito invariato.

Esempio: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Converte un valore CEL in una stringa JSON.

Ad esempio: map.toJsonString() -> string

toMap

Converte un elenco CEL di mappe CEL in una singola mappa CEL.

Esempio: list(map).toMap() -> map

Esempio: mantieni le intestazioni, aggiungi una nuova intestazione, imposta il corpo sul formato di destinazione

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

Passaggi successivi