Transforma los eventos recibidos

Puedes transformar tus datos de eventos escribiendo expresiones de transformación con CEL. Por ejemplo, puedes modificar las cargas útiles de eventos para satisfacer un contrato de API específico de un destino.

Ten en cuenta que los eventos siempre se entregan en un formato de CloudEvents con una solicitud HTTP en modo de contenido binario a menos que especifiques una vinculación de mensajes.

Establece los formatos de datos de entrada y salida

Además de escribir una expresión de transformación en CEL, de manera opcional, puedes especificar el formato de datos de los datos de eventos entrantes. Esto permite que Eventarc Advanced sepa cómo analizar la carga útil del evento. También puedes convertir los datos de un formato a otro.

Se admiten los siguientes formatos: Avro, JSON y Protobuf. Para obtener más información, consulta Cómo formatear los eventos recibidos.

Expresiones de transformación

Cuando se transforman los eventos, se puede acceder a todos los atributos de eventos en una expresión CEL como variables a través de un objeto message predefinido. Estas variables se propagan con valores en función de los datos del evento en el entorno de ejecución. Por ejemplo:

  • message.id muestra el atributo id del evento.
  • message.data muestra una representación del valor de CEL de la carga útil del evento.
  • message.data.some-key muestra el contenido de un campo llamado some-key de la carga útil del evento.

Los campos en message.data siempre se representan como tipos String, y los valores se asignan desde el evento original con el esquema especificado cuando se establece el formato de datos de entrada.

La expresión de transformación debe expresar un evento completo que incluya los atributos del contexto del evento y la carga útil de datos del evento. Las expresiones se escriben en JSON, pero se admiten funciones, macros y operadores de CEL predefinidos, así como expresiones regulares con RE2. Eventarc Advanced también admite ciertas funciones de extensión que se pueden usar para transformar los datos de eventos.

A continuación, se muestran dos ejemplos del uso de expresiones CEL para transformar tus datos de eventos. Para ver más casos de uso y ejemplos, consulta Ejemplos de transformación.

Ejemplo: Da formato a los valores de los atributos

En el siguiente ejemplo, se da formato a los valores del atributo phone_number con funciones de expresión regular. (Se omitieron otros atributos).

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Estas son las funciones de expresión regular que se usaron en el ejemplo anterior:

  • re.capture: Captura el primer valor de grupo sin nombre o con nombre. Los argumentos son los siguientes:
    • target: Es la cadena que se debe analizar.
    • regex: Es una expresión regular que se usa para capturar valores.

    Muestra una cadena del primer valor de grupo capturado.

  • re.captureN: Realiza una coincidencia completa en la cadena y la expresión regular proporcionadas. Los argumentos son los siguientes:
    • target: Es la cadena que se debe analizar.
    • regex: Es una expresión regular que se usa para capturar valores.

    Muestra un mapa con pares clave-valor para un grupo con nombre (nombre del grupo, cadena capturada) o un grupo sin nombre (índice del grupo, cadena capturada).

  • re.extract: Coincide con los valores de grupo de la cadena de destino proporcionada y reescribe la cadena. Los argumentos son los siguientes:
    • target: Es la cadena que se debe analizar.
    • regex: Es una expresión regular que se usa para extraer valores.
    • rewrite: Es una expresión regular para indicar cómo se debe dar formato al resultado.

    Muestra una cadena de los valores extraídos con formato según el argumento rewrite.

Ejemplo: Asigna un array a un array de objetos

En el siguiente ejemplo, se asigna un array de números enteros a un array de objetos. (Se omitieron otros atributos).

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configura una canalización para transformar eventos

Puedes configurar una canalización para transformar los datos de eventos en la consola de Google Cloud o con gcloud CLI.

Ten en cuenta que solo se admite una mediación por canalización.

Console

  1. En la consola de Google Cloud, ve a la página Eventarc > Canales.

    Ir a Canalizaciones

  2. Puedes crear una canalización o, si quieres actualizar una, haz clic en su nombre.

    Ten en cuenta que la actualización de una canalización puede tardar más de 10 minutos.

  3. En la página Detalles de la canalización, haz clic en Editar.

  4. En el panel Mediación de eventos, haz lo siguiente:

    1. Selecciona la casilla de verificación Aplicar una transformación.
    2. En la lista Formato entrante, selecciona el formato aplicable.

      Para obtener más información, consulta Cómo formatear los eventos recibidos.

    3. En el campo Expresión de CEL, escribe una expresión de transformación en JSON. Se admiten funciones, macros y operadores predefinidos de CEL, así como expresiones regulares. Por ejemplo:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      En el ejemplo anterior, se hace lo siguiente:

      • Quita todos los atributos del evento original, excepto su id.
      • Establece el atributo datacontenttype en application/json.
      • Reemplaza la carga útil del evento por una cadena JSON estática.
    4. Haga clic en Continuar.

  5. En el panel Destino, haz lo siguiente:

    1. Si corresponde, selecciona un formato en la lista Formato de salida.

      Para obtener más información, consulta Cómo formatear los eventos recibidos.

    2. De manera opcional, aplica una vinculación de mensajes. Para obtener más información, consulta la sección Cómo definir una vinculación de mensajes en este documento.

  6. Haz clic en Guardar.

