Pub/Sub to Elasticsearch 템플릿

Pub/Sub to Elasticsearch 템플릿은 Pub/Sub 구독에서 메시지를 읽고 사용자 정의 함수(UDF)를 실행하고 Elasticsearch에 문서로 쓰는 스트리밍 파이프라인입니다. Dataflow 템플릿은 Elasticsearch의 데이터 스트림 기능을 사용하여 여러 색인에 시계열 데이터를 저장하면서 요청에 대한 이름이 지정된 단일 리소스를 제공합니다. 데이터 스트림은 로그, 측정항목, trace, Pub/Sub에 저장된 기타 지속적으로 생성된 데이터에 적합합니다.

파이프라인 요구사항

  • 소스 Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • GCP 인스턴스 또는 Elasticsearch 버전 7.0 이상을 사용하는 Elastic Cloud에 공개적으로 연결 가능한 Elasticsearch 호스트가 있어야 합니다. 자세한 내용은 Google Cloud의 Elastic 통합을 참조하세요.
  • 오류 출력을 위한 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 사용할 Pub/Sub 구독입니다. 이름은 projects/<project-id>/subscriptions/<subscription-name> 형식이어야 합니다.
connectionUrl Elasticsearch URL(https://hostname:[port] 형식) 또는 Elastic Cloud를 사용하는 경우 CloudID를 지정합니다.
apiKey 인증에 사용되는 Base64로 인코딩된 API 키입니다.
errorOutputTopic 실패한 레코드를 projects/<project-id>/topics/<topic-name> 형식으로 게시하는 Pub/Sub 출력 주제입니다.
dataset (선택사항) 즉시 사용 가능한 대시보드가 있는 Pub/Sub를 통해 전송된 로그 유형입니다. 알려진 로그 유형 값은 audit, vpcflow, firewall입니다. 기본값: pubsub
namespace (선택사항) 환경(dev, prod, qa), 팀 또는 전략적 사업부와 같은 임의의 그룹화입니다. 기본값: default
batchSize (선택사항) 문서 수의 배치 크기입니다. 기본값: 1000.
batchSizeBytes (선택사항) 바이트 수의 배치 크기입니다. 기본값은 5242880(5MB)입니다.
maxRetryAttempts (선택사항) 최대 재시도 횟수이며 0 이상이어야 합니다. 기본값: no retries.
maxRetryDuration (선택사항) 밀리초 단위의 최대 재시도 시간이며 0 이상이어야 합니다. 기본값: no retries.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
propertyAsIndex (선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정하는 값이 있는 문서의 속성입니다(_index UDF보다 우선 적용됨). 기본값: none.
propertyAsId (선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정하는 값이 있는 문서의 속성입니다(_id UDF보다 우선 적용됨). 기본값: none.
javaScriptIndexFnGcsPath (선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정할 자바스크립트 UDF 소스의 Cloud Storage 경로입니다. 기본값: none.
javaScriptIndexFnName (선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정할 함수의 UDF 자바스크립트 함수 이름입니다. 기본값: none.
javaScriptIdFnGcsPath (선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정할 자바스크립트 UDF 소스의 Cloud Storage 경로입니다. 기본값: none.
javaScriptIdFnName (선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정할 함수의 UDF 자바스크립트 함수 이름입니다. 기본값: none.
javaScriptTypeFnGcsPath (선택사항) 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정할 자바스크립트 UDF 소스의 Cloud Storage 경로입니다. 기본값: none.
javaScriptTypeFnName (선택사항) 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정할 함수의 UDF 자바스크립트 함수 이름입니다. 기본값: none.
javaScriptIsDeleteFnGcsPath (선택사항) 문서를 삽입하거나 업데이트하는 대신 삭제해야 하는지 여부를 결정하는 함수의 자바스크립트 UDF 소스에 대한 Cloud Storage 경로입니다. 이 함수는 문자열 값 "true" 또는 "false"를 반환해야 합니다. 기본값: none.
javaScriptIsDeleteFnName (선택사항) 문서를 삽입하거나 업데이트하지 않고 삭제할지 여부를 결정하는 함수의 UDF 자바스크립트 함수 이름입니다. 이 함수는 문자열 값 "true" 또는 "false"를 반환해야 합니다. 기본값: none.
usePartialUpdate (선택사항) Elasticsearch 요청에서 부분 업데이트 (만들기 또는 색인 대신 업데이트, 부분 문서 허용)를 사용할지 여부입니다. 기본값: false.
bulkInsertMethod (선택사항) Elasticsearch 일괄 요청에서 INDEX(색인, 업데이트 허용) 또는 CREATE(만들기, 중복 _id 오류)를 사용할지 여부입니다. 기본값: CREATE

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Elasticsearch template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • ERROR_OUTPUT_TOPIC: 오류 출력을 위한 Pub/Sub 주제
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • CONNECTION_URL: Elasticsearch URL
  • DATASET: 로그 유형
  • NAMESPACE: 데이터 세트의 네임스페이스
  • APIKEY: 인증을 위한 base64로 인코딩된 API 키

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • ERROR_OUTPUT_TOPIC: 오류 출력을 위한 Pub/Sub 주제
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • CONNECTION_URL: Elasticsearch URL
  • DATASET: 로그 유형
  • NAMESPACE: 데이터 세트의 네임스페이스
  • APIKEY: 인증을 위한 base64로 인코딩된 API 키