受信したイベントをフォーマットする

パイプラインは、バスをターゲット デスティネーションに接続し、イベント メッセージをそのデスティネーションに転送します。特定の形式のイベントデータを想定するようにパイプラインを構成することも、イベントを宛先に配信する前に、イベントデータをサポートされている形式から別の形式に変換することもできます。たとえば、Avro データのみを受け入れることができるエンドポイントにイベントを転送する必要がある場合があります。

サポートされているファイル形式

次の形式変換がサポートされています。

  • Avro から JSON
  • Avro から Protobuf
  • JSON to Avro
  • JSON to Protobuf
  • Protobuf から Avro
  • Protobuf to JSON

次の点にご注意ください。

  • イベントの形式を変換する場合、イベント メッセージ全体ではなく、イベント ペイロードのみが変換されます。

  • パイプラインにインバウンド データ形式が指定されている場合、すべてのイベントがその形式と一致している必要があります。想定される形式と一致しないイベントは、永続的なエラーとして扱われます。

  • パイプラインにインバウンド データ形式が指定されていない場合、アウトバウンド形式は設定できません。

  • イベント形式が特定の宛先用に変換される前に、構成されているデータ変換が適用されます。

  • メッセージ バインディングを指定しない限り、イベントは常に バイナリ コンテンツ モードで HTTP リクエストを使用して CloudEvents 形式で配信されます。

  • JSON スキーマは動的に検出されます。Protobuf スキーマ定義では、定義できる最上位の型は 1 つだけです。他の型を参照するインポート ステートメントはサポートされていません。syntax 識別子のないスキーマ定義は、デフォルトで proto2 になります。スキーマのサイズには上限があります。

イベントをフォーマットするパイプラインを構成する

特定の形式のイベントデータを想定するようにパイプラインを構成したり、イベントデータをある形式から別の形式に変換したりするには、Google Cloud コンソールまたは gcloud CLI を使用します。

Console

  1. Google Cloud コンソールで、[Eventarc] > [パイプライン] ページに移動します。

    [Pipelines] に移動

  2. パイプラインを作成するか、パイプラインを更新する場合はパイプライン名をクリックします。

    パイプラインの更新には 10 分以上かかる場合があります。

  3. [パイプラインの詳細] ページで、 [編集] をクリックします。

  4. [イベント メディエーション] ペインで、次の操作を行います。

    1. [変換を適用] チェックボックスをオンにします。
    2. [インバウンド形式] リストで、該当する形式を選択します。

      パイプラインにインバウンド データ形式が指定されている場合、すべてのイベントがその形式と一致している必要があります。想定される形式と一致しないイベントは、永続的なエラーとして扱われます。

    3. Avro 形式または Protobuf 形式の場合は、受信スキーマを指定する必要があります。(必要に応じて、直接指定する代わりに、インバウンド スキーマをアップロードすることもできます)。

    4. [CEL 式] フィールドに、CEL を使用して変換式を記述します。

    5. [続行] をクリックします。

  5. [宛先] ペインで、次の操作を行います。

    1. 必要に応じて、[アウトバウンド形式] リストで形式を選択します。

      パイプラインにインバウンド データ形式が指定されていない場合、アウトバウンド形式を設定することはできません。

    2. 省略可: メッセージ バインディングを適用します。詳細については、メッセージ バインディングをご覧ください。

  6. [保存] をクリックします。

gcloud

  1. ターミナルを開きます。

  2. パイプラインを作成するか、gcloud beta eventarc pipelines update コマンドを使用してパイプラインを更新できます。

    パイプラインの更新には 10 分以上かかる場合があります。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --INPUT_PAYLOAD_FLAG \
        --destinations=OUTPUT_PAYLOAD_KEY

    次のように置き換えます。

    • PIPELINE_NAME: パイプラインの ID または完全修飾名
    • REGION: サポートされている Eventarc Advanced のロケーション

      または、gcloud CLI の location プロパティを設定することもできます。

      gcloud config set eventarc/location REGION
      
    • INPUT_PAYLOAD_FLAG: 入力データ形式フラグ。次のいずれかになります。

      • --input-payload-format-avro-schema-definition
      • --input-payload-format-json
      • --input-payload-format-protobuf-schema-definition

      パイプラインに入力データ形式が指定されている場合、すべてのイベントがその形式と一致している必要があります。想定される形式と一致しないイベントは、永続的なエラーとして扱われます。

    • OUTPUT_PAYLOAD_KEY: 出力データ形式キー。次のいずれかになります。

      • output_payload_format_avro_schema_definition
      • output_payload_format_json
      • output_payload_format_protobuf_schema_definition

      出力データ形式キーを設定する場合は、入力データ形式フラグも指定する必要があります。

    例:

    次の例では、--input-payload-format-protobuf-schema-definition フラグを使用して、パイプラインで特定のスキーマを持つ Protobuf データ形式のイベントを想定するように指定します。

    gcloud beta eventarc pipelines update my-pipeline \
        --input-payload-format-protobuf-schema-definition \
    '
      syntax = "proto3";
      message schema {
        string name = 1;
        string severity = 2;
      }
    '

    次の例では、output_payload_format_avro_schema_definition キーと --input-payload-format-avro-schema-definition フラグを使用して、Avro 形式のイベントを想定し、同じ形式で出力するパイプラインを作成します。

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \
        --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'

    次の例では、output_payload_format_protobuf_schema_definition キーと --input-payload-format-avro-schema-definition フラグを使用してパイプラインを更新し、スキーマ定義を使用してイベントデータを Avro から Protobuf に変換します。

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \
        --input-payload-format-avro-schema-definition= \
        '
        {
          "type": "record",
          "name": "MessageProto",
          "fields": [
            { "name" : "prop1", "type": "string" },
            { "name" : "prop2", "type": "string" },
          ]
        }
        '