Google 提供のストリーミング テンプレート

Google はオープンソースの Cloud Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページではストリーミング テンプレートについて説明しています。

Cloud Pub/Sub Subscription to BigQuery

Cloud Pub/Sub Subscription to BigQuery テンプレートは、Cloud Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Cloud Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Cloud Pub/Sub メッセージは JSON 形式でなければなりません。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、データ型が文字列に指定された k1k2 という名前の 2 つの列からなる BigQuery テーブルに挿入できます。
  • パイプラインの実行前に出力テーブルが存在する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み取り元の Cloud Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Cloud Pub/Sub Subscription to BigQuery テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub Subscription to BigQuery テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_SUBSCRIPTION_NAME は Cloud Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_SUBSCRIPTION_NAME は Cloud Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub Topic to BigQuery

Cloud Pub/Sub Topic to BigQuery テンプレートは、Cloud Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Cloud Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Cloud Pub/Sub メッセージは JSON 形式でなければなりません。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、データ型が文字列に指定された k1k2 という名前の 2 つの列からなる BigQuery テーブルに挿入できます。
  • パイプラインの実行前に出力テーブルが存在する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Cloud Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Cloud Pub/Sub Topic to BigQuery テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub Topic to BigQuery テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、使用する Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、使用する Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Pub/Sub

Cloud Pub/Sub to Cloud Pub/Sub テンプレートは、Cloud Pub/Sub サブスクリプションからメッセージを読み取り、Cloud Pub/Sub トピックに書き込むストリーミング パイプラインです。Cloud Pub/Sub トピックに書き込むメッセージをフィルタリングするためのメッセージ属性キーと値をオプションとして指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しつつ、Cloud Pub/Sub サブスクリプションから別の Cloud Pub/Sub トピックにメッセージをコピーすることができます。

このパイプラインの要件:

  • 実行前にコピー元の Cloud Pub/Sub サブスクリプションが存在している必要があります。
  • 実行前にコピー先の Cloud Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Cloud Pub/Sub サブスクリプション。たとえば、projects/<project-id>/subscriptions/<subscription-name> などです。
outputTopic 出力の書き込み先の Cloud Pub/Sub トピック。たとえば、projects/<project-id>/topics/<topic-name> などです。
filterKey (オプション)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (オプション)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトではヌルの filterValue が使用されます。

Cloud Pub/Sub to Cloud Pub/Sub テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub to Cloud Pub/Sub テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_SUBSCRIPTION_NAME は、Cloud Pub/Sub サブスクリプションの名前に置き換えます。
  • YOUR_TOPIC_NAME は、Cloud Pub/Sub トピックの名前に置き換えます。
  • FILTER_KEY は、メッセージのフィルタリングに使用する属性キーの名前に置き換えます。
  • FILTER_VALUE は、属性値に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_SUBSCRIPTION_NAME は、Cloud Pub/Sub サブスクリプションの名前に置き換えます。
  • YOUR_TOPIC_NAME は、Cloud Pub/Sub トピックの名前に置き換えます。
  • FILTER_KEY は、メッセージのフィルタリングに使用する属性キーの名前に置き換えます。
  • FILTER_VALUE は、属性値に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Storage Avro

Cloud Pub/Sub to Cloud Storage Avro テンプレートは、Cloud Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。このパイプラインは、ウィンドウ処理された書き込みを実行するために使用されるオプションのユーザー指定ウィンドウ期間をサポートしています。

このパイプラインの要件:

  • パイプラインの実行前に入力 Cloud Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを消費するために購読する Cloud Pub/Sub トピック。トピックの名前は、projects/<project-id>/topics/<topic-name> という形式にしてください。
