Google 提供のストリーミング テンプレート

Google はオープンソースの Cloud Dataflow テンプレートを提供しています。テンプレートに関する一般的な情報については、概要ページをご覧ください。Google が提供するすべてのテンプレートのリストについては、Google 提供のテンプレートの概要ページをご覧ください。

このページではストリーミング テンプレートについて説明しています。

Cloud Pub/Sub Subscription to BigQuery

Cloud Pub/Sub Subscription to BigQuery テンプレートは、Cloud Pub/Sub サブスクリプションから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Cloud Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Cloud Pub/Sub メッセージは JSON 形式でなければなりません。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 という名前の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 読み込まれる Cloud Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Cloud Pub/Sub Subscription to BigQuery テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub Subscription to BigQuery テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME を Cloud Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_DATASET を BigQuery データセットに置き換え、YOUR_TABLE_NAME を BigQuery テーブル名に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME を Cloud Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_DATASET を BigQuery データセットに置き換え、YOUR_TABLE_NAME を BigQuery テーブル名に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub Topic to BigQuery

Cloud Pub/Sub Topic to BigQuery テンプレートは、Cloud Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。このテンプレートは、Cloud Pub/Sub データを BigQuery に移動する簡単なソリューションとして使用できます。このテンプレートは Cloud Pub/Sub から JSON 形式のメッセージを読み込み、BigQuery 要素に変換します。

