MongoDB to BigQuery テンプレート

このテンプレートは、MongoDB からドキュメントを読み取り、BigQuery に書き込むバッチ パイプラインを作成します。

MongoDB 変更ストリーム データをキャプチャする場合は、MongoDB to BigQuery(CDC)テンプレートを使用できます。

パイプラインの要件

  • ターゲット BigQuery データセットが存在すること。
  • ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。

出力形式

出力レコードの形式は、userOption パラメータの値によって異なります。userOptionNONE の場合、出力のスキーマは次のとおりです。source_data フィールドには、JSON 形式のドキュメントが含まれます。

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

userOptionFLATTEN の場合、パイプラインはドキュメントをフラット化し、最上位のフィールドをテーブルの列として書き込みます。たとえば、MongoDB コレクションのドキュメントに次のフィールドがあるとします。

  • "_id"string
  • "title"string
  • "genre"string

FLATTEN を使用すると、出力のスキーマは次のようになります。timestamp フィールドはテンプレートによって追加されます。

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

テンプレートのパラメータ

必須パラメータ

  • mongoDbUri: MongoDB 接続 URI。形式は mongodb+srv://:@.
  • database: コレクションを読み取る MongoDB 内のデータベース(例: my-db)。
  • collection: MongoDB データベース内のコレクションの名前(例: my-collection)。
  • userOption: FLATTEN または NONEFLATTEN は、ドキュメントを単一レベルにフラット化します。NONE は、ドキュメント全体を JSON 文字列として保存します。デフォルトは NONE です。
  • outputTableSpec: 書き込み先の BigQuery テーブル。たとえば、bigquery-project:dataset.output_table のようにします。

オプション パラメータ

  • KMSEncryptionKey: MongoDB URI 接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、MongoDB URI 接続文字列はすべて暗号化されて渡されます(例: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key)。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • javascriptDocumentTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://your-bucket/your-transforms/*.js)。
  • javascriptDocumentTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください(例: transform)。
  • bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス(例: gs://your-bucket/your-schema.json)。

ユーザー定義関数

必要であれば、JavaScript でユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。

UDF を使用するには、JavaScript ファイルを Cloud Storage にアップロードし、次のテンプレート パラメータを設定します。

パラメータ説明
javascriptDocumentTransformGcsPath JavaScript ファイルの Cloud Storage の場所。
javascriptDocumentTransformFunctionName JavaScript 関数の名前。

詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: MongoDB ドキュメント。
  • 出力: JSON 文字列としてシリアル化されたオブジェクト。userOptionNONE の場合、JSON オブジェクトにはドキュメント ID を含む _id という名前のプロパティが含まれている必要があります。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the MongoDB to BigQuery template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION
    

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN または NONE。

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN または NONE。

    次のステップ