Streaming Data Generator to Pub/Sub, BigQuery, and Cloud Storage テンプレート

このストリーミング データ生成ツール テンプレートは、ユーザーが指定したスキーマに基づいて、指定されたレートで無制限または固定数の合成レコードまたはメッセージを生成するために使用されます。対応している宛先には、Pub/Sub トピック、BigQuery テーブル、Cloud Storage バケットがあります。

次のようなユースケースが考えられます。

  • Pub/Sub トピックへの大規模でリアルタイムのイベント公開をシミュレーションし、公開されたイベントを処理するために必要な受信者の数と規模を測定して判断します。
  • パフォーマンス ベンチマークを評価する、または概念実証として機能するには、BigQuery テーブルまたは Cloud Storage バケットに合成データを生成します。

サポートされているシンクとエンコード形式

次の表は、このテンプレートでサポートされるシンクおよびエンコード形式を示したものです。
JSON Avro Parquet
Pub/Sub ×
BigQuery いいえ ×
Cloud Storage はい

パイプラインの要件

  • ワーカー サービス アカウントを使用するには、Dataflow ワーカー(roles/dataflow.worker)にロールが割り当てられている必要があります。詳細については、IAM の概要をご覧ください。
  • 生成されたデータの JSON テンプレートを含むスキーマ ファイルを作成します。このテンプレートは JSON データ生成ツール ライブラリを使用しているため、スキーマの各フィールドにさまざまな faker 関数を指定できます。詳細については、json-data-generator ドキュメントをご覧ください。

    次に例を示します。

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • スキーマ ファイルを Cloud Storage バケットにアップロードします。
  • 実行する前に出力ターゲットが存在している必要があります。ターゲットは、シンクタイプに応じて、Pub/Sub トピック、BigQuery テーブル、Cloud Storage バケットのいずれかである必要があります。
  • 出力エンコードが Avro または Parquet の場合は、Avro スキーマ ファイルを作成し、Cloud Storage の場所に保存します。
  • 目的の宛先に応じて、ワーカー サービス アカウントに追加の IAM ロールを割り当てます。
    宛先 さらに必要な IAM ロール 適用先のリソース
    Pub/Sub Pub/Sub パブリッシャー(roles/pubsub.publisher
    (詳細については、IAM による Pub/Sub のアクセス制御をご覧ください)
    Pub/Sub トピック
    BigQuery BigQuery データ編集者(roles/bigquery.dataEditor
    (詳細については、IAM による BigQuery のアクセス制御をご覧ください)
    BigQuery データセット
    Cloud Storage Cloud Storage オブジェクト管理者(roles/storage.objectAdmin
    (詳細については、IAM による Cloud Storage のアクセス制御をご覧ください)
    Cloud Storage バケット

テンプレートのパラメータ

パラメータ 説明
schemaLocation スキーマ ファイルの場所。例: gs://mybucket/filename.json
qps 1 秒あたりにパブリッシュされるメッセージ数。例: 100
sinkType (省略可)出力シンクのタイプ。指定可能な値は PUBSUBBIGQUERYGCS です。デフォルトは PUBSUB です。
outputType (省略可)出力エンコード タイプ。指定可能な値は JSONAVROPARQUET です。デフォルトは JSON です。
avroSchemaLocation (省略可)AVRO スキーマ ファイルの場所。outputType が AVRO または PARQUET の場合は必須。例: gs://mybucket/filename.avsc
topic (省略可)パイプラインがデータを公開する Pub/Sub トピックの名前。sinkType が Pub/Sub の場合は必須。例: projects/my-project-id/topics/my-topic-id
outputTableSpec (省略可)出力 BigQuery テーブルの名前。sinkType が BigQuery の場合は必須。例: my-project-ID:my_dataset_name.my-table-name
writeDisposition (省略可)BigQuery の書き込み処理。指定可能な値は WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE です。デフォルトは WRITE_APPEND です。
outputDeadletterTable (省略可)失敗したレコードを格納する出力 BigQuery テーブルの名前。指定されていない場合、パイプラインは実行中に {output_table_name}_error_records という名前のテーブルを作成します。例: my-project-ID:my_dataset_name.my-table-name
outputDirectory (省略可)出力される Cloud Storage の場所のパス。sinkType が Cloud Storage の場合は必須。例: gs://mybucket/pathprefix/
outputFilenamePrefix (省略可)Cloud Storage に書き込まれる出力ファイルのファイル名の接頭辞。デフォルトは output- です。
windowDuration (省略可)出力が Cloud Storage に書き込まれる時間間隔。デフォルトは 1m(つまり 1 分)です。
numShards (省略可)出力シャードの最大数。sinkType が Cloud Storage の場合に必須で、1 以上の数値に設定する必要があります。
messagesLimit (省略可)出力メッセージの最大数。デフォルトは 0 で、制限がないことを示します。
autoscalingAlgorithm (省略可)ワーカーの自動スケーリングに使用されるアルゴリズム。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。
maxNumWorkers (省略可)ワーカーマシンの最大数。例: 10

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Streaming Data Generator template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒あたりにパブリッシュされるメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/my-project-id/topics/my-topic-id

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒あたりにパブリッシュされるメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/my-project-id/topics/my-topic-id

次のステップ