수신된 이벤트 변환

CEL을 사용하여 변환 표현식을 작성하여 이벤트 데이터를 변환할 수 있습니다. 예를 들어 대상의 특정 API 계약을 충족하도록 이벤트 페이로드를 수정할 수 있습니다.

메시지 바인딩을 지정하지 않는 한 이벤트는 항상 바이너리 콘텐츠 모드에서 HTTP 요청을 사용하는 CloudEvents 형식으로 전송됩니다.

입력 및 출력 데이터 형식 설정

CEL로 변환 표현식을 작성하는 것 외에도 원하는 경우 수신되는 이벤트 데이터의 데이터 형식을 지정할 수 있습니다. 이렇게 하면 Eventarc Advanced에서 이벤트의 페이로드를 파싱하는 방법을 알 수 있습니다. 데이터를 한 형식에서 다른 형식으로 변환할 수도 있습니다.

Avro, JSON, Protobuf 형식이 지원됩니다. 자세한 내용은 수신된 이벤트 형식 지정을 참고하세요.

변환 표현식

이벤트를 변환할 때 모든 이벤트 속성은 사전 정의된 message 객체를 통해 CEL 표현식에서 변수로 액세스할 수 있습니다. 이러한 변수는 런타임 시 이벤트 데이터에 따른 값으로 채워집니다. 예를 들면 다음과 같습니다.

  • message.id는 이벤트의 id 속성을 반환합니다.
  • message.data는 이벤트 페이로드의 CEL 값 표현을 반환합니다.
  • message.data.some-key는 이벤트 페이로드에서 some-key라는 필드의 콘텐츠를 반환합니다.

message.data의 필드는 항상 String 유형으로 표시되며 값은 입력 데이터 형식을 설정할 때 지정된 스키마를 사용하여 원본 이벤트에서 매핑됩니다.

변환 표현식은 이벤트 컨텍스트 속성과 이벤트 데이터 페이로드가 포함된 전체 이벤트를 표현해야 합니다. 표현식은 JSON으로 작성되지만 사전 정의된 CEL 함수, 매크로, 연산자, RE2를 사용하는 정규 표현식은 지원됩니다. Eventarc Advanced는 이벤트 데이터를 변환하는 데 사용할 수 있는 특정 확장 함수도 지원합니다.

다음은 CEL 표현식을 사용하여 이벤트 데이터를 변환하는 두 가지 예입니다. 자세한 사용 사례 및 예시는 변환 예시를 참고하세요.

예: 속성 값 형식 지정

다음 예에서는 정규 표현식 함수를 사용하여 phone_number 속성 값의 형식을 지정합니다. (다른 속성은 생략되었습니다.)

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

다음은 위 예에서 사용된 정규 표현식 함수입니다.

  • re.capture: 이름이 지정되지 않았거나 이름이 지정된 첫 번째 그룹 값을 캡처합니다. 인수는 다음과 같습니다.
    • target: 파싱해야 하는 문자열
    • regex: 값을 캡처하는 데 사용되는 정규 표현식입니다.

    첫 번째로 캡처된 그룹 값의 문자열을 반환합니다.

  • re.captureN: 지정된 문자열과 정규 표현식을 완전히 일치시킵니다. 인수는 다음과 같습니다.
    • target: 파싱해야 하는 문자열
    • regex: 값을 캡처하는 데 사용되는 정규 표현식입니다.

    이름이 지정된 그룹 (그룹 이름, 캡처된 문자열) 또는 이름이 지정되지 않은 그룹 (그룹 색인, 캡처된 문자열)의 키와 값 쌍이 포함된 맵을 반환합니다.

  • re.extract: 지정된 대상 문자열의 그룹 값을 일치시키고 문자열을 다시 작성합니다. 인수는 다음과 같습니다.
    • target: 파싱해야 하는 문자열
    • regex: 값을 추출하는 데 사용되는 정규 표현식
    • rewrite: 결과 형식 지정 방법에 관한 정규식

    rewrite 인수를 기반으로 형식이 지정된 추출된 값의 문자열을 반환합니다.

