Transformar eventos recebidos

É possível transformar os dados de eventos escrevendo expressões de transformação usando a CEL. Por exemplo, é possível modificar os payloads de eventos para atender a um contrato de API específico de um destino.

Os eventos são sempre entregues em um formato CloudEvents usando uma solicitação HTTP no modo de conteúdo binário, a menos que você especifique uma vinculação de mensagens.

Definir os formatos de dados de entrada e saída

Além de escrever uma expressão de transformação em CEL, você pode especificar o formato de dados dos dados de eventos recebidos. Isso permite que o Eventarc Advanced saiba como analisar o payload do evento. Também é possível converter os dados de um formato para outro.

Os seguintes formatos são aceitos: Avro, JSON e Protobuf. Para mais informações, consulte Formatar eventos recebidos.

Expressões de transformação

Ao transformar eventos, todos os atributos de evento podem ser acessados em uma expressão CEL como variáveis usando um objeto message predefinido. Essas variáveis são preenchidas com valores com base nos dados do evento no momento da execução. Exemplo:

  • message.id retorna o atributo id do evento
  • message.data retorna uma representação de valor CEL do payload do evento
  • message.data.some-key retorna o conteúdo de um campo chamado some-key do payload do evento

Os campos em message.data são sempre representados como tipos String, e os valores são mapeados do evento original usando o esquema especificado ao definir o formato de dados de entrada.

A expressão de transformação precisa expressar um evento completo que inclua os atributos de contexto do evento e o payload de dados do evento. As expressões são escritas em JSON, mas há suporte para funções, macros e operadores CEL predefinidos, bem como expressões regulares que usam RE2. O Eventarc Advanced também oferece suporte a determinadas funções de extensão que podem ser usadas para transformar os dados do evento.

Confira a seguir dois exemplos de uso de expressões de CEL para transformar os dados de eventos. Para mais casos de uso e exemplos, consulte Exemplos de transformação.

Exemplo: formatar valores de atributos

O exemplo a seguir formata os valores de atributo phone_number usando funções de expressão regular. Outros atributos foram omitidos.

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Estas são as funções de expressão regular usadas no exemplo anterior:

  • re.capture: captura o primeiro valor de grupo sem nome ou nomeado. Os argumentos são os seguintes:
    • target: string que precisa ser analisada
    • regex: expressão regular usada para capturar valores

    Retorna uma string do primeiro valor de grupo capturado.

  • re.captureN: faz uma correspondência completa na string e na expressão regular fornecidas. Os argumentos são os seguintes:
    • target: string que precisa ser analisada
    • regex: expressão regular usada para capturar valores

    Retorna um mapa com pares de chave e valor para um grupo nomeado (nome do grupo, string capturada) ou um grupo sem nome (índice do grupo, string capturada).

  • re.extract: corresponde aos valores de grupo da string de destino especificada e grava a string novamente. Os argumentos são os seguintes:
    • target: string que precisa ser analisada
    • regex: expressão regular usada para extrair valores
    • rewrite: expressão regular para formatar o resultado

    Retorna uma string dos valores extraídos formatada com base no argumento rewrite.

Exemplo: mapear uma matriz para uma matriz de objetos

O exemplo a seguir mapeia uma matriz de números inteiros para uma matriz de objetos. Outros atributos foram omitidos.

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configurar um pipeline para transformar eventos

É possível configurar um pipeline para transformar dados de eventos no console do Google Cloud ou usando a CLI gcloud.

Somente uma mediação por pipeline é aceita.

Console

  1. No console do Google Cloud, acesse a página Eventarc > Pipelines.

    Acessar "Pipelines"

  2. Você pode criar um pipeline ou, se estiver atualizando um, clique no nome dele.

    A atualização de um pipeline pode levar mais de 10 minutos.

  3. Na página Detalhes do pipeline, clique em Editar.

  4. No painel Mediação de eventos, faça o seguinte:

    1. Marque a caixa de seleção Aplicar uma transformação.
    2. Na lista Formato de entrada, selecione o formato aplicável.

      Para mais informações, consulte Formatar eventos recebidos.

    3. No campo Expressão CEL, escreva uma expressão de transformação em JSON. Há suporte para funções, macros e operadores predefinidos da CEL, bem como para expressões regulares. Exemplo:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      O exemplo anterior faz o seguinte:

      • Remove todos os atributos do evento original, exceto o id
      • Define o atributo datacontenttype como application/json.
      • Substitui o payload do evento por uma string JSON estática
    4. Clique em Continuar.

  5. No painel Destino, faça o seguinte:

    1. Se aplicável, selecione um formato na lista Formato de saída.

      Para mais informações, consulte Formatar eventos recebidos.

    2. Como alternativa, aplique uma Vinculação de mensagem. Para mais informações, consulte a seção Definir uma vinculação de mensagem neste documento.

  6. Clique em Salvar.

