Datastream to BigQuery(Stream)テンプレート

Datastream to BigQuery テンプレートは、Datastream データを読み取り、BigQuery に複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、時間でパーティション分割された BigQuery ステージング テーブルに複製します。レプリケーションに続いて、このテンプレートは BigQuery で MERGE を実行し、すべての変更データ キャプチャ(CDC)の変更をソーステーブルのレプリカに upsert します。

このテンプレートは、レプリケーションによって管理される BigQuery テーブルの作成と更新を処理します。データ定義言語(DDL)が必要な場合、Datastream に対するコールバックによってソーステーブル スキーマが抽出され、BigQuery のデータ型に変換されます。サポートされているオペレーションは次のとおりです。

  • データが挿入されると、新しいテーブルが作成される。
  • null の初期値を持つ BigQuery テーブルに新しい列が追加される。
  • ドロップされた列は BigQuery で無視され、将来の値が null になる。
  • 名前が変更された列が BigQuery に新しい列として追加される。
  • 型の変更が BigQuery に伝播されない。

このパイプラインは、1 回以上のストリーミング モードで実行することをおすすめします。このテンプレートは、一時 BigQuery テーブルからメインの BigQuery テーブルにデータをマージするときに重複排除を実行します。パイプラインのこのステップのため、1 回限りのストリーミング モードを使用するメリットはありません。

パイプラインの要件

  • データを複製する準備ができている、またはすでに複製されている Datastream ストリーム。
  • Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
  • BigQuery の宛先データセットが作成され、Compute Engine サービス アカウントにこれらのデータセットへの管理者権限が付与されている。
  • 作成される宛先レプリカ テーブルでは、ソーステーブルに主キーが必要です。
  • MySQL または Oracle のソース データベース。PostgreSQL データベースはサポートされていません。

テンプレートのパラメータ

必須パラメータ

  • inputFilePattern: Cloud Storage で Datastream ファイル出力用のファイルの場所。gs://<BUCKET_NAME>/<ROOT_PATH>/ の形式。
  • inputFileFormat: Datastream によって生成された出力ファイルの形式。値は avro または json です。デフォルトは avro です。
  • gcsPubSubSubscription: 処理可能な新しいファイルを Dataflow に通知するために Cloud Storage で使用される Pub/Sub サブスクリプション。projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> の形式で指定します。
  • outputStagingDatasetTemplate: ステージング テーブルを含むデータセットの名前。このパラメータは、{_metadata_dataset}_log や my_dataset_log などのテンプレートをサポートしています。通常、このパラメータはデータセット名です。デフォルトは {_metadata_dataset} です。
  • outputDatasetTemplate: レプリカ テーブルを含むデータセットの名前。このパラメータは、{_metadata_dataset} や my_dataset などのテンプレートをサポートしています。通常、このパラメータはデータセット名です。デフォルトは {_metadata_dataset} です。
  • deadLetterQueueDirectory: Dataflow がデッドレター キューの出力の書き込みに使用するパス。このパスは、Datastream ファイルの出力と同じパスにすることはできません。デフォルトは空です。

オプション パラメータ

  • streamName: スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルトは {_metadata_stream} です。通常はデフォルト値で十分です。
  • rfcStartDateTime: Cloud Storage(https://tools.ietf.org/html/rfc3339)からのフェッチに使用される開始日時。デフォルトは 1970-01-01T00:00:00.00Z です。
  • fileReadConcurrency: 同時に読み取る DataStream ファイルの数。デフォルトは 10 です。
  • outputProjectId: データを出力する BigQuery データセットを含む Google Cloud プロジェクトの ID。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
  • outputStagingTableNameTemplate: ステージング テーブルに使用するテンプレート。たとえば、{_metadata_table})。デフォルトは {_metadata_table}_log です。
  • outputTableNameTemplate: レプリカ テーブルの名前に使用するテンプレート。たとえば、{_metadata_table}。デフォルトは {_metadata_table} です。
  • ignoreFields: BigQuery で無視するフィールド(カンマ区切り)。デフォルトは _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count です(例: _metadata_stream,_metadata_schema)。
  • mergeFrequencyMinutes: 特定のテーブルのマージ間隔の分数。デフォルトは 5 です。
  • dlqRetryMinutes: DLQ の再試行間隔(分)。デフォルトは 10 です。
  • dataStreamRootUrl: Datastream API のルート URL。デフォルトは https://datastream.googleapis.com/ です。
  • applyMerge: ジョブの MERGE クエリを無効にするかどうか。デフォルトは true です。
  • mergeConcurrency: 同時実行する BigQuery MERGE クエリの数。applyMerge が true に設定されている場合にのみ有効です。デフォルトは 30 です。
  • partitionRetentionDays: BigQuery の統合を実行するときに、パーティションの保持に使用する日数。デフォルトは 1 です。
  • useStorageWriteApiAtLeastOnce: このパラメータは、BigQuery Storage Write API の使用が有効になっている場合にのみ有効になります。true の場合、Storage Write API に at-least-once セマンティクスが使用されます。それ以外の場合は、exactly-once セマンティクスが使用されます。デフォルトは false です。
  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。
  • javascriptTextTransformReloadIntervalMinutes: UDF を再読み込みする頻度を指定します(分単位)。値が 0 より大きい場合、Dataflow は Cloud Storage 内の UDF ファイルを定期的にチェックし、ファイルが変更された場合は UDF を再読み込みします。このパラメータを使用すると、パイプラインの実行中にジョブを再起動することなく、UDF を更新できます。値が 0 の場合、UDF の再読み込みは無効になります。デフォルト値は 0 です。
  • pythonTextTransformGcsPath: ユーザー定義関数を含む Python コードの Cloud Storage パスパターン(例: gs://your-bucket/your-transforms/*.py)。
  • pythonRuntimeVersion: この Python UDF に使用するランタイム バージョン。
  • pythonTextTransformFunctionName: JavaScript ファイルから呼び出す関数の名前。使用できるのは英字、数字、アンダースコアのみです(例: transform_udf1)。
  • runtimeRetries: 失敗するまでにランタイムが再試行される回数。デフォルトは 5 です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: JSON 文字列としてシリアル化された CDC データ。
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the Datastream to BigQuery template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
    8. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET: BigQuery データセット名。
    • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET: BigQuery データセット名。
    • BIGQUERY_TABLE: BigQuery テーブル テンプレート。例: {_metadata_schema}_{_metadata_table}_log

    次のステップ