Google 提供のユーティリティ テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページでは、ユーティリティ テンプレートについて説明します。

Bulk Compress Cloud Storage Files

Bulk Compress Cloud Storage Files テンプレートは、Cloud Storage 上のファイルを指定した場所に圧縮するバッチ パイプラインです。このテンプレートは、定期的なアーカイブ プロセスの一環として大量のファイルを圧縮する必要がある場合に役立ちます。サポートされている圧縮モードは、BZIP2DEFLATEGZIP です。出力先の場所に出力されるファイルは、元のファイル名の命名スキーマに従って命名され、ファイル名の末尾に圧縮モードの拡張子が付加されます。付加される拡張子は、.bzip2.deflate.gz のいずれかです。

圧縮処理中に発生したエラーは、CSV 形式(ファイル名, エラー メッセージ)のエラーファイルに出力されます。パイプラインの実行中にエラーが発生しなくてもエラーファイルは作成されますが、ファイル内にエラーレコードはありません。

このパイプラインの要件:

  • 圧縮形式は、BZIP2DEFLATEGZIP のいずれかにすること。
  • パイプラインの実行前に出力ディレクトリが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/uncompressed/*.txt
outputDirectory 出力を書き込む場所。例: gs://bucket-name/compressed/
outputFailureFile 圧縮処理中に発生したエラーの書き込みに使用されるエラーログ出力ファイル。たとえば、gs://bucket-name/compressed/failed.csv とします。エラーが発生しなくてもファイルは作成されますが、その中身は空です。このファイルの内容は CSV 形式(ファイル名, エラー)であり、圧縮に失敗したファイルごとに 1 行が使用されます。
compression 一致するファイルを圧縮するために使用する圧縮アルゴリズム。BZIP2DEFLATEGZIP のいずれかにする必要があります。

Bulk Compress Cloud Storage Files テンプレートの実行

Console

Google Cloud Console から実行する
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Bulk Compress Cloud Storage Files template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールから実行する

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
  • COMPRESSION は、任意の圧縮アルゴリズムに置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://YOUR_BUCKET_NAME/compressed,\
outputFailureFile=gs://YOUR_BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
  • COMPRESSION は、任意の圧縮アルゴリズムに置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/compressed",
       "outputFailureFile": "gs://YOUR_BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Bulk Decompress Cloud Storage Files

Bulk Decompress Cloud Storage Files テンプレートは、Cloud Storage 上のファイルを指定された場所に解凍するバッチ パイプラインです。移行中はネットワーク帯域幅のコストを最小限に抑えるために圧縮データを使用する一方、移行が完了したら、分析処理速度を最大限にするために非圧縮データを処理する場合に、この機能が役立ちます。このパイプラインは、1 回の実行時に自動的に複数の圧縮モードを同時に処理し、ファイル拡張子(.bzip2.deflate.gz.zip)に基づいて使用する解凍モードを判断します。

このパイプラインの要件:

  • 解凍するファイルの形式は、Bzip2DeflateGzipZip のいずれかでなければなりません。
  • パイプラインの実行前に出力ディレクトリが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/compressed/*.gz
outputDirectory 出力を書き込む場所。例: gs://bucket-name/decompressed
outputFailureFile 解凍処理中に発生したエラーを書き込むために使用するエラーログ出力ファイル。たとえば、gs://bucket-name/decompressed/failed.csv とします。エラーが発生しなくてもファイルは作成されますが、その中身は空です。このファイルの内容は CSV 形式(ファイル名, エラー)であり、解凍に失敗したファイルごとに 1 行が使用されます。

Bulk Decompress Cloud Storage Files テンプレートの実行

Console

Google Cloud Console から実行する
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Bulk Decompress Cloud Storage Files template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールから実行する

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
  • OUTPUT_FAILURE_FILE_PATH は、エラー情報が記載されるファイルへの任意のパスに置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://YOUR_BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
  • OUTPUT_FAILURE_FILE_PATH は、エラー情報が記載されるファイルへの任意のパスに置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Datastore Bulk Delete

Datastore Bulk Delete テンプレートは、指定の GQL クエリを使用して Datastore からエンティティを読み込み、選択したターゲット プロジェクト内のすべての一致エンティティを削除するパイプラインです。このパイプラインはオプションで JSON でエンコードされた Datastore エンティティを JavaScript UDF に渡すことができます。これを使用すると、null 値を返すことでエンティティを除外できます。

このパイプラインの要件:

  • テンプレートを実行する前に、Datastore をプロジェクトで設定する必要があります。
  • 読み取る Datastore インスタンスと削除する Datastore インスタンスが異なる場合は、Dataflow コントローラ サービス アカウントに、あるインスタンスから読み取り、別のインスタンスから削除する権限が必要です。

テンプレートのパラメータ

パラメータ 説明
datastoreReadGqlQuery 削除対象としてマッチするエンティティを指定する GQL クエリ。たとえば、"SELECT * FROM MyKind" とします。
datastoreReadProjectId GQL クエリで一致するエンティティを読み取る Datastore インスタンスの GCP プロジェクト ID。
datastoreDeleteProjectId 一致するエンティティを削除する Datastore インスタンスの GCP プロジェクト ID。Datastore インスタンス内で読み取りと削除を行う場合は、datastoreReadProjectId と同じでもかまいません。
datastoreReadNamespace (省略可)リクエストされるエンティティの名前空間。デフォルトの名前空間には「""」を設定します。
javascriptTextTransformGcsPath (オプション)すべての JavaScript コードが格納された Cloud Storage のパス。たとえば、"gs://mybucket/mytransforms/*.js" とします。UDF を使用しない場合は、このフィールドを空白にします。
javascriptTextTransformFunctionName (オプション)呼び出される関数の名前。この関数で特定の Datastore エンティティに関して未定義の値や null が返される場合、そのエンティティは削除されません。"function myTransform(inJson) { ...dostuff...}" という JavaScript コードが存在する場合、関数名は "myTransform" です。UDF を使用しない場合は、このフィールドを空白にします。

Datastore Bulk Delete テンプレートの実行

Console

Google Cloud Console から実行する
  1. Cloud Console の [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから the Datastore Bulk Delete template を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

gcloud

gcloud コマンドライン ツールから実行する

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

このサンプルの次の値は置き換える必要があります。

  • JOB_NAME は、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • GQL_QUERY は、削除するエンティティの照合に使用するクエリで置き換えます。
  • DATASTORE_READ_AND_DELETE_PROJECT_ID は、Datastore インスタンスのプロジェクト ID に置き換えます。このサンプルでは、同じ Datastore インスタンスで読み取りと削除が行われます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • JOB_NAME は、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • GQL_QUERY は、削除するエンティティの照合に使用するクエリで置き換えます。
  • DATASTORE_READ_AND_DELETE_PROJECT_ID は、Datastore インスタンスのプロジェクト ID に置き換えます。このサンプルでは、同じ Datastore インスタンスで読み取りと削除が行われます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "READ_PROJECT_ID",
       "datastoreDeleteProjectId": "DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}