gcloud

  1. Abra um terminal.

  2. É possível criar um pipeline ou atualizar um usando o comando gcloud beta eventarc pipelines update:

    A atualização de um pipeline pode levar mais de 10 minutos.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Substitua:

    • PIPELINE_NAME: o ID do pipeline ou um nome totalmente qualificado
    • REGION: um local do Eventarc Advanced compatível.

      Como alternativa, defina a propriedade de local da CLI gcloud:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: uma expressão escrita em JSON. Há suporte para funções, macros e operadores predefinidos da CEL, bem como para expressões regulares. Uma flag mediations é usada para aplicar uma chave transformation_template.

    Exemplo:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    O exemplo anterior faz o seguinte:

    • Remove todos os atributos do evento original, exceto o id
    • Define o atributo datacontenttype como application/json.
    • Substitui o payload do evento por uma string JSON estática

Funções de extensão

O Eventarc Advanced oferece suporte às seguintes funções de extensão, que podem ser usadas para transformar os dados de eventos recebidos por um barramento.

Função Descrição
denormalize

Desnormaliza um mapa ou uma lista adicionando dados redundantes para melhorar o desempenho de leitura. Os nomes de campo no mapa resultante são delimitados usando um ponto (.). O índice da lista é convertido em uma chave de string, começando em 0.

Como não é possível usar um ponto (.) nos nomes de campo do Avro e do Protobuf, use essa função apenas para dados JSON.

Por exemplo: map.() -> map(string, dyn) ou list() -> map(string, dyn)

merge

Une dois campos e retorna o campo combinado. Os campos com nomes duplicados são mesclados.

Por exemplo: message.(message) -> message

removeFields

Remove campos específicos de um evento. Os nomes dos campos são resolvidos como caminhos. O caractere de ponto (.) é usado como delimitador.

O JSON bruto é esperado. Se você agrupar o JSON, a transformação poderá ser aplicada a uma string JSON e resultar em um erro.

Por exemplo: message.(list(string)) -> message

setField

Adiciona ou substitui um campo do evento com uma chave específica. O nome do campo é resolvido como um caminho. O caractere de ponto (.) é usado como delimitador.

Por exemplo: message.(string, dyn) -> message

Exemplo: adicionar um atributo ao payload do evento sem modificar outros dados

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Exemplo: desnormalizar a lista de itens do payload do evento

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Exemplo: remover um campo do payload do evento

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Definir uma vinculação de mensagem

Por padrão, os eventos são sempre entregues a um destino em um formato CloudEvents usando uma solicitação HTTP no modo de conteúdo binário. Também é possível definir uma vinculação de mensagens e criar uma nova solicitação HTTP para modificar esse comportamento.

Todos os cabeçalhos HTTP introduzidos por outras políticas ou controles (por exemplo, tokens OAuth ou OIDC) são preservados e mesclados com os cabeçalhos resultantes da expressão de vinculação.

É possível definir uma vinculação de mensagem ao configurar um pipeline no Console do Google Cloud ou usando a CLI gcloud.

Console

  1. No console do Google Cloud, acesse a página Eventarc > Pipelines.

    Acessar "Pipelines"

  2. Você pode criar um pipeline ou, se estiver atualizando um, clique no nome dele.

    A atualização de um pipeline pode levar mais de 10 minutos.

  3. Na página Detalhes do pipeline, clique em Editar.

  4. No painel Destino, aplique uma Vinculação de mensagem, que é uma expressão CEL gravada em JSON. Isso resulta em uma solicitação HTTP recém-construída, que é enviada ao destino do pipeline.

    Para mais informações, consulte as seções Acessar mensagens de entrada e Criar solicitações HTTP neste documento.

  5. Clique em Salvar.

