Cloud Storage to Elasticsearch テンプレート

Cloud Storage to Elasticsearch テンプレートは、Cloud Storage バケットに保存されている CSV ファイルからデータを読み取り、データを JSON ドキュメントとして Elasticsearch に書き込むバッチ パイプラインです。

パイプラインの要件

  • Cloud Storage バケットが存在している必要があります。
  • Dataflow からアクセス可能な Google Cloud インスタンスまたは Elasticsearch Cloud に Elasticsearch ホストが存在している必要があります。
  • エラー出力用の BigQuery テーブルが存在している必要があります。

CSV スキーマ

CSV ファイルにヘッダーが含まれている場合は、containsHeaders テンプレート パラメータを true に設定します。

それ以外の場合は、データを記述する JSON スキーマ ファイルを作成します。jsonSchemaPath テンプレート パラメータに、スキーマ ファイルの Cloud Storage URI を指定します。次の例は、JSON スキーマを示しています。

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

また、CSV テキストを解析し、Elasticsearch ドキュメントを出力するユーザー定義関数(UDF)を指定することもできます。

テンプレートのパラメータ

パラメータ 説明
inputFileSpec CSV ファイルを検索する Cloud Storage ファイル パターン。例: gs://mybucket/test-*.csv
connectionUrl https://hostname:[port] 形式の Elasticsearch URL。Elastic Cloud を使用する場合は CloudID を指定します。
apiKey 認証に使用される Base64 エンコードの API キー。
index リクエストが発行される Elasticsearch インデックス(my-index など)。
deadletterTable 挿入先の送信に失敗した BigQuery の Deadletter テーブル。例: <your-project>:<your-dataset>.<your-table-name>
containsHeaders (省略可)CSV にヘッダーが含まれているかどうかを示すブール値。デフォルトは false です。
delimiter (省略可)CSV ファイルの区切り文字。例: ,
csvFormat (省略可)Apache Commons CSV 形式に準拠する CSV 形式。デフォルト: Default
jsonSchemaPath (省略可)JSON スキーマのパス。デフォルト: null
largeNumFiles (省略可)ファイルの数が数万個の場合は、true に設定します。デフォルト: false
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
batchSize (省略可)バッチサイズ(ドキュメント数)。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト数)。デフォルト: 5242880(5 MB)。
maxRetryAttempts (省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: 再試行なし
maxRetryDuration (省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: 再試行なし
csvFileEncoding (省略可)CSV ファイルのエンコード。
propertyAsIndex (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _index メタデータを指定し、一括リクエストではドキュメントに含まれます(_index UDF よりも優先適用されます)。デフォルト: none
propertyAsId (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _id メタデータを指定し、一括リクエストではドキュメントに含まれます(_id UDF よりも優先適用されます)。デフォルト: none
javaScriptIndexFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIndexFnName (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIdFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIdFnName (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptTypeFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptTypeFnName (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIsDeleteFnGcsPath (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
javaScriptIsDeleteFnName (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
usePartialUpdate (省略可)Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: false
bulkInsertMethod (省略可)INDEX(インデックス、upserts を許可する)または CREATE(作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルト: CREATE

ユーザー定義関数

次のように、このテンプレートでは、パイプライン内の複数のポイントでユーザー定義関数(UDF)をサポートしています。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

テキスト変換関数

CSV データを Elasticsearch ドキュメントに変換します。

テンプレートのパラメータ:

  • javascriptTextTransformGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javascriptTextTransformFunctionName: JavaScript 関数の名前。

関数の仕様:

  • 入力: 入力 CSV ファイルの 1 行。
  • 出力: Elasticsearch に挿入する文字列化された JSON ドキュメント。

インデックス関数

ドキュメントが属するインデックスを返します。

テンプレートのパラメータ:

  • javaScriptIndexFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIndexFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _index メタデータ フィールドの値。

ドキュメント ID 関数

ドキュメント ID を返します。

テンプレートのパラメータ:

  • javaScriptIdFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIdFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _id メタデータ フィールドの値。

ドキュメント削除関数

ドキュメントを削除するかどうかを指定します。この関数を使用するには、一括挿入モードを INDEX に設定し、ドキュメント ID 関数を指定します。

テンプレートのパラメータ:

  • javaScriptIsDeleteFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIsDeleteFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントを削除する場合は文字列 "true" を、ドキュメントをアップサートする場合は "false" を返します。

マッピング タイプ関数

ドキュメントのマッピング タイプを返します。

テンプレートのパラメータ:

  • javaScriptTypeFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptTypeFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _type メタデータ フィールドの値。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Storage to Elasticsearch template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • INPUT_FILE_SPEC: Cloud Storage ファイル パターン。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。
  • DEADLETTER_TABLE: BigQuery テーブル。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • INPUT_FILE_SPEC: Cloud Storage ファイル パターン。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。
  • DEADLETTER_TABLE: BigQuery テーブル。

次のステップ