Dataflow 템플릿에 대한 사용자 정의 함수 만들기

일부 Google 제공 Dataflow 템플릿에서는 사용자 정의 함수(UDF)를 지원합니다. UDF를 사용하면 템플릿 코드를 수정하지 않고 템플릿 기능을 확장할 수 있습니다.

개요

UDF를 만들려면 템플릿에 따라 JavaScript 함수 또는 Python 함수를 작성합니다. UDF 코드 파일을 Cloud Storage에 저장하고 위치를 템플릿 매개변수로 지정합니다. 각 입력 요소에 대해 템플릿이 함수를 호출합니다. 이 함수는 요소를 변환하거나 다른 커스텀 로직을 수행하고 결과를 다시 템플릿에 반환합니다.

예를 들어 UDF를 사용하여 다음 작업을 수행할 수 있습니다.

  • 타겟 스키마와 일치하도록 입력 데이터 형식을 다시 지정합니다.
  • 민감한 정보를 수정합니다.
  • 출력에서 일부 요소를 필터링합니다.

UDF 함수의 입력은 JSON 문자열로 직렬화된 단일 데이터 요소입니다. 이 함수는 직렬화된 JSON 문자열을 출력으로 반환합니다. 데이터 형식은 템플릿에 따라 다릅니다. 예를 들어 Pub/Sub Subscription to BigQuery 템플릿에서 입력은 JSON 객체로 직렬화된 Pub/Sub 메시지 데이터이고 출력은 BigQuery 테이블 행을 나타내는 직렬화된 JSON 객체입니다. 자세한 내용은 각 템플릿의 문서를 참조하세요.

UDF를 사용하여 템플릿 실행

UDF로 템플릿을 실행하려면 템플릿 매개변수로 JavaScript 파일의 Cloud Storage 위치와 함수 이름을 지정합니다.

일부 Google 제공 템플릿을 사용하면 다음과 같이 Google Cloud 콘솔에서 직접 UDF를 만들 수 있습니다.

  1. Google Cloud 콘솔의 Dataflow 페이지로 이동합니다.

    Dataflow 페이지로 이동

  2. 템플릿에서 작업 만들기를 클릭합니다.

  3. 실행하려는 Google 제공 템플릿을 선택합니다.

  4. 선택적 매개변수를 펼칩니다. 템플릿이 UDF를 지원하는 경우 UDF의 Cloud Storage 위치에 대한 매개변수와 함수 이름의 다른 매개변수가 포함됩니다.

  5. 템플릿 매개변수 옆에 있는 UDF 만들기를 클릭합니다.

  6. 사용자 정의 함수(UDF) 선택 또는 만들기 패널에서 다음 작업을 수행합니다.

    1. 파일 이름을 입력합니다. 예: my_udf.js
    2. Cloud Storage 폴더를 선택합니다. 예: gs://your-bucket/your-folder
    3. 인라인 코드 편집기를 사용하여 함수를 작성합니다. 편집기에는 출발점으로 사용할 수 있는 상용구 코드가 자동 입력됩니다.
    4. UDF 만들기를 클릭합니다.

      Google Cloud 콘솔이 UDF 파일을 저장하고 Cloud Storage 위치를 자동 입력합니다.

    5. 해당 필드에 함수 이름을 입력합니다.

JavaScript UDF 작성

다음 코드는 시작할 수 있는 노옵스(no-ops) JavaScript UDF를 보여줍니다.

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

JavaScript 코드는 Nashorn JavaScript 엔진에서 실행됩니다. 배포하기 전에 Nashorn 엔진에서 UDF를 테스트하는 것이 좋습니다. Nashorn 엔진은 JavaScript의 Node.js 구현과 정확하게 일치하지 않습니다. 흔히 범하는 실수는 console.log() 또는 Number.isNaN()를 사용하는 것입니다. 이 둘은 모두 Nashorn 엔진에 정의되어 있지 않습니다.

