MongoDB to BigQuery テンプレート(Stream)

このテンプレートは、MongoDB 変更ストリームと連携するストリーミング パイプラインを作成します。このテンプレートを使用するには、変更ストリーム データを Pub/Sub に公開します。パイプラインは Pub/Sub から JSON レコードを読み取り、BigQuery に書き込みます。BigQuery に書き込まれるレコードは、MongoDB to BigQuery バッチ テンプレートと同じ形式になります。

パイプラインの要件

  • ターゲット BigQuery データセットが存在すること。
  • ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。
  • 変更ストリームを読み取るには、Pub/Sub トピックを作成する必要があります。パイプラインの実行中に、MongoDB 変更ストリームで変更データ キャプチャ(CDC)イベントをリッスンし、それらを JSON レコードとして Pub/Sub に公開します。Pub/Sub へのメッセージのパブリッシュの詳細については、メッセージのトピックへのパブリッシュをご覧ください。
  • このテンプレートでは MongoDB 変更ストリームを使用します。BigQuery の変更データ キャプチャはサポートされていません。

テンプレートのパラメータ

必須パラメータ

  • mongoDbUri: MongoDB 接続 URI。形式は mongodb+srv://:@.
  • database: コレクションを読み取る MongoDB 内のデータベース例: my-db
  • collection: MongoDB データベース内のコレクションの名前。例: my-collection
  • userOption: FLATTENJSONNONEFLATTEN は、ドキュメントを単一レベルにフラット化します。JSON は、ドキュメントを BigQuery JSON 形式で保存します。NONE は、ドキュメント全体を JSON 形式の文字列として保存します。デフォルトは NONE です。
  • inputTopic: 読み取る Pub/Sub 入力トピック。projects/<PROJECT_ID>/topics/<TOPIC_NAME> の形式で指定します。
  • outputTableSpec: 書き込み先の BigQuery テーブル。例: bigquery-project:dataset.output_table

オプション パラメータ

  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • KMSEncryptionKey: MongoDB URI 接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、MongoDB URI 接続文字列はすべて暗号化されて渡されます例: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • filter: JSON 形式の Bson フィルタ例: { "val": { $gt: 0, $lt: 9 }}
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
  • bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス。例: gs://your-bucket/your-schema.json
  • javascriptDocumentTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI例: gs://your-bucket/your-transforms/*.js
  • javascriptDocumentTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください例: transform

ユーザー定義関数

必要であれば、JavaScript でユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。

UDF を使用するには、JavaScript ファイルを Cloud Storage にアップロードし、次のテンプレート パラメータを設定します。

パラメータ説明
javascriptDocumentTransformGcsPath JavaScript ファイルの Cloud Storage の場所。
javascriptDocumentTransformFunctionName JavaScript 関数の名前。

詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: MongoDB ドキュメント。
  • 出力: JSON 文字列としてシリアル化されたオブジェクト。
  • テンプレートを実行する

    コンソールgcloudAPI
    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the MongoDB (CDC) to BigQuery template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. [ジョブを実行] をクリックします。

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN、JSON、NONE。
    • INPUT_TOPIC: Pub/Sub 入力トピック。

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN、JSON、NONE。
    • INPUT_TOPIC: Pub/Sub 入力トピック。
    Java
    /*
     * Copyright (C) 2019 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.mongodb.templates;
    
    import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
    
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.metadata.TemplateParameter;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.PubSubOptions;
    import com.google.cloud.teleport.v2.mongodb.templates.MongoDbCdcToBigQuery.Options;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
    import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import java.io.IOException;
    import javax.script.ScriptException;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * The {@link MongoDbCdcToBigQuery} pipeline is a streaming pipeline which reads data pushed to
     * PubSub from MongoDB Changestream and outputs the resulting records to BigQuery.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery_CDC.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @Template(
        name = "MongoDB_to_BigQuery_CDC",
        category = TemplateCategory.STREAMING,
        displayName = "MongoDB (CDC) to BigQuery",
        description =
            "The MongoDB CDC (Change Data Capture) to BigQuery template is a streaming pipeline that works together with MongoDB change streams. "
                + "The pipeline reads the JSON records pushed to Pub/Sub via a MongoDB change stream and writes them to BigQuery as specified by the <code>userOption</code> parameter.",
        optionsClass = Options.class,
        flexContainerName = "mongodb-to-bigquery-cdc",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-change-stream-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        preview = true,
        requirements = {
          "The target BigQuery dataset must exist.",
          "The source MongoDB instance must be accessible from the Dataflow worker machines.",
          "The change stream pushing changes from MongoDB to Pub/Sub should be running."
        },
        streaming = true,
        supportsAtLeastOnce = true)
    public class MongoDbCdcToBigQuery {
    
      private static final Logger LOG = LoggerFactory.getLogger(MongoDbCdcToBigQuery.class);
    
      /** Options interface. */
      public interface Options
          extends PipelineOptions,
              MongoDbOptions,
              PubSubOptions,
              BigQueryWriteOptions,
              JavascriptDocumentTransformerOptions,
              BigQueryStorageApiStreamingOptions {
    
        // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
        // on when pipeline is running on ALO mode and using the Storage Write API
        @TemplateParameter.Boolean(
            order = 1,
            optional = true,
            parentName = "useStorageWriteApi",
            parentTriggerValues = {"true"},
            description = "Use at at-least-once semantics in BigQuery Storage Write API",
            helpText =
                "When using the Storage Write API, specifies the write semantics. To"
                    + " use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly-"
                    + " once semantics, set the parameter to `false`. This parameter applies only when"
                    + " `useStorageWriteApi` is `true`. The default value is `false`.",
            hiddenUi = true)
        @Default.Boolean(false)
        @Override
        Boolean getUseStorageWriteApiAtLeastOnce();
    
        void setUseStorageWriteApiAtLeastOnce(Boolean value);
      }
    
      /** class ParseAsDocumentsFn. */
      private static class ParseAsDocumentsFn extends DoFn<String, Document> {
    
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(Document.parse(context.element()));
        }
      }
    
      /**
       * Main entry point for pipeline execution.
       *
       * @param args Command line arguments to the pipeline.
       */
      public static void main(String[] args)
          throws ScriptException, IOException, NoSuchMethodException {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
        run(options);
      }
    
      /** Pipeline to read data from PubSub and write to MongoDB. */
      public static boolean run(Options options)
          throws ScriptException, IOException, NoSuchMethodException {
        options.setStreaming(true);
        Pipeline pipeline = Pipeline.create(options);
        String userOption = options.getUserOption();
        String inputOption = options.getInputTopic();
    
        TableSchema bigquerySchema;
    
        // Get MongoDbUri
        String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
    
        if (options.getJavascriptDocumentTransformFunctionName() != null
            && options.getJavascriptDocumentTransformGcsPath() != null) {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchemaForUDF(
                  mongoDbUri,
                  options.getDatabase(),
                  options.getCollection(),
                  options.getJavascriptDocumentTransformGcsPath(),
                  options.getJavascriptDocumentTransformFunctionName(),
                  options.getUserOption());
        } else {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchema(
                  mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
        }
    
        pipeline
            .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(inputOption))
            .apply(
                "RTransform string to document",
                ParDo.of(
                    new DoFn<String, Document>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = Document.parse(c.element());
                        c.output(document);
                      }
                    }))
            .apply(
                "UDF",
                TransformDocumentViaJavascript.newBuilder()
                    .setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
                    .setFunctionName(options.getJavascriptDocumentTransformFunctionName())
                    .build())
            .apply(
                "Read and transform data",
                ParDo.of(
                    new DoFn<Document, TableRow>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = c.element();
                        TableRow row = MongoDbUtils.getTableSchema(document, userOption);
                        c.output(row);
                      }
                    }))
            .apply(
                BigQueryIO.writeTableRows()
                    .to(options.getOutputTableSpec())
                    .withSchema(bigquerySchema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        pipeline.run();
        return true;
      }
    }
    

    次のステップ