예시: 배열을 객체 배열에 매핑

다음 예에서는 정수 배열을 객체 배열에 매핑합니다. (다른 속성은 생략되었습니다.)

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

이벤트를 변환하는 파이프라인 구성

Google Cloud 콘솔 또는 gcloud CLI를 사용하여 이벤트 데이터를 변환하는 파이프라인을 구성할 수 있습니다.

파이프라인당 하나의 미디에이션만 지원됩니다.

콘솔

  1. Google Cloud 콘솔에서 Eventarc > 파이프라인 페이지로 이동합니다.

    파이프라인으로 이동

  2. 파이프라인을 만들거나 파이프라인을 업데이트하는 경우 파이프라인 이름을 클릭합니다.

    파이프라인을 업데이트하는 데 10분 이상 걸릴 수 있습니다.

  3. 파이프라인 세부정보 페이지에서 수정을 클릭합니다.

  4. 이벤트 미디에이션 창에서 다음을 수행합니다.

    1. 변환 적용 체크박스를 선택합니다.
    2. 인바운드 형식 목록에서 해당하는 형식을 선택합니다.

      자세한 내용은 수신된 이벤트 형식 지정을 참고하세요.

    3. CEL 표현식 필드에 JSON으로 변환 표현식을 작성합니다. 사전 정의된 CEL 함수, 매크로, 연산자와 정규 표현식이 지원됩니다. 예를 들면 다음과 같습니다.

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      위의 예에서는 다음을 실행합니다.

      • 원래 이벤트에서 id를 제외한 모든 속성을 삭제합니다.
      • datacontenttype 속성을 application/json로 설정합니다.
      • 이벤트 페이로드를 정적 JSON 문자열로 바꿉니다.
    4. 계속을 클릭합니다.

  5. 대상 창에서 다음을 수행합니다.

    1. 해당하는 경우 외부 형식 목록에서 형식을 선택합니다.

      자세한 내용은 수신된 이벤트 형식 지정을 참고하세요.

    2. 원하는 경우 메시지 결합을 적용합니다. 자세한 내용은 이 문서의 메시지 결합 정의 섹션을 참고하세요.

  6. 저장을 클릭합니다.

gcloud

  1. 터미널을 엽니다.

  2. 파이프라인을 만들거나 gcloud beta eventarc pipelines update 명령어를 사용하여 파이프라인을 업데이트할 수 있습니다.

    파이프라인을 업데이트하는 데 10분 이상 걸릴 수 있습니다.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    다음을 바꿉니다.

    • PIPELINE_NAME: 파이프라인의 ID 또는 정규화된 이름
    • REGION: 지원되는 Eventarc Advanced 위치

      또는 gcloud CLI 위치 속성을 설정할 수 있습니다.

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: JSON으로 작성된 표현식입니다. 사전 정의된 CEL 함수, 매크로, 연산자, 정규 표현식이 지원됩니다. mediations 플래그는 transformation_template 키를 적용하는 데 사용됩니다.

    예:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    위의 예에서는 다음을 실행합니다.

    • 원래 이벤트에서 id를 제외한 모든 속성을 삭제합니다.
    • datacontenttype 속성을 application/json로 설정합니다.
    • 이벤트 페이로드를 정적 JSON 문자열로 바꿉니다.

확장 함수

Eventarc Advanced는 버스를 통해 수신된 이벤트 데이터를 변환하는 데 사용할 수 있는 다음 확장 함수를 지원합니다.

함수 설명
denormalize

중복 데이터를 추가하여 지도 또는 목록을 비정규화하여 읽기 성능을 개선합니다. 결과 맵의 필드 이름은 마침표(.)를 사용하여 구분됩니다. 목록 색인은 0부터 시작하는 문자열 키로 변환됩니다.

