Transforme eventos recebidos

Pode transformar os dados de eventos escrevendo expressões de transformação com a CEL. Por exemplo, pode modificar os payloads de eventos para satisfazer o contrato de API específico de um destino.

Tenha em atenção que os eventos são sempre enviados num formato CloudEvents através de um pedido HTTP no modo de conteúdo binário a menos que especifique uma associação de mensagens.

Defina os formatos de dados de entrada e saída

Além de escrever uma expressão de transformação em CEL, pode, opcionalmente, especificar o formato de dados dos dados de eventos recebidos. Isto permite que o Eventarc Advanced saiba como analisar o payload do evento. Também pode converter os dados de um formato para outro.

São suportados os seguintes formatos: Avro, JSON e Protobuf. Para mais informações, consulte o artigo Formate eventos recebidos.

Expressões de transformação

Quando transforma eventos, pode aceder a todos os atributos de eventos numa expressão CEL como variáveis através de um objeto message predefinido. Estas variáveis são preenchidas com valores baseados nos dados de eventos em tempo de execução. Por exemplo:

  • message.id devolve o atributo id do evento
  • message.data devolve uma representação de valor CEL da carga útil do evento
  • message.data.some-key devolve o conteúdo de um campo denominado some-key do payload do evento

Os campos em message.data são sempre representados como tipos String e os valores são mapeados a partir do evento original através do esquema especificado quando define o formato de dados de entrada.

A expressão de transformação deve expressar um evento completo que inclua os atributos de contexto do evento e a carga útil de dados do evento. As expressões são escritas em JSON, mas as funções, as macros e os operadores CEL predefinidos, bem como as expressões regulares que usam RE2, são suportados. O Eventarc Advanced também suporta determinadas funções de extensão que podem ser usadas para transformar os dados de eventos.

Seguem-se dois exemplos de utilização de expressões de IEC para transformar os dados de eventos. Para ver mais exemplos de utilização, consulte os exemplos de transformações.

Exemplo: formatar valores de atributos

O exemplo seguinte formata os phone_numbervalores dos atributos com funções de expressões regulares. (Outros atributos foram omitidos.)

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Seguem-se as funções de expressão regular usadas no exemplo anterior:

  • re.capture: captura o primeiro valor do grupo sem nome ou com nome. Os argumentos são os seguintes:
    • target: string que deve ser analisada
    • regex: expressão regular usada para capturar valores

    Devolve uma string do valor do primeiro grupo capturado.

  • re.captureN: faz uma correspondência completa na string e na expressão regular dadas. Os argumentos são os seguintes:
    • target: string que deve ser analisada
    • regex: expressão regular usada para capturar valores

    Devolve um mapa com pares de chave e valor para um grupo com nome (nome do grupo, string capturada) ou um grupo sem nome (índice do grupo, string capturada).

  • re.extract: faz corresponder os valores do grupo da string de destino fornecida e reescreve a string. Os argumentos são os seguintes:
    • target: string que deve ser analisada
    • regex: expressão regular usada para extrair valores
    • rewrite: expressão regular para como o resultado deve ser formatado

    Devolve uma string dos valores extraídos formatada com base no argumento rewrite.

Exemplo: mapeie uma matriz para uma matriz de objetos

O exemplo seguinte mapeia uma matriz de números inteiros numa matriz de objetos. (Outros atributos foram omitidos.)

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configure um pipeline para transformar eventos

Pode configurar um pipeline para transformar dados de eventos na Google Cloud consola ou através da CLI gcloud.

Tenha em atenção que só é suportada uma mediação por pipeline.

Consola

  1. Na Google Cloud consola, aceda à página Eventarc > Pipelines.

    Aceda a Pipelines

  2. Pode criar um pipeline ou, se estiver a atualizar um pipeline, clicar no nome do pipeline.

  3. Na página Detalhes do pipeline, clique em Editar.

  4. No painel Mediação de eventos, faça o seguinte:

    1. Selecione a caixa de verificação Aplicar uma transformação.
    2. Na lista Formato de entrada, selecione o formato aplicável.

      Para mais informações, consulte o artigo Formate eventos recebidos.

    3. No campo Expressão CEL, escreva uma expressão de transformação em JSON. As funções, as macros e os operadores CEL predefinidos, bem como as expressões regulares, são suportados. Por exemplo:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      O exemplo anterior faz o seguinte:

      • Remove todos os atributos do evento original, exceto o respetivo id
      • Define o atributo datacontenttype como application/json
      • Substitui o payload do evento por uma string JSON estática
    4. Clique em Continuar.

  5. No painel Destino, faça o seguinte:

    1. Se aplicável, na lista Formato de saída, selecione um formato.

      Para mais informações, consulte o artigo Formate eventos recebidos.

    2. Opcionalmente, aplique uma vinculação de mensagens. Para mais informações, consulte a secção Defina uma associação de mensagens neste documento.

  6. Clique em Guardar.

    A atualização de um pipeline pode demorar alguns minutos.

