Google 提供の Dataflow バッチ テンプレート

Google はオープンソースの Dataflow テンプレートを提供しています。

これらの Dataflow テンプレートは、データのインポート、エクスポート、バックアップ、復元、API の一括オペレーションなど、大規模なデータタスクの解決に役立ちます。専用の開発環境を使用しなくても、これらの処理を実行できます。テンプレートは Apache Beam 上に構築され、Dataflow を使用してデータを変換します。

テンプレートに関する一般的な情報については、Dataflow テンプレートをご覧ください。Google が提供するテンプレートのリストについては、Google 提供のテンプレートの概要をご覧ください。

このガイドでは、以下のバッチ テンプレートについて説明します。

BigQuery to Cloud Storage TFRecords

BigQuery to Cloud Storage TFRecords テンプレートは、BigQuery クエリからデータを読み取り、Cloud Storage バケットに TFRecord 形式で書き込むパイプラインです。トレーニング、テスト、検証の分割パーセンテージを指定できます。デフォルトでは、トレーニング セットの分割パーセンテージは 1 または 100%、テストセットと検証セットは 0 または 0% です。データセットの分割を設定する場合は、トレーニング、テスト、検証の合計が 1 または 100% になるようにする必要があります(たとえば、0.6 + 0.2 + 0.2)。Dataflow では、各出力データセットに最適なシャード数が自動的に設定されます。

このパイプラインの要件:

  • BigQuery のデータセットとテーブルが存在すること。
  • パイプラインの実行前に出力先の Cloud Storage バケットが存在すること。トレーニング、テスト、検証のサブディレクトリは事前に作成する必要はありません。自動的に生成されます。

テンプレートのパラメータ

パラメータ 説明
readQuery ソースからデータを抽出する BigQuery SQL クエリ。例: select * from dataset1.sample_table
outputDirectory トレーニング、テスト、検証の TFRecord ファイルを書き込む最上位の Cloud Storage パスの接頭辞。例: gs://mybucket/outputトレーニング、テスト、検証の TFRecord ファイルのサブディレクトリは、outputDirectory から自動的に生成されます。例: gs://mybucket/output/train
trainingPercentage (省略可)トレーニングの TFRecord ファイルに割り当てられるクエリデータの割合。デフォルト値は 1 または 100% です。
testingPercentage (省略可)テストの TFRecord ファイルに割り当てられるクエリデータの割合。デフォルト値は 0 または 0% です。
validationPercentage (省略可)検証の TFRecord ファイルに割り当てられるクエリデータの割合。デフォルト値は 0 または 0% です。
outputSuffix (省略可)トレーニング、テスト、検証で書き込まれる TFRecord ファイルの接頭辞。デフォルト値は .tfrecord です。

BigQuery to Cloud Storage TFRecord ファイル テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the BigQuery to TFRecords template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records \
    --region REGION_NAME \
    --parameters \
