Cloud Storage to Elasticsearch 템플릿

Cloud Storage to Elasticsearch 템플릿은 Cloud Storage 버킷에 저장된 CSV 파일에서 데이터를 읽고 Elasticsearch에 데이터를 JSON 문서로 쓰는 일괄 파이프라인입니다.

파이프라인 요구사항

  • Cloud Storage 버킷이 있어야 합니다.
  • Dataflow에서 액세스할 수 있는 Google Cloud 인스턴스 또는 Elasticsearch Cloud의 Elasticsearch 호스트가 있어야 합니다.
  • 오류 출력용 BigQuery 테이블이 있어야 합니다.

CSV 스키마

CSV 파일에 헤더가 포함된 경우 containsHeaders 템플릿 매개변수를 true로 설정합니다.

그렇지 않으면 데이터를 기술하는 JSON 스키마 파일을 만듭니다. jsonSchemaPath 템플릿 매개변수에서 스키마 파일의 Cloud Storage URI를 지정합니다. 다음 예시는 JSON 스키마를 보여줍니다.

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

또는 CSV 텍스트를 파싱하고 Elasticsearch 문서를 출력하는 사용자 정의 함수(UDF)를 제공할 수 있습니다.

템플릿 매개변수

필수 매개변수

  • deadletterTable: 실패한 삽입을 전송할 BigQuery 데드 레터 테이블입니다. 예를 들면 your-project:your-dataset.your-table-name입니다.
  • inputFileSpec: CSV 파일을 검색하는 Cloud Storage 파일 패턴입니다. 예를 들면 gs://mybucket/test-*.csv입니다.
  • connectionUrl: https://hostname:[port] 형식의 Elasticsearch URL입니다. Elastic Cloud를 사용하는 경우 CloudID를 지정합니다. 예를 들면 https://elasticsearch-host:9200입니다.
  • apiKey: Base64로 인코딩된 API 키로, 인증에 사용됩니다.
  • index: 요청이 실행되는 Elasticsearch 색인입니다. 예를 들면 my-index입니다.

