Pub/Sub to MongoDB テンプレート

Pub/Sub to MongoDB テンプレートは、Pub/Sub サブスクリプションから JSON エンコードのメッセージを読み取り、ドキュメントとして MongoDB に書き込むストリーミング パイプラインです。このパイプラインでは、必要に応じて JavaScript ユーザー定義関数(UDF)を使用して組み込むことができる追加の変換がサポートされています。

レコードの処理中にエラーが発生した場合、テンプレートは入力メッセージとともに BigQuery テーブルにエラーを書き込みます。たとえば、スキーマの不一致、JSON の形式の誤り、変換の実行中にエラーが発生することがあります。deadletterTable パラメータにテーブル名を指定します。テーブルが存在しない場合は、パイプラインによって自動的に作成されます。

パイプラインの要件

  • Pub/Sub サブスクリプションが存在し、有効な JSON 形式でメッセージがエンコードされている必要があります。
  • MongoDB クラスタが存在し、Dataflow ワーカーマシンからアクセス可能である必要があります。

テンプレートのパラメータ

必須パラメータ

  • inputSubscription: Pub/Sub サブスクリプションの名前(例: projects/your-project-id/subscriptions/your-subscription-name)。
  • mongoDBUri: MongoDB サーバーのカンマ区切りのリスト(例: host1:port,host2:port,host3:port)。
  • database: コレクションを格納する MongoDB のデータベース(例: my-db)。
  • collection: MongoDB データベース内のコレクションの名前(例: my-collection)。
  • deadletterTable: スキーマの不一致、JSON の形式の誤りなど、エラーによって発生したメッセージを保存する BigQuery テーブル(例: your-project-id:your-dataset.your-table-name)。

オプション パラメータ

  • batchSize: MongoDB へのドキュメントのバッチ挿入に使用するバッチサイズ。デフォルトは 1000 です。
  • batchSizeBytes: バッチサイズ(バイト単位)。デフォルトは 5242880 です。
  • maxConnectionIdleTime: 接続タイムアウトが発生するまでの最大アイドル時間(秒単位)。デフォルトは 60000 です。
  • sslEnabled: MongoDB への接続が SSL に対応しているかどうかを示すブール値。デフォルトは true です。
  • ignoreSSLCertificate: SSL 証明書を無視するかどうかを示すブール値。デフォルトは true です。
  • withOrdered: MongoDB への順序付けされた一括挿入を有効にするブール値。デフォルトは true です。
  • withSSLInvalidHostNameAllowed: SSL 接続で無効なホスト名を許可するかどうかを示すブール値。デフォルトは true です。
  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。
  • javascriptTextTransformReloadIntervalMinutes: UDF を再読み込みする頻度を指定します(分単位)。値が 0 より大きい場合、Dataflow は Cloud Storage 内の UDF ファイルを定期的にチェックし、ファイルが変更された場合は UDF を再読み込みします。このパラメータを使用すると、パイプラインの実行中にジョブを再起動することなく、UDF を更新できます。値が 0 の場合、UDF の再読み込みは無効になります。デフォルト値は 0 です。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: 入力 CSV ファイルの 1 行。
  • 出力: MongoDB に挿入する文字列化された JSON ドキュメント。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub to MongoDB template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • INPUT_SUBSCRIPTION: Pub/Sub サブスクリプション(例: projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI: MongoDB サーバーのアドレス(例: 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE: MongoDB データベースの名前(例: users
  • COLLECTION: MongoDB コレクションの名前(例: profiles
  • UNPROCESSED_TABLE: BigQuery テーブルの名前(例: your-project:your-dataset.your-table-name

次のステップ