readQuery=READ_QUERY,\
outputDirectory=OUTPUT_DIRECTORY,\
trainingPercentage=TRAINING_PERCENTAGE,\
testingPercentage=TESTING_PERCENTAGE,\
validationPercentage=VALIDATION_PERCENTAGE,\
outputSuffix=OUTPUT_FILENAME_SUFFIX

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • READ_QUERY: 実行する BigQuery クエリ
  • OUTPUT_DIRECTORY: 出力データセットの Cloud Storage パス接頭辞
  • TRAINING_PERCENTAGE: トレーニング データセットの分割割合の小数値
  • TESTING_PERCENTAGE: テスト データセットの分割割合の小数値
  • VALIDATION_PERCENTAGE: 検証データセットの分割割合の小数値
  • OUTPUT_FILENAME_SUFFIX: 出力される TensorFlow レコードのファイル サフィックス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records
{
   "jobName": "JOB_NAME",
   "parameters": {
       "readQuery":"READ_QUERY",
       "outputDirectory":"OUTPUT_DIRECTORY",
       "trainingPercentage":"TRAINING_PERCENTAGE",
       "testingPercentage":"TESTING_PERCENTAGE",
       "validationPercentage":"VALIDATION_PERCENTAGE",
       "outputSuffix":"OUTPUT_FILENAME_SUFFIX"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • READ_QUERY: 実行する BigQuery クエリ
  • OUTPUT_DIRECTORY: 出力データセットの Cloud Storage パス接頭辞
  • TRAINING_PERCENTAGE: トレーニング データセットの分割割合の小数値
  • TESTING_PERCENTAGE: テスト データセットの分割割合の小数値
  • VALIDATION_PERCENTAGE: 検証データセットの分割割合の小数値
  • OUTPUT_FILENAME_SUFFIX: 出力される TensorFlow レコードのファイル サフィックス

BigQuery export to Parquet(Storage API 経由)

BigQuery export to Parquet テンプレートは、BigQuery テーブルからデータを読み取り、Parquet 形式で Cloud Storage バケットに書き込むバッチ パイプラインです。このテンプレートは、BigQuery Storage API を使用してデータをエクスポートします。

このパイプラインの要件:

  • パイプラインを実行する前に、入力 BigQuery テーブルが存在すること。
  • パイプラインを実行する前に、出力先の Cloud Storage バケットが存在すること。

テンプレートのパラメータ

パラメータ 説明
tableRef BigQuery 入力テーブルの場所。例: <my-project>:<my-dataset>.<my-table>
bucket Parquet ファイルを書き込む Cloud Storage フォルダ。例: gs://mybucket/exports
numShards (省略可)出力ファイルのシャード数。デフォルト値は 1 です。
fields (省略可)入力 BigQuery テーブルから選択するフィールドのカンマ区切りのリスト。

BigQuery to Cloud Storage Parquet テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the BigQuery export to Parquet (via Storage API) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet \
    --region=REGION_NAME \
    --parameters \
tableRef=BIGQUERY_TABLE,\
bucket=OUTPUT_DIRECTORY,\
numShards=NUM_SHARDS,\
fields=FIELDS

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGQUERY_TABLE: BigQuery テーブル名
  • OUTPUT_DIRECTORY: 出力ファイルを格納する Cloud Storage フォルダ
  • NUM_SHARDS: 目的の出力ファイル シャードの数
  • FIELDS: 入力 BigQuery テーブルから選択するフィールドのカンマ区切りリスト

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "tableRef": "BIGQUERY_TABLE",
          "bucket": "OUTPUT_DIRECTORY",
          "numShards": "NUM_SHARDS",
          "fields": "FIELDS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGQUERY_TABLE: BigQuery テーブル名
  • OUTPUT_DIRECTORY: 出力ファイルを格納する Cloud Storage フォルダ
  • NUM_SHARDS: 目的の出力ファイル シャードの数
  • FIELDS: 入力 BigQuery テーブルから選択するフィールドのカンマ区切りリスト

BigQuery to Elasticsearch

BigQuery to Elasticsearch テンプレートは、BigQuery テーブルから Elasticsearch にデータをドキュメントとして取り込むバッチ パイプラインです。テンプレートでは、テーブル全体を読み取ることも、クエリを使用して特定のレコードを読み取ることもできます。

このパイプラインの要件

  • ソース BigQuery テーブルが存在すること。
  • Google Cloud インスタンスまたは Elasticsearch バージョン 7.0 以降の Elastic Cloud 上に Elasticsearch ホストが存在し、このホストに Dataflow ワーカーマシンからアクセスできること。

テンプレートのパラメータ

パラメータ 説明
connectionUrl https://hostname:[port] 形式の Elasticsearch URL。Elastic Cloud を使用する場合は CloudID を指定します。
apiKey 認証に使用される Base64 エンコードの API キー。
index リクエストが発行される Elasticsearch インデックス。例: my-index
inputTableSpec (省略可)Elasticsearch に挿入するために読み取る BigQuery テーブル。テーブルまたはクエリを指定する必要があります。例: projectId:datasetId.tablename
query (省略可)BigQuery からデータを pull する SQL クエリ。テーブルまたはクエリを指定する必要があります。
useLegacySql (省略可)レガシー SQL を使用するには true に設定します(クエリを提供する場合のみ)。デフォルト: false
batchSize (省略可)バッチサイズ(ドキュメント数)。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト数)。デフォルト: 5242880(5 MB)。
maxRetryAttempts (省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: no retries
maxRetryDuration (省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: no retries
propertyAsIndex (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _index メタデータを指定し、一括リクエストではドキュメントに含まれます(_index UDF よりも優先適用されます)。デフォルト: none
propertyAsId (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _id メタデータを指定し、一括リクエストではドキュメントに含まれます(_id UDF よりも優先適用されます)。デフォルト: none
javaScriptIndexFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIndexFnName (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIdFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIdFnName (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptTypeFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptTypeFnName (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIsDeleteFnGcsPath (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
javaScriptIsDeleteFnName (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
usePartialUpdate (省略可)Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: false
bulkInsertMethod (省略可)INDEX(インデックス、upserts を許可する)または CREATE(作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルト: CREATE

BigQuery to Elasticsearch テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the BigQuery to Elasticsearch template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_TABLE_SPEC: BigQuery テーブル名。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_TABLE_SPEC: BigQuery テーブル名。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。

BigQuery to MongoDB

BigQuery to MongoDB テンプレートは、BigQuery から行を読み取り、ドキュメントとして MongoDB に書き込むバッチ パイプラインです。現在、各行がドキュメントとして格納されています。

このパイプラインの要件

  • ソース BigQuery テーブルが存在すること。
  • Dataflow ワーカーマシンからターゲット MongoDB インスタンスにアクセスできること。

テンプレートのパラメータ

パラメータ 説明
mongoDbUri MongoDB 接続 URI。形式は mongodb+srv://:@
database コレクションを格納する MongoDB のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
inputTableSpec 読み取り元の BigQuery テーブル。例: bigquery-project:dataset.input_table

BigQuery to MongoDB テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the BigQuery to MongoDB template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

  gcloud beta dataflow flex-template run JOB_NAME \
      --project=PROJECT_ID \
      --region=REGION_NAME \
      --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_MongoDB \
      --parameters \
  inputTableSpec=INPUT_TABLE_SPEC,\
  mongoDbUri=MONGO_DB_URI,\
  database=DATABASE,\
  collection=COLLECTION
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_TABLE_SPEC: ソース BigQuery テーブル名。
  • MONGO_DB_URI: MongoDB URI。
  • DATABASE: MongoDB データベース。
  • COLLECTION: MongoDB コレクション。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
     "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputTableSpec": "INPUT_TABLE_SPEC",
            "mongoDbUri": "MONGO_DB_URI",
            "database": "DATABASE",
            "collection": "COLLECTION"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_MongoDB",
     }
  }

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • INPUT_TABLE_SPEC: ソース BigQuery テーブル名。
  • MONGO_DB_URI: MongoDB URI。
  • DATABASE: MongoDB データベース。
  • COLLECTION: MongoDB コレクション。

Bigtable to Cloud Storage Avro

Bigtable to Cloud Storage Avro テンプレートは、Bigtable テーブルからデータを読み取り、Cloud Storage バケットに Avro 形式で書き込むパイプラインです。このテンプレートは、Bigtable から Cloud Storage にデータを移動する場合に使用できます。

このパイプラインの要件:

  • Bigtable テーブルが存在していること。
  • パイプラインを実行する前に、出力先の Cloud Storage バケットが存在すること。

テンプレートのパラメータ

パラメータ 説明
bigtableProjectId データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID。
bigtableInstanceId テーブルが含まれている Bigtable インスタンスの ID。
bigtableTableId エクスポートする Bigtable テーブルの ID。
outputDirectory データが書き込まれる Cloud Storage のパス。例: gs://mybucket/somefolder
filenamePrefix Avro ファイル名の接頭辞。例: output-

Bigtable to Cloud Storage Avro file テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Bigtable to Avro Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • OUTPUT_DIRECTORY: データの書き込み先の Cloud Storage パス(例: gs://mybucket/somefolder
  • FILENAME_PREFIX: Avro ファイル名の接頭辞(例: output-

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • OUTPUT_DIRECTORY: データの書き込み先の Cloud Storage パス(例: gs://mybucket/somefolder
  • FILENAME_PREFIX: Avro ファイル名の接頭辞(例: output-

Bigtable to Cloud Storage Parquet

Bigtable to Cloud Storage Parquet テンプレートは、Bigtable テーブルからデータを読み取り、Cloud Storage バケットに Parquet 形式で書き込むパイプラインです。このテンプレートは、Bigtable から Cloud Storage にデータを移動する場合に使用できます。

このパイプラインの要件:

  • Bigtable テーブルが存在していること。
  • パイプラインを実行する前に、出力先の Cloud Storage バケットが存在すること。

テンプレートのパラメータ

パラメータ 説明
bigtableProjectId データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID。
bigtableInstanceId テーブルが含まれている Bigtable インスタンスの ID。
bigtableTableId エクスポートする Bigtable テーブルの ID。
outputDirectory データが書き込まれる Cloud Storage のパス。例: gs://mybucket/somefolder
filenamePrefix Parquet ファイル名の接頭辞。例: output-
numShards 出力ファイルのシャード数。例: 2

Bigtable to Cloud Storage Parquet ファイル テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Bigtable to Parquet Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX,\
numShards=NUM_SHARDS

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • OUTPUT_DIRECTORY: データの書き込み先の Cloud Storage パス(例: gs://mybucket/somefolder
  • FILENAME_PREFIX: Parquet ファイル名の接頭辞(例: output-
  • NUM_SHARDS: 出力する Parquet ファイルの数(例: 1

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
       "numShards": "NUM_SHARDS"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • OUTPUT_DIRECTORY: データの書き込み先の Cloud Storage パス(例: gs://mybucket/somefolder
  • FILENAME_PREFIX: Parquet ファイル名の接頭辞(例: output-
  • NUM_SHARDS: 出力する Parquet ファイルの数(例: 1

Bigtable to Cloud Storage SequenceFile

Bigtable to Cloud Storage to SequenceFile テンプレートは、Bigtable テーブルからデータを読み取り、SequenceFile 形式で Cloud Storage バケットに書き込むパイプラインです。このテンプレートは、Bigtable から Cloud Storage にデータをコピーする場合に使用できます。

このパイプラインの要件:

  • Bigtable テーブルが存在していること。
  • パイプラインを実行する前に、出力先の Cloud Storage バケットが存在すること。

テンプレートのパラメータ

パラメータ 説明
bigtableProject データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID。
bigtableInstanceId テーブルが含まれている Bigtable インスタンスの ID。
bigtableTableId エクスポートする Bigtable テーブルの ID。
bigtableAppProfileId エクスポートに使用される Bigtable アプリケーション プロファイルの ID。アプリ プロファイルを指定しないと、Bigtable はインスタンスのデフォルトのアプリ プロファイルを使用します。
destinationPath データが書き込まれる Cloud Storage のパス。例: gs://mybucket/somefolder
filenamePrefix SequenceFile ファイル名の接頭辞。例: output-

Bigtable to Cloud Storage SequenceFile テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Bigtable to SequenceFile Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile \
    --region REGION_NAME \
    --parameters \
bigtableProject=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
bigtableAppProfileId=APPLICATION_PROFILE_ID,\
destinationPath=DESTINATION_PATH,\
filenamePrefix=FILENAME_PREFIX

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • APPLICATION_PROFILE_ID: エクスポートに使用される Bigtable アプリケーション プロファイルの ID。
  • DESTINATION_PATH: データの書き込み先の Cloud Storage パス(例: gs://mybucket/somefolder
  • FILENAME_PREFIX: SequenceFile ファイル名の接頭辞(例: output-

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProject": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "bigtableAppProfileId": "APPLICATION_PROFILE_ID",
       "destinationPath": "DESTINATION_PATH",
       "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • APPLICATION_PROFILE_ID: エクスポートに使用される Bigtable アプリケーション プロファイルの ID。
  • DESTINATION_PATH: データの書き込み先の Cloud Storage パス(例: gs://mybucket/somefolder
  • FILENAME_PREFIX: SequenceFile ファイル名の接頭辞(例: output-

Datastore to Cloud Storage Text [非推奨]

このテンプレートはサポートが終了しており、2022 年第 1 四半期に廃止されます。Firestore to Cloud Storage Text テンプレートに移行してください。

Datastore to Cloud Storage Text テンプレートは、Datastore エンティティを読み取り、Cloud Storage にテキスト ファイルとして書き込むバッチ パイプラインです。各エンティティを JSON 文字列として扱う関数を使用できます。このような関数を使用しない場合、出力ファイルの各行はシリアル化された JSON エンティティとなります。

このパイプラインの要件:

パイプラインを実行する前に、プロジェクトで Datastore を設定する必要があります。

テンプレートのパラメータ

パラメータ 説明
datastoreReadGqlQuery 取得するエンティティを指定する GQL クエリ。例: SELECT * FROM MyKind
datastoreReadProjectId データを読み取る Datastore インスタンスの Google Cloud プロジェクト ID。
datastoreReadNamespace 要求されたエンティティの名前空間。デフォルトの名前空間を使用するには、このパラメータを空白のままにします。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
textWritePrefix データの書き込み先を示す Cloud Storage パスの接頭辞。例: gs://mybucket/somefolder/

Datastore to Cloud Storage Text テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Datastore to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="SELECT * FROM DATASTORE_KIND",\
datastoreReadProjectId=DATASTORE_PROJECT_ID,\
datastoreReadNamespace=DATASTORE_NAMESPACE,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
textWritePrefix=gs://BUCKET_NAME/output/

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • DATASTORE_PROJECT_ID: Datastore インスタンスが存在する Cloud プロジェクトの ID
  • DATASTORE_KIND: Datastore エンティティのタイプ
  • DATASTORE_NAMESPACE: Datastore エンティティの名前空間
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "SELECT * FROM DATASTORE_KIND"
       "datastoreReadProjectId": "DATASTORE_PROJECT_ID",
       "datastoreReadNamespace": "DATASTORE_NAMESPACE",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • DATASTORE_PROJECT_ID: Datastore インスタンスが存在する Cloud プロジェクトの ID
  • DATASTORE_KIND: Datastore エンティティのタイプ
  • DATASTORE_NAMESPACE: Datastore エンティティの名前空間
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js

Firestore to Cloud Storage Text

Firestore to Cloud Storage Text テンプレートは、Firestore エンティティを読み取り、Cloud Storage にテキスト ファイルとして書き込むバッチ パイプラインです。各エンティティを JSON 文字列として扱う関数を使用できます。このような関数を使用しない場合、出力ファイルの各行はシリアル化された JSON エンティティとなります。

このパイプラインの要件:

パイプラインを実行する前に、プロジェクトで Firestore を設定する必要があります。

テンプレートのパラメータ

パラメータ 説明
firestoreReadGqlQuery 取得するエンティティを指定する GQL クエリ。例: SELECT * FROM MyKind
firestoreReadProjectId データを読み取る Firestore インスタンスの Google Cloud プロジェクト ID。
firestoreReadNamespace 要求されたエンティティの名前空間。デフォルトの名前空間を使用するには、このパラメータを空白のままにします。
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
textWritePrefix データの書き込み先を示す Cloud Storage パスの接頭辞。例: gs://mybucket/somefolder/

Firestore to Cloud Storage Text テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Firestore to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Firestore_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
firestoreReadGqlQuery="SELECT * FROM FIRESTORE_KIND",\
firestoreReadProjectId=FIRESTORE_PROJECT_ID,\
firestoreReadNamespace=FIRESTORE_NAMESPACE,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
textWritePrefix=gs://BUCKET_NAME/output/

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FIRESTORE_PROJECT_ID: Firestore インスタンスが存在する Cloud プロジェクトの ID
  • FIRESTORE_KIND: Firestore エンティティのタイプ
  • FIRESTORE_NAMESPACE: Firestore エンティティの名前空間
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "firestoreReadGqlQuery": "SELECT * FROM FIRESTORE_KIND"
       "firestoreReadProjectId": "FIRESTORE_PROJECT_ID",
       "firestoreReadNamespace": "FIRESTORE_NAMESPACE",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FIRESTORE_PROJECT_ID: Firestore インスタンスが存在する Cloud プロジェクトの ID
  • FIRESTORE_KIND: Firestore エンティティのタイプ
  • FIRESTORE_NAMESPACE: Firestore エンティティの名前空間
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js

Cloud Spanner to Cloud Storage Avro

Cloud Spanner to Avro Files on Cloud Storage テンプレートは、Cloud Spanner データベース全体を Avro 形式で Cloud Storage にエクスポートするバッチ パイプラインです。Cloud Spanner データベースをエクスポートすると、選択したバケット内にフォルダが作成されます。フォルダには以下が含まれています。

  • spanner-export.json ファイル。
  • エクスポートしたデータベースの角テーブルの TableName-manifest.json ファイル。
  • 1 つ以上の TableName.avro-#####-of-##### ファイル。

たとえば、SingersAlbums の 2 つのテーブルを持つデータベースをエクスポートして、次のファイルセットを作成します。

  • Albums-manifest.json
  • Albums.avro-00000-of-00002
  • Albums.avro-00001-of-00002
  • Singers-manifest.json
  • Singers.avro-00000-of-00003
  • Singers.avro-00001-of-00003
  • Singers.avro-00002-of-00003
  • spanner-export.json

このパイプラインの要件:

  • Cloud Spanner データベースが存在すること。
  • 出力先の Cloud Storage バケットが存在すること。
  • Dataflow ジョブの実行に必要な IAM ロールに加えて、Cloud Spanner のデータの読み取りと Cloud Storage バケットへの書き込みに適切な IAM ロールも必要です。

テンプレートのパラメータ

パラメータ 説明
instanceId エクスポートする Cloud Spanner データベースのインスタンス ID。
databaseId エクスポートする Cloud Spanner データベースのデータベース ID。
outputDir Avro ファイルのエクスポート先にする Cloud Storage パス。エクスポート ジョブによって、このパスの下にディレクトリが新規作成されます。ここに、エクスポートされたファイルが格納されます。
snapshotTime (省略可)読み取る Cloud Spanner データベースのバージョンに対応するタイムスタンプ。タイムスタンプは RFC 3339 UTC Zulu 形式で指定する必要があります。例: 1990-12-31T23:59:60Zタイムスタンプは過去の日付でなければならず、タイムスタンプ ステイルネスの最大値が適用されます。
tableNames (省略可)エクスポートする Cloud Spanner データベースのサブセットを指定するテーブルのカンマ区切りリスト。このリストには、すべての関連テーブル(親テーブル、外部キーで参照されるテーブル)を含める必要があります。明示的に指定されていない場合は、エクスポートを正常に行うために shouldExportRelatedTables フラグを設定する必要があります。
shouldExportRelatedTables (省略可)エクスポートするすべてのテーブルを含めるために tableNames パラメータと組み合わせて使用するフラグ。
spannerProjectId (省略可)データを読み取る Cloud Spanner データベースの Google Cloud プロジェクト ID。

Cloud Spanner to Avro Files on Cloud Storage テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。

    Google Cloud コンソールの Spanner インスタンス ページにジョブを表示するには、ジョブ名が次の形式になっている必要があります。

    cloud-spanner-export-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    次のように置き換えます。

    • SPANNER_INSTANCE_ID: Spanner インスタンスの ID
    • SPANNER_DATABASE_NAME: Spanner データベースの名前
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner to Avro Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro \
    --region REGION_NAME \
    --staging-location GCS_STAGING_LOCATION \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
outputDir=GCS_DIRECTORY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名

    ジョブを Google Cloud コンソールの Cloud Spanner の部分に表示するには、ジョブ名を cloud-spanner-export-INSTANCE_ID-DATABASE_ID という形式に一致させる必要があります。

  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • GCS_STAGING_LOCATION: 一時ファイルを書き込むパス。例: gs://mybucket/temp
  • INSTANCE_ID: Cloud Spanner インスタンス ID
  • DATABASE_ID: Cloud Spanner データベース ID
  • GCS_DIRECTORY: Avro ファイルのエクスポート先になる Cloud Storage のパス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "outputDir": "gs://GCS_DIRECTORY"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名

    ジョブを Google Cloud コンソールの Cloud Spanner の部分に表示するには、ジョブ名を cloud-spanner-export-INSTANCE_ID-DATABASE_ID という形式に一致させる必要があります。

  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • GCS_STAGING_LOCATION: 一時ファイルを書き込むパス。例: gs://mybucket/temp
  • INSTANCE_ID: Cloud Spanner インスタンス ID
  • DATABASE_ID: Cloud Spanner データベース ID
  • GCS_DIRECTORY: Avro ファイルのエクスポート先になる Cloud Storage のパス

Cloud Spanner to Cloud Storage Text

Cloud Spanner to Cloud Storage Text テンプレートは、Cloud Spanner テーブルからデータを読み取り、CSV テキスト ファイルとして Cloud Storage に書き込むバッチ パイプラインです。

このパイプラインの要件:

  • パイプラインを実行する前に、入力 Spanner テーブルが存在すること。

テンプレートのパラメータ

パラメータ 説明
spannerProjectId データを読み取る Cloud Spanner データベースの Google Cloud プロジェクト ID。
spannerDatabaseId リクエストされたテーブルのデータベース ID。
spannerInstanceId リクエストされたテーブルのインスタンス ID。
spannerTable データを読み取るテーブル。
textWritePrefix 出力テキスト ファイルを書き込むディレクトリ。末尾に / を付加してください。例: gs://mybucket/somefolder/
spannerSnapshotTime (省略可)読み取る Cloud Spanner データベースのバージョンに対応するタイムスタンプ。タイムスタンプは RFC 3339 UTC Zulu 形式で指定する必要があります。例: 1990-12-31T23:59:60Zタイムスタンプは過去の日付でなければならず、タイムスタンプ ステイルネスの最大値が適用されます。

Cloud Spanner to Cloud Storage Text テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner to Text Files on Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Spanner_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
spannerProjectId=SPANNER_PROJECT_ID,\
spannerDatabaseId=DATABASE_ID,\
spannerInstanceId=INSTANCE_ID,\
spannerTable=TABLE_ID,\
textWritePrefix=gs://BUCKET_NAME/output/

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_PROJECT_ID: データを読み取る Spanner データベースの Cloud プロジェクト ID
  • DATABASE_ID: Spanner データベース ID
  • BUCKET_NAME: Cloud Storage バケットの名前
  • INSTANCE_ID: Spanner インスタンス ID
  • TABLE_ID: Spanner テーブル ID

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Spanner_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "spannerDatabaseId": "DATABASE_ID",
       "spannerInstanceId": "INSTANCE_ID",
       "spannerTable": "TABLE_ID",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • SPANNER_PROJECT_ID: データを読み取る Spanner データベースの Cloud プロジェクト ID
  • DATABASE_ID: Spanner データベース ID
  • BUCKET_NAME: Cloud Storage バケットの名前
  • INSTANCE_ID: Spanner インスタンス ID
  • TABLE_ID: Spanner テーブル ID

Cloud Storage Avro to Bigtable

Cloud Storage Avro to Bigtable テンプレートは、Cloud Storage バケットの Avro ファイルからデータを読み取り、そのデータを Bigtable テーブルに書き込むパイプラインです。このテンプレートは、Cloud Storage から Bigtable にデータをコピーする場合に使用できます。

このパイプラインの要件:

  • Bigtable テーブルが存在し、Avro ファイルにエクスポートしたものと同じ列ファミリーがこのテーブルにあること。
  • パイプラインを実行する前に、入力 Avro ファイルが Cloud Storage バケット内に存在すること。
  • Bigtable が入力の Avro ファイルに特定のスキーマを想定していること。

テンプレートのパラメータ

パラメータ 説明
bigtableProjectId データを書き込む Bigtable インスタンスの Google Cloud プロジェクトの ID。
bigtableInstanceId テーブルが含まれている Bigtable インスタンスの ID。
bigtableTableId インポートする Bigtable テーブルの ID。
inputFilePattern データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*)。

Cloud Storage Avro file to Bigtable テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Avro Files on Cloud Storage to Cloud Bigtable template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
inputFilePattern=INPUT_FILE_PATTERN

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • INPUT_FILE_PATTERN: データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • INPUT_FILE_PATTERN: データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*

Cloud Storage Avro to Cloud Spanner

Cloud Storage Avro files to Cloud Spanner テンプレートは、Cloud Storage に保存されている Cloud Spanner からエクスポートされた Avro ファイルを読み取り、そのファイルを Cloud Spanner データベースにインポートするバッチ パイプラインです。

このパイプラインの要件:

  • ターゲットの Cloud Spanner データベースが存在し、空であること。
  • Cloud Storage バケットの読み取り権限と、対象の Cloud Spanner データベースに対する書き込み権限が必要です。
  • 入力された Cloud Storage パスが存在する必要があります。また、インポートするファイルの JSON 記述を含む spanner-export.json ファイルがそこに格納されている必要があります。

テンプレートのパラメータ

パラメータ 説明
instanceId Cloud Spanner データベースのインスタンス ID。
databaseId Cloud Spanner データベースのデータベース ID。
inputDir Avro ファイルのインポート元となる Cloud Storage のパス。

Cloud Storage Avro to Cloud Spanner テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。

    Google Cloud コンソールの Spanner インスタンス ページにジョブを表示するには、ジョブ名が次の形式になっている必要があります。

    cloud-spanner-import-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    次のように置き換えます。

    • SPANNER_INSTANCE_ID: Spanner インスタンスの ID
    • SPANNER_DATABASE_NAME: Spanner データベースの名前
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Avro Files on Cloud Storage to Cloud Spanner template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner \
    --region REGION_NAME \
    --staging-location GCS_STAGING_LOCATION \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
inputDir=GCS_DIRECTORY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • INSTANCE_ID: データベースを含む Spanner インスタンスの ID
  • DATABASE_ID: インポート先の Spanner データベースの ID
  • GCS_DIRECTORY: Avro ファイルのインポート元となる Cloud Storage パス。例: gs://mybucket/somefolder

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "inputDir": "gs://GCS_DIRECTORY"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • INSTANCE_ID: データベースを含む Spanner インスタンスの ID
  • DATABASE_ID: インポート先の Spanner データベースの ID
  • GCS_DIRECTORY: Avro ファイルのインポート元となる Cloud Storage パス。例: gs://mybucket/somefolder

Cloud Storage Parquet to Bigtable

Cloud Storage Parquet to Bigtable テンプレートは、Cloud Storage バケットの Parquet ファイルからデータを読み取り、そのデータを Bigtable テーブルに書き込むパイプラインです。このテンプレートは、Cloud Storage から Bigtable にデータをコピーする場合に使用できます。

このパイプラインの要件:

  • Bigtable テーブルが存在し、Parquet ファイルにエクスポートしたものと同じ列ファミリーがこのテーブルにあること。
  • パイプラインを実行する前に、入力 Parquet ファイルが Cloud Storage バケット内に存在すること。
  • Bigtable が入力の Parquet ファイルに特定のスキーマを想定していること。

テンプレートのパラメータ

パラメータ 説明
bigtableProjectId データを書き込む Bigtable インスタンスの Google Cloud プロジェクトの ID。
bigtableInstanceId テーブルが含まれている Bigtable インスタンスの ID。
bigtableTableId インポートする Bigtable テーブルの ID。
inputFilePattern データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*)。

Cloud Storage Parquet file to Bigtable テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Parquet Files on Cloud Storage to Cloud Bigtable template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
inputFilePattern=INPUT_FILE_PATTERN

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • INPUT_FILE_PATTERN: データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • INPUT_FILE_PATTERN: データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*

Cloud Storage SequenceFile to Bigtable

Cloud Storage SequenceFile to Bigtable テンプレートは、Cloud Storage バケット内の SequenceFile からデータを読み取り、そのデータを Bigtable テーブルに書き込むパイプラインです。このテンプレートは、Cloud Storage から Bigtable にデータをコピーする場合に使用できます。

このパイプラインの要件:

  • Bigtable テーブルが存在していること。
  • パイプラインを実行する前に、入力 SequenceFiles が Cloud Storage バケット内に存在すること。
  • 入力 SequenceFiles が Bigtable または HBase からエクスポートされていること。

テンプレートのパラメータ

パラメータ 説明
bigtableProject データを書き込む Bigtable インスタンスの Google Cloud プロジェクトの ID。
bigtableInstanceId テーブルが含まれている Bigtable インスタンスの ID。
bigtableTableId インポートする Bigtable テーブルの ID。
bigtableAppProfileId インポートに使用される Bigtable アプリケーション プロファイルの ID。アプリ プロファイルを指定しないと、Bigtable はインスタンスのデフォルトのアプリ プロファイルを使用します。
sourcePattern データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*)。

Cloud Storage SequenceFile to Bigtable テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the SequenceFile Files on Cloud Storage to Cloud Bigtable template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProject=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
bigtableAppProfileId=APPLICATION_PROFILE_ID,\
sourcePattern=SOURCE_PATTERN

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • APPLICATION_PROFILE_ID: エクスポートに使用される Bigtable アプリケーション プロファイルの ID。
  • SOURCE_PATTERN: データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProject": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "bigtableAppProfileId": "APPLICATION_PROFILE_ID",
       "sourcePattern": "SOURCE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID
  • INSTANCE_ID: テーブルが含まれている Bigtable インスタンスの ID
  • TABLE_ID: エクスポートする Bigtable テーブルの ID
  • APPLICATION_PROFILE_ID: エクスポートに使用される Bigtable アプリケーション プロファイルの ID。
  • SOURCE_PATTERN: データが存在する Cloud Storage パスのパターン(例: gs://mybucket/somefolder/prefix*

Cloud Storage Text to BigQuery

Cloud Storage Text to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルを読み取り、ユーザーが指定する JavaScript ユーザー定義関数(UDF)を使用してそれらのファイルを変換し、結果を BigQuery テーブルに追加するバッチ パイプラインです。

このパイプラインの要件:

  • BigQuery スキーマを記述する JSON ファイルを作成します。

    BigQuery Schema というタイトルのトップレベルの JSON 配列があり、その内容が {"name": "COLUMN_NAME", "type": "DATA_TYPE"} のパターンに従っていることを確認します。

    Cloud Storage Text to BigQuery バッチ テンプレートでは、ターゲットの BigQuery テーブルの STRUCT(レコード) フィールドへのデータのインポートはサポートされていません。

    次の JSON は、BigQuery スキーマの例を示しています。

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }
    
  • JavaScript(.js)ファイルを作成し、このファイル内に、テキスト行の変換ロジックを提供する UDF 関数を含めます。使用する関数は、JSON 文字列を返します。

    たとえば、次の関数は、CSV ファイルの各行を分割し、値を変換してから JSON 文字列を返します。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }

テンプレートのパラメータ

パラメータ 説明
javascriptTextTransformFunctionName 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
JSONPath Cloud Storage に格納された BigQuery スキーマを定義する JSON ファイルへの gs:// パス。例: gs://path/to/my/schema.json
javascriptTextTransformGcsPath : 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
inputFilePattern Cloud Storage 内で処理するテキストの gs:// パス。例: gs://path/to/my/text/data.txt
outputTable 処理されたデータを格納するために作成する BigQuery テーブル名。既存の BigQuery テーブルを再利用すると、データは宛先テーブルに追加されます。例: my-project-name:my-dataset.my-table
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスの一時ディレクトリ。例: gs://my-bucket/my-files/temp_dir

Cloud Storage Text to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to BigQuery (Batch) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --parameters \
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_BIGQUERY_SCHEMA_JSON: スキーマ定義を含む JSON ファイルへの Cloud Storage パス
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: テキスト データセットへの Cloud Storage パス
  • BIGQUERY_TABLE: BigQuery テーブル名
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス

Cloud Storage Text to Datastore [非推奨]

このテンプレートはサポートが終了しており、2022 年第 1 四半期に廃止されます。Cloud Storage Text to Firestore テンプレートに移行してください。

Cloud Storage Text to Datastore テンプレートは、Cloud Storage に保存されたテキスト ファイルを読み取り、JSON にエンコードされたエンティティを Datastore に書き込むバッチ パイプラインです。入力テキスト ファイルの各行は、指定された JSON 形式である必要があります。

このパイプラインの要件:

  • データストアが宛先プロジェクトで有効にされている必要があります。

テンプレートのパラメータ

パラメータ 説明
textReadPattern テキスト データファイルの場所を指定する Cloud Storage のパスパターン。例: gs://mybucket/somepath/*.json
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
datastoreWriteProjectId Datastore エンティティを書き込む Google Cloud プロジェクト ID
datastoreHintNumWorkers (省略可)Datastore のランプアップ スロットリング ステップで予想されるワーカー数のヒント。デフォルトは、500 です。
errorWritePath 処理中に発生したエラーを書き込むために使用するエラーログ出力ファイル。例: gs://bucket-name/errors.txt

Cloud Storage Text to Datastore テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Datastore template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Datastore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
datastoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • PATH_TO_INPUT_TEXT_FILES: Cloud Storage 上の入力ファイル パターン
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Cloud Storage 上のエラーファイルの目的のパス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Datastore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "datastoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • PATH_TO_INPUT_TEXT_FILES: Cloud Storage 上の入力ファイル パターン
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Cloud Storage 上のエラーファイルの目的のパス

Cloud Storage Text to Firestore

Cloud Storage Text to Firestore テンプレートは、Cloud Storage に保存されたテキスト ファイルを読み取り、JSON にエンコードされたエンティティを Firestore に書き込むバッチ パイプラインです。入力テキスト ファイルの各行は、指定された JSON 形式である必要があります。

このパイプラインの要件:

  • Firestore が宛先プロジェクトで有効にされている必要があります。

テンプレートのパラメータ

パラメータ 説明
textReadPattern テキスト データファイルの場所を指定する Cloud Storage のパスパターン。例: gs://mybucket/somepath/*.json
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
firestoreWriteProjectId Firestore エンティティを書き込む Google Cloud プロジェクト ID
firestoreHintNumWorkers (省略可)Firestore のランプアップ スロットリング ステップで予想されるワーカー数のヒント。デフォルトは、500 です。
errorWritePath 処理中に発生したエラーを書き込むために使用するエラーログ出力ファイル。例: gs://bucket-name/errors.txt

Cloud Storage Text to Firestore テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Firestore template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Firestore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
firestoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • PATH_TO_INPUT_TEXT_FILES: Cloud Storage 上の入力ファイル パターン
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Cloud Storage 上のエラーファイルの目的のパス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Firestore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "firestoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • PATH_TO_INPUT_TEXT_FILES: Cloud Storage 上の入力ファイル パターン
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Cloud Storage 上のエラーファイルの目的のパス

Cloud Storage Text to Pub/Sub(Batch)

このテンプレートは、Cloud Storage に保存されたテキスト ファイルからレコードを読み取り、Pub/Sub トピックに公開するバッチ パイプラインを作成します。このテンプレートは、JSON レコードまたは Pub/Sub トピックが改行区りで含まれるファイルまたは CSV ファイルのレコードを Pub/Sub トピックに公開し、リアルタイムで処理する場合に使用できます。また、Pub/Sub でデータを再生することもできます。

このテンプレートでは、個々のレコードにタイムスタンプを設定しません。実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

このパイプラインの要件:

  • 読み込むファイルは、改行区切りの JSON または CSV 形式でなければなりません。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • パイプラインを実行する前に、Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/files/*.json
outputTopic 書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にする必要があります。

Cloud Storage Text to Pub/Sub(Batch)テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Pub/Sub (Batch) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/files/*.json,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/files/*.json",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前

Cloud Storage Text to Cloud Spanner

Cloud Storage Text to Cloud Spanner テンプレートは、Cloud Storage から CSV テキスト ファイルを読み取り、Cloud Spanner データベースにインポートするバッチ パイプラインです。

このパイプラインの要件:

  • ターゲットの Cloud Spanner データベースとテーブルが存在している必要があります。
  • Cloud Storage バケットの読み取り権限と、対象の Cloud Spanner データベースに対する書き込み権限が必要です。
  • CSV ファイルを含む入力 Cloud Storage パスが存在している必要があります。
  • CSV ファイルの JSON 記述を含むインポート マニフェスト ファイルを作成し、そのマニフェストを Cloud Storage に保存する必要があります。
  • ターゲットの Cloud Spanner データベースにすでにスキーマがある場合、マニフェスト ファイルで指定された列は、ターゲット データベースのスキーマ内の対応する列と同じデータ型である必要があります。
  • ASCII または UTF-8 でエンコードされたマニフェスト ファイルは、次の形式に一致する必要があります。

  • インポートするテキスト ファイルは、ASCII または UTF-8 エンコードの CSV 形式である必要があります。UTF-8 エンコード ファイルではバイト オーダー マーク(BOM)を使用しないことをおすすめします。
  • データは次のタイプのいずれかに一致する必要があります。

    GoogleSQL

        BOOL
        INT64
        FLOAT64
        NUMERIC
        STRING
        DATE
        TIMESTAMP
        BYPES
        JSON

    PostgreSQL

        boolean
        bigint
        double precision
        numeric
        character varying, text
        date
        timestamp with time zone
        bytea

テンプレートのパラメータ

パラメータ 説明
instanceId Cloud Spanner データベースのインスタンス ID。
databaseId Cloud Spanner データベースのデータベース ID。
importManifest Cloud Storage のインポート マニフェスト ファイルへのパス。
columnDelimiter ソースファイルが使用する列区切り文字。デフォルト値は , です。
fieldQualifier 文字は、columnDelimiter を含むソースファイル内の任意の値を囲む必要があります。デフォルト値は " です。
trailingDelimiter ソースファイルの行の末尾に区切り文字があるかどうかを指定します(つまり、columnDelimiter 文字が各行の最後の列の値の後に表示されるかどうか)。デフォルト値は true です。
escape ソースファイルが使用するエスケープ文字。デフォルトでは、このパラメータは設定されておらず、テンプレートではエスケープ文字は使用されません。
nullString NULL 値を表す文字列。デフォルトでは、このパラメータは設定されておらず、テンプレートでは null 文字列は使用されません。
dateFormat 日付列を解析するために使用される形式。デフォルトでは、パイプラインは日付列を yyyy-M-d[' 00:00:00'] として解析します。たとえば 2019-01-31 または 2019-1-1 00:00:00 とします。日付形式が異なる場合は、java.time.format.DateTimeFormatter のパターンを使って指定します。
timestampFormat timestamp 列を解析するために使用される形式。タイムスタンプが長整数の場合、Unix エポック時間として解析されます。それ以外の場合は、java.time.format.DateTimeFormatter.ISO_INSTANT の形式を使用して、文字列として解析されます。その他の場合は、独自のパターン文字列を指定します。たとえば、MMM dd yyyy HH:mm:ss.SSSVV タイムスタンプの形式は "Jan 21 1998 01:02:03.456+08:00" です。

日付形式やタイムスタンプ形式をカスタマイズする必要がある場合は、有効な java.time.format.DateTimeFormatter パターンであることを確認してください。次の表に、カスタマイズされた date 列と timestamp 列の形式の例を示します。

入力値 フォーマット 備考
DATE 2011-3-31 デフォルトでは、テンプレートはこの形式を解析できます。dateFormat パラメータを指定する必要はありません。
DATE 2011-3-31 00:00:00 デフォルトでは、テンプレートはこの形式を解析できます。形式を指定する必要はありません。必要に応じて yyyy-M-d' 00:00:00' が使用できます。
DATE 2018 年 4 月 1 日 dd MMM, yy
DATE 西暦 2019 年 4 月 3 日水曜日 EEEE, LLLL d, yyyy G
TIMESTAMP 2019-01-02T11:22:33Z
2019-01-02T11:22:33.123Z
2019-01-02T11:22:33.12356789Z
デフォルトの形式 ISO_INSTANT はこのタイプのタイムスタンプを解析できます。timestampFormat パラメータを指定する必要はありません。
TIMESTAMP 1568402363 デフォルトでは、テンプレートはこのタイプのタイムスタンプを解析し、Unix エポック時間として扱います。
TIMESTAMP 2008 年 6 月 3 日(火曜日)11:05:30 GMT EEE, d MMM yyyy HH:mm:ss VV
TIMESTAMP 2018 年 12 月 31 日 110530.123 太平洋標準時 yyyy/MM/dd HHmmss.SSSz
TIMESTAMP 2019-01-02T11:22:33Z または 2019-01-02T11:22:33.123Z yyyy-MM-dd'T'HH:mm:ss [.SSS]VV 入力列が 2019-01-02T11:22:33Z と 2019-01-02T11:22:33.123Z の場合、この形式のタイムスタンプはデフォルトの形式で解析できます。独自のフォーマット パラメータを指定する必要はありません。yyyy-MM-dd'T'HH:mm:ss[.SSS]VV を使用すると、両方のケースに対応できます。接尾辞「Z」は文字リテラルではなくタイムゾーン ID として解析する必要があるため、yyyy-MM-dd'T'HH:mm:ss[.SSS]'Z' は使用できません。内部的には、タイムスタンプ列は java.time.Instant に変換されます。そのため、UTC で指定するか、タイムゾーン情報を設定する必要があります。2019-01-02 11:22:33 のようなローカル日時は、有効な java.time.Instant として解析されません。

Text Files on Cloud Storage to Cloud Spanner テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Cloud Spanner template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner \
    --region REGION_NAME \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
importManifest=GCS_PATH_TO_IMPORT_MANIFEST

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • INSTANCE_ID: Cloud Spanner インスタンス ID
  • DATABASE_ID: Cloud Spanner データベース ID
  • GCS_PATH_TO_IMPORT_MANIFEST: インポート マニフェスト ファイルへの Cloud Storage パス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "importManifest": "GCS_PATH_TO_IMPORT_MANIFEST"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • INSTANCE_ID: Cloud Spanner インスタンス ID
  • DATABASE_ID: Cloud Spanner データベース ID
  • GCS_PATH_TO_IMPORT_MANIFEST: インポート マニフェスト ファイルへの Cloud Storage パス

Cloud Storage to Elasticsearch

Cloud Storage to Elasticsearch テンプレートは、Cloud Storage バケットに保存されている CSV ファイルからデータを読み取り、データを JSON ドキュメントとして Elasticsearch に書き込むバッチ パイプラインです。

このパイプラインの要件:

  • Cloud Storage バケットが存在している必要があります。
  • Dataflow からアクセス可能な Google Cloud インスタンスまたは Elasticsearch Cloud に Elasticsearch ホストが存在している必要があります。
  • エラー出力用の BigQuery テーブルが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFileSpec CSV ファイルを検索する Cloud Storage ファイル パターン。例: gs://mybucket/test-*.csv
connectionUrl https://hostname:[port] 形式の Elasticsearch URL。Elastic Cloud を使用する場合は CloudID を指定します。
apiKey 認証に使用される Base64 エンコードの API キー。
index リクエストが発行される Elasticsearch インデックス(my-index など)。
deadletterTable 挿入先の送信に失敗した BigQuery の Deadletter テーブル。例: <your-project>:<your-dataset>.<your-table-name>
containsHeaders (省略可)CSV にヘッダーが含まれているかどうかを示すブール値。デフォルトは true です。
delimiter (省略可)CSV ファイルの区切り文字。例: ,
csvFormat (省略可)Apache Commons CSV 形式に準拠する CSV 形式。デフォルト: Default
jsonSchemaPath (省略可)JSON スキーマのパス。デフォルト: null
largeNumFiles (省略可)ファイルの数が数万個の場合は、true に設定します。デフォルト: false
javascriptTextTransformGcsPath (省略可) 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (省略可) 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。
batchSize (省略可)バッチサイズ(ドキュメント数)。デフォルト: 1000
batchSizeBytes (省略可)バッチサイズ(バイト数)。デフォルト: 5242880(5 MB)。
maxRetryAttempts (省略可)最大再試行回数。0 より大きくする必要があります。デフォルト: 再試行なし
maxRetryDuration (省略可)最大再試行時間(ミリ秒)は 0 より大きくする必要があります。デフォルト: 再試行なし
csvFileEncoding (省略可)CSV ファイルのエンコード。
propertyAsIndex (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _index メタデータを指定し、一括リクエストではドキュメントに含まれます(_index UDF よりも優先適用されます)。デフォルト: none
propertyAsId (省略可)インデックスに登録されているドキュメント内のプロパティ。その値は _id メタデータを指定し、一括リクエストではドキュメントに含まれます(_id UDF よりも優先適用されます)。デフォルト: none
javaScriptIndexFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIndexFnName (省略可)一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIdFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptIdFnName (省略可)一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptTypeFnGcsPath (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルト: none
javaScriptTypeFnName (省略可)一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の UDF JavaScript 関数名。デフォルト: none
javaScriptIsDeleteFnGcsPath (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
javaScriptIsDeleteFnName (省略可)ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の UDF JavaScript 関数名。この関数は、文字列値 "true" または "false" を返す必要があります。デフォルト: none
usePartialUpdate (省略可)Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルト: false
bulkInsertMethod (省略可)INDEX(インデックス、upserts を許可する)または CREATE(作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルト: CREATE

Cloud Storage to Elasticsearch テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Storage to Elasticsearch template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • INPUT_FILE_SPEC: Cloud Storage ファイル パターン。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。
  • DEADLETTER_TABLE: BigQuery テーブル。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/GCS_to_Elasticsearch",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • INPUT_FILE_SPEC: Cloud Storage ファイル パターン。
  • CONNECTION_URL: Elasticsearch の URL。
  • APIKEY: 認証用に Base64 でエンコードされた API キー。
  • INDEX: Elasticsearch インデックス。
  • DEADLETTER_TABLE: BigQuery テーブル。

Java Database Connectivity(JDBC)to BigQuery

JDBC to BigQuery テンプレートは、リレーショナル データベース テーブルから既存の BigQuery テーブルにデータをコピーするバッチ パイプラインです。このパイプラインは、JDBC を使用してリレーショナル データベースに接続します。このテンプレートを使用すると、使用可能な JDBC ドライバがある任意のリレーショナル データベースから BigQuery にデータをコピーできます。保護をさらに強化するために、Cloud KMS 鍵で暗号化された Base64 でエンコードされたユーザー名、パスワード、接続文字列パラメータを渡すこともできます。詳しくは Cloud KMS API 暗号化エンドポイントで、ユーザー名、パスワード、接続文字列パラメータの暗号化の詳細をご覧ください。

このパイプラインの要件:

  • リレーショナル データベース用の JDBC ドライバが使用可能である必要があります。
  • パイプラインを実行する前に、BigQuery テーブルが存在する必要があります。
  • BigQuery テーブルに互換性のあるスキーマが必要です。
  • リレーショナル データベースは、Dataflow が実行されているサブネットからアクセス可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
driverJars ドライバ JAR ファイルのカンマ区切りのリスト。例: gs://<my-bucket>/driver_jar1.jar,gs://<my-bucket>/driver_jar2.jar
driverClassName JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver
connectionURL JDBC 接続 URL 文字列。例: jdbc:mysql://some-host:3306/sampledbBase64 でエンコードされ、Cloud KMS 鍵で暗号化される文字列として渡すことができます。
query ソースで実行されるクエリでデータを抽出します。例: select * from sampledb.sample_table
outputTable BigQuery 出力テーブルの場所。<my-project>:<my-dataset>.<my-table> の形式で指定します。
bigQueryLoadingTemporaryDirectory BigQuery 読み込みプロセスの一時ディレクトリ。例: gs://<my-bucket>/my-files/temp_dir
connectionProperties (省略可)JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。例: unicode=true;characterEncoding=UTF-8
username (省略可)JDBC 接続に使用するユーザー名。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
password (省略可)JDBC 接続に使用するパスワード。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
KMSEncryptionKey (省略可)ユーザー名、パスワード、接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、ユーザー名、パスワード、接続文字列はすべて暗号化されて渡されます。
disabledAlgorithms (省略可)無効にするアルゴリズム。カンマ区切りで指定します。この値が none に設定されている場合、アルゴリズムは無効になりません。アルゴリズムはデフォルトで無効であり、脆弱性やパフォーマンス上の問題があるため、慎重に使用してください。例: SSLv3, RC4.
extraFilesToStage ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

JDBC to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the JDBC to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Jdbc_to_BigQuery \
    --region REGION_NAME \
    --parameters \
driverJars=DRIVER_PATHS,\
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • DRIVER_CLASS_NAME: ドライブクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • SOURCE_SQL_QUERY: ソース データベースで実行する SQL クエリ
  • DATASET: BigQuery データセット。TABLE_NAME は BigQuery テーブル名に置き換えます
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Jdbc_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverJars": "DRIVER_PATHS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "query": "SOURCE_SQL_QUERY",
       "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • DRIVER_CLASS_NAME: ドライブクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • SOURCE_SQL_QUERY: ソース データベースで実行する SQL クエリ
  • DATASET: BigQuery データセット。TABLE_NAME は BigQuery テーブル名に置き換えます
  • PATH_TO_TEMP_DIR_ON_GCS: 一時ディレクトリへの Cloud Storage パス
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

Java Database Connectivity(JDBC)to Pub/Sub

Java Database Connectivity(JDBC)to Pub/Sub テンプレートは、JDBC ソースからデータを取り込み、結果のレコードを JSON 文字列として既存の Pub/Sub トピックに書き込むバッチ パイプラインです。

このパイプラインの要件:

  • パイプラインを実行する前に JDBC ソースが存在している必要があります。
  • Cloud Pub/Sub 出力トピックは、パイプラインを実行する前に存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
driverClassName JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver
connectionUrl JDBC 接続 URL 文字列。例: jdbc:mysql://some-host:3306/sampledbBase64 でエンコードされ、Cloud KMS 鍵で暗号化される文字列として渡すことができます。
driverJars JDBC ドライバのカンマ区切りの Cloud Storage パス。例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username (省略可)JDBC 接続に使用するユーザー名。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
password (省略可)JDBC 接続に使用するパスワード。Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
connectionProperties (省略可)JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。例: unicode=true;characterEncoding=UTF-8
query ソースで実行されるクエリでデータを抽出します。例: select * from sampledb.sample_table
outputTopic 公開先の Pub/Sub トピック。projects/<project>/topics/<topic> の形式で指定します。
KMSEncryptionKey (省略可)ユーザー名、パスワード、接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、ユーザー名、パスワード、接続文字列はすべて暗号化されて渡されます。
extraFilesToStage ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

Java Database Connectivity(JDBC)to Pub/Sub テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the JDBC to Pub/Sub template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/flex/Jdbc_to_PubSub \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
query=SOURCE_SQL_QUERY,\
outputTopic=OUTPUT_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SOURCE_SQL_QUERY: ソース データベースで実行する SQL クエリ
  • OUTPUT_TOPIC: 公開先の Pub/Sub
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "query": "SOURCE_SQL_QUERY",
       "outputTopic": "OUTPUT_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SOURCE_SQL_QUERY: ソース データベースで実行する SQL クエリ
  • OUTPUT_TOPIC: 公開先の Pub/Sub
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

Apache Cassandra to Cloud Bigtable

Apache Cassandra to Cloud Bigtable テンプレートは、Apache Cassandra から Cloud Bigtable にテーブルをコピーします。このテンプレートに対して行う必要のある構成は最小限に抑えられており、Cassandra のテーブル構造を Cloud Bigtable で可能な限り再現します。

Apache Cassandra to Cloud Bigtable テンプレートは次の場合に役立ちます。

  • 短いダウンタイムしか許容されない状況で Apache Cassandra データベースを移行する。
  • グローバルなサービス提供を目的として、Cassandra のテーブルを Cloud Bigtable に定期的に複製する。

このパイプラインの要件:

  • パイプラインを実行する前に、複製先の Bigtable テーブルが存在していること。
  • Dataflow ワーカーと Apache Cassandra ノードの間のネットワーク接続。

型変換

Apache Cassandra to Cloud Bigtable テンプレートでは、Apache Cassandra のデータ型が Cloud Bigtable のデータ型に自動的に変換されます。

ほとんどのプリミティブは Cloud Bigtable と Apache Cassandra で同じように表現されますが、次のプリミティブは異なる方法で表現されます。

  • DateTimestampDateTime オブジェクトに変換されます。
  • UUIDString に変換されます。
  • VarintBigDecimal に変換されます。

Apache Cassandra は、TupleListSetMap などの複雑な型もネイティブにサポートしています。Apache Beam にはタプルに対応する型がないため、このパイプラインではタプルはサポートされません。

たとえば、Apache Cassandra では「mylist」という名前の List 型の列を使用し、次の表のような値を格納できます。

row mylist
1 (a,b,c)

このリスト列はパイプラインによって 3 つの異なる列に展開されます(Cloud Bigtable ではこれを列修飾子といいます)。列の名前は「mylist」ですが、「mylist[0]」のようにリスト内のアイテムのインデックスがパイプラインによって追加されます。

row mylist[0] mylist[1] mylist[2]
1 a b c

このパイプラインでは、セットもリストと同じように処理されますが、セルがキーか値かを示す接尾辞が追加されます。

row mymap
1 {"first_key":"first_value","another_key":"different_value"}

変換後、テーブルは次のようになります。

row mymap[0].key mymap[0].value mymap[1].key mymap[1].value
1 first_key first_value another_key different_value

主キー変換

Apache Cassandra では、主キーはデータ定義言語を使用して定義されます。主キーは、単純、複合、クラスタ化された列の複合のいずれかです。Cloud Bigtable では、バイト配列を辞書順に並べ替える行キーの手動作成がサポートされています。このパイプラインは、キーの型の情報を自動で収集し、複数の値に基づいて行キーを作成するためのベスト プラクティスに基づいてキーを作成します。

テンプレートのパラメータ

パラメータ 説明
cassandraHosts Apache Cassandra ノードのホストをカンマ区切りのリストで表したもの。
cassandraPort (省略可)ノード上の Apache Cassandra に到達する TCP ポート(デフォルトは 9042)。
cassandraKeyspace テーブルが配置されている Apache Cassandra キースペース。
cassandraTable コピーする Apache Cassandra テーブル。
bigtableProjectId Apache Cassandra テーブルがコピーされる Bigtable インスタンスの Google Cloud プロジェクト ID。
bigtableInstanceId Apache Cassandra テーブルをコピーする Cloud Bigtable インスタンス ID。
bigtableTableId Apache Cassandra テーブルをコピーする Bigtable テーブルの名前。
defaultColumnFamily (省略可)Bigtable テーブルの列ファミリーの名前(デフォルトは default)。
rowKeySeparator (省略可)行キーの作成に使用される区切り文字(デフォルトは #)。

Apache Cassandra to Cloud Bigtable テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cassandra to Cloud Bigtable template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableTableId=BIGTABLE_TABLE_ID,\
cassandraHosts=CASSANDRA_HOSTS,\
cassandraKeyspace=CASSANDRA_KEYSPACE,\
cassandraTable=CASSANDRA_TABLE

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: Cloud Bigtable が配置されているプロジェクト ID
  • BIGTABLE_INSTANCE_ID: Cloud Bigtable インスタンス ID
  • BIGTABLE_TABLE_ID: Cloud Bigtable テーブル名
  • CASSANDRA_HOSTS: Apache Cassandra のホストリスト。複数のホストがある場合は、カンマをエスケープするための手順を行ってください
  • CASSANDRA_KEYSPACE: テーブルが配置されている Apache Cassandra キースペース
  • CASSANDRA_TABLE: 移行する必要がある Apache Cassandra テーブル

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "BIGTABLE_INSTANCE_ID",
       "bigtableTableId": "BIGTABLE_TABLE_ID",
       "cassandraHosts": "CASSANDRA_HOSTS",
       "cassandraKeyspace": "CASSANDRA_KEYSPACE",
       "cassandraTable": "CASSANDRA_TABLE"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJET_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • BIGTABLE_PROJECT_ID: Cloud Bigtable が配置されているプロジェクト ID
  • BIGTABLE_INSTANCE_ID: Cloud Bigtable インスタンス ID
  • BIGTABLE_TABLE_ID: Cloud Bigtable テーブル名
  • CASSANDRA_HOSTS: Apache Cassandra のホストリスト。複数のホストがある場合は、カンマをエスケープするための手順を行ってください
  • CASSANDRA_KEYSPACE: テーブルが配置されている Apache Cassandra キースペース
  • CASSANDRA_TABLE: 移行する必要がある Apache Cassandra テーブル

MongoDB to BigQuery

MongoDB to BigQuery テンプレートは、MongoDB からドキュメントを読み取り、userOption パラメータで指定されたとおりに BigQuery に書き込むバッチ パイプラインです。

このパイプラインの要件

  • ターゲット BigQuery データセットが存在すること。
  • ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。

テンプレートのパラメータ

パラメータ 説明
mongoDbUri MongoDB 接続 URI。形式は mongodb+srv://:@
database コレクションを読み取る MongoDB 内のデータベース。例: my-db
collection MongoDB データベース内のコレクションの名前。例: my-collection
outputTableSpec 書き込み先の BigQuery テーブル。例: bigquery-project:dataset.output_table
userOption FLATTEN または NONEFLATTEN: ドキュメントを第 1 レベルでフラット化します。NONE は、ドキュメント全体を JSON 文字列として格納します。

MongoDB to BigQuery テンプレートの実行

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョン エンドポイントは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the MongoDB to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
  • MONGO_DB_URI: MongoDB URI。
  • DATABASE: MongoDB データベース。
  • COLLECTION: MongoDB コレクション。
  • USER_OPTION: FLATTEN または NONE。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン エンドポイント。例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates/latest/)にあります。
    • バージョン名(例: 2021-09-20-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット gs://dataflow-templates/ 内の、名前に日付が入った親フォルダに格納されています。
  • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
  • MONGO_DB_URI: MongoDB URI。
  • DATABASE: MongoDB データベース。
  • COLLECTION: MongoDB コレクション。
  • USER_OPTION: FLATTEN または NONE。