gcloud

  1. Abre una terminal.

  2. Puedes crear una canalización o actualizarla con el comando gcloud beta eventarc pipelines update:

    Ten en cuenta que la actualización de una canalización puede tardar más de 10 minutos.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Reemplaza lo siguiente:

    • PIPELINE_NAME: El ID de la canalización o un nombre completamente calificado
    • REGION: Una ubicación de Eventarc Advanced compatible

      Como alternativa, puedes configurar la propiedad de ubicación de gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: Es una expresión escrita en JSON. Se admiten funciones, macros y operadores predefinidos de CEL, así como expresiones regulares. Se usa una marca mediations para aplicar una clave transformation_template.

    Ejemplo:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    En el ejemplo anterior, se hace lo siguiente:

    • Quita todos los atributos del evento original, excepto su id.
    • Establece el atributo datacontenttype en application/json.
    • Reemplaza la carga útil del evento por una cadena JSON estática.

Funciones de extensión

Eventarc Advanced admite las siguientes funciones de extensión, que se pueden usar para transformar los datos de eventos recibidos a través de un bus.

Función Descripción
denormalize

Desnormaliza un mapa o una lista agregando datos redundantes para mejorar el rendimiento de la lectura. Los nombres de los campos en el mapa resultante se delimitan con un punto (.). El índice de la lista se convierte en una clave de cadena, a partir de 0.

Ten en cuenta que, como no puedes usar un punto (.) en los nombres de campos de Avro y Protobuf, usa esta función solo para segmentar datos JSON.

Por ejemplo, map.() -> map(string, dyn) o list() -> map(string, dyn).

merge

Une dos campos y muestra el campo combinado. Se combinan los campos con nombres duplicados.

Por ejemplo: message.(message) -> message

removeFields

Quita campos específicos de un evento. Los nombres de campo se resuelven como rutas de acceso. El carácter de punto (.) se usa como delimitador.

Ten en cuenta que se espera un JSON sin procesar. Si ordenas el JSON, la transformación se podría aplicar a una cadena JSON y generar un error.

Por ejemplo: message.(list(string)) -> message

setField

Agrega o reemplaza un campo del evento con una clave determinada. El nombre del campo se resuelve como una ruta de acceso. El carácter de punto (.) se usa como delimitador.

Por ejemplo: message.(string, dyn) -> message

Ejemplo: Agrega un atributo a la carga útil del evento sin modificar otros datos

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Ejemplo: Cómo desnormalizar la lista de elementos de la carga útil del evento

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Ejemplo: Quita un campo de la carga útil del evento

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Define una vinculación de mensajes

De forma predeterminada, los eventos siempre se entregan a un destino en un formato de CloudEvents con una solicitud HTTP en modo de contenido binario. De forma opcional, puedes definir una vinculación de mensajes y crear una nueva solicitud HTTP para anular este comportamiento.

Cualquier encabezado HTTP que introduzcan otras políticas o controles (por ejemplo, tokens de OAuth o OIDC) se conserva y se combina con los encabezados que resultan de la expresión de vinculación.

Puedes definir una vinculación de mensajes cuando configures una canalización en la console de Google Cloud o con gcloud CLI.

Console

  1. En la consola de Google Cloud, ve a la página Eventarc > Canales.

    Ir a Canalizaciones

  2. Puedes crear una canalización o, si quieres actualizar una, haz clic en su nombre.

    Ten en cuenta que la actualización de una canalización puede tardar más de 10 minutos.

  3. En la página Detalles de la canalización, haz clic en Editar.

  4. En el panel Destino, aplica una vinculación de mensajes, que es una expresión de CEL escrito en JSON. Esto genera una solicitud HTTP recién creada que, luego, se envía al destino de la canalización.

    Para obtener más información, consulta las secciones Cómo acceder a los mensajes entrantes y Cómo crear solicitudes HTTP en este documento.

  5. Haz clic en Guardar.

