Cloud Storage to Elasticsearch 템플릿은 Cloud Storage 버킷에 저장된 CSV 파일에서 데이터를 읽고 Elasticsearch에 데이터를 JSON 문서로 쓰는 일괄 파이프라인입니다.
파이프라인 요구사항
- Cloud Storage 버킷이 있어야 합니다.
- Dataflow에서 액세스할 수 있는 Google Cloud 인스턴스 또는 Elasticsearch Cloud의 Elasticsearch 호스트가 있어야 합니다.
- 오류 출력용 BigQuery 테이블이 있어야 합니다.
CSV 스키마
CSV 파일에 헤더가 포함된 경우 containsHeaders
템플릿 매개변수를 true
로 설정합니다.
그렇지 않으면 데이터를 기술하는 JSON 스키마 파일을 만듭니다. jsonSchemaPath
템플릿 매개변수에서 스키마 파일의 Cloud Storage URI를 지정합니다. 다음 예시는 JSON 스키마를 보여줍니다.
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
또는 CSV 텍스트를 파싱하고 Elasticsearch 문서를 출력하는 사용자 정의 함수(UDF)를 제공할 수 있습니다.
템플릿 매개변수
매개변수 | 설명 |
---|---|
inputFileSpec |
CSV 파일을 검색하는 Cloud Storage 파일 패턴입니다. 예: gs://mybucket/test-*.csv |
connectionUrl |
Elasticsearch URL(https://hostname:[port] 형식) 또는 Elastic Cloud를 사용하는 경우 CloudID를 지정합니다. |
apiKey |
인증에 사용되는 Base64로 인코딩된 API 키입니다. |
index |
요청을 실행할 대상이 되는 Elasticsearch 색인입니다(예: my-index ). |
deadletterTable |
실패한 삽입을 전송할 BigQuery Deadletter 테이블입니다. 예: <your-project>:<your-dataset>.<your-table-name> |
containsHeaders |
(선택사항) 헤더가 CSV에 포함되어 있는지 여부를 나타내는 불리언입니다. 기본값: false |
delimiter |
(선택사항) CSV에서 사용되는 구분 기호입니다. 예: , |
csvFormat |
(선택사항) Apache Commons CSV 형식에 따른 CSV 형식입니다. 기본값: Default |
jsonSchemaPath |
(선택사항) JSON 스키마의 경로입니다. 기본값: null |
largeNumFiles |
(선택사항) 파일 수가 수만 개인 경우 true로 설정합니다. 기본값: false |
javascriptTextTransformGcsPath |
(선택사항)
사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js 입니다.
|
javascriptTextTransformFunctionName |
(선택사항)
사용할 JavaScript 사용자 정의 함수(UDF)의 이름입니다.
예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ } 이면 함수 이름은 myTransform 입니다. 샘플 JavaScript UDF는 UDF 예시를 참조하세요.
|
batchSize |
(선택사항) 문서 수의 배치 크기입니다. 기본값: 1000 . |
batchSizeBytes |
(선택사항) 바이트 수의 배치 크기입니다. 기본값은 5242880 (5MB)입니다. |
maxRetryAttempts |
(선택사항) 최대 재시도 횟수이며 0 이상이어야 합니다. 기본값: no retries. |
maxRetryDuration |
(선택사항) 밀리초 단위의 최대 재시도 시간이며 0 이상이어야 합니다. 기본값: no retries. |
csvFileEncoding |
(선택사항) CSV 파일 인코딩입니다. |
propertyAsIndex |
(선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정하는 값이 있는 문서의 속성입니다(_index UDF보다 우선 적용됨). 기본값: none. |
propertyAsId |
(선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정하는 값이 있는 문서의 속성입니다(_id UDF보다 우선 적용됨). 기본값: none. |
javaScriptIndexFnGcsPath |
(선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정할 JavaScript UDF 소스의 Cloud Storage 경로입니다. 기본값: none. |
javaScriptIndexFnName |
(선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정할 함수의 UDF JavaScript 함수 이름입니다. 기본값: none. |
javaScriptIdFnGcsPath |
(선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정할 JavaScript UDF 소스의 Cloud Storage 경로입니다. 기본값: none. |
javaScriptIdFnName |
(선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정할 함수의 UDF JavaScript 함수 이름입니다. 기본값: none. |
javaScriptTypeFnGcsPath |
(선택사항) 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정할 JavaScript UDF 소스의 Cloud Storage 경로입니다. 기본값: none. |
javaScriptTypeFnName |
(선택사항) 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정할 함수의 UDF JavaScript 함수 이름입니다. 기본값: none. |
javaScriptIsDeleteFnGcsPath |
(선택사항) 문서를 삽입하거나 업데이트하는 대신 삭제해야 하는지 여부를 결정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 이 함수는 문자열 값 "true" 또는 "false" 를 반환해야 합니다. 기본값: none. |
javaScriptIsDeleteFnName |
(선택사항) 문서를 삽입하거나 업데이트하지 않고 삭제할지 여부를 결정하는 함수의 UDF JavaScript 함수 이름입니다. 이 함수는 문자열 값 "true" 또는 "false" 를 반환해야 합니다. 기본값: none. |
usePartialUpdate |
(선택사항) Elasticsearch 요청에서 부분 업데이트 (만들기 또는 색인 대신 업데이트, 부분 문서 허용)를 사용할지 여부입니다. 기본값: false . |
bulkInsertMethod |
(선택사항) Elasticsearch 일괄 요청에서 INDEX (색인, 업데이트 허용) 또는 CREATE (만들기, 중복 _id 오류)를 사용할지 여부입니다. 기본값: CREATE |
사용자 정의 함수
이 템플릿은 아래 설명된 파이프라인의 여러 포인트에서 사용자 정의 함수(UDF)를 지원합니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.
텍스트 변환 함수
CSV 데이터를 Elasticsearch 문서로 변환합니다.
템플릿 매개변수:
javascriptTextTransformGcsPath
: 자바스크립트 파일의 Cloud Storage URI입니다.javascriptTextTransformFunctionName
: 자바스크립트 함수의 이름입니다.
함수 사양:
- 입력: 입력 CSV 파일의 한 줄입니다.
- 출력: Elasticsearch에 삽입할 문자열화된 JSON 문서입니다.
색인 함수
문서가 속한 색인을 반환합니다.
템플릿 매개변수:
javaScriptIndexFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIndexFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_index
메타데이터 필드 값입니다.
문서 ID 함수
문서 ID를 반환합니다.
템플릿 매개변수:
javaScriptIdFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIdFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_id
메타데이터 필드 값입니다.
문서 삭제 함수
문서 삭제 여부를 지정합니다. 이 함수를 사용하려면 대량 삽입 모드를 INDEX
로 설정하고 문서 ID 함수를 제공합니다.
템플릿 매개변수:
javaScriptIsDeleteFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIsDeleteFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서를 삭제하려면 문자열을
"true"
로 반환하고 문서를 삽입/업데이트하려면"false"
로 반환합니다.
매핑 유형 함수
문서의 매핑 유형을 반환합니다.
템플릿 매개변수:
javaScriptTypeFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptTypeFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_type
메타데이터 필드 값입니다.
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Storage to Elasticsearch template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
REGION_NAME
: Dataflow 작업을 배포할 리전(예:us-central1
)INPUT_FILE_SPEC
: Cloud Storage 파일 패턴CONNECTION_URL
: Elasticsearch URL입니다.APIKEY
: 인증을 위한 base64 인코딩 API 키입니다.INDEX
: Elasticsearch 색인입니다.DEADLETTER_TABLE
: 사용자의 BigQuery 테이블입니다.
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
LOCATION
: Dataflow 작업을 배포할 리전(예:us-central1
)INPUT_FILE_SPEC
: Cloud Storage 파일 패턴CONNECTION_URL
: Elasticsearch URL입니다.APIKEY
: 인증을 위한 base64 인코딩 API 키입니다.INDEX
: Elasticsearch 색인입니다.DEADLETTER_TABLE
: 사용자의 BigQuery 테이블입니다.
다음 단계
- Dataflow 템플릿 알아보기
- Google 제공 템플릿 목록 참조