BigQuery to Elasticsearch 템플릿

BigQuery to Elasticsearch 템플릿은 BigQuery 테이블의 데이터를 Elasticsearch에 문서로 수집하는 일괄 파이프라인입니다. 이 템플릿에서는 전체 테이블을 읽거나 제공된 쿼리를 사용하여 특정 레코드를 읽을 수 있습니다.

파이프라인 요구사항

  • 소스 BigQuery 테이블이 있어야 합니다.
  • Google Cloud 인스턴스 또는 Elasticsearch 버전 7.0 이상을 사용하는 Elastic Cloud의 Elasticsearch 호스트. Dataflow 작업자 머신에서 액세스할 수 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • connectionUrl: https://hostname:[port] 형식의 Elasticsearch URL입니다. Elastic Cloud를 사용하는 경우 CloudID를 지정합니다. 예를 들면 https://elasticsearch-host:9200입니다.
  • apiKey: 인증에 사용할 Base64로 인코딩된 API 키입니다.
  • 색인: 요청이 실행되는 Elasticsearch 색인입니다. 예를 들면 my-index입니다.

선택적 매개변수

  • inputTableSpec: 읽을 BigQuery 테이블입니다. inputTableSpec을 지정하는 경우 템플릿은 BigQuery Storage Read API (https://cloud.google.com/bigquery/docs/reference/storage)를 사용하여 BigQuery 스토리지에서 직접 데이터를 읽습니다. Storage Read API의 제한사항에 대한 자세한 내용은 https://cloud.google.com/bigquery/docs/reference/storage#limitations를 참고하세요. inputTableSpec 또는 query를 지정해야 합니다. 두 매개변수를 모두 설정하면 템플릿에서 query 매개변수를 사용합니다. 예를 들면 <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>입니다.
  • outputDeadletterTable: 출력 테이블에 도달하지 못한 메시지의 BigQuery 테이블입니다. 테이블이 없으면 파이프라인 실행 중에 생성됩니다. 지정하지 않으면 <outputTableSpec>_error_records이 사용됩니다. 예를 들면 <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>입니다.
  • query: BigQuery에서 데이터를 읽는 데 사용할 SQL 쿼리입니다. BigQuery 데이터 세트가 Dataflow 작업과 다른 프로젝트에 있는 경우 SQL 쿼리에 전체 데이터 세트 이름을 지정합니다(예: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>). useLegacySqltrue가 아닌 한 기본적으로 query 매개변수는 GoogleSQL(https://cloud.google.com/bigquery/docs/introduction-sql)을 사용합니다. inputTableSpec 또는 query를 지정해야 합니다. 두 매개변수를 모두 설정하면 템플릿에서 query 매개변수를 사용합니다. 예를 들면 select * from sampledb.sample_table입니다.
  • useLegacySql: legacy SQL을 사용하려면 true로 설정합니다. 이 매개변수는 query 매개변수를 사용할 때만 적용됩니다. 기본값은 false입니다.
  • queryLocation: 기본 테이블 권한 없이 승인된 뷰에서 읽을 때 필요합니다. 예를 들면 US입니다.
  • queryTempDataset: 이 옵션을 사용하면 기존 데이터 세트를 설정하여 쿼리 결과를 저장할 임시 테이블을 만들 수 있습니다. 예를 들면 temp_dataset입니다.
  • KMSEncryptionKey: 쿼리 소스를 사용하여 BigQuery에서 읽는 경우 이 Cloud KMS 키를 사용하여 생성된 임시 테이블을 암호화합니다. 예를 들면 projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key입니다.
  • elasticsearchUsername: 인증할 Elasticsearch 사용자 이름입니다. 지정하면 apiKey 값이 무시됩니다.
  • elasticsearchPassword: 인증할 Elasticsearch 비밀번호입니다. 지정하면 apiKey 값이 무시됩니다.
  • batchSize: 문서 수의 배치 크기입니다. 기본값은 1000입니다.
  • batchSizeBytes: 바이트 수의 배치 크기입니다. 기본값은 5242880 (5MB)입니다.
  • maxRetryAttempts: 최대 재시도 횟수입니다. 0보다 커야 합니다. 기본값은 no retries입니다.
  • maxRetryDuration: 최대 재시도 시간(밀리초)입니다. 0보다 커야 합니다. 기본값은 no retries입니다.
  • propertyAsIndex: 색인이 생성되는 문서의 속성으로, 값이 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정합니다. _index UDF보다 우선 적용됩니다. 기본값은 none입니다.
  • javaScriptIndexFnGcsPath: 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은 none입니다.
  • javaScriptIndexFnName: 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은 none입니다.
  • propertyAsId: 색인이 생성되는 문서의 속성으로, 값이 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정합니다. _id UDF보다 우선 적용됩니다. 기본값은 none입니다.
  • javaScriptIdFnGcsPath: 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은 none입니다.
  • javaScriptIdFnName: 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은 none입니다.
  • javaScriptTypeFnGcsPath: 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은 none입니다.
  • javaScriptTypeFnName: 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은 none입니다.
  • javaScriptIsDeleteFnGcsPath: 문서를 삽입하거나 업데이트하는 대신 삭제할지 여부를 결정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 이 함수는 문자열 값 true 또는 false를 반환합니다. 기본값은 none입니다.
  • javaScriptIsDeleteFnName: 문서를 삽입 또는 업데이트하는 대신 삭제할지 여부를 결정하는 UDF JavaScript 함수의 이름입니다. 이 함수는 문자열 값 true 또는 false를 반환합니다. 기본값은 none입니다.
  • usePartialUpdate: Elasticsearch 요청에서 부분 업데이트 (만들기 또는 색인 생성 대신 업데이트, 부분 문서 허용)를 사용할지 여부입니다. 기본값은 false입니다.
  • bulkInsertMethod: Elasticsearch 일괄 요청에서 INDEX (색인, 업데이트 허용) 또는 CREATE (만들기, 중복 _id 오류)를 사용할지 여부입니다. 기본값은 CREATE입니다.
  • trustSelfSignedCerts: 자체 서명 인증서를 신뢰할지 여부입니다. 설치된 Elasticsearch 인스턴스에 자체 서명 인증서가 있을 수 있습니다. SSL 인증서 유효성 검사를 우회하려면 이를 true로 사용 설정합니다. 기본값은 false입니다.
  • disableCertificateValidation: true인 경우 자체 서명 SSL 인증서를 신뢰합니다. Elasticsearch 인스턴스에는 자체 서명 인증서가 있을 수 있습니다. 인증서 유효성 검사를 우회하려면 이 매개변수를 true로 설정합니다. 기본값은 false입니다.
  • apiKeyKMSEncryptionKey: API 키를 복호화하는 Cloud KMS 키입니다. apiKeySourceKMS로 설정된 경우 이 매개변수는 필수입니다. 이 매개변수가 제공되면 암호화된 apiKey 문자열을 전달합니다. KMS API 암호화 엔드포인트를 사용하여 매개변수를 암호화합니다. 키에는 projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME> 형식을 사용합니다. https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt를 참고하세요. 예를 들어 projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name를 사용하세요.
  • apiKeySecretId: apiKey의 Secret Manager 보안 비밀 ID입니다. apiKeySourceSECRET_MANAGER로 설정된 경우 이 매개변수를 제공합니다. projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version` 형식을 사용합니다.
  • apiKeySource: API 키의 소스입니다. 허용되는 값은 PLAINTEXT, KMS, SECRET_MANAGER입니다. 이 매개변수는 Secret Manager 또는 KMS를 사용할 때 필요합니다. apiKeySourceKMS로 설정된 경우 apiKeyKMSEncryptionKey 및 암호화된 apiKey를 제공해야 합니다. apiKeySourceSECRET_MANAGER로 설정된 경우 apiKeySecretId를 제공해야 합니다. apiKeySourcePLAINTEXT로 설정된 경우 apiKey를 제공해야 합니다. 기본값은 PLAINTEXT입니다.
  • socketTimeout: 설정하면 Elastic RestClient의 기본 최대 재시도 제한 시간 및 기본 소켓 제한 시간 (30000ms)을 덮어씁니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.

사용자 정의 함수

이 템플릿은 아래 설명된 파이프라인의 여러 포인트에서 사용자 정의 함수(UDF)를 지원합니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

색인 함수

문서가 속한 색인을 반환합니다.

템플릿 매개변수:

  • javaScriptIndexFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptIndexFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서의 _index 메타데이터 필드 값입니다.

문서 ID 함수

문서 ID를 반환합니다.

템플릿 매개변수:

  • javaScriptIdFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptIdFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서의 _id 메타데이터 필드 값입니다.

문서 삭제 함수

문서 삭제 여부를 지정합니다. 이 함수를 사용하려면 대량 삽입 모드를 INDEX로 설정하고 문서 ID 함수를 제공합니다.

템플릿 매개변수:

  • javaScriptIsDeleteFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptIsDeleteFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서를 삭제하려면 문자열을 "true"로 반환하고 문서를 삽입/업데이트하려면 "false"로 반환합니다.

매핑 유형 함수

문서의 매핑 유형을 반환합니다.

템플릿 매개변수:

  • javaScriptTypeFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptTypeFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서의 _type 메타데이터 필드 값입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the BigQuery to Elasticsearch template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID입니다.
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • INPUT_TABLE_SPEC: BigQuery 테이블 이름입니다.
  • CONNECTION_URL: Elasticsearch URL입니다.
  • APIKEY: 인증을 위한 base64 인코딩 API 키입니다.
  • INDEX: Elasticsearch 색인입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID입니다.
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • INPUT_TABLE_SPEC: BigQuery 테이블 이름입니다.
  • CONNECTION_URL: Elasticsearch URL입니다.
  • APIKEY: 인증을 위한 base64 인코딩 API 키입니다.
  • INDEX: Elasticsearch 색인입니다.

다음 단계