gcloud

  1. Abre una terminal.

  2. Puedes crear una canalización o actualizarla con el comando gcloud beta eventarc pipelines update:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Reemplaza lo siguiente:

    • PIPELINE_NAME: El ID de la canalización o un nombre completamente calificado
    • REGION: Una ubicación de Eventarc Advanced compatible

      Como alternativa, puedes configurar la propiedad de ubicación de gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: Es una expresión CEL escrita en JSON que genera una solicitud HTTP recién creada que, luego, se envía al destino de la canalización.

      Para obtener más información, consulta las secciones Cómo acceder a los mensajes entrantes y Cómo crear solicitudes HTTP en este documento.

    Ejemplo:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Ten en cuenta que, si usas una clave http_endpoint_message_binding_template, también debes configurar las claves http_endpoint_uri y network_attachment.

Accede a los mensajes entrantes

Puedes usar una expresión CEL para acceder a un mensaje entrante de CloudEvents de la siguiente manera:

  • Usa el valor message.data para acceder al campo data del mensaje entrante.
  • Usa los valores message.key (donde key es el nombre del atributo) para acceder a los atributos del mensaje entrante.
  • Usa una variable headers para acceder a los encabezados que las mediaciones anteriores de la cadena de procesamiento agregaron a la solicitud HTTP. Esta variable define un mapa de parejas clave-valor que corresponden a los encabezados HTTP adicionales y no a los encabezados originales de la solicitud entrante inicial.

    Por ejemplo, la siguiente expresión CEL se puede usar para crear una solicitud HTTP solo de encabezados agregando un encabezado adicional a los que se agregaron en las mediaciones de canalización anteriores:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Cómo crear solicitudes HTTP

El resultado de la expresión CEL debe ser un mapa de pares clave-valor cuyos campos headers y body se usan para construir la solicitud HTTP de la siguiente manera.

Para los campos headers:

  • Si existe un mapa headers como resultado de la expresión CEL, sus pares clave-valor se asignan directamente a los encabezados de la solicitud HTTP, y sus valores se construyen con la codificación de cadena canónica del tipo de datos correspondiente.
  • Si no existe un campo headers, la solicitud HTTP resultante no contendrá ningún encabezado.

Para los campos body:

  • Si existe un campo body como resultado de la expresión CEL, su valor se asigna directamente al cuerpo de la solicitud HTTP.
  • Si el valor del campo body es del tipo bytes o string, se usa como cuerpo de la solicitud HTTP tal como está; de lo contrario, se convierte en una cadena JSON.
  • Si no existe el campo body, el cuerpo de la solicitud HTTP resultante es el cuerpo de la vinculación de mensajes HTTP de CloudEvents final en modo de contenido binario.

Se ignoran todos los demás campos como resultado de la expresión CEL.

Funciones de extensión

Eventarc Advanced admite las siguientes funciones de extensión, que se pueden usar para transformar los datos del evento cuando se especifica una vinculación de mensajes.

Función Descripción
merge

Combina un mapa CEL pasado en el mapa CEL al que se aplica la función. Si la misma clave existe en ambos mapas o si el valor de la clave es del tipo map, se combinan ambos mapas. De lo contrario, se usa el valor del mapa pasado.

Ejemplo: map1.merge(map2) -> map3

toBase64

Convierte un valor de CEL en una cadena codificada en base64 con URL.

Ejemplo: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Convierte un mensaje en un mapa CEL que corresponde a una representación en JSON de un mensaje de CloudEvents y aplica toDestinationPayloadFormat a los datos del mensaje. También establece el datacontenttype del evento en el formato saliente especificado (output_payload_format_*). Si no se establece un formato saliente, se usa cualquier datacontenttype existente; de lo contrario, no se establece el datacontenttype. Si el mensaje no cumple con la especificación de CloudEvents, la función falla. Ten en cuenta que, para convertir los datos en una cadena JSON, puedes usar toJsonString.

Ejemplo: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Convierte message.data al formato de salida especificado (output_payload_format_*). Si no se establece un formato de salida, message.data se muestra sin cambios.

Ejemplo: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Convierte un valor CEL en una cadena JSON.

Por ejemplo: map.toJsonString() -> string

toMap

Convierte una lista de mapas CEL en un solo mapa CEL.

Ejemplo: list(map).toMap() -> map

Ejemplo: Retener encabezados, agregar un encabezado nuevo y establecer el cuerpo en el formato de destino

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

¿Qué sigue?