Google 提供のユーティリティ テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページでは、ユーティリティ テンプレートについて説明します。

Bulk Compress Cloud Storage Files

Bulk Compress Cloud Storage Files テンプレートは、Cloud Storage 上のファイルを指定した場所に圧縮するバッチ パイプラインです。このテンプレートは、定期的なアーカイブ プロセスの一環として大量のファイルを圧縮する必要がある場合に役立ちます。サポートされている圧縮モードは、BZIP2DEFLATEGZIP です。出力先の場所に出力されるファイルは、元のファイル名の命名スキーマに従って命名され、ファイル名の末尾に圧縮モードの拡張子が付加されます。付加される拡張子は、.bzip2.deflate.gz のいずれかです。

圧縮処理中に発生したエラーは、CSV 形式(ファイル名, エラー メッセージ)のエラーファイルに出力されます。パイプラインの実行中にエラーが発生しなくてもエラーファイルは作成されますが、ファイル内にエラーレコードはありません。

このパイプラインの要件:

  • 圧縮形式は、BZIP2DEFLATEGZIP のいずれかにすること。
  • パイプラインの実行前に出力ディレクトリが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/uncompressed/*.txt
outputDirectory 出力を書き込む場所。例: gs://bucket-name/compressed/
outputFailureFile 圧縮処理中に発生したエラーの書き込みに使用されるエラーログ出力ファイル。たとえば、gs://bucket-name/compressed/failed.csv とします。エラーが発生しなくてもファイルは作成されますが、その中身は空です。このファイルの内容は CSV 形式(ファイル名, エラー)であり、圧縮に失敗したファイルごとに 1 行が使用されます。
compression 一致するファイルを圧縮するために使用する圧縮アルゴリズム。BZIP2DEFLATEGZIP のいずれかにする必要があります。

Bulk Compress Cloud Storage Files テンプレートの実行

Console

Google Cloud Console から実行する
  1. Cloud Console の [Dataflow] ページに移動します。
  2. Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Bulk Compress Cloud Storage Files template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールから実行する

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

以下を置き換えます。

  • JOB_NAME: 任意のジョブ名。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • COMPRESSION: 任意の圧縮アルゴリズム

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • COMPRESSION: 任意の圧縮アルゴリズム

Bulk Decompress Cloud Storage Files

Bulk Decompress Cloud Storage Files テンプレートは、Cloud Storage 上のファイルを指定された場所に解凍するバッチ パイプラインです。移行中はネットワーク帯域幅のコストを最小限に抑えるために圧縮データを使用する一方、移行が完了したら、分析処理速度を最大限にするために非圧縮データを処理する場合に、この機能が役立ちます。このパイプラインは、1 回の実行時に自動的に複数の圧縮モードを同時に処理し、ファイル拡張子(.bzip2.deflate.gz.zip)に基づいて使用する解凍モードを判断します。

このパイプラインの要件:

  • 解凍するファイルの形式は、Bzip2DeflateGzipZip のいずれかでなければなりません。
  • パイプラインの実行前に出力ディレクトリが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/compressed/*.gz
outputDirectory 出力を書き込む場所。例: gs://bucket-name/decompressed
outputFailureFile 解凍処理中に発生したエラーを書き込むために使用するエラーログ出力ファイル。たとえば、gs://bucket-name/decompressed/failed.csv とします。エラーが発生しなくてもファイルは作成されますが、その中身は空です。このファイルの内容は CSV 形式(ファイル名, エラー)であり、解凍に失敗したファイルごとに 1 行が使用されます。

Bulk Decompress Cloud Storage Files テンプレートの実行

Console

Google Cloud Console から実行する
  1. Cloud Console の [Dataflow] ページに移動します。
  2. Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Bulk Decompress Cloud Storage Files template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールから実行する

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

以下を置き換えます。

  • JOB_NAME: 任意のジョブ名。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • OUTPUT_FAILURE_FILE_PATH: エラー情報を含むファイルへの任意のパス

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • OUTPUT_FAILURE_FILE_PATH: エラー情報を含むファイルへの任意のパス

Datastore Bulk Delete

Datastore Bulk Delete テンプレートは、指定の GQL クエリを使用して Datastore からエンティティを読み込み、選択したターゲット プロジェクト内のすべての一致エンティティを削除するパイプラインです。このパイプラインはオプションで JSON でエンコードされた Datastore エンティティを JavaScript UDF に渡すことができます。これを使用すると、null 値を返すことでエンティティを除外できます。

このパイプラインの要件:

  • テンプレートを実行する前に、Datastore をプロジェクトで設定する必要があります。
  • 読み取る Datastore インスタンスと削除する Datastore インスタンスが異なる場合は、Dataflow コントローラ サービス アカウントに、あるインスタンスから読み取り、別のインスタンスから削除する権限が必要です。

テンプレートのパラメータ

パラメータ 説明
datastoreReadGqlQuery 削除対象としてマッチするエンティティを指定する GQL クエリ。たとえば、"SELECT * FROM MyKind" とします。
datastoreReadProjectId GQL クエリで一致するエンティティを読み取る Datastore インスタンスの GCP プロジェクト ID。
datastoreDeleteProjectId 一致するエンティティを削除する Datastore インスタンスの GCP プロジェクト ID。Datastore インスタンス内で読み取りと削除を行う場合は、datastoreReadProjectId と同じでもかまいません。
datastoreReadNamespace (省略可)リクエストされるエンティティの名前空間。デフォルトの名前空間には「""」を設定します。
javascriptTextTransformGcsPath (省略可)すべての JavaScript コードが格納された Cloud Storage のパス。たとえば、"gs://mybucket/mytransforms/*.js" とします。UDF を使用しない場合は、このフィールドを空白にします。
javascriptTextTransformFunctionName (省略可)呼び出される関数の名前。この関数で特定の Datastore エンティティに関して未定義の値や null が返される場合、そのエンティティは削除されません。"function myTransform(inJson) { ...dostuff...}" という JavaScript コードが存在する場合、関数名は "myTransform" です。UDF を使用しない場合は、このフィールドを空白にします。

Datastore Bulk Delete テンプレートの実行

Console

Google Cloud Console から実行する
  1. Cloud Console の [Dataflow] ページに移動します。
  2. Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Datastore Bulk Delete template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールから実行する

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

以下を置き換えます。

  • JOB_NAME: 任意のジョブ名。
  • GQL_QUERY: 削除するエンティティを照合するために使用するクエリ。
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: Datastore インスタンスのプロジェクト ID。この例では、同じ Datastore インスタンスからの読み取りと削除の両方を行います。

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • JOB_NAME: 任意のジョブ名。
  • GQL_QUERY: 削除するエンティティを照合するために使用するクエリ。
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: Datastore インスタンスのプロジェクト ID。この例では、同じ Datastore インスタンスからの読み取りと削除の両方を行います。

Pub/Sub / BigQuery / Cloud Storage へのストリーミング データ生成ツール

ストリーミング データ生成ツール テンプレートは、ユーザーが指定したスキーマに基づいて、指定されたレートで無限または固定数の合成レコードまたはメッセージを生成するために使用されます。対応している宛先には、Pub/Sub トピック、BigQuery テーブル、Cloud Storage バケットがあります。

次のようなユースケースが考えられます。

  • Pub/Sub トピックへの大規模でリアルタイムのイベント公開をシミュレーションし、公開されたイベントを処理するために必要な受信者の数と規模を測定して判断します。
  • パフォーマンス ベンチマークを評価するため、または概念実証として機能するには、BigQuery テーブルまたは Cloud Storage バケットに合成データを生成します。

サポートされているシンクとエンコード形式

次の表は、このテンプレートでサポートされるシンクとエンコード形式を示したものです。
JSON Avro Parquet
Pub/Sub ×
BigQuery × ×
Cloud Storage

パイプラインで使用される JSON Data Generator ライブラリを使用すると、各スキーマ フィールドでさまざまな faker 関数を使用できます。faker 関数とスキーマ形式の詳細については、json-data-generator のドキュメントをご覧ください。

このパイプラインの要件:

  • メッセージ スキーマ ファイルを作成し、このファイルを Cloud Storage の場所に保存します。
  • 実行前に出力ターゲットが存在する必要があります。ターゲットは、シンクタイプに応じて、Pub/Sub トピック、BigQuery テーブル、または Cloud Storage バケットである必要があります。
  • 出力エンコードが Avro または Parquet の場合は、Avro スキーマ ファイルを作成し、Cloud Storage の場所に保存します。

テンプレートのパラメータ

パラメータ 説明
schemaLocation スキーマ ファイルの場所。例: gs://mybucket/filename.json
qps 1 秒あたりにパブリッシュされるメッセージ数。例: 100
sinkType (省略可)出力シンクのタイプ。指定可能な値は、PUBSUBBIGQUERYGCS です。デフォルトは PUBSUB です。
outputType (省略可)出力エンコード タイプ。指定可能な値は、JSONAVROPARQUET です。デフォルトは JSON です。
avroSchemaLocation (省略可)AVRO スキーマ ファイルの場所。outputType が AVRO または PARQUET の場合は必須です。例: gs://mybucket/filename.avsc
topic (省略可)パイプラインがデータを公開する Pub/Sub トピックの名前。sinkType が Pub/Sub の場合は必須。例: projects/<project-id>/topics/<topic-name>
outputTableSpec (省略可)出力 BigQuery テーブルの名前。sinkType が BigQuery の場合は必須です。例: your-project:your-dataset.your-table-name
writeDisposition (省略可)BigQuery の書き込み処理。指定可能な値は WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE です。デフォルトは WRITE_APPEND です。
outputDeadletterTable (省略可)失敗したレコードを格納する出力 BigQuery テーブルの名前。指定されていない場合、パイプラインは実行中に {output_table_name}_error_records という名前のテーブルを作成します。例: your-project:your-dataset.your-table-name
outputDirectory (省略可)出力される Cloud Storage の場所のパス。sinkType が Cloud Storage の場合は必須です。例: gs://mybucket/pathprefix/
outputFilenamePrefix (省略可)Cloud Storage に書き込まれる出力ファイルのファイル名の接頭辞。デフォルトは output- です。
windowDuration (省略可)出力が Cloud Storage に書き込まれる時間間隔。デフォルトは 1m(つまり 1 分)です。
numShards (省略可)出力シャードの最大数。sinkType が Cloud Storage では必須です。1 以上の数値に設定する必要があります。
messagesLimit (省略可)出力メッセージの最大数。デフォルトは 0(無制限)です。
autoscalingAlgorithm (省略可)ワーカーの自動スケーリングに使用されるアルゴリズム。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。
maxNumWorkers (省略可)ワーカーマシンの最大数。例: 10

ストリーミング データ生成ツール テンプレートの実行

Console

Google Cloud Console からの実行
  1. Cloud Console の [Dataflow] ページに移動します。
  2. Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Streaming Data Generator template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 284.0.0 以降が必要です。

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

以下を置き換えます。

  • JOB_NAME: 任意のジョブ名。
  • PROJECT_ID: プロジェクト ID
  • REGION_NAME: Dataflow リージョン名(例: us-central1
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒間にパブリッシュするメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>

API

REST API からの実行

このテンプレートを実行するときは、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには認可が必要です。

POST  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}
  

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • LOCATION: Dataflow リージョン名(例: us-central1
  • JOB_NAME: 任意のジョブ名。
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒間にパブリッシュするメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>