Avro 및 Protobuf 필드 이름에는 마침표 (.)를 사용할 수 없으므로 이 함수는 JSON 데이터를 타겟팅하는 데만 사용하세요.

예를 들면 map.() -> map(string, dyn) 또는 list() -> map(string, dyn)입니다.

merge

두 필드를 조인하고 결합된 필드를 반환합니다. 이름이 중복된 필드는 병합됩니다.

예를 들면 다음과 같습니다. message.(message) -> message

removeFields

이벤트에서 특정 필드를 삭제합니다. 필드 이름은 경로로 확인됩니다. 마침표 문자 (.)가 구분 기호로 사용됩니다.

원시 JSON이 예상됩니다. JSON을 마샬하면 변환이 JSON 문자열에 적용되어 오류가 발생할 수 있습니다.

예: message.(list(string)) -> message

setField

이벤트의 필드를 지정된 키로 추가하거나 바꿉니다. 필드 이름은 경로로 확인됩니다. 마침표 문자 (.)가 구분 기호로 사용됩니다.

예: message.(string, dyn) -> message

예: 다른 데이터를 수정하지 않고 이벤트 페이로드에 속성 추가

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

예: 이벤트 페이로드에서 항목 목록 비정규화

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

예: 이벤트 페이로드에서 필드 삭제

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

메시지 결합 정의

기본적으로 이벤트는 항상 바이너리 콘텐츠 모드에서 HTTP 요청을 사용하여 CloudEvents 형식으로 대상에 전달됩니다. 메시지 바인딩을 정의하고 새 HTTP 요청을 구성하여 이 동작을 재정의할 수도 있습니다.

다른 정책 또는 컨트롤 (예: OAuth 또는 OIDC 토큰)에서 도입한 모든 HTTP 헤더는 보존되고 결합 표현식으로 인해 생성된 헤더와 병합됩니다.

Google Cloud 콘솔에서 파이프라인을 구성할 때 또는 gcloud CLI를 사용하여 메시지 바인딩을 정의할 수 있습니다.

콘솔

  1. Google Cloud 콘솔에서 Eventarc > 파이프라인 페이지로 이동합니다.

    파이프라인으로 이동

  2. 파이프라인을 만들거나 파이프라인을 업데이트하는 경우 파이프라인 이름을 클릭합니다.

    파이프라인을 업데이트하는 데 10분 이상 걸릴 수 있습니다.

  3. 파이프라인 세부정보 페이지에서 수정을 클릭합니다.

  4. 대상 창에서 JSON으로 작성된 CEL 표현식인 메시지 결합을 적용합니다. 그러면 새로 생성된 HTTP 요청이 파이프라인의 대상에 전송됩니다.

    자세한 내용은 이 문서의 수신 메시지 액세스HTTP 요청 구성 섹션을 참고하세요.

  5. 저장을 클릭합니다.

gcloud

  1. 터미널을 엽니다.

  2. 파이프라인을 만들거나 gcloud beta eventarc pipelines update 명령어를 사용하여 파이프라인을 업데이트할 수 있습니다.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    다음을 바꿉니다.

    • PIPELINE_NAME: 파이프라인의 ID 또는 정규화된 이름
    • REGION: 지원되는 Eventarc Advanced 위치

      또는 gcloud CLI 위치 속성을 설정할 수 있습니다.

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: JSON으로 작성된 CEL 표현식으로, 새로 생성된 HTTP 요청을 생성하여 파이프라인의 대상에 전송합니다.

      자세한 내용은 이 문서의 수신 메시지 액세스HTTP 요청 구성 섹션을 참고하세요.

    예:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    http_endpoint_message_binding_template 키를 사용하는 경우 http_endpoint_urinetwork_attachment 키도 설정해야 합니다.

수신 메일에 액세스

