Cloud Storage to Elasticsearch テンプレートは、Cloud Storage バケットに保存されている CSV ファイルからデータを読み取り、データを JSON ドキュメントとして Elasticsearch に書き込むバッチ パイプラインです。
パイプラインの要件
- Cloud Storage バケットが存在している必要があります。
- Dataflow からアクセス可能な Google Cloud インスタンスまたは Elasticsearch Cloud に Elasticsearch ホストが存在している必要があります。
- エラー出力用の BigQuery テーブルが存在している必要があります。
CSV スキーマ
CSV ファイルにヘッダーが含まれている場合は、containsHeaders
テンプレート パラメータを true
に設定します。
それ以外の場合は、データを記述する JSON スキーマ ファイルを作成します。jsonSchemaPath
テンプレート パラメータに、スキーマ ファイルの Cloud Storage URI を指定します。次の例は、JSON スキーマを示しています。
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
また、CSV テキストを解析し、Elasticsearch ドキュメントを出力するユーザー定義関数(UDF)を指定することもできます。
テンプレートのパラメータ
必須パラメータ
- deadletterTable: 挿入の送信に失敗した BigQuery の Deadletter テーブル。例:
your-project:your-dataset.your-table-name
- inputFileSpec: CSV ファイルを検索する Cloud Storage ファイル パターン。例:
gs://mybucket/test-*.csv
- connectionUrl: Elasticsearch URL(
https://hostname:[port]
形式)。Elastic Cloud を使用している場合は、CloudID を指定します。例:https://elasticsearch-host:9200
- apiKey: 認証に使用する Base64 でエンコードされた API キー。
- index: リクエストが発行される Elasticsearch インデックス。例:
my-index
オプション パラメータ
- inputFormat: 入力ファイル形式。デフォルトは
CSV
です。 - containsHeaders: 入力 CSV ファイルにヘッダー レコードが含まれているかどうか(true/false)。CSV ファイルを読み込む場合にのみ必要です。デフォルトは false です。
- delimiter: 入力テキスト ファイルの列区切り文字。デフォルトは
,
です。例:,
- csvFormat: レコードの解析に使用する CSV 形式の仕様。デフォルトは
Default
です。詳しくは、https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html をご覧ください。https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html で指定されている形式名と完全に一致している必要があります。 - jsonSchemaPath: JSON スキーマのパス。デフォルトは
null
です。例:gs://path/to/schema
- largeNumFiles: ファイルの数が数万個の場合は、true に設定します。デフォルトは
false
です。 - csvFileEncoding: CSV ファイルの文字エンコード形式。使用できる値は
US-ASCII
、ISO-8859-1
、UTF-8
、UTF-16
です。デフォルトは UTF-8 です。 - logDetailedCsvConversionErrors: CSV 解析が失敗したときに詳細なエラーロギングを有効にするには、
true
に設定します。ログに機密データが含まれる可能性があります(CSV ファイルにパスワードが含まれている場合など)。デフォルトはfalse
です。 - elasticsearchUsername: 認証に使用する Elasticsearch のユーザー名。指定すると、
apiKey
の値は無視されます。 - elasticsearchPassword: 認証に使用する Elasticsearch のパスワード。指定すると、
apiKey
の値は無視されます。 - batchSize: バッチサイズ(ドキュメント数)。デフォルトは
1000
です。 - batchSizeBytes: バッチサイズ(バイト数)。デフォルト値は
5242880
(5 MB)です。 - maxRetryAttempts: 再試行の最大回数。0 より大きい値にする必要があります。デフォルトは
no retries
です。 - maxRetryDuration: 最大再試行時間(ミリ秒)。0 より大きい値にする必要があります。デフォルトは
no retries
です。 - propertyAsIndex: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる
_index
メタデータを指定します。_index
UDF よりも優先されます。デフォルトはnone
です。 - javaScriptIndexFnGcsPath: 一括リクエストでドキュメントに含まれる
_index
メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトはnone
です。 - javaScriptIndexFnName: 一括リクエストでドキュメントに含まれる
_index
メタデータを指定する UDF JavaScript 関数の名前。デフォルトはnone
です。 - propertyAsId: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる
_id
メタデータを指定します。_id
UDF よりも優先されます。デフォルトはnone
です。 - javaScriptIdFnGcsPath: 一括リクエストでドキュメントに含まれる
_id
メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトはnone
です。 - javaScriptIdFnName: 一括リクエストでドキュメントに含まれる
_id
メタデータを指定する UDF JavaScript 関数の名前。デフォルトはnone
です。 - javaScriptTypeFnGcsPath: 一括リクエストでドキュメントに含まれる
_type
メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトはnone
です。 - javaScriptTypeFnName: 一括リクエストでドキュメントに含まれる
_type
メタデータを指定する UDF JavaScript 関数の名前。デフォルトはnone
です。 - javaScriptIsDeleteFnGcsPath: ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値
true
またはfalse
を返します。デフォルトはnone
です。 - javaScriptIsDeleteFnName: ドキュメントを挿入または更新する代わりに削除するかどうかを決定する UDF JavaScript 関数の名前。この関数は、文字列値
true
またはfalse
を返します。デフォルトはnone
です。 - usePartialUpdate: Elasticsearch リクエストで部分的な更新(作成やインデックス登録ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルトは
false
です。 - bulkInsertMethod: Elasticsearch 一括リクエストで
INDEX
(インデックス登録、upserts を許可する)またはCREATE
(作成、duplicate _id でエラー)を使用するかどうか。デフォルトはCREATE
です。 - trustSelfSignedCerts: 自己署名証明書を信頼するかどうか。インストールされた Elasticsearch インスタンスに自己署名証明書が存在する場合があります。SSL 証明書の検証をバイパスするには、この値を True に設定します(デフォルトは
false
です)。 - disableCertificateValidation:
true
の場合、自己署名 SSL 証明書を信頼します。Elasticsearch インスタンスには自己署名証明書が存在する場合があります。証明書の検証をバイパスするには、このパラメータをtrue
に設定します。デフォルトはfalse
です。 - apiKeyKMSEncryptionKey: API キーを復号するための Cloud KMS 鍵。
apiKeySource
がKMS
に設定されている場合、このパラメータは必須です。このパラメータを指定する場合は、暗号化されたapiKey
文字列を渡します。KMS API 暗号化エンドポイントを使用してパラメータを暗号化します。キーにはprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
の形式を使用します。https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt をご覧ください。例:projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
- apiKeySecretId: API キーの Secret Manager シークレット ID。
apiKeySource
がSECRET_MANAGER
に設定されている場合は、このパラメータを指定します。projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version` の形式を使用します。 - apiKeySource: API キーのソース。使用できる値は
PLAINTEXT
、KMS
、SECRET_MANAGER
です。Secret Manager または KMS を使用する場合、このパラメータは必須です。apiKeySource
がKMS
に設定されている場合は、apiKeyKMSEncryptionKey
と暗号化された apiKey を指定する必要があります。apiKeySource
がSECRET_MANAGER
に設定されている場合は、apiKeySecretId
を指定する必要があります。apiKeySource
がPLAINTEXT
に設定されている場合は、apiKey
を指定する必要があります。デフォルトは PLAINTEXT です。 - socketTimeout: 設定すると、Elastic RestClient のデフォルトの最大再試行タイムアウトとデフォルトのソケット タイムアウト(30,000 ms)が上書きされます。
- javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例:
gs://my-bucket/my-udfs/my_file.js
- javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。
ユーザー定義の関数
次のように、このテンプレートでは、パイプライン内の複数のポイントでユーザー定義関数(UDF)をサポートしています。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。
テキスト変換関数
CSV データを Elasticsearch ドキュメントに変換します。
テンプレートのパラメータ:
javascriptTextTransformGcsPath
: JavaScript ファイルの Cloud Storage URI。javascriptTextTransformFunctionName
: JavaScript 関数の名前。
関数の仕様:
- 入力: 入力 CSV ファイルの 1 行。
- 出力: Elasticsearch に挿入する文字列化された JSON ドキュメント。
インデックス関数
ドキュメントが属するインデックスを返します。
テンプレートのパラメータ:
javaScriptIndexFnGcsPath
: JavaScript ファイルの Cloud Storage URI。javaScriptIndexFnName
: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントの
_index
メタデータ フィールドの値。
ドキュメント ID 関数
ドキュメント ID を返します。
テンプレートのパラメータ:
javaScriptIdFnGcsPath
: JavaScript ファイルの Cloud Storage URI。javaScriptIdFnName
: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントの
_id
メタデータ フィールドの値。
ドキュメント削除関数
ドキュメントを削除するかどうかを指定します。この関数を使用するには、一括挿入モードを INDEX
に設定し、ドキュメント ID 関数を指定します。
テンプレートのパラメータ:
javaScriptIsDeleteFnGcsPath
: JavaScript ファイルの Cloud Storage URI。javaScriptIsDeleteFnName
: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントを削除する場合は文字列
"true"
を、ドキュメントをアップサートする場合は"false"
を返します。
マッピング タイプ関数
ドキュメントのマッピング タイプを返します。
テンプレートのパラメータ:
javaScriptTypeFnGcsPath
: JavaScript ファイルの Cloud Storage URI。javaScriptTypeFnName
: JavaScript 関数の名前。
関数の仕様:
- 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
- 出力: ドキュメントの
_type
メタデータ フィールドの値。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Storage to Elasticsearch template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)INPUT_FILE_SPEC
: Cloud Storage ファイル パターン。CONNECTION_URL
: Elasticsearch の URL。APIKEY
: 認証用に Base64 でエンコードされた API キー。INDEX
: Elasticsearch インデックス。DEADLETTER_TABLE
: BigQuery テーブル。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)INPUT_FILE_SPEC
: Cloud Storage ファイル パターン。CONNECTION_URL
: Elasticsearch の URL。APIKEY
: 認証用に Base64 でエンコードされた API キー。INDEX
: Elasticsearch インデックス。DEADLETTER_TABLE
: BigQuery テーブル。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。