Cloud Storage to Elasticsearch テンプレート

Cloud Storage to Elasticsearch テンプレートは、Cloud Storage バケットに保存されている CSV ファイルからデータを読み取り、データを JSON ドキュメントとして Elasticsearch に書き込むバッチ パイプラインです。

パイプラインの要件

  • Cloud Storage バケットが存在している必要があります。
  • Dataflow からアクセス可能な Google Cloud インスタンスまたは Elasticsearch Cloud に Elasticsearch ホストが存在している必要があります。
  • エラー出力用の BigQuery テーブルが存在している必要があります。

CSV スキーマ

CSV ファイルにヘッダーが含まれている場合は、containsHeaders テンプレート パラメータを true に設定します。

それ以外の場合は、データを記述する JSON スキーマ ファイルを作成します。jsonSchemaPath テンプレート パラメータに、スキーマ ファイルの Cloud Storage URI を指定します。次の例は、JSON スキーマを示しています。

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

また、CSV テキストを解析し、Elasticsearch ドキュメントを出力するユーザー定義関数(UDF)を指定することもできます。

テンプレートのパラメータ

必須パラメータ

  • deadletterTable: 挿入の送信に失敗した BigQuery の Deadletter テーブル(例: your-project:your-dataset.your-table-name)。
  • inputFileSpec: CSV ファイルを検索する Cloud Storage ファイル パターン。例: gs://mybucket/test-*.csv。
  • connectionUrl: Elasticsearch URL(https://hostname:[port] 形式)。Elastic Cloud を使用している場合は、CloudID を指定します(例: https://elasticsearch-host:9200)。
  • apiKey: 認証に使用する Base64 でエンコードされた API キー。
  • index: リクエストが発行される Elasticsearch インデックス(my-index. など)。例: my-index。

オプション パラメータ

  • inputFormat: 入力ファイルの形式。デフォルトは CSV です。
  • containsHeaders: 入力 CSV ファイルにはヘッダー レコード(true/false)が含まれています。CSV ファイルを読み込む場合にのみ必要です。デフォルトは false です。
  • delimiter: 入力テキスト ファイルの列区切り文字。デフォルト: csvFormat で指定された区切り文字を使用します(例: ,)。
  • csvFormat: レコードの解析に使用する CSV 形式の仕様。デフォルトは Default です。詳しくは、https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html をご覧ください。https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html で指定されている形式名と完全に一致している必要があります。
  • jsonSchemaPath: JSON スキーマのパス。デフォルトは null です(例: gs://path/to/schema)。
  • largeNumFiles: ファイルの数が数万個の場合は、true に設定します。デフォルトは false です。
  • csvFileEncoding: CSV ファイルの文字エンコード形式。許可される値は、US-ASCII、ISO-8859-1、UTF-8、UTF-16 です。デフォルトは UTF-8 です。
  • logDetailedCsvConversionErrors: CSV 解析が失敗したときに詳細なエラーロギングを有効にするには、true に設定します。ログに機密データが含まれる可能性があります(CSV ファイルにパスワードが含まれている場合など)。デフォルトは false です。
  • elasticsearchUsername: 認証に使用する Elasticsearch のユーザー名。指定すると、apiKey の値は無視されます。
  • elasticsearchPassword: 認証に使用する Elasticsearch のパスワード。指定すると、apiKey の値は無視されます。
  • batchSize: バッチサイズ(ドキュメント数)。デフォルトは 1000 です。
  • batchSizeBytes: バッチサイズ(バイト数)。デフォルト値は 5242880(5 MB)です。
  • maxRetryAttempts: 再試行の最大回数。0 より大きい値にする必要があります。デフォルトは「再試行なし」です。
  • maxRetryDuration: 最大再試行時間(ミリ秒)。0 より大きい値にする必要があります。デフォルトは「再試行なし」です。
  • propertyAsIndex: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる _index メタデータを指定します。_index UDF よりも優先されます。デフォルトは none です。
  • javaScriptIndexFnGcsPath: 一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトは none です。
  • javaScriptIndexFnName: 一括リクエストでドキュメントに含める _index メタデータを指定する UDF JavaScript 関数の名前。デフォルトは none です。
  • propertyAsId: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる _id メタデータを指定します。_id UDF よりも優先されます。デフォルトは none です。
  • javaScriptIdFnGcsPath: 一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトは none です。
  • javaScriptIdFnName: 一括リクエストでドキュメントに含める _id メタデータを指定する UDF JavaScript 関数の名前。デフォルトは none です。
  • javaScriptTypeFnGcsPath: 一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトは none です。
  • javaScriptTypeFnName: 一括リクエストでドキュメントに含まれる _type メタデータを指定する UDF JavaScript 関数の名前。デフォルトは none です。
  • javaScriptIsDeleteFnGcsPath: ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 true または false を返します。デフォルトは none です。
  • javaScriptIsDeleteFnName: ドキュメントを挿入または更新する代わりに削除するかどうかを決定する UDF JavaScript 関数の名前。この関数は、文字列値 true または false を返します。デフォルトは none です。
  • usePartialUpdate: Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルトは false です。
  • bulkInsertMethod: INDEX(インデックス、upserts を許可する)または CREATE(作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルトは CREATE です。
  • trustSelfSignedCerts: 自己署名証明書を信頼するかどうか。インストールされた Elasticsearch インスタンスに自己署名証明書が存在する場合があります。SSL 証明書の検証をバイパスするには、この値を True に設定します(デフォルトは False です)。
  • disableCertificateValidation: 「true」の場合に、自己署名 SSL 証明書を信頼します。Elasticsearch インスタンスには自己署名証明書が存在する場合があります。証明書の検証をバイパスするには、このパラメータを「true」に設定します。デフォルトは false です。
  • apiKeyKMSEncryptionKey: API キーを復号するための Cloud KMS 鍵。apiKeySource が KMS に設定されている場合は、このパラメータを指定する必要があります。このパラメータを指定する場合は、apiKey 文字列を暗号化して渡す必要があります。KMS API 暗号化エンドポイントを使用してパラメータを暗号化します。鍵は、projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name} の形式にする必要があります。https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt をご覧ください(例: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name)。
  • apiKeySecretId: apiKey の Secret Manager シークレット ID。apiKeySource が SECRET_MANAGER に設定されている場合は、このパラメータを指定する必要があります。projects/{project}/secrets/{secret}/versions/{secret_version} の形式にする必要があります(例: projects/your-project-id/secrets/your-secret/versions/your-secret-version)。
  • apiKeySource: API キーのソース。PLAINTEXT、KMS、SECRET_MANAGER のいずれかです。Secret Manager または KMS を使用する場合は、このパラメータを指定する必要があります。apiKeySource が KMS に設定されている場合は、apiKeyKMSEncryptionKey と暗号化された apiKey を指定する必要があります。apiKeySource が SECRET_MANAGER に設定されている場合は、apiKeySecretId を指定する必要があります。apiKeySource が PLAINTEXT に設定されている場合は、apiKey を指定する必要があります。デフォルト: PLAINTEXT。
  • socketTimeout: 設定すると、Elastic RestClient のデフォルトの最大再試行タイムアウトとデフォルトのソケット タイムアウト(30000ms)が上書きされます。
  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。

ユーザー定義の関数

次のように、このテンプレートでは、パイプライン内の複数のポイントでユーザー定義関数(UDF)をサポートしています。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

テキスト変換関数

CSV データを Elasticsearch ドキュメントに変換します。

テンプレートのパラメータ:

  • javascriptTextTransformGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javascriptTextTransformFunctionName: JavaScript 関数の名前。

関数の仕様:

  • 入力: 入力 CSV ファイルの 1 行。
  • 出力: Elasticsearch に挿入する文字列化された JSON ドキュメント。

インデックス関数

ドキュメントが属するインデックスを返します。

テンプレートのパラメータ:

  • javaScriptIndexFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIndexFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _index メタデータ フィールドの値。

ドキュメント ID 関数

ドキュメント ID を返します。

テンプレートのパラメータ:

  • javaScriptIdFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIdFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _id メタデータ フィールドの値。

ドキュメント削除関数

ドキュメントを削除するかどうかを指定します。この関数を使用するには、一括挿入モードを INDEX に設定し、ドキュメント ID 関数を指定します。

テンプレートのパラメータ:

  • javaScriptIsDeleteFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIsDeleteFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントを削除する場合は文字列 "true" を、ドキュメントをアップサートする場合は "false" を返します。

マッピング タイプ関数

ドキュメントのマッピング タイプを返します。

テンプレートのパラメータ:

  • javaScriptTypeFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptTypeFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _type メタデータ フィールドの値。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Storage to Elasticsearch template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • INPUT_FILE_SPEC: Cloud Storage ファイル パターン。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。
  • DEADLETTER_TABLE: BigQuery テーブル。

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • INPUT_FILE_SPEC: Cloud Storage ファイル パターン。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。
  • DEADLETTER_TABLE: BigQuery テーブル。
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.GCSToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.CsvConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link GCSToElasticsearch} pipeline exports data from one or more CSV files in Cloud Storage
 * to Elasticsearch.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_GCS_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_to_Elasticsearch",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch",
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Javascript user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      flexContainerName = "gcs-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      }),
  @Template(
      name = "GCS_to_Elasticsearch_Xlang",
      category = TemplateCategory.BATCH,
      displayName = "Cloud Storage to Elasticsearch with Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.",
        "If the CSV files contain headers, set the <code>containsHeaders</code> template parameter to <code>true</code>.\n"
            + "Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. "
            + "The following example shows a JSON schema:\n"
            + "<code>[{\"name\":\"id\", \"type\":\"text\"}, {\"name\":\"age\", \"type\":\"integer\"}]</code>\n"
            + "Alternatively, you can provide a Python user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents."
      },
      optionsClass = GCSToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      flexContainerName = "gcs-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The Cloud Storage bucket must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.",
        "A BigQuery table for error output must exist."
      })
})
public class GCSToElasticsearch {