JDK 11이 사전 설치된 Cloud Shell을 사용하여 Nashorn 엔진에서 UDF를 테스트할 수 있습니다. 다음과 같이 대화형 모드에서 Nashorn을 실행합니다.

jjs --language=es6

Nashorn 대화형 셸에서 다음 단계를 수행합니다.

  1. load를 호출하여 UDF JavaScript 파일을 로드합니다.
  2. 파이프라인의 예상 메시지에 따라 입력 JSON 객체를 정의합니다.
  3. JSON.stringify 함수를 사용하여 입력을 JSON 문자열로 직렬화합니다.
  4. UDF 함수를 호출하여 JSON 문자열을 처리합니다.
  5. JSON.parse를 호출하여 출력을 역직렬화합니다.
  6. 결과를 확인합니다.

예:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Python UDF 작성

다음 코드는 시작할 수 있는 노옵스(no-ops) Python UDF를 보여줍니다.

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Python UDF는 Python 및 Apache Beam의 표준 종속 항목 패키지를 지원합니다. 타사 패키지는 사용할 수 없습니다.

오류 처리

일반적으로 UDF 실행 중에 오류가 발생하면 오류가 데드 레터 위치에 기록됩니다. 세부정보는 템플릿에 따라 다릅니다. 예를 들어 Pub/Sub Subscription to BigQuery 템플릿은 _error_records 테이블을 만들고 여기에 오류를 기록합니다. 런타임 UDF 오류는 구문 오류 또는 발견되지 않은 예외로 인해 발생할 수 있습니다. 구문 오류를 확인하려면 로컬에서 UDF를 테스트합니다.

처리되지 않아야 하는 요소에 대해 프로그래매틱 방식으로 예외를 발생시킬 수 있습니다. 이 경우 템플릿에서 데드 레터를 지원한다면 데드 레터 위치에 요소가 기록됩니다. 이 접근 방식을 보여주는 예시는 경로 이벤트를 참조하세요.

사용 사례 예시

이 섹션에서는 실제 사용 사례에 따라 일반적인 UDF 패턴 몇 가지를 설명합니다.

이벤트 보강

UDF를 사용하여 추가 컨텍스트 정보를 담은 새 필드로 이벤트를 보강합니다.

예:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

변환 이벤트

UDF를 사용하여 대상의 예상에 따라 전체 이벤트 형식을 변환합니다.

다음 예시에서는 가능한 경우 Cloud Logging 로그 항목(LogEntry)을 원래 로그 문자열로 되돌립니다. 로그 소스에 따라 원본 로그 문자열이 textPayload 필드에 채워지는 경우가 있습니다. Cloud Logging에서 전체 LogEntry를 보내는 대신 이 패턴을 사용하면 원시 로그를 원래 형식으로 보낼 수 있습니다.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

이벤트 데이터 수정 또는 삭제

UDF를 사용하여 이벤트의 일부를 수정하거나 삭제합니다.

다음 예시에서는 필드 이름 sensitiveField를 수정하여 값을 교체하고 redundantField라는 필드를 완전히 삭제합니다.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

경로 이벤트

UDF를 사용하여 다운스트림 싱크에서 개별 대상으로 이벤트를 라우팅합니다.

다음 예시에서는 Pub/Sub to Splunk 템플릿을 기반으로 각 이벤트를 올바른 Splunk 색인으로 라우팅합니다. 사용자 정의 로컬 함수를 호출하여 이벤트를 색인에 매핑합니다.

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

다음 예시에서는 템플릿이 데드 레터 큐를 지원한다고 가정하고 인식되지 않은 이벤트를 데드 레터 큐로 라우팅합니다. (예시는 Pub/Sub to JDBC 템플릿을 참조하세요.) 이 패턴을 사용하면 대상에 쓰기 전에 예기치 못한 항목을 필터링할 수 있습니다.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

이벤트 필터링

UDF를 사용하여 출력에서 원치 않거나 인식되지 않는 이벤트를 필터링합니다.

다음 예시는 data.severity"DEBUG"인 이벤트를 삭제합니다.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

다음 단계