流水线可将总线连接到目标目的地,并将事件消息路由到该目的地。您可以将流水线配置为以特定格式预期事件数据,也可以在将事件传送到目标之前,将事件数据从一种受支持的格式转换为另一种受支持的格式。例如,您可能需要将事件路由到仅接受 Avro 数据的端点。
支持的格式
支持以下格式转换:
- Avro 到 JSON
- Avro 到 Protobuf
- JSON 到 Avro
- JSON 到 Protobuf
- Protobuf 到 Avro
- Protobuf 转换为 JSON
请注意以下几点:
转换事件格式时,仅会转换事件载荷,而不会转换整个事件消息。
如果为流水线指定了入站数据格式,则所有事件都必须与该格式匹配。任何与预期格式不匹配的事件都将被视为持续性错误。
如果未为流水线指定入站数据格式,则无法设置出站格式。
在为特定目标转换事件格式之前,系统会先应用配置的所有数据转换。
除非您指定了消息绑定,否则事件始终以 二进制内容模式的 HTTP 请求的 CloudEvents 格式传送。
JSON 架构是动态检测的。对于 Protobuf 架构定义,您只能定义一个顶级类型,并且不支持引用其他类型的导入语句。没有
syntax
标识符的架构定义默认为proto2
。请注意,架构大小存在限制。
配置流水线以设置事件格式
您可以在 Google Cloud 控制台中或使用 gcloud CLI 配置数据流,以便其预期事件数据采用特定格式,或将事件数据从一种格式转换为另一种格式。
打开终端。
您可以创建流水线,也可以使用
gcloud beta eventarc pipelines update
命令更新流水线:请注意,更新流水线可能需要 10 分钟以上。
gcloud beta eventarc pipelines update
PIPELINE_NAME \ --location=REGION \ --INPUT_PAYLOAD_FLAG \ --destinations=OUTPUT_PAYLOAD_KEY 替换以下内容:
PIPELINE_NAME
:流水线的 ID 或完全限定名称REGION
:受支持的 Eventarc Advanced 位置或者,您也可以设置 gcloud CLI 位置属性:
gcloud config set eventarc/location
REGION INPUT_PAYLOAD_FLAG
:输入数据格式标志,可以是以下各项之一:--input-payload-format-avro-schema-definition
--input-payload-format-json
--input-payload-format-protobuf-schema-definition
请注意,如果为流水线指定了输入数据格式,则所有事件都必须采用该格式。所有与预期格式不匹配的事件都将被视为持续性错误。
OUTPUT_PAYLOAD_KEY
:输出数据格式键,可以是以下任一项:output_payload_format_avro_schema_definition
output_payload_format_json
output_payload_format_protobuf_schema_definition
请注意,如果您设置输出数据格式键,则还必须指定输入数据格式标志。
示例:
以下示例使用
--input-payload-format-protobuf-schema-definition
标志指定流水线应预期采用特定架构的 Protobuf 数据格式的事件:gcloud beta eventarc pipelines update my-pipeline \ --input-payload-format-protobuf-schema-definition \ ' syntax = "proto3"; message schema { string name = 1; string severity = 2; } '
以下示例使用
output_payload_format_avro_schema_definition
键和--input-payload-format-avro-schema-definition
标志创建了一个流水线,该流水线预期事件采用 Avro 格式,并以相同的格式输出事件:gcloud beta eventarc pipelines create my-pipeline \ --location=us-central1 \ --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \ --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'
以下示例使用
output_payload_format_protobuf_schema_definition
键和--input-payload-format-avro-schema-definition
标志更新数据流,并使用架构定义将其事件数据从 Avro 转换为 Protobuf:gcloud beta eventarc pipelines update my-pipeline \ --location=us-central1 \ --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \ --input-payload-format-avro-schema-definition= \ ' { "type": "record", "name": "MessageProto", "fields": [ { "name" : "prop1", "type": "string" }, { "name" : "prop2", "type": "string" }, ] } '