Pub/Sub Proto から BigQuery への Python UDF を含むテンプレート

Pub/Sub proto to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに proto データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

データを変換するための Python ユーザー定義関数(UDF)を指定できます。UDF の実行中のエラーは、個別の Pub/Sub トピック、または BigQuery エラーと同じ未処理のトピックに送信できます。

パイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Proto レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 出力 Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。
  • BigQuery テーブルが存在する場合は、createDisposition 値にかかわらず、proto データに一致するスキーマが必要です。

テンプレートのパラメータ

パラメータ 説明
protoSchemaPath 自己完結型の proto スキーマ ファイルの Cloud Storage の場所。例: gs://path/to/my/file.pbこのファイルは、protoc コマンドの --descriptor_set_out フラグを使用して生成できます。--include_imports フラグを使用すると、ファイルが自己完結することを保証できます。
fullMessageName proto メッセージの完全な名前。たとえば、package.name.MessageName の場合、package.name は、java_package ステートメントではなく、package ステートメントに指定された値です。
inputSubscription 読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription>
outputTopic 未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 出力テーブルの場所。例: my-project:my_dataset.my_tablecreateDisposition の指定によっては、入力スキーマ ファイルを使用して出力テーブルが自動的に作成されます。
preserveProtoFieldNames 省略可: JSON で元の Proto フィールド名を保持する場合は true を指定します。false にすると、標準の JSON 名を使用されます。たとえば、false と指定すると field_namefieldName に変更されます。(デフォルト: false
bigQueryTableSchemaPath 省略可: BigQuery スキーマパスへの Cloud Storage パス。たとえば、gs://path/to/my/schema.json のようにします。指定されていない場合、スキーマは Proto スキーマから推測されます。
pythonExternalTextTransformGcsPath 省略可: 使用するユーザー定義関数(UDF)を定義する Python コードファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 省略可: 使用する Python ユーザー定義関数(UDF)の名前。
udfOutputTopic 省略可: UDF エラーを保存する Pub/Sub トピック。たとえば、projects/<project-id>/topics/<topic-name> です。指定されていない場合、UDF エラーは outputTopic と同じトピックに送信されます。
writeDisposition 省略可: BigQuery WriteDispositionWRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE など。デフォルト: WRITE_APPEND
createDisposition 省略可: BigQuery CreateDisposition。例: CREATE_IF_NEEDEDCREATE_NEVERデフォルト: CREATE_IF_NEEDED
useStorageWriteApi 省略可: true の場合、パイプラインは BigQuery Storage Write API を使用します。デフォルト値は false です。詳細については、Storage Write API の使用をご覧ください。
useStorageWriteApiAtLeastOnce 省略可: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクスを使用するには、このパラメータを true に設定します。1 回限りのセマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
numStorageWriteApiStreams 省略可: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue で、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
storageWriteApiTriggeringFrequencySec 省略可: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue で、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: JSON 文字列としてシリアル化された Pub/Sub メッセージ データ フィールド。
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Proto to BigQuery with Python UDF template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    次のように置き換えます。

    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
    • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
    • BIGQUERY_TABLE: BigQuery 出力テーブル名
    • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
    • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
    • BIGQUERY_TABLE: BigQuery 出力テーブル名
    • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

    次のステップ