  /** The tag for the headers of the CSV if required. */
  static final TupleTag<String> CSV_HEADERS = new TupleTag<String>() {};

  /** The tag for the lines of the CSV. */
  static final TupleTag<String> CSV_LINES = new TupleTag<String>() {};

  /** The tag for the dead-letter output of the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output for the UDF. */
  static final TupleTag<FailsafeElement<String, String>> PROCESSING_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(GCSToElasticsearch.class);

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    GCSToElasticsearchOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(GCSToElasticsearchOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  private static PipelineResult run(GCSToElasticsearchOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    // Throw error if containsHeaders is true and a schema or Udf is also set.
    if (options.getContainsHeaders()) {
      checkArgument(
          options.getJavascriptTextTransformGcsPath() == null
              && options.getJsonSchemaPath() == null
              && options.getPythonExternalTextTransformGcsPath() == null,
          "Cannot parse file containing headers with UDF or Json schema.");
    }

    // Throw error if only one retry configuration parameter is set.
    checkArgument(
        (options.getMaxRetryAttempts() == null && options.getMaxRetryDuration() == null)
            || (options.getMaxRetryAttempts() != null && options.getMaxRetryDuration() != null),
        "To specify retry configuration both max attempts and max duration must be set.");

    // Throw error if both Javascript UDF and Python UDF are set. We can only apply one or the
    // other.
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Steps: 1) Read records from CSV(s) via {@link CsvConverters.ReadCsv}.
     *        2) Convert lines to JSON strings via {@link CsvConverters.LineToFailsafeJson}.
     *        3a) Write JSON strings as documents to Elasticsearch via {@link ElasticsearchIO}.
     *        3b) Write elements that failed processing to {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO}.
     */
    PCollectionTuple readCsvLines =
        pipeline
            /*
             * Step 1: Read CSV file(s) from Cloud Storage using {@link CsvConverters.ReadCsv}.
             */
            .apply(
            "ReadCsv",
            CsvConverters.ReadCsv.newBuilder()
                .setCsvFormat(options.getCsvFormat())
                .setDelimiter(options.getDelimiter())
                .setHasHeaders(options.getContainsHeaders())
                .setInputFileSpec(options.getInputFileSpec())
                .setHeaderTag(CSV_HEADERS)
                .setLineTag(CSV_LINES)
                .setFileEncoding(options.getCsvFileEncoding())
                .build());
    /*
     * Step 2: Convert lines to Elasticsearch document.
     */
    CsvConverters.LineToFailsafeJson.Builder lineToFailsafeJsonBuilder =
        CsvConverters.LineToFailsafeJson.newBuilder()
            .setDelimiter(options.getDelimiter())
            .setJsonSchemaPath(options.getJsonSchemaPath())
            .setHeaderTag(CSV_HEADERS)
            .setLineTag(CSV_LINES)
            .setUdfOutputTag(PROCESSING_OUT)
            .setUdfDeadletterTag(PROCESSING_DEADLETTER_OUT);
    if (options.getPythonExternalTextTransformGcsPath() != null) {
      lineToFailsafeJsonBuilder
          .setPythonUdfFileSystemPath(options.getPythonExternalTextTransformGcsPath())
          .setPythonUdfFunctionName(options.getPythonExternalTextTransformFunctionName());
    } else {
      lineToFailsafeJsonBuilder
          .setJavascriptUdfFileSystemPath(options.getJavascriptTextTransformGcsPath())
          .setJavascriptUdfFunctionName(options.getJavascriptTextTransformFunctionName());
    }
    PCollectionTuple convertedCsvLines =
        readCsvLines.apply("ConvertLine", lineToFailsafeJsonBuilder.build());
    /*
     * Step 3a: Write elements that were successfully processed to Elasticsearch using {@link WriteToElasticsearch}.
     */
    convertedCsvLines
        .get(PROCESSING_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-gcs-to-elasticsearch-template/v2")
                .setOptions(options.as(GCSToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}.
     */
    convertedCsvLines
        .get(PROCESSING_DEADLETTER_OUT)
        .apply(
            "AddTimestamps",
            WithTimestamps.of((FailsafeElement<String, String> failures) -> new Instant()))
        .apply(
            "WriteFailedElementsToBigQuery",
            WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getDeadletterTable())
                .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                .build());

    return pipeline.run();
  }
}

次のステップ