このパイプラインの要件:

  • Cloud Pub/Sub メッセージは JSON 形式でなければなりません。JSON 形式については、こちらをご覧ください。たとえば、{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1k2 という名前の 2 列に文字列データ型で挿入されます。
  • パイプラインの実行前に出力テーブルが存在する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み込まれる Cloud Pub/Sub 入力トピック。projects/<project>/topics/<topic> の形式で指定します。
outputTableSpec BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。

Cloud Pub/Sub Topic to BigQuery テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub Topic to BigQuery テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR_DATASET を BigQuery データセットに置き換え、YOUR_TABLE_NAME を BigQuery テーブル名に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR_DATASET を BigQuery データセットに置き換え、YOUR_TABLE_NAME を BigQuery テーブル名に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Pub/Sub

Cloud Pub/Sub to Cloud Pub/Sub テンプレートは、Cloud Pub/Sub サブスクリプションからメッセージを読み取り、Cloud Pub/Sub トピックに書き込むストリーミング パイプラインです。Cloud Pub/Sub トピックに書き込むメッセージをフィルタリングするためのメッセージ属性キーと値をオプションとして指定することもできます。このテンプレートを使用すると、必要に応じてメッセージ フィルタを適用しつつ、Cloud Pub/Sub サブスクリプションから別の Cloud Pub/Sub トピックにメッセージをコピーすることができます。

このパイプラインの要件:

  • 実行前にコピー元の Cloud Pub/Sub サブスクリプションが存在している必要があります。
  • 実行前にコピー先の Cloud Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputSubscription 入力の読み取り元の Cloud Pub/Sub サブスクリプション。例: projects/<project-id>/subscriptions/<subscription-name>
outputTopic 出力の書き込み先の Cloud Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
filterKey (省略可)属性キーに基づいてイベントをフィルタします。filterKey が指定されていない場合、フィルタは適用されません。
filterValue (省略可)filterKey が指定されている場合に使用するフィルタ属性値。デフォルトでは null の filterValue が使用されます。

Cloud Pub/Sub to Cloud Pub/Sub テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub to Cloud Pub/Sub テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME を Cloud Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • FILTER_KEY を、メッセージをフィルタする属性キーの名前に置き換えます。
  • FILTER_VALUE を属性値に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_SUBSCRIPTION_NAME を Cloud Pub/Sub サブスクリプション名に置き換えます。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • FILTER_KEY を、メッセージをフィルタする属性キーの名前に置き換えます。
  • FILTER_VALUE を属性値に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Storage Avro

Cloud Pub/Sub to Cloud Storage Avro テンプレートは、Cloud Pub/Sub トピックからデータを読み取り、指定された Cloud Storage バケットに Avro ファイルを書き込むストリーミング パイプラインです。このパイプラインは、ウィンドウ処理された書き込みを実行するために使用されるオプションのユーザー指定ウィンドウ期間をサポートしています。

このパイプラインの要件:

  • パイプラインの実行前に入力 Cloud Pub/Sub トピックが存在すること。

テンプレートのパラメータ

パラメータ 説明
inputTopic メッセージを消費するために購読する Cloud Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力の Avro ファイルがアーカイブされる出力ディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
avroTempDirectory 一時的な Avro ファイルのためのディレクトリ。末尾に / を付加してください。例: gs://example-bucket/example-directory/
outputFilenamePrefix (省略可)Avro ファイルの出力ファイル名接頭辞。
outputFilenameSuffix (省略可)Avro ファイルの出力ファイル名接尾辞。
outputShardTemplate (省略可)出力ファイルのシャード テンプレート。文字「S」または「N」の繰り返しシーケンスで指定します(例: SSS-NNN)。これらはそれぞれシャード番号またはシャード数に置き換えられます。このパラメータを省略した場合のデフォルトのテンプレート フォーマットは「W-P-SS-of-NN」です。
numShards (省略可)書き込み時に生成される出力シャードの最大数。シャードのデフォルトの最大数は 1 です。
windowDuration (省略可)データの書き込みが行われるウィンドウ期間。デフォルトは 5m です。使用できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時間が Nh(例: 2h)です。

Cloud Pub/Sub to Cloud Storage Avro テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub to Cloud Storage Avro テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
  • FILENAME_PREFIX を、使用する出力ファイル名の接頭辞に置き換えます。
  • FILENAME_SUFFIX を、使用する出力ファイル名の接尾辞に置き換えます。
  • SHARD_TEMPLATE を、使用する出力シャード テンプレートに置き換えます。
  • NUM_SHARDS を出力シャードの数で置き換えます。
  • WINDOW_DURATION を出力ウィンドウ期間に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Avro \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
avroTempDirectory=gs://YOUR_BUCKET_NAME/temp/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
windowDuration=WINDOW_DURATION

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
  • FILENAME_PREFIX を、使用する出力ファイル名の接頭辞に置き換えます。
  • FILENAME_SUFFIX を、使用する出力ファイル名の接尾辞に置き換えます。
  • SHARD_TEMPLATE を、使用する出力シャード テンプレートに置き換えます。
  • NUM_SHARDS を出力シャードの数で置き換えます。
  • WINDOW_DURATION を出力ウィンドウ期間に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "avroTempDirectory": "gs://YOUR_BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
       "windowDuration": "WINDOW_DURATION"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Pub/Sub to Cloud Storage Text

Cloud Pub/Sub to Cloud Storage Text テンプレートは、Cloud Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、後で使用するデータを Cloud Pub/Sub にすばやく保存できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

このパイプラインの要件:

  • 実行前に Cloud Pub/Sub トピックが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式でなければなりません。
  • トピックに公開するメッセージに改行が含まれていてはなりません。出力ファイルでは、1 つの Cloud Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

パラメータ 説明
inputTopic 読み取り元の Cloud Pub/Sub トピック。トピック名は projects/<project-id>/topics/<topic-name> の形式にします。
outputDirectory 出力ファイルを書き込むパスとファイル名の接頭辞。例: gs://bucket-name/path/この値はスラッシュで終わる必要があります。
outputFilenamePrefix ウィンドウ処理されたファイルの接頭辞。例: output-
outputFilenameSuffix ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などの拡張子です。
outputShardTemplate シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに保存されます。outputShardTemplateW-P-SS-of-NN 形式のデフォルト値。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。

Cloud Pub/Sub to Cloud Storage Text テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Pub/Sub to Cloud Storage Text テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text \
    --parameters \
inputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME,\
outputDirectory=gs://YOUR_BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
       "outputDirectory": "gs://YOUR_BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to BigQuery (Stream)

Cloud Storage Text to BigQuery (Stream) パイプラインは、Cloud Storage に格納されているテキスト ファイルをストリーミングし、指定された JavaScript ユーザー定義関数(UDF)を使用して変換し、結果を BigQuery に出力するストリーミング パイプラインです。

このパイプラインの要件:

  • 出力テーブルを記述する JSON 形式の BigQuery スキーマ ファイルを作成します。
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。この関数は、JSON 文字列を返さなければならないことに注意してください。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformGcsPath JavaScript UDF の Cloud Storage の場所。例: gs://my_bucket/my_function.js
JSONPath BigQuery スキーマ ファイルの Cloud Storage の場所。JSON 形式で記述されます。例: gs://path/to/my/schema.json
javascriptTextTransformFunctionName UDF として呼び出す JavaScript 関数の名前。例: transform
outputTable 完全修飾された BigQuery テーブル。例: my-project:dataset.table
inputFilePattern 処理するテキストの Cloud Storage の場所。例: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスで使用する一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir
outputDeadletterTable 出力テーブルに到達できなかったメッセージが記載されたテーブル(「Deadletter」テーブル)。例: my-project:dataset.my-deadletter-table。存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は、<outputTableSpec>_error_records が代わりに使用されます。

Cloud Storage Text to BigQuery (Stream) テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Storage Text to BigQuery テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_JAVASCRIPT_FUNCTION を UDF の名前に置き換えます。
  • PATH_TO_BIGQUERY_SCHEMA_JSON を、スキーマ定義を含む JSON ファイルの Cloud Storage パスに置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE を、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_YOUR_TEXT_DATA を、テキスト データセットの Cloud Storage パスに置き換えます。
  • BIGQUERY_TABLE を BigQuery テーブル名に置き換えます。
  • BIGQUERY_DEADLETTER_TABLE を BigQuery の deadletter テーブル名に置き換えます。
  • PATH_TO_TEMP_DIR_ON_GCS を、一時ディレクトリへの Cloud Storage パスに置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_DEADLETTER_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_JAVASCRIPT_FUNCTION を UDF の名前に置き換えます。
  • PATH_TO_BIGQUERY_SCHEMA_JSON を、スキーマ定義を含む JSON ファイルの Cloud Storage パスに置き換えます。
  • PATH_TO_JAVASCRIPT_UDF_FILE を、JavaScript コードを含む .js ファイルへの Cloud Storage パスに置き換えます。
  • PATH_TO_YOUR_TEXT_DATA を、テキスト データセットの Cloud Storage パスに置き換えます。
  • BIGQUERY_TABLE を BigQuery テーブル名に置き換えます。
  • BIGQUERY_DEADLETTER_TABLE を BigQuery の deadletter テーブル名に置き換えます。
  • PATH_TO_TEMP_DIR_ON_GCS を、一時ディレクトリへの Cloud Storage パスに置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_YOUR_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_DEADLETTER_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to Cloud Pub/Sub (Stream)

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Cloud Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Cloud Pub/Sub トピックに公開します。また、Cloud Pub/Sub でデータを再生することもできます。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Cloud Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Cloud Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/files/*.json
outputTopic 書き込み先の Cloud Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。

Cloud Storage Text to Cloud Pub/Sub(Stream)テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Cloud Storage Text to Cloud Pub/Sub (Stream) テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR-PROJECT-ID をプロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_TOPIC_NAME を Cloud Pub/Sub のトピック名に置き換えます。
  • YOUR-BUCKET-NAME を Cloud Storage バケットの名前に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream)

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートは、Cloud Storage バケットから csv ファイルを読み取り、匿名化のために Cloud Data Loss Prevention(Cloud DLP)API を呼び出し、指定された BigQuery テーブルに匿名化されたデータを書き込むパイプラインです。このテンプレートでは、Cloud DLP 検査テンプレートと Cloud DLP 匿名化テンプレートの両方の使用がサポートされます。これにより、潜在的な機密情報を検査して匿名化したり、列が匿名化されるように指定されていて、検査を必要としない構造化データを匿名化したりできます。

このパイプラインの要件:

  • トークン化する入力データが存在している必要があります
  • Cloud DLP テンプレートが存在している必要があります(たとえば、DeidentifyTemplate や InspectTemplate)。詳細については、Cloud DLP テンプレートをご覧ください。
  • BigQuery データセットが存在している必要があります

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 入力データレコードを読み込む CSV ファイル。ワイルドカードも使用できます。例: gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId Cloud DLP API リソースを所有する Cloud DLP プロジェクト ID。 この Cloud DLP プロジェクトは、Cloud DLP テンプレートを所有するプロジェクトと同じプロジェクトにすることも、別のプロジェクトにすることもできます。 例: my_dlp_api_project
deidentifyTemplateName API リクエストに使用する Cloud DLP 匿名化テンプレート。パターン projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} で指定します。例: projects/my_project/deidentifyTemplates/100
datasetName トークン化された結果を送信するための BigQuery データセット。
batchSize 検索やトークン化解除するデータを送信するためのチャンク / バッチサイズ。CSV ファイルの場合、batchSize は全体の行数です。ユーザーは、レコードのサイズとファイルのサイズに基づいてバッチサイズを決定する必要があります。Cloud DLP API では、ペイロードのサイズが API 呼び出しごとに 524 KB に制限されます。
inspectTemplateName (省略可)API リクエストに使用する Cloud DLP 検査テンプレート。 projects/{template_project_id}/identifyTemplates/{idTemplateId} の形で指定します。例: projects/my_project/identifyTemplates/100

Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery テンプレートの実行

Console

Google Cloud Platform Console からの実行
  1. GCP Console の Cloud Dataflow ページに移動します。
  2. Cloud Dataflow ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Cloud Platform Console の [テンプレートからジョブを作成] ボタン
  5. [Cloud Dataflow テンプレート] プルダウン メニューから Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) テンプレート を選択します。
  6. [ジョブ名] フィールドにジョブ名を入力します。 ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  7. 表示されるパラメータ フィールドにパラメータ値を入力します。
  8. [ジョブを実行] をクリックします。

GCLOUD

gcloud コマンドライン ツールからの実行

注: テンプレートを実行する gcloud コマンドライン ツールを使用するには、Cloud SDK のバージョン 138.0.0 以降が必要です。

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID をテンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID を Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_INPUT_DATA を入力ファイルのパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE を Cloud DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME を BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE を Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE をバッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)に置き換えます。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --parameters \
inputFilePattern=YOUR_INPUT_DATA,\
dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
datasetName=YOUR_DATASET,\
batchSize=BATCH_SIZE_VALUE

API

REST API からの実行

このテンプレートを実行する場合、テンプレートへの Cloud Storage パスが必要です。

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

REST API リクエストでこのテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

このサンプルの次の値は置き換える必要があります。

  • YOUR_TEMPLATE_PROJECT_ID をテンプレート プロジェクト ID に置き換えます。
  • YOUR_DLP_API_PROJECT_ID を Cloud DLP API プロジェクト ID に置き換えます。
  • JOB_NAME を、選択したジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? と一致する必要があります。
  • YOUR_INPUT_DATA を入力ファイルのパスに置き換えます。
  • YOUR_DEIDENTIFY_TEMPLATE を Cloud DLPDeidentify テンプレート番号に置き換えます。
  • YOUR_DATASET_NAME を BigQuery データセット名に置き換えます。
  • YOUR_INSPECT_TEMPLATE を Cloud DLPInspect テンプレート番号に置き換えます。
  • BATCH_SIZE_VALUE をバッチサイズ(csv の場合は 1 回の API 呼び出しに対する行数)に置き換えます。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
      "inputFilePattern":YOUR_INPUT_DATA,
      "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
      "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
      "datasetName": "YOUR_DATASET",
      "batchSize": "BATCH_SIZE_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}