BigQuery to Elasticsearch テンプレート

BigQuery to Elasticsearch テンプレートは、BigQuery テーブルから Elasticsearch にデータをドキュメントとして取り込むバッチ パイプラインです。テンプレートでは、テーブル全体を読み取ることも、クエリを使用して特定のレコードを読み取ることもできます。

パイプラインの要件

  • ソース BigQuery テーブルが存在すること。
  • Google Cloud インスタンス上または Elasticsearch バージョン 7.0 以降の Elastic Cloud 上の Elasticsearch ホスト。Dataflow ワーカーマシンからアクセス可能であること。

テンプレートのパラメータ

必須パラメータ

オプション パラメータ

  • inputTableSpec: BigQuery ソーステーブルの仕様(例: bigquery-project:dataset.input_table)。
  • outputDeadletterTable: さまざまな理由(スキーマの不一致、JSON の形式の誤りなど)で出力テーブルに到達できなかったメッセージが書き込まれるテーブル。このテーブルが存在しない場合は、パイプラインの実行中に作成されます(例: your-project-id:your-dataset.your-table-name)。
  • query: データを抽出するためにソース上で実行されるクエリ(例: select * from sampledb.sample_table)。
  • useLegacySql: レガシー SQL を使用するには true に設定します(クエリを提供する場合のみ)。デフォルトは false です。
  • queryLocation: 基となるテーブルの権限なしで承認済みビューから読み取る場合は必須です(例: US)。
  • elasticsearchUsername: 認証に使用する Elasticsearch のユーザー名。指定すると、「apiKey」の値は無視されます。
  • elasticsearchPassword: 認証に使用する Elasticsearch のパスワード。指定すると、「apiKey」の値は無視されます。
  • batchSize: バッチサイズ(ドキュメント数)。デフォルト値は 1,000 です。
  • batchSizeBytes: elasticsearch へのメッセージのバッチ挿入に使用されるバッチサイズ(バイト単位)。デフォルト: 「5,242,880(5 MB)」。
  • maxRetryAttempts: 最大再試行回数。0 より大きな値にする必要があります。デフォルト: 「再試行なし」。
  • maxRetryDuration: 最大再試行時間(ミリ秒)。0 より大きな値にする必要があります。デフォルト: 「再試行なし」。
  • propertyAsIndex: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる「_index」メタデータを指定します(「_index」UDF よりも優先されます)。デフォルト: none
  • javaScriptIndexFnGcsPath: 一括リクエストでドキュメントに含まれる「_index」メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
  • javaScriptIndexFnName : 一括リクエストでドキュメントに含まれる「_index」メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
  • propertyAsId: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる「_id」メタデータを指定します(「_id」UDF よりも優先されます)。デフォルト: none
  • javaScriptIdFnGcsPath: 一括リクエストでドキュメントに含まれる「_id」メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none。
  • javaScriptIdFnName: 一括リクエストでドキュメントに含まれる「_id」メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
  • javaScriptTypeFnGcsPath: 一括リクエストでドキュメントに含まれる「_type」メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
  • javaScriptTypeFnName: 一括リクエストでドキュメントに含まれる「_type」メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
  • javaScriptIsDeleteFnGcsPath: ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値「true」または「false」を返す必要があります。デフォルト: none
  • javaScriptIsDeleteFnName: ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値「true」または「false」を返す必要があります。デフォルト: none
  • usePartialUpdate: Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: 「false」。
  • bulkInsertMethod: Elasticsearch 一括リクエストで「INDEX」(インデックス作成、upsert を許可)または「CREATE」(作成、duplicate _id でエラー)を使用するかどうか。デフォルト: 「CREATE」。
  • trustSelfSignedCerts: 自己署名証明書を信頼するかどうか。インストールされた Elasticsearch インスタンスに自己署名証明書が存在する場合があります。SSL 証明書の検証をバイパスするには、この値を True に設定します(デフォルトは False です)。
  • disableCertificateValidation: 「true」の場合に、自己署名 SSL 証明書を信頼します。Elasticsearch インスタンスには自己署名証明書が存在する場合があります。証明書の検証をバイパスするには、このパラメータを「true」に設定します。デフォルト: false。
  • apiKeyKMSEncryptionKey: API キーを復号するための Cloud KMS 鍵。apiKeySource が KMS に設定されている場合は、このパラメータを指定する必要があります。このパラメータを指定する場合は、apiKey 文字列を暗号化して渡す必要があります。KMS API 暗号化エンドポイントを使用してパラメータを暗号化します。鍵は、projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name} の形式にする必要があります。https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt をご覧ください(例: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name)。
  • apiKeySecretId: apiKey の Secret Manager シークレット ID。apiKeySource が SECRET_MANAGER に設定されている場合は、このパラメータを指定する必要があります。projects/{project}/secrets/{secret}/versions/{secret_version} の形式にする必要があります(例: projects/your-project-id/secrets/your-secret/versions/your-secret-version)。
  • apiKeySource: API キーのソース。PLAINTEXT、KMS、SECRET_MANAGER のいずれかです。Secret Manager または KMS を使用する場合は、このパラメータを指定する必要があります。apiKeySource が KMS に設定されている場合は、apiKeyKMSEncryptionKey と暗号化された apiKey を指定する必要があります。apiKeySource が SECRET_MANAGER に設定されている場合は、apiKeySecretId を指定する必要があります。apiKeySource が PLAINTEXT に設定されている場合は、apiKey を指定する必要があります。デフォルト: PLAINTEXT。
  • javascriptTextTransformGcsPath: ユーザー定義関数を含む JavaScript コードの Cloud Storage パスパターン(例: gs://your-bucket/your-function.js)。
  • javascriptTextTransformFunctionName: JavaScript ファイルから呼び出す関数の名前。英字、数字、アンダースコアのみ使用できます(例: transform、transform_udf1)。

ユーザー定義関数

次のように、このテンプレートでは、パイプライン内の複数のポイントでユーザー定義関数(UDF)をサポートしています。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

インデックス関数

ドキュメントが属するインデックスを返します。

テンプレートのパラメータ:

  • javaScriptIndexFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIndexFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _index メタデータ フィールドの値。

ドキュメント ID 関数

ドキュメント ID を返します。

テンプレートのパラメータ:

  • javaScriptIdFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIdFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _id メタデータ フィールドの値。

ドキュメント削除関数

ドキュメントを削除するかどうかを指定します。この関数を使用するには、一括挿入モードを INDEX に設定し、ドキュメント ID 関数を指定します。

テンプレートのパラメータ:

  • javaScriptIsDeleteFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIsDeleteFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントを削除する場合は文字列 "true" を、ドキュメントをアップサートする場合は "false" を返します。

マッピング タイプ関数

ドキュメントのマッピング タイプを返します。

テンプレートのパラメータ:

  • javaScriptTypeFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptTypeFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _type メタデータ フィールドの値。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the BigQuery to Elasticsearch template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • INPUT_TABLE_SPEC: BigQuery テーブル名。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • INPUT_TABLE_SPEC: BigQuery テーブル名。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。

次のステップ