Spanner to BigQuery テンプレート

Spanner to BigQuery テンプレートは、Spanner テーブルからデータを読み取り、BigQuery に書き込むバッチ パイプラインです。

パイプラインの要件

  • パイプラインを実行する前に、ソースの Spanner テーブルが存在する必要があります。
  • パイプラインの実行前に BigQuery データセットが存在している必要があります。
  • BigQuery スキーマを記述する JSON ファイル。

    このファイルには、fields というタイトルの最上位の JSON 配列が含まれている必要があります。fields 配列の内容は、次のパターンを使用する必要があります。
    {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    次の JSON は、BigQuery スキーマの例を示しています。

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }

    Spanner to BigQuery バッチ テンプレートでは、ターゲットの BigQuery テーブルの STRUCT(レコード) フィールドへのデータのインポートはサポートされていません。

テンプレートのパラメータ

必須パラメータ

  • spannerInstanceId: 読み取り元の Spanner データベースのインスタンス ID。
  • spannerDatabaseId: エクスポートする Spanner データベースのデータベース ID。
  • outputTableSpec: 出力を書き込む BigQuery 出力テーブルの場所。たとえば、<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME> です。指定された createDisposition によっては、ユーザーが指定した Avro スキーマを使用して出力テーブルが自動的に作成される場合があります。

オプション パラメータ

  • spannerProjectId: 省略可: Spanner データベースが存在するプロジェクトの ID。このパラメータのデフォルト値は、Dataflow パイプラインが動作しているプロジェクトです。
  • spannerTableId: エクスポートする Spanner データベースのテーブル名。sqlQuery が設定されている場合は無視されます。
  • spannerRpcPriority: Spanner 呼び出しのリクエストの優先度(https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions)。有効な値は HIGHMEDIUMLOW です。デフォルト値は HIGH です。
  • sqlQuery: Spanner データベースからデータを読み取るために使用する SQL クエリ。spannerTableId が空の場合は必須です。
  • bigQuerySchemaPath: BigQuery スキーマを定義する JSON ファイルの Cloud Storage パス(gs://)(例: gs://your-bucket/your-schema.json)。
  • writeDisposition : BigQuery WriteDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)の値。例: WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。デフォルトは WRITE_APPEND です。
  • createDisposition: BigQuery CreateDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)。たとえば、CREATE_IF_NEEDEDCREATE_NEVER です。デフォルトは CREATE_IF_NEEDED です。
  • useStorageWriteApi: true の場合、パイプラインでは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Spanner to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTableId=SPANNER_TABLE_ID,\
       sqlQuery=SQL_QUERY,\
       outputTableSpec=OUTPUT_TABLE_SPEC,\

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE_ID: Spanner データベース ID
  • SPANNER_TABLE_ID: Spanner テーブル名
  • SQL_QUERY: SQL クエリ
  • OUTPUT_TABLE_SPEC: BigQuery テーブルのロケーション

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "spannerInstanceId": "SPANNER_INSTANCE_ID",
       "spannerDatabaseId": "SPANNER_DATABASE_ID",
       "spannerTableId": "SPANNER_TABLE_ID",
       "sqlQuery": "SQL_QUERY",
       "outputTableSpec": "OUTPUT_TABLE_SPEC",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE_ID: Spanner データベース ID
  • SPANNER_TABLE_ID: Spanner テーブル名
  • SQL_QUERY: SQL クエリ
  • OUTPUT_TABLE_SPEC: BigQuery テーブルのロケーション
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_NEVER;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.SpannerToBigQueryTransform.StructToJson;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/** Template to read data from a Spanner table and write into a BigQuery table. */
@Template(
    name = "Cloud_Spanner_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Spanner to BigQuery",
    description =
        "The Spanner to BigQuery template is a batch pipeline that reads data from a Spanner table, and writes them to a BigQuery table.",
    optionsClass = SpannerToBigQueryOptions.class,
    flexContainerName = "spanner-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/spanner-to-bigquery",
    contactInformation = "https://cloud.google.com/support")
public final class SpannerToBigQuery {

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PipelineOptionsFactory.register(SpannerToBigQueryOptions.class);
    SpannerToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToBigQueryOptions.class);

    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);

    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(
                options.getSpannerProjectId().isEmpty()
                    ? options.getProject()
                    : options.getSpannerProjectId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withInstanceId(options.getSpannerInstanceId())
            .withRpcPriority(options.getSpannerRpcPriority());

    SpannerIO.Read read = SpannerIO.read().withSpannerConfig(spannerConfig);

    if (!Strings.isNullOrEmpty(options.getSqlQuery())) {
      read = read.withQuery(options.getSqlQuery());
    } else if (!Strings.isNullOrEmpty(options.getSpannerTableId())) {
      read = read.withTable(options.getSpannerTableId());
    } else {
      throw new IllegalArgumentException("either sqlQuery or spannerTableId required");
    }
    pipeline
        .apply(read)
        .apply(new StructToJson())
        .apply("Write To BigQuery", writeToBigQuery(options));

    pipeline.run();
  }

  private static Write<String> writeToBigQuery(SpannerToBigQueryOptions options) {
    if (CreateDisposition.valueOf(options.getCreateDisposition()) == CREATE_NEVER) {
      return BigQueryIO.<String>write()
          .to(options.getOutputTableSpec())
          .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
          .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
          .withExtendedErrorInfo()
          .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    }
    return BigQueryIO.<String>write()
        .to(options.getOutputTableSpec())
        .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
        .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
        .withExtendedErrorInfo()
        .withFormatFunction(BigQueryConverters::convertJsonToTableRow)
        .withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
  }
}

次のステップ