Formatear eventos recibidos

Una canalización conecta un bus con un destino y dirige los mensajes de eventos a ese destino. Puede configurar una canalización para que espere datos de eventos en un formato específico o, antes de que los eventos se envíen a un destino, puede convertir los datos de eventos de un formato admitido a otro. Por ejemplo, puede que tengas que enrutar eventos a un endpoint que solo acepte datos Avro.

Formatos admitidos

Se admiten las siguientes conversiones de formato:

  • De Avro a JSON
  • De Avro a Protobuf
  • De JSON a Avro
  • De JSON a Protobuf
  • De Protobuf a Avro
  • De Protobuf a JSON

Ten en cuenta lo siguiente:

  • Cuando conviertes el formato de los eventos, solo se convierte la carga útil del evento, no todo el mensaje del evento.

  • Si se especifica un formato de datos de entrada para una canalización, todos los eventos deben coincidir con ese formato. Los eventos que no coincidan con el formato esperado se tratarán como errores persistentes.

  • Si no se especifica un formato de datos de entrada para una canalización, no se puede definir un formato de salida.

  • Antes de convertir un formato de evento para un destino específico, se aplica primero cualquier transformación de datos que se haya configurado.

  • Los eventos siempre se envían en formato CloudEvents mediante una solicitud HTTP en modo de contenido binario, a menos que especifiques un enlace de mensajes.

  • Los esquemas JSON se detectan de forma dinámica. En las definiciones de esquemas de Protobuf, solo puedes definir un tipo de nivel superior y no se admiten las instrucciones de importación que hacen referencia a otros tipos. Las definiciones de esquema sin un identificador syntax tienen el valor predeterminado proto2. Ten en cuenta que hay un límite de tamaño del esquema.

Configurar un flujo de procesamiento para dar formato a los eventos

Puedes configurar una canalización para que espere datos de eventos en un formato específico o para convertir datos de eventos de un formato a otro en la Google Cloud consola o mediante la CLI de gcloud.

Consola

  1. En la Google Cloud consola, ve a la página Eventarc > Pipelines.

    Ir a Pipelines

  2. Puedes crear una canalización o, si vas a actualizar una, hacer clic en su nombre.

  3. En la página Detalles de la canalización, haz clic en Editar.

  4. En el panel Mediación de eventos, haga lo siguiente:

    1. Seleccione la casilla Aplicar una transformación.
    2. En la lista Formato de entrada, seleccione el formato aplicable.

      Ten en cuenta que, si se especifica un formato de datos de entrada para una canalización, todos los eventos deben coincidir con ese formato. Los eventos que no coincidan con el formato esperado se tratarán como errores persistentes.

    3. En el caso de los formatos Avro o Protobuf, debe especificar un esquema de entrada. (Opcional: en lugar de especificarlo directamente, puedes subir un esquema de entrada).

    4. En el campo Expresión CEL, escribe una expresión de transformación con CEL.

    5. Haz clic en Continuar.

  5. En el panel Destino, haga lo siguiente:

    1. Si procede, en la lista Formato de salida, seleccione un formato.

      Ten en cuenta que, si no se especifica un formato de datos de entrada para una canalización, no se podrá definir un formato de salida.

    2. Opcional: aplica un enlace de mensaje. Para obtener más información, consulta el artículo Vinculación de mensajes.

  6. Haz clic en Guardar.

    Una canalización puede tardar un par de minutos en actualizarse.

gcloud

  1. Abre la terminal.

  2. Puedes crear una canalización o actualizarla con el comando gcloud eventarc pipelines update:

    gcloud eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --INPUT_PAYLOAD_FLAG \
        --destinations=OUTPUT_PAYLOAD_KEY

    Haz los cambios siguientes:

    • PIPELINE_NAME: el ID de la canalización o un nombre completo
    • REGION: una ubicación de Eventarc Advanced compatible

      También puedes definir la propiedad de ubicación de gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • INPUT_PAYLOAD_FLAG: una marca de formato de datos de entrada que puede ser una de las siguientes:

      • --input-payload-format-avro-schema-definition
      • --input-payload-format-json
      • --input-payload-format-protobuf-schema-definition

      Ten en cuenta que, si se especifica un formato de datos de entrada para una canalización, todos los eventos deben coincidir con ese formato. Los eventos que no coincidan con el formato esperado se tratarán como errores persistentes.

    • OUTPUT_PAYLOAD_KEY: una clave de formato de datos de salida que puede ser una de las siguientes:

      • output_payload_format_avro_schema_definition
      • output_payload_format_json
      • output_payload_format_protobuf_schema_definition

      Tenga en cuenta que, si define una clave de formato de datos de salida, también debe especificar una marca de formato de datos de entrada.

    Una canalización puede tardar un par de minutos en actualizarse.

    Ejemplos:

    En el siguiente ejemplo se usa una marca --input-payload-format-protobuf-schema-definition para especificar que la canalización debe esperar eventos en formato de datos Protobuf con un esquema específico:

    gcloud eventarc pipelines update my-pipeline \
        --input-payload-format-protobuf-schema-definition \
    '
      syntax = "proto3";
      message schema {
        string name = 1;
        string severity = 2;
      }
    '

    En el siguiente ejemplo se usan una clave output_payload_format_avro_schema_definition y una marca --input-payload-format-avro-schema-definition para crear una canalización que espera eventos en formato Avro y los genera en el mismo formato:

    gcloud eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \
        --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'

    En el siguiente ejemplo se usan una clave output_payload_format_protobuf_schema_definition y una marca --input-payload-format-avro-schema-definition para actualizar una canalización y convertir sus datos de eventos de Avro a Protobuf mediante definiciones de esquema:

    gcloud eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \
        --input-payload-format-avro-schema-definition= \
        '
        {
          "type": "record",
          "name": "MessageProto",
          "fields": [
            { "name" : "prop1", "type": "string" },
            { "name" : "prop2", "type": "string" },
          ]
        }
        '