gcloud

  1. Abra um terminal.

  2. É possível criar um pipeline ou atualizar um usando o comando gcloud beta eventarc pipelines update:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Substitua:

    • PIPELINE_NAME: o ID do pipeline ou um nome totalmente qualificado
    • REGION: um local do Eventarc Advanced compatível.

      Como alternativa, defina a propriedade de local da CLI gcloud:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: uma expressão CEL gravada em JSON que resulta em uma solicitação HTTP recém-criada, que é enviada ao destino do pipeline.

      Para mais informações, consulte as seções Acessar mensagens de entrada e Criar solicitações HTTP neste documento.

    Exemplo:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Se você estiver usando uma chave http_endpoint_message_binding_template, também será necessário definir as chaves http_endpoint_uri e network_attachment.

Acessar mensagens recebidas

É possível usar uma expressão CEL para acessar uma mensagem de entrada do CloudEvents da seguinte maneira:

  • Use o valor message.data para acessar o campo data da mensagem de entrada.
  • Use os valores message.key (em que key é o nome do atributo) para acessar os atributos da mensagem de entrada.
  • Use uma variável headers para acessar todos os cabeçalhos adicionados à solicitação HTTP por mediações anteriores na cadeia de processamento. Essa variável define um mapa de pares de chave-valor correspondentes aos cabeçalhos HTTP adicionais, e não aos cabeçalhos originais da solicitação de entrada inicial.

    Por exemplo, a expressão CEL a seguir pode ser usada para construir uma solicitação HTTP somente com cabeçalhos, adicionando um cabeçalho adicional aos adicionados nas mediações de pipeline anteriores:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Criar solicitações HTTP

O resultado da expressão CEL precisa ser um mapa de pares de chave-valor cujos campos headers e body são usados para construir a solicitação HTTP da seguinte maneira:

Para campos headers:

  • Se um mapa headers existir como resultado da expressão CEL, os pares de chave-valor dele serão mapeados diretamente para os cabeçalhos de solicitação HTTP, e os valores serão construídos usando a codificação de string canônica do tipo de dados correspondente.
  • Se um campo headers não existir, a solicitação HTTP resultante não conterá cabeçalhos.

Para campos body:

  • Se um campo body existir como resultado da expressão CEL, o valor dele será mapeado diretamente para o corpo da solicitação HTTP.
  • Se o valor do campo body for do tipo bytes ou string, ele será usado como o corpo da solicitação HTTP. Caso contrário, ele será convertido em uma string JSON.
  • Se o campo body não existir, o corpo da solicitação HTTP resultante será o corpo da vinculação de mensagem HTTP do CloudEvents final no modo de conteúdo binário.

Todos os outros campos como resultado da expressão CEL são ignorados.

Funções de extensão

O Eventarc Advanced oferece suporte às seguintes funções de extensão, que podem ser usadas para transformar os dados do evento ao especificar uma vinculação de mensagem.

Função Descrição
merge

Mescla um mapa CEL transmitido no mapa CEL em que a função é aplicada. Se a mesma chave existir nos dois mapas ou se o valor da chave for do tipo map, os dois mapas serão mesclados. Caso contrário, o valor do mapa transmitido será usado.

Exemplo: map1.merge(map2) -> map3

toBase64

Converte um valor de CEL em uma string codificada em base64.

Exemplo: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Converte uma mensagem em um mapa CEL que corresponde a uma representação JSON de uma mensagem do CloudEvents e aplica toDestinationPayloadFormat aos dados da mensagem. Também define o datacontenttype do evento para o formato de saída especificado (output_payload_format_*). Se um formato de saída não estiver definido, qualquer datacontenttype existente será usado. Caso contrário, o datacontenttype não será definido. Se a mensagem não seguir a especificação do CloudEvents, a função vai falhar. Para converter os dados em uma string JSON, use toJsonString.

Exemplo: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Converte message.data para o formato de saída especificado (output_payload_format_*). Se um formato de saída não estiver definido, message.data será retornado sem alterações.

Exemplo: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Converte um valor CEL em uma string JSON.

Por exemplo: map.toJsonString() -> string

toMap

Converte uma lista de mapas CEL em um único mapa CEL.

Exemplo: list(map).toMap() -> map

Exemplo: manter cabeçalhos, adicionar um novo cabeçalho e definir o corpo para o formato de destino

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

A seguir