선택적 매개변수

  • inputFormat: 입력 파일 형식입니다. 기본값은 CSV입니다.
  • containsHeaders: 헤더 레코드(true/false)가 포함된 입력 CSV 파일입니다. CSV 파일을 읽는 경우에만 필요합니다. 기본값은 false입니다.
  • delimiter: 입력 텍스트 파일의 열 구분 기호입니다. 기본값은 ,입니다(예: ,).
  • csvFormat: 레코드 파싱에 사용할 CSV 형식 사양입니다. 기본값은 Default입니다. 자세한 내용은 https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html을 참조하세요. https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html에 있는 형식 이름과 정확하게 일치해야 합니다.
  • jsonSchemaPath: JSON 스키마의 경로입니다. 기본값은 null입니다. 예를 들면 gs://path/to/schema입니다.
  • largeNumFiles: 파일 수가 수만 개이면 true로 설정합니다. 기본값은 false입니다.
  • csvFileEncoding: CSV 파일 문자 인코딩 형식입니다. 허용되는 값은 US-ASCII, ISO-8859-1, UTF-8, UTF-16입니다. 기본값은 UTF-8입니다.
  • logDetailedCsvConversionErrors: CSV 파싱이 실패한 경우에 자세한 오류 로깅을 사용 설정하려면 true로 설정합니다. 이로 인해 로그에 민감한 정보가 노출될 수 있습니다(예: CSV 파일에 비밀번호가 포함된 경우). 기본값: false
  • elasticsearchUsername: 인증할 Elasticsearch 사용자 이름입니다. 지정하면 apiKey 값이 무시됩니다.
  • elasticsearchPassword: 인증할 Elasticsearch 비밀번호입니다. 지정하면 apiKey 값이 무시됩니다.
  • batchSize: 문서 수의 배치 크기입니다. 기본값은 1000입니다.
  • batchSizeBytes: 바이트 수의 배치 크기입니다. 기본값은 5242880(5MB)입니다.
  • maxRetryAttempts: 최대 재시도 시도 횟수입니다. 0보다 커야 합니다. 기본값은 no retries입니다.
  • maxRetryDuration: 최대 재시도 시간(밀리초)입니다. 0보다 커야 합니다. 기본값은 no retries입니다.
  • propertyAsIndex: 색인이 생성되는 문서의 속성으로, 값에서 일괄 요청 시 문서에 포함할 _index 메타데이터를 지정합니다. _index UDF보다 우선 적용됩니다. 기본값은 none입니다.
  • javaScriptIndexFnGcsPath: 일괄 요청 시 문서에 포함할 _index 메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은 none입니다.
  • javaScriptIndexFnName: 일괄 요청 시 문서에 포함할 _index 메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은 none입니다.
  • propertyAsId: 색인이 생성되는 문서의 속성으로, 값에서 일괄 요청 시 문서에 포함할 _id 메타데이터를 지정합니다. _id UDF보다 우선 적용됩니다. 기본값은 none입니다.
  • javaScriptIdFnGcsPath: 일괄 요청 시 문서에 포함할 _id 메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은 none입니다.
  • javaScriptIdFnName: 일괄 요청 시 문서에 포함할 _id 메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은 none입니다.
  • javaScriptTypeFnGcsPath: 일괄 요청 시 문서에 포함할 _type 메타데이터를 지정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 기본값은 none입니다.
  • javaScriptTypeFnName: 일괄 요청 시 문서에 포함할 _type 메타데이터를 지정하는 UDF JavaScript 함수의 이름입니다. 기본값은 none입니다.
  • javaScriptIsDeleteFnGcsPath: 문서를 삽입하거나 업데이트하는 대신 삭제할지 여부를 결정하는 함수의 JavaScript UDF 소스에 대한 Cloud Storage 경로입니다. 이 함수는 문자열 값 true 또는 false를 반환합니다. 기본값은 none입니다.
  • javaScriptIsDeleteFnName: 문서를 삽입하거나 업데이트하는 대신 삭제할지 여부를 결정하는 UDF JavaScript 함수의 이름입니다. 이 함수는 문자열 값 true 또는 false를 반환합니다. 기본값은 none입니다.
  • usePartialUpdate: Elasticsearch 요청에서 부분 업데이트(만들기 또는 색인 생성 대신 업데이트, 부분 문서 허용)를 사용할지 여부입니다. 기본값은 false입니다.
  • bulkInsertMethod: Elasticsearch 일괄 요청에서 INDEX(색인, 삽입/업데이트(upsert) 허용) 또는 CREATE(만들기, 중복 _id 오류)를 사용할지 여부입니다. 기본값은 CREATE입니다.
  • trustSelfSignedCerts: 자체 서명 인증서를 신뢰할지 여부입니다. 설치된 Elasticsearch 인스턴스에 자체 서명 인증서가 있을 수 있습니다. SSL 인증서 유효성 검사를 우회하려면 이를 true로 사용 설정합니다. 기본값은 false입니다.
  • disableCertificateValidation: true이면 자체 서명 SSL 인증서를 신뢰합니다. Elasticsearch 인스턴스에는 자체 서명 인증서가 있을 수 있습니다. 인증서 유효성 검사를 우회하려면 이 파라미터를 true로 설정합니다. 기본값은 false입니다.
  • apiKeyKMSEncryptionKey: API 키를 복호화하는 Cloud KMS 키입니다. apiKeySourceKMS로 설정되면 이 파라미터는 필수입니다. 이 파라미터가 제공되면 암호화된 apiKey 문자열을 전달합니다. KMS API 암호화 엔드포인트를 사용하여 매개변수를 암호화합니다. 키에는 projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME> 형식을 사용합니다. https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt를 참조하세요(예: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId: apiKey의 Secret Manager 보안 비밀 ID입니다. apiKeySourceSECRET_MANAGER로 설정된 경우에 이 파라미터를 제공합니다. projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version` 형식을 사용합니다.
  • apiKeySource: API 키 소스입니다. 허용되는 값은 PLAINTEXT, KMS, SECRET_MANAGER입니다. 이 파라미터는 Secret Manager 또는 KMS를 사용할 때 필요합니다. apiKeySourceKMS, apiKeyKMSEncryptionKey로 설정되고 암호화되면 apiKey를 제공해야 합니다. apiKeySourceSECRET_MANAGER로 설정되면 apiKeySecretId를 제공해야 합니다. apiKeySourcePLAINTEXT로 설정되면 apiKey를 제공해야 합니다. 기본값은 PLAINTEXT입니다.
  • socketTimeout: 설정하면 Elastic RestClient의 기본 최대 재시도 제한 시간과 기본 소켓 제한 시간 (30000ms)을 덮어씁니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수(UDF) 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.

사용자 정의 함수

이 템플릿은 아래 설명된 파이프라인의 여러 포인트에서 사용자 정의 함수(UDF)를 지원합니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

텍스트 변환 함수

CSV 데이터를 Elasticsearch 문서로 변환합니다.

템플릿 매개변수:

  • javascriptTextTransformGcsPath: 자바스크립트 파일의 Cloud Storage URI입니다.
  • javascriptTextTransformFunctionName: 자바스크립트 함수의 이름입니다.

함수 사양:

  • 입력: 입력 CSV 파일의 한 줄입니다.
  • 출력: Elasticsearch에 삽입할 문자열화된 JSON 문서입니다.

색인 함수

문서가 속한 색인을 반환합니다.

템플릿 매개변수:

  • javaScriptIndexFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptIndexFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서의 _index 메타데이터 필드 값입니다.

문서 ID 함수

문서 ID를 반환합니다.

템플릿 매개변수:

  • javaScriptIdFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptIdFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서의 _id 메타데이터 필드 값입니다.

문서 삭제 함수

문서 삭제 여부를 지정합니다. 이 함수를 사용하려면 대량 삽입 모드를 INDEX로 설정하고 문서 ID 함수를 제공합니다.

템플릿 매개변수:

  • javaScriptIsDeleteFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptIsDeleteFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서를 삭제하려면 문자열을 "true"로 반환하고 문서를 삽입/업데이트하려면 "false"로 반환합니다.

매핑 유형 함수

문서의 매핑 유형을 반환합니다.

템플릿 매개변수:

  • javaScriptTypeFnGcsPath: JavaScript 파일의 Cloud Storage URI입니다.
  • javaScriptTypeFnName: JavaScript 함수의 이름입니다.

함수 사양:

  • 입력: JSON 문자열로 직렬화된 Elasticsearch 문서입니다.
  • 출력: 문서의 _type 메타데이터 필드 값입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Storage to Elasticsearch template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • INPUT_FILE_SPEC: Cloud Storage 파일 패턴
  • CONNECTION_URL: Elasticsearch URL입니다.
  • APIKEY: 인증을 위한 base64 인코딩 API 키입니다.
  • INDEX: Elasticsearch 색인입니다.
  • DEADLETTER_TABLE: 사용자의 BigQuery 테이블입니다.

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • INPUT_FILE_SPEC: Cloud Storage 파일 패턴
  • CONNECTION_URL: Elasticsearch URL입니다.
  • APIKEY: 인증을 위한 base64 인코딩 API 키입니다.
  • INDEX: Elasticsearch 색인입니다.
  • DEADLETTER_TABLE: 사용자의 BigQuery 테이블입니다.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.GCSToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.CsvConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link GCSToElasticsearch} pipeline exports data from one or more CSV files in Cloud Storage
 * to Elasticsearch.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_GCS_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_to_Elasticsearch",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch",
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Javascript user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      flexContainerName = "gcs-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      }),
  @Template(
      name = "GCS_to_Elasticsearch_Xlang",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch with Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Python user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      flexContainerName = "gcs-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      })
})
public class GCSToElasticsearch {

  /** The tag for the headers of the CSV if required. */
  static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the dead-letter output of the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output for the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(GCSToElasticsearch.class);

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    GCSToElasticsearchOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(GCSToElasticsearchOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  private static PipelineResult run(GCSToElasticsearchOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    // Throw error if containsHeaders is true and a schema or Udf is also set.
    if (options.getContainsHeaders()) {
      checkArgument(
          options.getJavascriptTextTransformGcsPath() == null
              && options.getJsonSchemaPath() == null
              && options.getPythonExternalTextTransformGcsPath() == null,
          "Cannot parse file containing headers with UDF or Json schema.");
    }

    // Throw error if only one retry configuration parameter is set.
    checkArgument(
        (options.getMaxRetryAttempts() == null && options.getMaxRetryDuration() == null)
            || (options.getMaxRetryAttempts() != null && options.getMaxRetryDuration() != null),
        "To specify retry configuration both max attempts and max duration must be set.");

    // Throw error if both Javascript UDF and Python UDF are set. We can only apply one or the
    // other.
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Steps: 1) Read records from CSV(s) via {@link CsvConverters.ReadCsv}.
     *        2) Convert lines to JSON strings via {@link CsvConverters.LineToFailsafeJson}.
     *        3a) Write JSON strings as documents to Elasticsearch via {@link ElasticsearchIO}.
     *        3b) Write elements that failed processing to {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO}.
     */
    PCollectionTuple readCsvLines =
        pipeline
            /*
             * Step 1: Read CSV file(s) from Cloud Storage using {@link CsvConverters.ReadCsv}.
             */
            .apply(
            "ReadCsv",
            CsvConverters.ReadCsv.newBuilder()
                .setCsvFormat(options.getCsvFormat())
                .setDelimiter(options.getDelimiter())
                .setHasHeaders(options.getContainsHeaders())
                .setInputFileSpec(options.getInputFileSpec())
                .setHeaderTag(CSV_HEADERS)
                .setLineTag(CSV_LINES)
                .setFileEncoding(options.getCsvFileEncoding())
                .build());
    /*
     * Step 2: Convert lines to Elasticsearch document.
     */
    CsvConverters.LineToFailsafeJson.Builder lineToFailsafeJsonBuilder =
        CsvConverters.LineToFailsafeJson.newBuilder()
            .setDelimiter(options.getDelimiter())
            .setJsonSchemaPath(options.getJsonSchemaPath())
            .setHeaderTag(CSV_HEADERS)
            .setLineTag(CSV_LINES)
            .setUdfOutputTag(PROCESSING_OUT)
            .setUdfDeadletterTag(PROCESSING_DEADLETTER_OUT);
    if (options.getPythonExternalTextTransformGcsPath() != null) {
      lineToFailsafeJsonBuilder
          .setPythonUdfFileSystemPath(options.getPythonExternalTextTransformGcsPath())
          .setPythonUdfFunctionName(options.getPythonExternalTextTransformFunctionName());
    } else {
      lineToFailsafeJsonBuilder
          .setJavascriptUdfFileSystemPath(options.getJavascriptTextTransformGcsPath())
          .setJavascriptUdfFunctionName(options.getJavascriptTextTransformFunctionName());
    }
    PCollectionTuple convertedCsvLines =
        readCsvLines.apply("ConvertLine", lineToFailsafeJsonBuilder.build());
    /*
     * Step 3a: Write elements that were successfully processed to Elasticsearch using {@link WriteToElasticsearch}.
     */
    convertedCsvLines
        .get(PROCESSING_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-gcs-to-elasticsearch-template/v2")
                .setOptions(options.as(GCSToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
     */
    convertedCsvLines
        .get(PROCESSING_DEADLETTER_OUT)
        .apply(
            "AddTimestamps",
            WithTimestamps.of((FailsafeElement<String, String> failures) -> new Instant()))
        .apply(
            "WriteFailedElementsToBigQuery",
            WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getDeadletterTable())
                .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                .build());

    return pipeline.run();
  }
}

다음 단계