gcloud

  1. Abra um terminal.

  2. Pode criar um pipeline ou atualizar um pipeline através do comando gcloud eventarc pipelines update:

    gcloud eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Substitua o seguinte:

    • PIPELINE_NAME: o ID do pipeline ou um nome totalmente qualificado
    • REGION: uma localização avançada do Eventarc suportada

      Em alternativa, pode definir a propriedade de localização da CLI gcloud:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: uma expressão escrita em JSON. As funções, as macros e os operadores CEL predefinidos, bem como as expressões regulares, são suportados. É usada uma flag mediations para aplicar uma chave transformation_template.

    A atualização de um pipeline pode demorar alguns minutos.

    Exemplo:

    gcloud eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    O exemplo anterior faz o seguinte:

    • Remove todos os atributos do evento original, exceto o respetivo id
    • Define o atributo datacontenttype como application/json
    • Substitui o payload do evento por uma string JSON estática

Funções de extensões

O Eventarc Advanced suporta as seguintes funções de extensão que podem ser usadas para transformar os dados de eventos recebidos através de um barramento.

Função Descrição
denormalize

Desnormaliza um mapa ou uma lista adicionando dados redundantes para melhorar o desempenho de leitura. Os nomes dos campos no mapa resultante são delimitados com um ponto (.). O índice da lista é convertido numa chave de string, começando em 0.

Tenha em atenção que, como não pode usar um ponto (.) nos nomes dos campos Avro e Protobuf, use apenas esta função para segmentar dados JSON.

Por exemplo: map.() -> map(string, dyn) ou list() -> map(string, dyn)

merge

Junta dois campos e devolve o campo combinado. Os campos com nomes duplicados são unidos.

Por exemplo: message.(message) -> message

removeFields

Remove campos específicos de um evento. Os nomes dos campos são resolvidos como caminhos. O caráter de ponto (.) é usado como delimitador.

Tenha em atenção que é esperado JSON não processado. Se serializar o JSON, a transformação pode ser aplicada a uma string JSON e resultar num erro.

Por exemplo: message.(list(string)) -> message

setField

Adiciona ou substitui um campo do evento por uma determinada chave. O nome do campo é resolvido como um caminho. O caráter de ponto (.) é usado como delimitador.

Por exemplo: message.(string, dyn) -> message

Exemplo: adicione um atributo ao payload do evento sem modificar outros dados

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Exemplo: desnormalizar a lista de artigos da carga útil do evento

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Exemplo: remova o campo do payload do evento

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Defina uma associação de mensagens

Por predefinição, os eventos são sempre enviados para um destino num formato CloudEvents através de um pedido HTTP no modo de conteúdo binário. Opcionalmente, pode substituir este comportamento definindo uma associação de mensagens e criando um novo pedido HTTP.

Todos os cabeçalhos HTTP introduzidos por outras políticas ou controlos (por exemplo, tokens OAuth ou OIDC) são preservados e unidos aos cabeçalhos resultantes da expressão de associação.

Pode definir uma associação de mensagens quando configura um pipeline na Google Cloud consola ou através da CLI gcloud.

Consola

  1. Na Google Cloud consola, aceda à página Eventarc > Pipelines.

    Aceda a Pipelines

  2. Pode criar um pipeline ou, se estiver a atualizar um pipeline, clicar no nome do pipeline.

    Tenha em atenção que a atualização de um pipeline pode demorar mais de 10 minutos.

  3. Na página Detalhes do pipeline, clique em Editar.

  4. No painel Destino, aplique uma Associação de mensagens, que é uma expressão CEL escrita em JSON. Isto resulta num pedido HTTP recém-criado que é enviado para o destino do pipeline.

    Para mais informações, consulte as secções Aceda a mensagens recebidas e Crie pedidos HTTP neste documento.

  5. Clique em Guardar.

