Pub/Sub to Elasticsearch 템플릿은 Pub/Sub 구독에서 메시지를 읽고 사용자 정의 함수(UDF)를 실행하고 Elasticsearch에 문서로 쓰는 스트리밍 파이프라인입니다. Dataflow 템플릿은 Elasticsearch의 데이터 스트림 기능을 사용하여 여러 색인에 시계열 데이터를 저장하면서 요청에 대한 이름이 지정된 단일 리소스를 제공합니다. 데이터 스트림은 로그, 측정항목, trace, Pub/Sub에 저장된 기타 지속적으로 생성된 데이터에 적합합니다.
템플릿은 logs-gcp.DATASET-NAMESPACE
라는 데이터 스트림을 만듭니다. 여기서
- DATASET는
dataset
템플릿 매개변수의 값 또는 지정되지 않은 경우pubsub
입니다. - NAMESPACE는
namespace
템플릿 매개변수의 값 또는 지정되지 않은 경우default
입니다.
파이프라인 요구사항
- 소스 Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
- Google Cloud 인스턴스 또는 Elasticsearch 버전 7.0 이상을 사용하는 Elastic Cloud에 공개적으로 연결 가능한 Elasticsearch 호스트가 있어야 합니다. 자세한 내용은 Google Cloud의 Elastic 통합을 참조하세요.
- 오류 출력을 위한 Pub/Sub 주제가 있어야 합니다.
템플릿 매개변수
필수 매개변수
- inputSubscription: 입력을 사용할 Pub/Sub 구독입니다. 예를 들면
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
입니다. - errorOutputTopic: 실패한 레코드를 게시하는 Pub/Sub 출력 주제이며
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
형식입니다. - connectionUrl:
https://hostname:[port]
형식의 Elasticsearch URL입니다. Elastic Cloud를 사용하는 경우 CloudID를 지정합니다. 예를 들면https://elasticsearch-host:9200
입니다. - apiKey: 인증에 사용할 Base64로 인코딩된 API 키입니다.
선택적 매개변수
- 데이터 세트: Pub/Sub을 사용하여 전송된 로그 유형으로, 즉시 사용 가능한 대시보드가 있습니다. 알려진 로그 유형 값은
audit
,vpcflow
,firewall
입니다. 기본값은pubsub
입니다. - 네임스페이스: 환경 (dev, prod, qa), 팀 또는 전략적 사업부와 같은 임의의 그룹화입니다. 기본값은
default
입니다. - elasticsearchTemplateVersion: 일반적으로 Google Cloud에서 정의하는 Dataflow 템플릿 버전 식별자입니다. 기본값은 1.0.0입니다.
- javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면
gs://my-bucket/my-udfs/my_file.js
입니다. - javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가
myTransform(inJson) { /*...do stuff...*/ }
이면 함수 이름은myTransform
입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요. - javascriptTextTransformReloadIntervalMinutes: UDF를 새로고침할 빈도(분)를 지정합니다. 값이 0보다 크면 Dataflow가 Cloud Storage에서 UDF 파일을 주기적으로 검사하고 파일이 수정된 경우 UDF를 새로고침합니다. 이 매개변수를 사용하면 파이프라인이 실행 중일 때 작업을 다시 시작할 필요 없이 UDF를 업데이트할 수 있습니다. 값이
0
이면 UDF 새로고침이 사용 중지됩니다. 기본값은0
입니다. - elasticsearchUsername: 인증할 Elasticsearch 사용자 이름입니다. 지정하면
apiKey
값이 무시됩니다. - elasticsearchPassword: 인증할 Elasticsearch 비밀번호입니다. 지정하면
apiKey
값이 무시됩니다. - batchSize: 문서 수의 배치 크기입니다. 기본값은
1000
입니다. - batchSizeBytes: 바이트 수의 배치 크기입니다. 기본값은
5242880
(5MB)입니다. - maxRetryAttempts: 최대 재시도 횟수입니다. 0보다 커야 합니다. 기본값은
no retries
입니다. - maxRetryDuration: 최대 재시도 시간(밀리초)입니다. 0보다 커야 합니다. 기본값은
no retries
입니다. - propertyAsIndex: 색인이 생성되는 문서의 속성으로, 값이 일괄 요청에서 문서에 포함할
_index
메타데이터를 지정합니다._index
UDF보다 우선 적용됩니다. 기본값은none
입니다. - javaScriptIndexFnGcsPath: 일괄 요청에서 문서에 포함할
_index
메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은none
입니다. - javaScriptIndexFnName: 일괄 요청에서 문서에 포함할
_index
메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은none
입니다. - propertyAsId: 색인이 생성되는 문서의 속성으로, 값이 일괄 요청에서 문서에 포함할
_id
메타데이터를 지정합니다._id
UDF보다 우선 적용됩니다. 기본값은none
입니다. - javaScriptIdFnGcsPath: 일괄 요청에서 문서에 포함할
_id
메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은none
입니다. - javaScriptIdFnName: 일괄 요청에서 문서에 포함할
_id
메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은none
입니다. - javaScriptTypeFnGcsPath: 일괄 요청에서 문서에 포함할
_type
메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은none
입니다. - javaScriptTypeFnName: 일괄 요청에서 문서에 포함할
_type
메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은none
입니다. - javaScriptIsDeleteFnGcsPath: 문서를 삽입하거나 업데이트하는 대신 삭제할지 여부를 결정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 이 함수는 문자열 값
true
또는false
를 반환합니다. 기본값은none
입니다. - javaScriptIsDeleteFnName: 문서를 삽입 또는 업데이트하는 대신 삭제할지 여부를 결정하는 UDF JavaScript 함수의 이름입니다. 이 함수는 문자열 값
true
또는false
를 반환합니다. 기본값은none
입니다. - usePartialUpdate: Elasticsearch 요청에서 부분 업데이트 (만들기 또는 색인 생성 대신 업데이트, 부분 문서 허용)를 사용할지 여부입니다. 기본값은
false
입니다. - bulkInsertMethod: Elasticsearch 일괄 요청에서
INDEX
(색인, 업데이트 허용) 또는CREATE
(만들기, 중복 _id 오류)를 사용할지 여부입니다. 기본값은CREATE
입니다. - trustSelfSignedCerts: 자체 서명 인증서를 신뢰할지 여부입니다. 설치된 Elasticsearch 인스턴스에 자체 서명 인증서가 있을 수 있습니다. SSL 인증서 유효성 검사를 우회하려면 이를 true로 사용 설정합니다. 기본값은
false
입니다. - disableCertificateValidation:
true
인 경우 자체 서명 SSL 인증서를 신뢰합니다. Elasticsearch 인스턴스에는 자체 서명 인증서가 있을 수 있습니다. 인증서 유효성 검사를 우회하려면 이 매개변수를true
로 설정합니다. 기본값은false
입니다. - apiKeyKMSEncryptionKey: API 키를 복호화하는 Cloud KMS 키입니다.
apiKeySource
가KMS
로 설정된 경우 이 매개변수는 필수입니다. 이 매개변수가 제공되면 암호화된apiKey
문자열을 전달합니다. KMS API 암호화 엔드포인트를 사용하여 매개변수를 암호화합니다. 키에는projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
형식을 사용합니다. https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt를 참고하세요. 예를 들어projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
를 사용하세요. - apiKeySecretId: apiKey의 Secret Manager 보안 비밀 ID입니다.
apiKeySource
가SECRET_MANAGER
로 설정된 경우 이 매개변수를 제공합니다.projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version` 형식을 사용합니다. - apiKeySource: API 키의 소스입니다. 허용되는 값은
PLAINTEXT
,KMS
,SECRET_MANAGER
입니다. 이 매개변수는 Secret Manager 또는 KMS를 사용할 때 필요합니다.apiKeySource
가KMS
로 설정된 경우apiKeyKMSEncryptionKey
및 암호화된 apiKey를 제공해야 합니다.apiKeySource
가SECRET_MANAGER
로 설정된 경우apiKeySecretId
를 제공해야 합니다.apiKeySource
가PLAINTEXT
로 설정된 경우apiKey
를 제공해야 합니다. 기본값은 PLAINTEXT입니다. - socketTimeout: 설정하면 Elastic RestClient의 기본 최대 재시도 제한 시간 및 기본 소켓 제한 시간 (30000ms)을 덮어씁니다.
사용자 정의 함수
이 템플릿은 아래 설명된 파이프라인의 여러 포인트에서 사용자 정의 함수(UDF)를 지원합니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.
텍스트 변환 함수
Pub/Sub 메시지를 Elasticsearch 문서로 변환합니다.
템플릿 매개변수:
javascriptTextTransformGcsPath
: 자바스크립트 파일의 Cloud Storage URI입니다.javascriptTextTransformFunctionName
: 자바스크립트 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Pub/Sub 메시지 데이터 필드입니다.
- 출력: Elasticsearch에 삽입할 문자열화된 JSON 문서입니다.
색인 함수
문서가 속한 색인을 반환합니다.
템플릿 매개변수:
javaScriptIndexFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIndexFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_index
메타데이터 필드 값입니다.
문서 ID 함수
문서 ID를 반환합니다.
템플릿 매개변수:
javaScriptIdFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIdFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_id
메타데이터 필드 값입니다.
문서 삭제 함수
문서 삭제 여부를 지정합니다. 이 함수를 사용하려면 대량 삽입 모드를 INDEX
로 설정하고 문서 ID 함수를 제공합니다.
템플릿 매개변수:
javaScriptIsDeleteFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIsDeleteFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서를 삭제하려면 문자열을
"true"
로 반환하고 문서를 삽입/업데이트하려면"false"
로 반환합니다.
매핑 유형 함수
문서의 매핑 유형을 반환합니다.
템플릿 매개변수:
javaScriptTypeFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptTypeFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_type
메타데이터 필드 값입니다.
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Elasticsearch template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름REGION_NAME
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
ERROR_OUTPUT_TOPIC
: 오류 출력을 위한 Pub/Sub 주제SUBSCRIPTION_NAME
: Pub/Sub 구독 이름CONNECTION_URL
: Elasticsearch URLDATASET
: 로그 유형NAMESPACE
: 데이터 세트의 네임스페이스APIKEY
: 인증을 위한 base64로 인코딩된 API 키
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex", } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름LOCATION
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
ERROR_OUTPUT_TOPIC
: 오류 출력을 위한 Pub/Sub 주제SUBSCRIPTION_NAME
: Pub/Sub 구독 이름CONNECTION_URL
: Elasticsearch URLDATASET
: 로그 유형NAMESPACE
: 데이터 세트의 네임스페이스APIKEY
: 인증을 위한 base64로 인코딩된 API 키
다음 단계
- Dataflow 템플릿 알아보기
- Google 제공 템플릿 목록 참조