다음과 같이 CEL 표현식을 사용하여 수신 CloudEvents 메시지에 액세스할 수 있습니다.

  • message.data 값을 사용하여 수신 메시지의 data 필드에 액세스합니다.
  • message.key 값 (key는 속성 이름)을 사용하여 수신 메시지의 속성에 액세스합니다.
  • headers 변수를 사용하여 처리 체인의 이전 미디에이션에서 HTTP 요청에 추가한 헤더에 액세스합니다. 이 변수는 초기 인바운드 요청의 원래 헤더가 아닌 추가 HTTP 헤더에 해당하는 키-값 쌍 매핑을 정의합니다.

    예를 들어 다음 CEL 표현식을 사용하면 이전 파이프라인 미디에이션에서 추가된 헤더에 헤더를 추가하여 헤더 전용 HTTP 요청을 구성할 수 있습니다.

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

HTTP 요청 생성

CEL 표현식의 결과는 다음과 같이 HTTP 요청을 구성하는 데 사용되는 headersbody 필드가 있는 키-값 쌍의 맵이어야 합니다.

headers 필드:

  • CEL 표현식의 결과로 headers 맵이 있는 경우 키-값 쌍이 HTTP 요청 헤더에 직접 매핑되고 값은 해당 데이터 유형의 표준 문자열 인코딩을 사용하여 생성됩니다.
  • headers 필드가 없으면 결과 HTTP 요청에 헤더가 포함되지 않습니다.

body 필드:

  • CEL 표현식의 결과로 body 필드가 있으면 값이 HTTP 요청 본문에 직접 매핑됩니다.
  • body 필드 값이 bytes 또는 string 유형인 경우 그대로 HTTP 요청 본문으로 사용됩니다. 그렇지 않으면 JSON 문자열로 변환됩니다.
  • body 필드가 없으면 결과 HTTP 요청 본문이 바이너리 콘텐츠 모드의 최종 CloudEvents HTTP 메시지 바인딩의 본문이 됩니다.

CEL 표현식의 결과로 생성된 다른 모든 필드는 무시됩니다.

확장 함수

Eventarc Advanced는 메시지 바인딩을 지정할 때 이벤트 데이터를 변환하는 데 사용할 수 있는 다음 확장 함수를 지원합니다.

함수 설명
merge

전달된 CEL 맵을 함수가 적용된 CEL 맵에 병합합니다. 두 맵에 동일한 키가 있거나 키의 값이 map 유형인 경우 두 맵이 병합됩니다. 그 외의 경우에는 전달된 맵의 값이 사용됩니다.

예: map1.merge(map2) -> map3

toBase64

CEL 값을 base64 URL로 인코딩된 문자열로 변환합니다.

예: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

메시지를 CloudEvents 메시지의 JSON 표현에 해당하는 CEL 맵으로 변환하고 메시지 데이터에 toDestinationPayloadFormat를 적용합니다. 또한 이벤트의 datacontenttype를 지정된 아웃바운드 형식(output_payload_format_*)으로 설정합니다. 아웃바운드 형식이 설정되지 않은 경우 기존 datacontenttype가 사용되며, 그렇지 않으면 datacontenttype가 설정되지 않습니다. 메시지가 CloudEvents 사양을 준수하지 않으면 함수가 실패합니다. 데이터를 JSON 문자열로 변환하려면 toJsonString를 사용하면 됩니다.

예: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

message.data를 지정된 아웃바운드 형식(output_payload_format_*)으로 변환합니다. 아웃바운드 형식이 설정되지 않은 경우 message.data가 변경되지 않고 반환됩니다.

예: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

CEL 값을 JSON 문자열로 변환합니다.

예를 들면 다음과 같습니다. map.toJsonString() -> string

toMap

CEL 맵의 CEL 목록을 단일 CEL 맵으로 변환합니다.

예: list(map).toMap() -> map

예: 헤더 유지, 새 헤더 추가, 본문 대상 형식으로 설정

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

다음 단계