gcloud

  1. Abra um terminal.

  2. Pode criar um pipeline ou atualizar um pipeline através do comando gcloud eventarc pipelines update:

    gcloud eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Substitua o seguinte:

    • PIPELINE_NAME: o ID do pipeline ou um nome totalmente qualificado
    • REGION: uma localização avançada do Eventarc suportada

      Em alternativa, pode definir a propriedade de localização da CLI gcloud:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: uma expressão CEL escrita em JSON que resulta num pedido HTTP recém-criado que é, em seguida, enviado para o destino do pipeline.

      Para mais informações, consulte as secções Aceda a mensagens recebidas e Construa pedidos HTTP neste documento.

    Exemplo:

    gcloud eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com', \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Tenha em atenção que, se estiver a usar uma chave http_endpoint_message_binding_template, também tem de definir a chave http_endpoint_uri.

Aceda às mensagens recebidas

Pode usar uma expressão de IEC para aceder a uma mensagem CloudEvents de entrada da seguinte forma:

  • Use o valor message.data para aceder ao campo data da mensagem recebida.
  • Use os valores message.key (em que key é o nome do atributo) para aceder aos atributos da mensagem recebida.
  • Use uma variável headers para aceder a quaisquer cabeçalhos adicionados ao pedido HTTP por mediações anteriores na cadeia de processamento. Esta variável define um mapa de pares chave-valor correspondentes aos cabeçalhos HTTP adicionais e não aos cabeçalhos originais do pedido de entrada inicial.

    Por exemplo, a seguinte expressão CEL pode ser usada para criar um pedido HTTP apenas com cabeçalhos adicionando um cabeçalho adicional aos cabeçalhos adicionados nas mediações de pipeline anteriores:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Construa pedidos HTTP

O resultado da expressão CEL tem de ser um mapa de pares de chave/valor cujos campos headers e body são usados para criar o pedido HTTP da seguinte forma.

Para headers campos:

  • Se existir um mapa headers como resultado da expressão CEL, os respetivos pares de chave-valor são mapeados diretamente para os cabeçalhos dos pedidos HTTP, e os respetivos valores são construídos através da codificação de strings canónica do tipo de dados correspondente.
  • Se um campo headers não existir, o pedido HTTP resultante não contém cabeçalhos.

Para body campos:

  • Se existir um campo body como resultado da expressão CEL, o respetivo valor é mapeado diretamente para o corpo do pedido HTTP.
  • Se o valor do campo body for do tipo bytes ou string, é usado como o corpo do pedido HTTP tal como está. Caso contrário, é convertido numa string JSON.
  • Se o campo body não existir, o corpo do pedido HTTP resultante é o corpo da associação de mensagens HTTP CloudEvents final no modo de conteúdo binário.

Quaisquer outros campos resultantes da expressão de IEC são ignorados.

Funções de extensões

O Eventarc Advanced suporta as seguintes funções de extensão que podem ser usadas para transformar os dados de eventos quando especifica uma associação de mensagens.

Função Descrição
merge

Mescla um mapa CEL transmitido no mapa CEL ao qual a função é aplicada. Se a mesma chave existir em ambos os mapas ou se o valor da chave for do tipo map, ambos os mapas são unidos; caso contrário, é usado o valor do mapa transmitido.

Exemplo: map1.merge(map2) -> map3

toBase64

Converte um valor CEL numa string codificada em URL base64.

Exemplo: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Converte uma mensagem num mapa CEL que corresponde a uma representação JSON de uma mensagem CloudEvents e aplica toDestinationPayloadFormat aos dados da mensagem. Também define o datacontenttype do evento para o formato de saída especificado (output_payload_format_*). Se não for definido um formato de saída, é usado qualquer datacontenttype existente; caso contrário, o datacontenttype não é definido. Se a mensagem não cumprir a especificação CloudEvents, a função falha. Tenha em atenção que, para converter os dados numa string JSON, pode usar toJsonString.

Exemplo: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Converte message.data para o formato de saída especificado (output_payload_format_*). Se não for definido um formato de saída, message.data é devolvido inalterado.

Exemplo: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Converte um valor CEL numa string JSON.

Por exemplo: map.toJsonString() -> string

toMap

Converte uma lista IEC de mapas IEC num único mapa IEC.

Exemplo: list(map).toMap() -> map

Exemplo: manter cabeçalhos, adicionar novo cabeçalho, definir corpo para formato de destino

gcloud eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

O que se segue?