Modelo do Cloud Storage Avro para Spanner

O modelo de arquivos Avro do Cloud Storage para o Spanner é um pipeline em lote que lê arquivos Avro exportados do Spanner e armazenados em um banco de dados do Spanner.

Requisitos de pipeline

  • O banco de dados de destino do Spanner precisa existir e estar vazio.
  • É necessário ter permissões de leitura para o bucket do Cloud Storage e permissões de gravação para o banco de dados de destino do Spanner.
  • O caminho do Cloud Storage precisa existir e incluir um arquivo spanner-export.jsoncom uma descrição JSON dos arquivos a serem importados.
  • Se o arquivo Avro de origem não contiver uma chave primária, será necessário criar uma tabela vazia do Spanner com uma chave primária antes de executar o modelo. Esta etapa não será necessária se o arquivo Avro definir a chave primária.

Parâmetros do modelo

Parâmetros obrigatórios

  • instanceId: o ID da instância referente ao banco de dados do Spanner.
  • databaseId: o ID do banco de dados do Spanner.
  • inputDir: o caminho do Cloud Storage de onde os arquivos Avro são importados.

Parâmetros opcionais

  • spannerHost: o endpoint do Cloud Spanner a ser chamado no modelo. Usado apenas para testes. Por exemplo, https://batch-spanner.googleapis.com. O padrão é: https://batch-spanner.googleapis.com.
  • waitForIndexes: se for true, o pipeline aguardará a criação dos índices. Se for false, o job poderá ser concluído enquanto os índices ainda estiverem sendo criados em segundo plano. O valor padrão é false.
  • waitForForeignKeys: se for true, o pipeline aguardará a criação das chaves externas. Se for false, o job poderá ser concluído enquanto chaves externas ainda estiverem sendo criadas em segundo plano. O valor padrão é false.
  • waitForChangeStreams: se for true, o pipeline aguardará a criação dos fluxos de alterações. Se for false, o job poderá ser concluído enquanto os fluxos de alterações ainda estiverem sendo criados em segundo plano. O valor padrão é true.
  • waitForSequences: por padrão, o pipeline de importação é bloqueado na criação da sequência. Se for false, o pipeline de importação poderá ser concluído com sequências ainda sendo criadas em segundo plano.
  • earlyIndexCreateFlag: especifica se a criação antecipada de índices está ativada. Se o modelo executa um grande número de instruções DDL, é mais eficiente criar índices antes de carregar os dados. Portanto, o comportamento padrão é criar os índices primeiro quando o número de instruções DDL exceder um limite. Para desativar o recurso, defina earlyIndexCreateFlag como false. O valor padrão é true.
  • spannerProjectId: o ID do projeto do Google Cloud que contém o banco de dados do Spanner. Se não for definido, o projeto padrão do Google Cloud será usado.
  • ddlCreationTimeoutInMinutes: o tempo limite, em minutos, das instruções DDL executadas pelo modelo. O valor padrão é 30 minutos.
  • spannerPriority: a prioridade da solicitação para chamadas do Spanner. Os valores possíveis são HIGH, MEDIUM e LOW. O valor padrão é MEDIUM.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.

    Para que o job apareça na página de Instâncias do Spanner do console do Google Cloud, o nome do job precisa corresponder ao seguinte formato:

    cloud-spanner-import-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    Substitua:

    • SPANNER_INSTANCE_ID: o ID da instância do Spanner
    • SPANNER_DATABASE_NAME: o nome do banco de dados do Spanner
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Avro Files on Cloud Storage to Cloud Spanner template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Avro_to_Cloud_Spanner \
    --region REGION_NAME \
    --staging-location GCS_STAGING_LOCATION \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
inputDir=GCS_DIRECTORY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • INSTANCE_ID: o ID da instância do Spanner que contém o banco de dados.
  • DATABASE_ID: o ID do banco de dados do Spanner a ser importado
  • GCS_DIRECTORY: o caminho do Cloud Storage de onde os arquivos Avro são importados. Por exemplo, gs://mybucket/somefolder.

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Avro_to_Cloud_Spanner
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "inputDir": "gs://GCS_DIRECTORY"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • INSTANCE_ID: o ID da instância do Spanner que contém o banco de dados.
  • DATABASE_ID: o ID do banco de dados do Spanner a ser importado
  • GCS_DIRECTORY: o caminho do Cloud Storage de onde os arquivos Avro são importados. Por exemplo, gs://mybucket/somefolder.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.spanner;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.spanner.ImportPipeline.Options;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;

