Template Spanner ke BigQuery

Template Spanner to BigQuery adalah pipeline batch yang membaca data dari tabel Spanner dan menulis data ke BigQuery.

Persyaratan pipeline

  • Tabel Spanner sumber harus ada sebelum menjalankan pipeline.
  • Set data BigQuery harus ada sebelum menjalankan pipeline.
  • File JSON yang mendeskripsikan skema BigQuery Anda.

    File harus berisi array JSON level teratas yang berjudul fields. Konten array fields harus menggunakan pola berikut:
    {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    JSON berikut menjelaskan contoh skema BigQuery:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }

    Template batch Spanner ke BigQuery tidak mendukung impor data ke kolom STRUCT (Record) di tabel BigQuery target.

Parameter template

Parameter yang diperlukan

  • spannerInstanceId: ID instance database Spanner yang akan dibaca.
  • spannerDatabaseId: ID database dari database Spanner yang akan diekspor.
  • outputTableSpec: Lokasi tabel output BigQuery tempat output akan ditulis. Misalnya, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.Bergantung pada createDisposition yang ditentukan, tabel output dapat dibuat secara otomatis menggunakan skema Avro yang disediakan pengguna.

Parameter opsional

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Spanner to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTableId=SPANNER_TABLE_ID,\
       sqlQuery=SQL_QUERY,\
       outputTableSpec=OUTPUT_TABLE_SPEC,\

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE_ID: ID database Spanner
  • SPANNER_TABLE_ID: nama tabel Spanner
  • SQL_QUERY: kueri SQL
  • OUTPUT_TABLE_SPEC: lokasi tabel BigQuery

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "spannerInstanceId": "SPANNER_INSTANCE_ID",
       "spannerDatabaseId": "SPANNER_DATABASE_ID",
       "spannerTableId": "SPANNER_TABLE_ID",
       "sqlQuery": "SQL_QUERY",
       "outputTableSpec": "OUTPUT_TABLE_SPEC",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE_ID: ID database Spanner
  • SPANNER_TABLE_ID: nama tabel Spanner
  • SQL_QUERY: kueri SQL
  • OUTPUT_TABLE_SPEC: lokasi tabel BigQuery
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_NEVER;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.SpannerToBigQueryTransform.StructToJson;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/** Template to read data from a Spanner table and write into a BigQuery table. */
@Template(
    name = "Cloud_Spanner_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Spanner to BigQuery",
    description =
        "The Spanner to BigQuery template is a batch pipeline that reads data from a Spanner table, and writes them to a BigQuery table.",
    optionsClass = SpannerToBigQueryOptions.class,
    flexContainerName = "spanner-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/spanner-to-bigquery",
    contactInformation = "https://cloud.google.com/support")
public final class SpannerToBigQuery {

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PipelineOptionsFactory.register(SpannerToBigQueryOptions.class);
    SpannerToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToBigQueryOptions.class);

    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);

    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(
                options.getSpannerProjectId().isEmpty()
                    ? options.getProject()
                    : options.getSpannerProjectId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withInstanceId(options.getSpannerInstanceId())
            .withRpcPriority(options.getSpannerRpcPriority());

    SpannerIO.Read read = SpannerIO.read().withSpannerConfig(spannerConfig);

    if (!Strings.isNullOrEmpty(options.getSqlQuery())) {
      read = read.withQuery(options.getSqlQuery());
    } else if (!Strings.isNullOrEmpty(options.getSpannerTableId())) {
      read = read.withTable(options.getSpannerTableId());
    } else {
      throw new IllegalArgumentException("either sqlQuery or spannerTableId required");
    }
    if (Strings.isNullOrEmpty(options.getBigQuerySchemaPath())
        && CreateDisposition.valueOf(options.getCreateDisposition()) != CREATE_NEVER) {
      throw new IllegalArgumentException(
          "bigQuerySchemaPath is required if CreateDisposition is not CREATE_NEVER");
    }
    pipeline
        .apply(read)
        .apply(new StructToJson())
        .apply("Write To BigQuery", writeToBigQuery(options));

    pipeline.run();
  }

  private static Write<String> writeToBigQuery(SpannerToBigQueryOptions options) {
    if (CreateDisposition.valueOf(options.getCreateDisposition()) == CREATE_NEVER) {
      return BigQueryIO.<String>write()
          .to(options.getOutputTableSpec())
          .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
          .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
          .withExtendedErrorInfo()
          .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    }
    return BigQueryIO.<String>write()
        .to(options.getOutputTableSpec())
        .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
        .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
        .withExtendedErrorInfo()
        .withFormatFunction(BigQueryConverters::convertJsonToTableRow)
        .withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
  }
}

Langkah berikutnya