Pub/Sub to BigQuery テンプレート

Pub/Sub to BigQuery テンプレートは、Pub/Sub から JSON 形式のメッセージを読み取り、BigQuery テーブルに書き込むストリーミング パイプラインです。必要に応じて、JavaScript で記述されたユーザー定義関数(UDF)を指定して、受信メッセージを処理できます。

パイプラインの要件

  • BigQuery テーブルが存在し、かつスキーマが存在している必要があります。
  • Pub/Sub メッセージ データで JSON 形式を使用するか、メッセージ データを JSON に変換する UDF を指定する必要があります。JSON データは BigQuery テーブル スキーマと一致している必要があります。たとえば、JSON ペイロードが {"k1":"v1", "k2":"v2"} としてフォーマットされている場合、BigQuery テーブルには k1k2 という名前の文字列の列が必要です。
  • inputSubscription パラメータまたは inputTopic パラメータを指定してください。両方を指定することはできません。

テンプレートのパラメータ

必須パラメータ

  • outputTableSpec: 書き込み先の BigQuery テーブル("PROJECT_ID:DATASET_NAME.TABLE_NAME" 形式)。

オプション パラメータ

  • inputTopic: 読み取り元の Pub/Sub トピック("projects/<PROJECT_ID>/topics/<TOPIC_NAME>" 形式)。
  • inputSubscription: 読み取り元の Pub/Sub サブスクリプション("projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>" 形式)。
  • outputDeadletterTable: 出力テーブルに到達できなかったメッセージに使用する BigQuery テーブル("PROJECT_ID:DATASET_NAME.TABLE_NAME" 形式)。テーブルが存在しない場合は、パイプラインの実行時に作成されます。このパラメータが指定されていない場合は、代わりに値 "OUTPUT_TABLE_SPEC_error_records" が使用されます。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。
  • javascriptTextTransformReloadIntervalMinutes: UDF を再読み込みする頻度を指定します(分単位)。値が 0 より大きい場合、Dataflow は Cloud Storage 内の UDF ファイルを定期的にチェックし、ファイルが変更された場合は UDF を再読み込みします。このパラメータを使用すると、パイプラインの実行中にジョブを再起動することなく、UDF を更新できます。値が 0 の場合、UDF の再読み込みは無効になります。デフォルト値は 0 です。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: JSON 文字列としてシリアル化された Pub/Sub メッセージ データ フィールド。
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub to BigQuery template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
    8. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    次のように置き換えます。

    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
    • TOPIC_NAME: Pub/Sub トピック名
    • DATASET: BigQuery データセット
    • TABLE_NAME: BigQuery テーブル名

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
    • TOPIC_NAME: Pub/Sub トピック名
    • DATASET: BigQuery データセット
    • TABLE_NAME: BigQuery テーブル名

    次のステップ