outputDirectory 出力の Avro ファイルがアーカイブされる出力ディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイルのためのディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
outputFilenamePrefix (オプション)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (オプション)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (オプション)出力ファイルのシャード テンプレート。文字「S」または「N」の繰り返しシーケンスで指定します(例: SSS-NNN)。これらはそれぞれシャード番号またはシャード数に置き換えられます。このパラメータを省略した場合のデフォルトのテンプレート フォーマットは「W-P-SS-of-NN」です。
numShards (オプション)書き込み時に生成される出力シャードの最大数。シャードのデフォルトの最大数は 1 です。
windowDuration (オプション)データの書き込みが行われるウィンドウ期間。デフォルトは 5m です。許容されるフォーマットは、Ns(秒の場合、5s など)、Nm(分の場合、12m など)、Nh(時間の場合、2h など)です。

Cloud Pub/Sub to Cloud Storage Avro テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub to Cloud Storage Avro テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。
  • FILENAME_PREFIX は、使用する出力ファイル名接頭辞に置き換えます。
  • FILENAME_SUFFIX は、使用する出力ファイル名接尾辞に置き換えます。
  • SHARD_TEMPLATE は、使用する出力シャード テンプレートに置き換えます。
  • NUM_SHARDS は、出力シャードの数に置き換えます。
  • WINDOW_DURATION は、出力ウィンドウ期間に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Avro \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
avroTempDirectory=gs://YOUR_BUCKET_NAME/temp/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
windowDuration=WINDOW_DURATION

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。
  • FILENAME_PREFIX は、使用する出力ファイル名接頭辞に置き換えます。
  • FILENAME_SUFFIX は、使用する出力ファイル名接尾辞に置き換えます。
  • SHARD_TEMPLATE は、使用する出力シャード テンプレートに置き換えます。
  • NUM_SHARDS は、出力シャードの数に置き換えます。
  • WINDOW_DURATION は、出力ウィンドウ期間に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "avroTempDirectory": "gs://YOUR_BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
       "windowDuration": "WINDOW_DURATION"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Storage Text

Cloud Pub/Sub to Cloud Storage Text テンプレートは、Cloud Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、後で使用するデータを Cloud Pub/Sub にすばやく保存できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Cloud Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式でなければなりません。
  • トピックに公開するメッセージに改行が含まれていてはなりません。出力ファイルでは、1 つの Cloud Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Cloud Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にする必要があります。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。たとえば、gs://bucket-name/path/ とします。この値はスラッシュで終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。たとえば、output- とします。
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに保存されます。outputShardTemplate のデフォルト値は W-P-SS-of-NN です。W はウィンドウの日付範囲、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN の部分は 00-of-01 になります。

Cloud Pub/Sub to Cloud Storage Text テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub to Cloud Storage Text テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、使用する Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、使用する Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_BUCKET_NAME は、Cloud Storage バケットの名前に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to BigQuery (Stream)

Cloud Storage Text to BigQuery (Stream) パイプラインは、Cloud Storage に格納されているテキスト ファイルをストリーミングし、指定された JavaScript ユーザー定義関数(UDF)を使用して変換し、結果を BigQuery に出力するストリーミング パイプラインです。

