Modelo do Spanner para o BigQuery

O modelo do Spanner para BigQuery é um pipeline em lote que lê dados de uma tabela do Spanner e os grava no BigQuery.

Requisitos de pipeline

  • A tabela de entrada do Spanner precisa existir antes da execução do pipeline.
  • O conjunto de dados do BigQuery precisa existir antes da execução do pipeline.
  • Um arquivo JSON que descreve o esquema do BigQuery.

    O arquivo precisa conter uma matriz JSON de nível superior chamada fields. O conteúdo da matriz fields precisa usar o seguinte padrão:
    {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    Veja no JSON a seguir um exemplo de esquema do BigQuery:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }

    O modelo de lote do Spanner para BigQuery não dá suporte à importação de dados para os campos STRUCT (registro) na tabela de destino do BigQuery.

Parâmetros do modelo

Parâmetros obrigatórios

  • spannerInstanceId: o ID da instância do banco de dados do Spanner que será usado para leitura.
  • spannerDatabaseId: o ID do banco de dados do Spanner a ser exportado.
  • outputTableSpec: o local da tabela de saída do BigQuery em que a saída será gravada. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Dependendo do createDisposition especificado, a tabela de saída pode ser criada automaticamente usando o esquema do Avro fornecido pelo usuário.

Parâmetros opcionais

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Spanner to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTableId=SPANNER_TABLE_ID,\
       sqlQuery=SQL_QUERY,\
       outputTableSpec=OUTPUT_TABLE_SPEC,\

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: o ID da instância do Spanner
  • SPANNER_DATABASE_ID: o ID do banco de dados do Spanner
  • SPANNER_TABLE_ID: o nome da tabela do Spanner
  • SQL_QUERY: a consulta SQL
  • OUTPUT_TABLE_SPEC: o local da tabela do BigQuery

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "spannerInstanceId": "SPANNER_INSTANCE_ID",
       "spannerDatabaseId": "SPANNER_DATABASE_ID",
       "spannerTableId": "SPANNER_TABLE_ID",
       "sqlQuery": "SQL_QUERY",
       "outputTableSpec": "OUTPUT_TABLE_SPEC",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: o ID da instância do Spanner
  • SPANNER_DATABASE_ID: o ID do banco de dados do Spanner
  • SPANNER_TABLE_ID: o nome da tabela do Spanner
  • SQL_QUERY: a consulta SQL
  • OUTPUT_TABLE_SPEC: o local da tabela do BigQuery
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_NEVER;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.SpannerToBigQueryTransform.StructToJson;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/** Template to read data from a Spanner table and write into a BigQuery table. */
@Template(
    name = "Cloud_Spanner_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Spanner to BigQuery",
    description =
        "The Spanner to BigQuery template is a batch pipeline that reads data from a Spanner table, and writes them to a BigQuery table.",
    optionsClass = SpannerToBigQueryOptions.class,
    flexContainerName = "spanner-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/spanner-to-bigquery",
    contactInformation = "https://cloud.google.com/support")
public final class SpannerToBigQuery {

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PipelineOptionsFactory.register(SpannerToBigQueryOptions.class);
    SpannerToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToBigQueryOptions.class);

    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);

    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(
                options.getSpannerProjectId().isEmpty()
                    ? options.getProject()
                    : options.getSpannerProjectId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withInstanceId(options.getSpannerInstanceId())
            .withRpcPriority(options.getSpannerRpcPriority());

    SpannerIO.Read read = SpannerIO.read().withSpannerConfig(spannerConfig);

    if (!Strings.isNullOrEmpty(options.getSqlQuery())) {
      read = read.withQuery(options.getSqlQuery());
    } else if (!Strings.isNullOrEmpty(options.getSpannerTableId())) {
      read = read.withTable(options.getSpannerTableId());
    } else {
      throw new IllegalArgumentException("either sqlQuery or spannerTableId required");
    }
    if (Strings.isNullOrEmpty(options.getBigQuerySchemaPath())
        && CreateDisposition.valueOf(options.getCreateDisposition()) != CREATE_NEVER) {
      throw new IllegalArgumentException(
          "bigQuerySchemaPath is required if CreateDisposition is not CREATE_NEVER");
    }
    pipeline
        .apply(read)
        .apply(new StructToJson())
        .apply("Write To BigQuery", writeToBigQuery(options));

    pipeline.run();
  }

  private static Write<String> writeToBigQuery(SpannerToBigQueryOptions options) {
    if (CreateDisposition.valueOf(options.getCreateDisposition()) == CREATE_NEVER) {
      return BigQueryIO.<String>write()
          .to(options.getOutputTableSpec())
          .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
          .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
          .withExtendedErrorInfo()
          .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    }
    return BigQueryIO.<String>write()
        .to(options.getOutputTableSpec())
        .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
        .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
        .withExtendedErrorInfo()
        .withFormatFunction(BigQueryConverters::convertJsonToTableRow)
        .withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
  }
}

A seguir