Una canalización conecta un bus con un destino y dirige los mensajes de eventos a ese destino. Puede configurar una canalización para que espere datos de eventos en un formato específico o, antes de que los eventos se envíen a un destino, puede convertir los datos de eventos de un formato admitido a otro. Por ejemplo, puede que tengas que enrutar eventos a un endpoint que solo acepte datos Avro.
Formatos admitidos
Se admiten las siguientes conversiones de formato:
- De Avro a JSON
- De Avro a Protobuf
- De JSON a Avro
- De JSON a Protobuf
- De Protobuf a Avro
- De Protobuf a JSON
Ten en cuenta lo siguiente:
Cuando conviertes el formato de los eventos, solo se convierte la carga útil del evento, no todo el mensaje del evento.
Si se especifica un formato de datos de entrada para una canalización, todos los eventos deben coincidir con ese formato. Los eventos que no coincidan con el formato esperado se tratarán como errores persistentes.
Si no se especifica un formato de datos de entrada para una canalización, no se puede definir un formato de salida.
Antes de convertir un formato de evento para un destino específico, se aplica primero cualquier transformación de datos que se haya configurado.
Los eventos siempre se envían en formato CloudEvents mediante una solicitud HTTP en modo de contenido binario, a menos que especifiques un enlace de mensajes.
Los esquemas JSON se detectan de forma dinámica. En las definiciones de esquemas de Protobuf, solo puedes definir un tipo de nivel superior y no se admiten las instrucciones de importación que hacen referencia a otros tipos. Las definiciones de esquema sin un identificador
syntax
tienen el valor predeterminadoproto2
. Ten en cuenta que hay un límite de tamaño del esquema.
Configurar un flujo de procesamiento para dar formato a los eventos
Puedes configurar una canalización para que espere datos de eventos en un formato específico o para convertir datos de eventos de un formato a otro en la Google Cloud consola o mediante la CLI de gcloud.
Consola
En la Google Cloud consola, ve a la página Eventarc > Pipelines.
Puedes crear una canalización o, si vas a actualizar una, hacer clic en su nombre.
En la página Detalles de la canalización, haz clic en
Editar.En el panel Mediación de eventos, haga lo siguiente:
- Seleccione la casilla Aplicar una transformación.
En la lista Formato de entrada, seleccione el formato aplicable.
Ten en cuenta que, si se especifica un formato de datos de entrada para una canalización, todos los eventos deben coincidir con ese formato. Los eventos que no coincidan con el formato esperado se tratarán como errores persistentes.
En el caso de los formatos Avro o Protobuf, debe especificar un esquema de entrada. (Opcional: en lugar de especificarlo directamente, puedes subir un esquema de entrada).
En el campo Expresión CEL, escribe una expresión de transformación con CEL.
Haz clic en Continuar.
En el panel Destino, haga lo siguiente:
Si procede, en la lista Formato de salida, seleccione un formato.
Ten en cuenta que, si no se especifica un formato de datos de entrada para una canalización, no se podrá definir un formato de salida.
Opcional: aplica un enlace de mensaje. Para obtener más información, consulta el artículo Vinculación de mensajes.
Haz clic en Guardar.
Una canalización puede tardar un par de minutos en actualizarse.
gcloud
Abre la terminal.
Puedes crear una canalización o actualizarla con el comando
gcloud eventarc pipelines update
:gcloud eventarc pipelines update PIPELINE_NAME \ --location=REGION \ --INPUT_PAYLOAD_FLAG \ --destinations=OUTPUT_PAYLOAD_KEY
Haz los cambios siguientes:
PIPELINE_NAME
: el ID de la canalización o un nombre completoREGION
: una ubicación de Eventarc Advanced compatibleTambién puedes definir la propiedad de ubicación de gcloud CLI:
gcloud config set eventarc/location REGION
INPUT_PAYLOAD_FLAG
: una marca de formato de datos de entrada que puede ser una de las siguientes:--input-payload-format-avro-schema-definition
--input-payload-format-json
--input-payload-format-protobuf-schema-definition
Ten en cuenta que, si se especifica un formato de datos de entrada para una canalización, todos los eventos deben coincidir con ese formato. Los eventos que no coincidan con el formato esperado se tratarán como errores persistentes.
OUTPUT_PAYLOAD_KEY
: una clave de formato de datos de salida que puede ser una de las siguientes:output_payload_format_avro_schema_definition
output_payload_format_json
output_payload_format_protobuf_schema_definition
Tenga en cuenta que, si define una clave de formato de datos de salida, también debe especificar una marca de formato de datos de entrada.
Una canalización puede tardar un par de minutos en actualizarse.
Ejemplos:
En el siguiente ejemplo se usa una marca
--input-payload-format-protobuf-schema-definition
para especificar que la canalización debe esperar eventos en formato de datos Protobuf con un esquema específico:gcloud eventarc pipelines update my-pipeline \ --input-payload-format-protobuf-schema-definition \ ' syntax = "proto3"; message schema { string name = 1; string severity = 2; } '
En el siguiente ejemplo se usan una clave
output_payload_format_avro_schema_definition
y una marca--input-payload-format-avro-schema-definition
para crear una canalización que espera eventos en formato Avro y los genera en el mismo formato:gcloud eventarc pipelines create my-pipeline \ --location=us-central1 \ --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \ --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'
En el siguiente ejemplo se usan una clave
output_payload_format_protobuf_schema_definition
y una marca--input-payload-format-avro-schema-definition
para actualizar una canalización y convertir sus datos de eventos de Avro a Protobuf mediante definiciones de esquema:gcloud eventarc pipelines update my-pipeline \ --location=us-central1 \ --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \ --input-payload-format-avro-schema-definition= \ ' { "type": "record", "name": "MessageProto", "fields": [ { "name" : "prop1", "type": "string" }, { "name" : "prop2", "type": "string" }, ] } '