このパイプラインの要件:

  • 出力テーブルを記述する JSON 形式の BigQuery スキーマ ファイルを作成します。
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath JavaScriptt UDF の Cloud Storage の場所。たとえば、gs://my_bucket/my_function.js などです。
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。たとえば、gs://path/to/my/schema.json などです。
javascriptTextTransformFunctionName UDF として呼び出す JavaScript 関数の名前。たとえば、transform などです。
outputTable 完全修飾 BigQuery テーブル。たとえば、my-project:dataset.table などです。
inputFilePattern 処理するテキストの Cloud Storage の場所。たとえば、gs://my-bucket/my-files/text.txt などです。
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。たとえば、gs://my-bucket/my-files/temp_dir などです。
outputDeadletterTable メッセージ テーブルが出力テーブル(「Deadletter」テーブル)に到達できませんでした。たとえば、my-project:dataset.my-deadletter-table などです。存在しない場合は、パイプラインの実行中に作成されます。指定されていない場合は、代わりに <outputTableSpec>_error_records が使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Storage Text to BigQuery テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • PATH_TO_BIGQUERY_SCHEMA_JSON は、スキーマ定義が格納された JSON ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、使用する JavaScript コードが格納された .js ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_YOUR_TEXT_DATA は、使用するテキスト データセットへの Cloud Storage パスに置き換えます。
  • BIGQUERY_TABLE は、実際の BigQuery テーブルの名前に置き換えます。
  • BIGQUERY_DEADLETTER_TABLE は、BigQuery deadletter テーブル名に置き換えます。
  • PATH_TO_TEMP_DIR_ON_GCS は、一時ディレクトリへの Cloud Storage パスに置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_DEADLETTER_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_JAVASCRIPT_FUNCTION は、UDF の名前に置き換えます。
  • PATH_TO_BIGQUERY_SCHEMA_JSON は、スキーマ定義が格納された JSON ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE は、使用する JavaScript コードが格納された .js ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_YOUR_TEXT_DATA は、使用するテキスト データセットへの Cloud Storage パスに置き換えます。
  • BIGQUERY_TABLE は、実際の BigQuery テーブルの名前に置き換えます。
  • BIGQUERY_DEADLETTER_TABLE は、BigQuery deadletter テーブル名に置き換えます。
  • PATH_TO_TEMP_DIR_ON_GCS は、一時ディレクトリへの Cloud Storage パスに置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_YOUR_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_DEADLETTER_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to Cloud Pub/Sub (Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Cloud Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Cloud Pub/Sub トピックに公開します。また、Cloud Pub/Sub でデータを再生することもできます。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Cloud Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Cloud Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。たとえば、gs://bucket-name/files/*.json などです。
outputTopic 書き込み先の Cloud Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にする必要があります。

Cloud Storage Text to Cloud Pub/Sub (Stream) テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Storage Text to Cloud Pub/Sub (Stream) テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、使用する Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_BUCKET_NAME は、使用する Cloud Storage バケットの名前に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_TOPIC_NAME は、使用する Cloud Pub/Sub トピックの名前に置き換えます。
  • YOUR_BUCKET_NAME は、使用する Cloud Storage バケットの名前に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream)

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、指定された BigQuery テーブルに匿名化されたデータを書き込むパイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて検査が不要な構造化データを匿名化したりできます。

このパイプラインの要件:

  • トークン化する入力データが存在している必要があります
  • Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
  • BigQuery データセットが存在している必要があります

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 入力データレコードを読み込む csv ファイル。ワイルドカードも使用できます。たとえば、gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv などです。
dlpProjectId Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。たとえば、my_dlp_api_project などです。
deidentifyTemplateName API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。たとえば、projects/my_project/deidentifyTemplates/100 などです。
datasetName トークン化された結果を送信するための BigQuery データセット。
batchSize 検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。csv ファイルの場合、batchSize はバッチ内の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。
inspectTemplateName (省略可)API リクエストに使用する Cloud DLP 検査テンプレート。パターン projects/{template_project_id}/identifyTemplates/{idTemplateId} で指定します。たとえば、projects/my_project/identifyTemplates/100 などです。

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートの実行

Console

Google Cloud Platform Console から実行する
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールから実行する

注: gcloud コマンドライン ツールを使用してテンプレートを実行するには、Cloud SDK バージョン 138.0.0 以降が必要です。

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID は、テンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID は、Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_INPUT_DATA は、入力ファイルパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE は、Cloud DLP DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME は、BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE は、Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE は、バッチサイズ(csv の場合は API ごとの行数)に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --parameters \
inputFilePattern=YOUR_INPUT_DATA,\
dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
datasetName=YOUR_DATASET,\
batchSize=BATCH_SIZE_VALUE

API

REST API から実行する

このテンプレートの実行時は、テンプレートへの次のような Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID は、テンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID は、Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名を有効にするには、正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致させる必要があります。
  • YOUR_INPUT_DATA は、入力ファイルパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE は、Cloud DLP DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME は、BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE は、Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE は、バッチサイズ(csv の場合は API ごとの行数)に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
      "inputFilePattern":YOUR_INPUT_DATA,
      "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
      "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
      "datasetName": "YOUR_DATASET",
      "batchSize": "BATCH_SIZE_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。