Cloud Storage to Elasticsearch 템플릿은 Cloud Storage 버킷에 저장된 CSV 파일에서 데이터를 읽고 Elasticsearch에 데이터를 JSON 문서로 쓰는 일괄 파이프라인입니다.
파이프라인 요구사항
- Cloud Storage 버킷이 있어야 합니다.
- Dataflow에서 액세스할 수 있는 Google Cloud 인스턴스 또는 Elasticsearch Cloud의 Elasticsearch 호스트가 있어야 합니다.
- 오류 출력용 BigQuery 테이블이 있어야 합니다.
CSV 스키마
CSV 파일에 헤더가 포함된 경우 containsHeaders
템플릿 매개변수를 true
로 설정합니다.
그렇지 않으면 데이터를 기술하는 JSON 스키마 파일을 만듭니다. jsonSchemaPath
템플릿 매개변수에서 스키마 파일의 Cloud Storage URI를 지정합니다. 다음 예시는 JSON 스키마를 보여줍니다.
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
또는 CSV 텍스트를 파싱하고 Elasticsearch 문서를 출력하는 사용자 정의 함수(UDF)를 제공할 수 있습니다.
템플릿 매개변수
필수 매개변수
- deadletterTable: 실패한 삽입을 전송할 BigQuery 데드 레터 테이블입니다. 예를 들면
your-project:your-dataset.your-table-name
입니다. - inputFileSpec: CSV 파일을 검색하는 Cloud Storage 파일 패턴입니다. 예를 들면
gs://mybucket/test-*.csv
입니다. - connectionUrl:
https://hostname:[port]
형식의 Elasticsearch URL입니다. Elastic Cloud를 사용하는 경우 CloudID를 지정합니다. 예를 들면https://elasticsearch-host:9200
입니다. - apiKey: 인증에 사용할 Base64로 인코딩된 API 키입니다.
- 색인: 요청이 실행되는 Elasticsearch 색인입니다. 예를 들면
my-index
입니다.
선택적 매개변수
- inputFormat: 입력 파일 형식입니다. 기본값은
CSV
입니다. - containsHeaders: 입력 CSV 파일에는 헤더 레코드가 포함됩니다 (true/false). CSV 파일을 읽는 경우에만 필요합니다. 기본값은 false입니다.
- delimiter: 입력 텍스트 파일의 열 구분 기호입니다. 기본값:
,
(예:,
) - csvFormat: 레코드 파싱에 사용할 CSV 형식 사양입니다. 기본값은
Default
입니다. 자세한 내용은 https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html을 참고하세요. https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html에 있는 형식 이름과 정확히 일치해야 합니다. - jsonSchemaPath: JSON 스키마의 경로입니다. 기본값은
null
입니다. 예를 들면gs://path/to/schema
입니다. - largeNumFiles: 파일 수가 수만 개인 경우 true로 설정합니다. 기본값은
false
입니다. - csvFileEncoding: CSV 파일 문자 인코딩 형식입니다. 허용되는 값은
US-ASCII
,ISO-8859-1
,UTF-8
,UTF-16
입니다. 기본값은 UTF-8입니다. - logDetailedCsvConversionErrors: CSV 파싱이 실패할 때 자세한 오류 로깅을 사용 설정하려면
true
로 설정합니다. 이로 인해 로그에 민감한 정보가 노출될 수 있습니다(예: CSV 파일에 비밀번호가 포함된 경우). 기본값:false
- elasticsearchUsername: 인증할 Elasticsearch 사용자 이름입니다. 지정하면
apiKey
값이 무시됩니다. - elasticsearchPassword: 인증할 Elasticsearch 비밀번호입니다. 지정하면
apiKey
값이 무시됩니다. - batchSize: 문서 수의 배치 크기입니다. 기본값은
1000
입니다. - batchSizeBytes: 바이트 수의 배치 크기입니다. 기본값은
5242880
(5MB)입니다. - maxRetryAttempts: 최대 재시도 횟수입니다. 0보다 커야 합니다. 기본값은
no retries
입니다. - maxRetryDuration: 최대 재시도 시간(밀리초)입니다. 0보다 커야 합니다. 기본값은
no retries
입니다. - propertyAsIndex: 색인이 생성되는 문서의 속성으로, 값이 일괄 요청에서 문서에 포함할
_index
메타데이터를 지정합니다._index
UDF보다 우선 적용됩니다. 기본값은none
입니다. - javaScriptIndexFnGcsPath: 일괄 요청에서 문서에 포함할
_index
메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은none
입니다. - javaScriptIndexFnName: 일괄 요청에서 문서에 포함할
_index
메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은none
입니다. - propertyAsId: 색인이 생성되는 문서의 속성으로, 값이 일괄 요청에서 문서에 포함할
_id
메타데이터를 지정합니다._id
UDF보다 우선 적용됩니다. 기본값은none
입니다. - javaScriptIdFnGcsPath: 일괄 요청에서 문서에 포함할
_id
메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은none
입니다. - javaScriptIdFnName: 일괄 요청에서 문서에 포함할
_id
메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은none
입니다. - javaScriptTypeFnGcsPath: 일괄 요청에서 문서에 포함할
_type
메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은none
입니다. - javaScriptTypeFnName: 일괄 요청에서 문서에 포함할
_type
메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은none
입니다. - javaScriptIsDeleteFnGcsPath: 문서를 삽입하거나 업데이트하는 대신 삭제할지 여부를 결정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 이 함수는 문자열 값
true
또는false
를 반환합니다. 기본값은none
입니다. - javaScriptIsDeleteFnName: 문서를 삽입 또는 업데이트하는 대신 삭제할지 여부를 결정하는 UDF JavaScript 함수의 이름입니다. 이 함수는 문자열 값
true
또는false
를 반환합니다. 기본값은none
입니다. - usePartialUpdate: Elasticsearch 요청에서 부분 업데이트 (만들기 또는 색인 생성 대신 업데이트, 부분 문서 허용)를 사용할지 여부입니다. 기본값은
false
입니다. - bulkInsertMethod: Elasticsearch 일괄 요청에서
INDEX
(색인, 업데이트 허용) 또는CREATE
(만들기, 중복 _id 오류)를 사용할지 여부입니다. 기본값은CREATE
입니다. - trustSelfSignedCerts: 자체 서명 인증서를 신뢰할지 여부입니다. 설치된 Elasticsearch 인스턴스에 자체 서명 인증서가 있을 수 있습니다. SSL 인증서 유효성 검사를 우회하려면 이를 true로 사용 설정합니다. 기본값은
false
입니다. - disableCertificateValidation:
true
인 경우 자체 서명 SSL 인증서를 신뢰합니다. Elasticsearch 인스턴스에는 자체 서명 인증서가 있을 수 있습니다. 인증서 유효성 검사를 우회하려면 이 매개변수를true
로 설정합니다. 기본값은false
입니다. - apiKeyKMSEncryptionKey: API 키를 복호화하는 Cloud KMS 키입니다.
apiKeySource
가KMS
로 설정된 경우 이 매개변수는 필수입니다. 이 매개변수가 제공되면 암호화된apiKey
문자열을 전달합니다. KMS API 암호화 엔드포인트를 사용하여 매개변수를 암호화합니다. 키에는projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
형식을 사용합니다. https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt를 참고하세요. 예를 들어projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
를 사용하세요. - apiKeySecretId: apiKey의 Secret Manager 보안 비밀 ID입니다.
apiKeySource
가SECRET_MANAGER
로 설정된 경우 이 매개변수를 제공합니다.projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version` 형식을 사용합니다. - apiKeySource: API 키의 소스입니다. 허용되는 값은
PLAINTEXT
,KMS
,SECRET_MANAGER
입니다. 이 매개변수는 Secret Manager 또는 KMS를 사용할 때 필요합니다.apiKeySource
가KMS
로 설정된 경우apiKeyKMSEncryptionKey
및 암호화된 apiKey를 제공해야 합니다.apiKeySource
가SECRET_MANAGER
로 설정된 경우apiKeySecretId
를 제공해야 합니다.apiKeySource
가PLAINTEXT
로 설정된 경우apiKey
를 제공해야 합니다. 기본값은 PLAINTEXT입니다. - socketTimeout: 설정하면 Elastic RestClient의 기본 최대 재시도 제한 시간 및 기본 소켓 제한 시간 (30000ms)을 덮어씁니다.
- javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면
gs://my-bucket/my-udfs/my_file.js
입니다. - javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가
myTransform(inJson) { /*...do stuff...*/ }
이면 함수 이름은myTransform
입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.
사용자 정의 함수
이 템플릿은 아래 설명된 파이프라인의 여러 포인트에서 사용자 정의 함수(UDF)를 지원합니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.
텍스트 변환 함수
CSV 데이터를 Elasticsearch 문서로 변환합니다.
템플릿 매개변수:
javascriptTextTransformGcsPath
: 자바스크립트 파일의 Cloud Storage URI입니다.javascriptTextTransformFunctionName
: 자바스크립트 함수의 이름입니다.
함수 사양:
- 입력: 입력 CSV 파일의 한 줄입니다.
- 출력: Elasticsearch에 삽입할 문자열화된 JSON 문서입니다.
색인 함수
문서가 속한 색인을 반환합니다.
템플릿 매개변수:
javaScriptIndexFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIndexFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_index
메타데이터 필드 값입니다.
문서 ID 함수
문서 ID를 반환합니다.
템플릿 매개변수:
javaScriptIdFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIdFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_id
메타데이터 필드 값입니다.
문서 삭제 함수
문서 삭제 여부를 지정합니다. 이 함수를 사용하려면 대량 삽입 모드를 INDEX
로 설정하고 문서 ID 함수를 제공합니다.
템플릿 매개변수:
javaScriptIsDeleteFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptIsDeleteFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서를 삭제하려면 문자열을
"true"
로 반환하고 문서를 삽입/업데이트하려면"false"
로 반환합니다.
매핑 유형 함수
문서의 매핑 유형을 반환합니다.
템플릿 매개변수:
javaScriptTypeFnGcsPath
: JavaScript 파일의 Cloud Storage URI입니다.javaScriptTypeFnName
: JavaScript 함수의 이름입니다.
함수 사양:
- 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
- 출력: 문서의
_type
메타데이터 필드 값입니다.
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Storage to Elasticsearch template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
REGION_NAME
: Dataflow 작업을 배포할 리전(예:us-central1
)INPUT_FILE_SPEC
: Cloud Storage 파일 패턴CONNECTION_URL
: Elasticsearch URL입니다.APIKEY
: 인증을 위한 base64 인코딩 API 키입니다.INDEX
: Elasticsearch 색인입니다.DEADLETTER_TABLE
: 사용자의 BigQuery 테이블입니다.
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
LOCATION
: Dataflow 작업을 배포할 리전(예:us-central1
)INPUT_FILE_SPEC
: Cloud Storage 파일 패턴CONNECTION_URL
: Elasticsearch URL입니다.APIKEY
: 인증을 위한 base64 인코딩 API 키입니다.INDEX
: Elasticsearch 색인입니다.DEADLETTER_TABLE
: 사용자의 BigQuery 테이블입니다.
다음 단계
- Dataflow 템플릿 알아보기
- Google 제공 템플릿 목록 참조