Google 提供のユーティリティ テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、Dataflow テンプレートをご覧ください。Google が提供するテンプレートのリストについては、Google 提供のテンプレートの概要をご覧ください。

このガイドでは、ユーティリティ テンプレートについて説明します。

File Format Conversion(Avro、Parquet、CSV)

File Format Conversion テンプレートは、Cloud Storage に格納されたファイルをサポートされている形式から別の形式に変換するバッチ パイプラインです。

次の形式変換がサポートされています。

  • CSV から Avro
  • CSV から Parquet
  • Avro から Parquet
  • Parquet から Avro

このパイプラインの要件:

  • パイプラインを実行する前に、出力先の Cloud Storage バケットが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputFileFormat 入力ファイルの形式。[csv, avro, parquet] のいずれかにする必要があります。
outputFileFormat 出力ファイルの形式。[avro, parquet] のいずれかにする必要があります。
inputFileSpec 入力ファイルの Cloud Storage パスのパターン。例: gs://bucket-name/path/*.csv
outputBucket 出力ファイルを書き込む Cloud Storage フォルダ。このパスはスラッシュで終わる必要があります。例: gs://bucket-name/output/
schema Avro スキーマ ファイルへの Cloud Storage パス。例: gs://bucket-name/schema/my-schema.avsc
containsHeaders (省略可)入力 CSV ファイルにはヘッダー レコード(true/false)が含まれています。デフォルト値は false です。CSV ファイルを読み込む場合にのみ必要です。
csvFormat (省略可)レコードの解析に使用する CSV 形式の仕様。デフォルト値は Default です。詳細については、Apache Commons CSV 形式をご覧ください。
delimiter (省略可)入力 CSV ファイルで使用されるフィールド区切り文字。
outputFilePrefix (省略可)出力ファイルの接頭辞。デフォルト値は output です。
numShards (省略可)出力ファイルのシャード数。

File Format Conversion テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Convert file formats template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行するクラウド プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • INPUT_FORMAT: 入力ファイルの形式。[csv, avro, parquet] のいずれかにする必要があります。
  • OUTPUT_FORMAT: 出力ファイルの形式。[avro, parquet] のいずれかにする必要があります。
  • INPUT_FILES: 入力ファイルのパスパターン
  • OUTPUT_FOLDER: 出力ファイルを格納する Cloud Storage フォルダ
  • SCHEMA: Avro スキーマ ファイルのパス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行するクラウド プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • INPUT_FORMAT: 入力ファイルの形式。[csv, avro, parquet] のいずれかにする必要があります。
  • OUTPUT_FORMAT: 出力ファイルの形式。[avro, parquet] のいずれかにする必要があります。
  • INPUT_FILES: 入力ファイルのパスパターン
  • OUTPUT_FOLDER: 出力ファイルを格納する Cloud Storage フォルダ
  • SCHEMA: Avro スキーマ ファイルのパス

Bulk Compress Cloud Storage Files

Bulk Compress Cloud Storage Files テンプレートは、Cloud Storage 上のファイルを指定した場所に圧縮するバッチ パイプラインです。このテンプレートは、定期的なアーカイブ プロセスの一環として大量のファイルを圧縮する必要がある場合に役立ちます。サポートされている圧縮モードは、BZIP2DEFLATEGZIP です。出力先の場所に出力されるファイルは、元のファイル名の命名スキーマに従って命名され、ファイル名の末尾に圧縮モードの拡張子が付加されます。付加される拡張子は、.bzip2.deflate.gz のいずれかです。

圧縮処理中に発生したエラーは、CSV 形式(ファイル名, エラー メッセージ)のエラーファイルに出力されます。パイプラインの実行中にエラーが発生しなくてもエラーファイルは作成されますが、ファイル内にエラーレコードはありません。