/**
 * Avro to Cloud Spanner Import pipeline.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Avro_to_Cloud_Spanner.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_Avro_to_Cloud_Spanner",
    category = TemplateCategory.BATCH,
    displayName = "Avro Files on Cloud Storage to Cloud Spanner",
    description =
        "The Cloud Storage Avro files to Cloud Spanner template is a batch pipeline that reads Avro files exported from "
            + "Cloud Spanner stored in Cloud Storage and imports them to a Cloud Spanner database.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/avro-to-cloud-spanner",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The target Cloud Spanner database must exist and must be empty.",
      "You must have read permissions for the Cloud Storage bucket and write permissions for the target Cloud Spanner database.",
      "The input Cloud Storage path must exist, and it must include a <a href=\"https://cloud.google.com/spanner/docs/import-non-spanner#create-export-json\">spanner-export.json</a> file that contains a JSON description of files to import."
    })
public class ImportPipeline {

  /** Options for {@link ImportPipeline}. */
  public interface Options extends PipelineOptions {

    @TemplateParameter.Text(
        order = 1,
        groupName = "Target",
        regexes = {"^[a-z0-9\\-]+$"},
        description = "Cloud Spanner instance ID",
        helpText = "The instance ID of the Spanner database.")
    ValueProvider<String> getInstanceId();

    void setInstanceId(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        regexes = {"^[a-z_0-9\\-]+$"},
        description = "Cloud Spanner database ID",
        helpText = "The database ID of the Spanner database.")
    ValueProvider<String> getDatabaseId();

    void setDatabaseId(ValueProvider<String> value);

    @TemplateParameter.GcsReadFolder(
        order = 3,
        groupName = "Source",
        description = "Cloud storage input directory",
        helpText = "The Cloud Storage path where the Avro files are imported from.")
    ValueProvider<String> getInputDir();

    void setInputDir(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 4,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText = "The Cloud Spanner endpoint to call in the template. Only used for testing.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

    @TemplateParameter.Boolean(
        order = 5,
        optional = true,
        description = "Wait for Indexes",
        helpText =
            "If `true`, the pipeline waits for indexes to be created. If `false`, the job might complete while indexes are still being created in the background. The default value is `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getWaitForIndexes();

    void setWaitForIndexes(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 6,
        optional = true,
        description = "Wait for Foreign Keys",
        helpText =
            "If `true`, the pipeline waits for foreign keys to be created. If `false`, the job might complete while foreign keys are still being created in the background. The default value is `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getWaitForForeignKeys();

    void setWaitForForeignKeys(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 7,
        optional = true,
        description = "Wait for Change Streams",
        helpText =
            "If `true`, the pipeline waits for change streams to be created. If `false`, the job might complete while change streams are still being created in the background. The default value is `true`.")
    @Default.Boolean(true)
    ValueProvider<Boolean> getWaitForChangeStreams();

    void setWaitForChangeStreams(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 7,
        optional = true,
        description = "Wait for Sequences",
        helpText =
            "By default, the import pipeline is blocked on sequence creation. If `false`, the import pipeline might"
                + " complete with sequences still being created in the background.")
    @Default.Boolean(true)
    ValueProvider<Boolean> getWaitForSequences();

    void setWaitForSequences(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 8,
        optional = true,
        description = "Create Indexes early",
        helpText =
            "Specifies whether early index creation is enabled. If the template runs a large number of DDL statements, it's more efficient to create indexes before loading data. Therefore, the default behavior is to create the indexes first when the number of DDL statements exceeds a threshold. To disable this feature, set `earlyIndexCreateFlag` to `false`. The default value is `true`.")
    @Default.Boolean(true)
    ValueProvider<Boolean> getEarlyIndexCreateFlag();

    void setEarlyIndexCreateFlag(ValueProvider<Boolean> value);

    @TemplateCreationParameter(value = "false")
    @Description("If true, wait for job finish")
    @Default.Boolean(true)
    boolean getWaitUntilFinish();

    @TemplateParameter.ProjectId(
        order = 9,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Project Id",
        helpText =
            "The ID of the Google Cloud project that contains the Spanner database. If not set, the default Google Cloud project is used.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    void setWaitUntilFinish(boolean value);

    @TemplateParameter.Integer(
        order = 10,
        optional = true,
        description = "DDL Creation timeout in minutes",
        helpText =
            "The timeout in minutes for DDL statements performed by the template. The default value is 30 minutes.")
    @Default.Integer(30)
    ValueProvider<Integer> getDdlCreationTimeoutInMinutes();

    void setDdlCreationTimeoutInMinutes(ValueProvider<Integer> value);

    @TemplateParameter.Enum(
        order = 11,
        groupName = "Target",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. Possible values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    Pipeline p = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            // Temporary fix explicitly setting SpannerConfig.projectId to the default project
            // if spannerProjectId is not provided as a parameter. Required as of Beam 2.38,
            // which no longer accepts null label values on metrics, and SpannerIO#setup() has
            // a bug resulting in the label value being set to the original parameter value,
            // with no fallback to the default project.
            // TODO: remove NestedValueProvider when this is fixed in Beam.
            .withProjectId(
                NestedValueProvider.of(
                    options.getSpannerProjectId(),
                    (SerializableFunction<String, String>)
                        input -> input != null ? input : SpannerOptions.getDefaultProjectId()))
            .withHost(options.getSpannerHost())
            .withInstanceId(options.getInstanceId())
            .withDatabaseId(options.getDatabaseId())
            .withRpcPriority(options.getSpannerPriority());

    p.apply(
        new ImportTransform(
            spannerConfig,
            options.getInputDir(),
            options.getWaitForIndexes(),
            options.getWaitForForeignKeys(),
            options.getWaitForChangeStreams(),
            options.getWaitForSequences(),
            options.getEarlyIndexCreateFlag(),
            options.getDdlCreationTimeoutInMinutes()));

    PipelineResult result = p.run();

    if (options.getWaitUntilFinish()
        &&
        /* Only if template location is null, there is a dataflow job to wait for. Else it's
         * template generation which doesn't start a dataflow job.
         */
        options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
  }
}

A seguir