MongoDB to BigQuery テンプレート

このテンプレートは、MongoDB からドキュメントを読み取り、BigQuery に書き込むバッチ パイプラインを作成します。

MongoDB 変更ストリーム データをキャプチャする場合は、MongoDB to BigQuery(CDC)テンプレートを使用できます。

パイプラインの要件

  • ターゲット BigQuery データセットが存在すること。
  • ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。

出力形式

出力レコードの形式は、userOption パラメータの値によって異なります。userOptionNONE の場合、出力のスキーマは次のとおりです。source_data フィールドには、JSON 形式のドキュメントが含まれます。

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

userOptionFLATTEN の場合、パイプラインはドキュメントをフラット化し、最上位のフィールドをテーブルの列として書き込みます。たとえば、MongoDB コレクションのドキュメントに次のフィールドがあるとします。

  • "_id"string
  • "title"string
  • "genre"string

FLATTEN を使用すると、出力のスキーマは次のようになります。timestamp フィールドはテンプレートによって追加されます。

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

テンプレートのパラメータ

パラメータ 説明
mongoDbUri MongoDB 接続 URI。形式は mongodb+srv://:@
database コレクションを読み取る MongoDB 内のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
outputTableSpec 書き込み先の BigQuery テーブル。例: bigquery-project:dataset.output_table
userOption FLATTEN または NONEFLATTEN: ドキュメントを第 1 レベルでフラット化します。NONE は、ドキュメント全体を JSON 文字列として保存します。
javascriptDocumentTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI例: gs://my-bucket/my-udfs/my_file.js。 )
javascriptDocumentTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
useStorageWriteApi (省略可)true の場合、パイプラインは BigQuery Storage Write API を使用します。デフォルト値は false です。詳細については、Storage Write API の使用をご覧ください。
useStorageWriteApiAtLeastOnce (省略可)Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクスを使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。

ユーザー定義関数

必要であれば、JavaScript でユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。

UDF を使用するには、JavaScript ファイルを Cloud Storage にアップロードし、次のテンプレート パラメータを設定します。

パラメータ説明
javascriptDocumentTransformGcsPath JavaScript ファイルの Cloud Storage の場所。
javascriptDocumentTransformFunctionName JavaScript 関数の名前。

詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: MongoDB ドキュメント。
  • 出力: JSON 文字列としてシリアル化されたオブジェクト。userOptionNONE の場合、JSON オブジェクトにはドキュメント ID を含む _id という名前のプロパティが含まれている必要があります。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the MongoDB to BigQuery template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION
    

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN または NONE。

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN または NONE。

    次のステップ