このパイプラインの要件:

  • 圧縮形式は、BZIP2DEFLATEGZIP のいずれかにすること。
  • パイプラインの実行前に出力ディレクトリが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/uncompressed/*.txt
outputDirectory 出力を書き込む場所。例: gs://bucket-name/compressed/
outputFailureFile 圧縮処理中に発生したエラーの書き込みに使用されるエラーログ出力ファイル。たとえば、gs://bucket-name/compressed/failed.csv とします。エラーが発生しなくてもファイルは作成されますが、その中身は空です。このファイルの内容は CSV 形式(ファイル名, エラー)であり、圧縮に失敗したファイルごとに 1 行が使用されます。
compression 一致するファイルを圧縮するために使用する圧縮アルゴリズム。BZIP2DEFLATEGZIP のいずれかにする必要があります。

Bulk Compress Cloud Storage Files テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Bulk Compress Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • BUCKET_NAME: Cloud Storage バケットの名前
  • COMPRESSION: 任意の圧縮アルゴリズム

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行するクラウド プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • BUCKET_NAME: Cloud Storage バケットの名前
  • COMPRESSION: 任意の圧縮アルゴリズム

Bulk Decompress Cloud Storage Files

Bulk Decompress Cloud Storage Files テンプレートは、Cloud Storage 上のファイルを指定された場所に解凍するバッチ パイプラインです。移行中はネットワーク帯域幅のコストを最小限に抑えるために圧縮データを使用する一方、移行が完了したら、分析処理速度を最大限にするために非圧縮データを処理する場合に、この機能が役立ちます。このパイプラインは、1 回の実行時に自動的に複数の圧縮モードを同時に処理し、ファイル拡張子(.bzip2.deflate.gz.zip)に基づいて使用する解凍モードを判断します。

このパイプラインの要件:

  • 解凍するファイルの形式は、Bzip2DeflateGzipZip のいずれかでなければなりません。
  • パイプラインの実行前に出力ディレクトリが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/compressed/*.gz
outputDirectory 出力を書き込む場所。例: gs://bucket-name/decompressed
outputFailureFile 解凍処理中に発生したエラーを書き込むために使用するエラーログ出力ファイル。たとえば、gs://bucket-name/decompressed/failed.csv とします。エラーが発生しなくてもファイルは作成されますが、その中身は空です。このファイルの内容は CSV 形式(ファイル名, エラー)であり、解凍に失敗したファイルごとに 1 行が使用されます。

Bulk Decompress Cloud Storage Files テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Bulk Decompress Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • BUCKET_NAME: Cloud Storage バケットの名前
  • OUTPUT_FAILURE_FILE_PATH: エラー情報を含むファイルへの任意のパス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行するクラウド プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • BUCKET_NAME: Cloud Storage バケットの名前
  • OUTPUT_FAILURE_FILE_PATH: エラー情報を含むファイルへの任意のパス

Datastore Bulk Delete

Datastore Bulk Delete テンプレートは、指定の GQL クエリを使用して Datastore からエンティティを読み込み、選択したターゲット プロジェクト内のすべての一致エンティティを削除するパイプラインです。このパイプラインはオプションで JSON でエンコードされた Datastore エンティティを JavaScript UDF に渡すことができます。これを使用すると、null 値を返すことでエンティティを除外できます。

このパイプラインの要件:

  • テンプレートを実行する前に、Datastore をプロジェクトで設定する必要があります。
  • 読み取る Datastore インスタンスと削除する Datastore インスタンスが異なる場合は、Dataflow コントローラ サービス アカウントに、あるインスタンスから読み取り、別のインスタンスから削除する権限が必要です。

テンプレートのパラメータ

パラメータ 説明
datastoreReadGqlQuery 削除対象としてマッチするエンティティを指定する GQL クエリ。キーのみのクエリを使用すると、パフォーマンスが向上する可能性があります。たとえば、「SELECT __key__ FROM MyKind」です。
datastoreReadProjectId GQL クエリで一致するエンティティを読み取る Datastore インスタンスの GCP プロジェクト ID。
datastoreDeleteProjectId 一致するエンティティを削除する Datastore インスタンスの GCP プロジェクト ID。Datastore インスタンス内で読み取りと削除を行う場合は、datastoreReadProjectId と同じでもかまいません。
datastoreReadNamespace (省略可)リクエストされるエンティティの名前空間。デフォルトの名前空間には「""」を設定します。
datastoreHintNumWorkers (省略可)Datastore のランプアップ スロットリング ステップで予想されるワーカー数のヒント。デフォルトは、500 です。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合は、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。 この関数で特定の Datastore エンティティに関して未定義の値や null が返される場合、そのエンティティは削除されません。

Datastore Bulk Delete テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Bulk Delete Entities in Datastore template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • GQL_QUERY: 削除するエンティティを照合するために使用するクエリ
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: Datastore インスタンスのプロジェクト ID。この例では、同じ Datastore インスタンスからの読み取りと削除の両方を行います。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行するクラウド プロジェクト ID
  • JOB_NAME: 任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • GQL_QUERY: 削除するエンティティを照合するために使用するクエリ
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: Datastore インスタンスのプロジェクト ID。この例では、同じ Datastore インスタンスからの読み取りと削除の両方を行います。

Pub/Sub、BigQuery、Cloud Storage へのストリーミング データ生成ツール

ストリーミング データ生成ツール テンプレートは、ユーザーが指定したスキーマに基づいて、指定されたレートで無限または固定数の合成レコードまたはメッセージを生成するために使用されます。対応している宛先には、Pub/Sub トピック、BigQuery テーブル、Cloud Storage バケットがあります。

次のようなユースケースが考えられます。

  • Pub/Sub トピックへの大規模でリアルタイムのイベント公開をシミュレーションし、公開されたイベントを処理するために必要な受信者の数と規模を測定して判断します。
  • パフォーマンス ベンチマークを評価する、または概念実証として機能するには、BigQuery テーブルまたは Cloud Storage バケットに合成データを生成します。

サポートされているシンクとエンコード形式

次の表は、このテンプレートでサポートされるシンクおよびエンコード形式を示したものです。
JSON Avro Parquet
Pub/Sub ×
BigQuery × ×
Cloud Storage

パイプラインで使用される JSON Data Generator ライブラリを使用すると、各スキーマ フィールドでさまざまな faker 関数を使用できます。faker 関数とスキーマ形式の詳細については、json-data-generator のドキュメントをご覧ください。

このパイプラインの要件:

  • メッセージ スキーマ ファイルを作成し、このファイルを Cloud Storage の場所に保存します。
  • 実行する前に出力ターゲットが存在している必要があります。ターゲットは、シンクタイプに応じて、Pub/Sub トピック、BigQuery テーブル、Cloud Storage バケットのいずれかである必要があります。
  • 出力エンコードが Avro または Parquet の場合は、Avro スキーマ ファイルを作成し、Cloud Storage の場所に保存します。

テンプレートのパラメータ

パラメータ 説明
schemaLocation スキーマ ファイルの場所。例: gs://mybucket/filename.json
qps 1 秒あたりにパブリッシュされるメッセージ数。例: 100
sinkType (省略可)出力シンクのタイプ。指定可能な値は PUBSUBBIGQUERYGCS です。デフォルトは PUBSUB です。
outputType (省略可)出力エンコード タイプ。指定可能な値は JSONAVROPARQUET です。デフォルトは JSON です。
avroSchemaLocation (省略可)AVRO スキーマ ファイルの場所。outputType が AVRO または PARQUET の場合は必須。例: gs://mybucket/filename.avsc
topic (省略可)パイプラインがデータを公開する Pub/Sub トピックの名前。sinkType が Pub/Sub の場合は必須。例: projects/my-project-ID/topics/my-topic-ID
outputTableSpec (省略可)出力 BigQuery テーブルの名前。sinkType が BigQuery の場合は必須。例: my-project-ID:my_dataset_name.my-table-name
writeDisposition (省略可)BigQuery の書き込み処理。指定可能な値は WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE です。デフォルトは WRITE_APPEND です。
outputDeadletterTable (省略可)失敗したレコードを格納する出力 BigQuery テーブルの名前。指定されていない場合、パイプラインは実行中に {output_table_name}_error_records という名前のテーブルを作成します。例: my-project-ID:my_dataset_name.my-table-name
outputDirectory (省略可)出力される Cloud Storage の場所のパス。sinkType が Cloud Storage の場合は必須。例: gs://mybucket/pathprefix/
outputFilenamePrefix (省略可)Cloud Storage に書き込まれる出力ファイルのファイル名の接頭辞。デフォルトは output- です。
windowDuration (省略可)出力が Cloud Storage に書き込まれる時間間隔。デフォルトは 1m(つまり 1 分)です。
numShards (省略可)出力シャードの最大数。sinkType が Cloud Storage の場合に必須で、1 以上の数値に設定する必要があります。
messagesLimit (省略可)出力メッセージの最大数。デフォルトは 0 で、制限がないことを示します。
autoscalingAlgorithm (省略可)ワーカーの自動スケーリングに使用されるアルゴリズム。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。
maxNumWorkers (省略可)ワーカーマシンの最大数。例: 10

ストリーミング データ生成ツール テンプレートの実行

Console

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Streaming Data Generator template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行するクラウド プロジェクト ID
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • JOB_NAME: 任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒あたりにパブリッシュされるメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/my-project-ID/topics/my-topic-ID

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行するクラウド プロジェクト ID
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイントus-central1 など)
  • JOB_NAME: 任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    次の値を使用できます。

    • latest: 最新バージョンのテンプレートを使用します。バケットは、日付のない親フォルダ(gs://dataflow-templates/latest/にあります。
    • 2021-09-20-00_RC00 などバージョン名を使用します。このテンプレートは、バケット内の日付付きの親フォルダ内にネストしてあります。 gs://dataflow-templates/
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒あたりにパブリッシュされるメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/my-project-ID/topics/my-topic-ID