Datastream to BigQuery(Stream)テンプレート

Datastream to BigQuery テンプレートは、Datastream データを読み取り、BigQuery に複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、時間でパーティション分割された BigQuery ステージング テーブルに複製します。レプリケーションに続いて、このテンプレートは BigQuery で MERGE を実行し、すべての変更データ キャプチャ(CDC)の変更をソーステーブルのレプリカに upsert します。

このテンプレートは、レプリケーションによって管理される BigQuery テーブルの作成と更新を処理します。データ定義言語(DDL)が必要な場合、Datastream に対するコールバックによってソーステーブル スキーマが抽出され、BigQuery のデータ型に変換されます。サポートされているオペレーションは次のとおりです。

  • データが挿入されると、新しいテーブルが作成される。
  • null の初期値を持つ BigQuery テーブルに新しい列が追加される。
  • ドロップされた列は BigQuery で無視され、将来の値が null になる。
  • 名前が変更された列が BigQuery に新しい列として追加される。
  • 型の変更が BigQuery に伝播されない。

このパイプラインは、1 回以上のストリーミング モードで実行することをおすすめします。このテンプレートは、一時 BigQuery テーブルからメインの BigQuery テーブルにデータをマージするときに重複排除を実行します。パイプラインのこのステップのため、1 回限りのストリーミング モードを使用するメリットはありません。

パイプラインの要件

  • データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
  • Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
  • BigQuery の宛先データセットが作成され、Compute Engine サービス アカウントにこれらのデータセットへの管理者権限が付与されている。
  • 作成される宛先レプリカ テーブルでは、ソーステーブルに主キーが必要です。
  • MySQL または Oracle のソース データベース。PostgreSQL データベースはサポートされていません。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。
gcsPubSubSubscription Datastream ファイル通知を含む Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
inputFileFormat Datastream によって生成された出力ファイルの形式(例: avro,json)。デフォルトは avro です。
outputStagingDatasetTemplate ステージング テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset}_log)。
outputDatasetTemplate レプリカ テーブルを含む既存のデータセットの名前。ソース データセット / スキーマの名前に置き換えられるプレースホルダとして、テンプレート {_metadata_dataset} を含めることができます(例: {_metadata_dataset})。
deadLetterQueueDirectory 処理されなかった理由とともに、未処理のメッセージを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
outputStagingTableNameTemplate 省略可: ステージング テーブルの名前のテンプレート。デフォルトは {_metadata_table}_log です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table}_log をおすすめします。
outputTableNameTemplate 省略可: レプリカ テーブルの名前のテンプレート。デフォルトは {_metadata_table} です。複数のスキーマを複製している場合は、{_metadata_schema}_{_metadata_table} をおすすめします。
outputProjectId 省略可: データを出力する BigQuery データセットのプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
streamName 省略可: スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。
mergeFrequencyMinutes 省略可: 特定のテーブルのマージ間隔の分数。デフォルトは 5 です。
dlqRetryMinutes 省略可: デッドレター キューの再試行間隔(分)。デフォルトは 10 です。
javascriptTextTransformGcsPath 省略可: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName 省略可: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
useStorageWriteApi 省略可: true の場合、パイプラインは BigQuery Storage Write API を使用します。デフォルト値は false です。詳細については、Storage Write API の使用をご覧ください。
useStorageWriteApiAtLeastOnce 省略可: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクスを使用するには、このパラメータを true に設定します。1 回限りのセマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
numStorageWriteApiStreams 省略可: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue で、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
storageWriteApiTriggeringFrequencySec 省略可: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue で、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
applyMerge 省略可: ステージング テーブルにデータを複製した後、テンプレートで BigQuery の MERGE ステートメントを実行するかどうかを指定します。デフォルト: true.
fileReadConcurrency 省略可: 同時に読み込む Datastream ファイルの数。デフォルト: 10。
mergeConcurrency 省略可: 同時に実行する BigQuery の MERGE ステートメントの数。デフォルト: 30。
partitionRetentionDays 省略可: BigQuery の MERGE ステートメントを実行するときに、パーティションの保持に使用する日数。デフォルト: 1。
rfcStartDateTime 省略可: Cloud Storage からファイルを読み取る開始時間(RFC 3339 の日時の値)。デフォルト: 1970-01-01T00:00:00.00Z

ユーザー定義関数

必要であれば、JavaScript でユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。

UDF を使用するには、JavaScript ファイルを Cloud Storage にアップロードし、次のテンプレート パラメータを設定します。

パラメータ説明
javascriptTextTransformGcsPath JavaScript ファイルの Cloud Storage の場所。
javascriptTextTransformFunctionName JavaScript 関数の名前。

詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: JSON 文字列としてシリアル化された CDC データ。
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、the Datastream to BigQuery template を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. 省略可: 1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
    8. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET: BigQuery データセット名。
    • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET: BigQuery データセット名。
    • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log

    次のステップ