Modello Avro di Cloud Storage a Spanner

Il modello File Avro di Cloud Storage in Spanner è una pipeline batch che legge i file Avro esportati da Spanner archiviati in Cloud Storage e li importa in un database Spanner.

Requisiti della pipeline

  • Il database Spanner di destinazione deve esistere e deve essere vuoto.
  • Devi disporre delle autorizzazioni di lettura per il bucket Cloud Storage e delle autorizzazioni di scrittura per il database Spanner di destinazione.
  • Il percorso Cloud Storage di input deve esistere e deve includere un file spanner-export.json contenente una descrizione JSON dei file da importare.
  • Se il file Avro di origine non contiene una chiave primaria, devi creare una tabella Spanner vuota con una chiave primaria prima di eseguire il modello. Questo passaggio non è necessario se il file Avro definisce la chiave primaria.

Parametri del modello

Parametri obbligatori

  • instanceId: l'ID istanza del database Spanner.
  • databaseId: l'ID del database Spanner.
  • inputDir: il percorso Cloud Storage da cui vengono importati i file Avro.

Parametri facoltativi

  • spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Ad esempio, Valore predefinito:
  • waitForIndexes: se true, la pipeline attende la creazione degli indici. Se è false, il job potrebbe essere completato mentre gli indici vengono ancora creati in background. Il valore predefinito è false.
  • waitForForeignKeys: se true, la pipeline attende la creazione delle chiavi esterne. Se è false, il job potrebbe essere completato mentre le chiavi esterne vengono ancora create in background. Il valore predefinito è false.
  • waitForChangeStreams: se true, la pipeline attende la creazione degli stream di modifiche. Se è false, il job potrebbe essere completato mentre i flussi di modifiche vengono ancora creati in background. Il valore predefinito è true.
  • waitForSequences: per impostazione predefinita, la pipeline di importazione è bloccata al momento della creazione della sequenza. Se è false, la pipeline di importazione potrebbe essere completata con sequenze ancora in fase di creazione in background.
  • earlyIndexCreateFlag: specifica se la creazione anticipata dell'indice è abilitata. Se il modello esegue un numero elevato di istruzioni DDL, è più efficiente creare gli indici prima di caricare i dati. Pertanto, il comportamento predefinito è creare prima gli indici quando il numero di istruzioni DDL supera una soglia. Per disattivare questa funzionalità, imposta earlyIndexCreateFlag su false. Il valore predefinito è true.
  • spannerProjectId: l'ID del progetto Google Cloud che contiene il database Spanner. Se non viene impostato, viene utilizzato il progetto Google Cloud predefinito.
  • ddlCreationTimeoutInMinutes: il timeout in minuti per le istruzioni DDL eseguite dal modello. Il valore predefinito è 30 minuti.
  • spannerPriority: la priorità della richiesta per le chiamate Spanner. I valori possibili sono HIGH, MEDIUM e LOW. Il valore predefinito è MEDIUM.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.

    Affinché il job venga visualizzato nella pagina Istanze di Spanner della console Google Cloud, il nome del job deve corrispondere al seguente formato:


    Sostituisci quanto segue:

    • SPANNER_INSTANCE_ID: ID dell'istanza Spanner
    • SPANNER_DATABASE_NAME: il nome del database Spanner
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Avro Files on Cloud Storage to Cloud Spanner template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Avro_to_Cloud_Spanner \
    --region REGION_NAME \
    --staging-location GCS_STAGING_LOCATION \
    --parameters \

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INSTANCE_ID: l'ID dell'istanza Spanner contenente il database
  • DATABASE_ID: l'ID del database Spanner in cui eseguire l'importazione
  • GCS_DIRECTORY: il percorso di Cloud Storage da cui vengono importati i file Avro, ad esempio gs://mybucket/somefolder

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "inputDir": "gs://GCS_DIRECTORY"
   "environment": {
       "machineType": "n1-standard-2"

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INSTANCE_ID: l'ID dell'istanza Spanner che contiene il database
  • DATABASE_ID: l'ID del database Spanner in cui eseguire l'importazione
  • GCS_DIRECTORY: il percorso di Cloud Storage da cui vengono importati i file Avro, ad esempio gs://mybucket/somefolder
 * Copyright (C) 2018 Google LLC
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;

 * Avro to Cloud Spanner Import pipeline.
 * <p>Check out <a
 * href="">README</a>
 * for instructions on how to use or modify this template.
    name = "GCS_Avro_to_Cloud_Spanner",
    category = TemplateCategory.BATCH,
    displayName = "Avro Files on Cloud Storage to Cloud Spanner",
    description =
        "The Cloud Storage Avro files to Cloud Spanner template is a batch pipeline that reads Avro files exported from "
            + "Cloud Spanner stored in Cloud Storage and imports them to a Cloud Spanner database.",
    optionsClass = Options.class,
    documentation =
    contactInformation = "",
    requirements = {
      "The target Cloud Spanner database must exist and must be empty.",
      "You must have read permissions for the Cloud Storage bucket and write permissions for the target Cloud Spanner database.",
      "The input Cloud Storage path must exist, and it must include a <a href=\"\">spanner-export.json</a> file that contains a JSON description of files to import."
public class ImportPipeline {

  /** Options for {@link ImportPipeline}. */
  public interface Options extends PipelineOptions {

        order = 1,
        groupName = "Target",
        regexes = {"^[a-z0-9\\-]+$"},
        description = "Cloud Spanner instance ID",
        helpText = "The instance ID of the Spanner database.")
    ValueProvider<String> getInstanceId();

    void setInstanceId(ValueProvider<String> value);

        order = 2,
        groupName = "Target",
        regexes = {"^[a-z_0-9\\-]+$"},
        description = "Cloud Spanner database ID",
        helpText = "The database ID of the Spanner database.")
    ValueProvider<String> getDatabaseId();

    void setDatabaseId(ValueProvider<String> value);

        order = 3,
        groupName = "Source",
        description = "Cloud storage input directory",
        helpText = "The Cloud Storage path where the Avro files are imported from.")
    ValueProvider<String> getInputDir();

    void setInputDir(ValueProvider<String> value);

        order = 4,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText = "The Cloud Spanner endpoint to call in the template. Only used for testing.",
        example = "")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

        order = 5,
        optional = true,
        description = "Wait for Indexes",
        helpText =
            "If `true`, the pipeline waits for indexes to be created. If `false`, the job might complete while indexes are still being created in the background. The default value is `false`.")
    ValueProvider<Boolean> getWaitForIndexes();

    void setWaitForIndexes(ValueProvider<Boolean> value);

        order = 6,
        optional = true,
        description = "Wait for Foreign Keys",
        helpText =
            "If `true`, the pipeline waits for foreign keys to be created. If `false`, the job might complete while foreign keys are still being created in the background. The default value is `false`.")
    ValueProvider<Boolean> getWaitForForeignKeys();

    void setWaitForForeignKeys(ValueProvider<Boolean> value);

        order = 7,
        optional = true,
        description = "Wait for Change Streams",
        helpText =
            "If `true`, the pipeline waits for change streams to be created. If `false`, the job might complete while change streams are still being created in the background. The default value is `true`.")
    ValueProvider<Boolean> getWaitForChangeStreams();

    void setWaitForChangeStreams(ValueProvider<Boolean> value);

        order = 7,
        optional = true,
        description = "Wait for Sequences",
        helpText =
            "By default, the import pipeline is blocked on sequence creation. If `false`, the import pipeline might"
                + " complete with sequences still being created in the background.")
    ValueProvider<Boolean> getWaitForSequences();

    void setWaitForSequences(ValueProvider<Boolean> value);

        order = 8,
        optional = true,
        description = "Create Indexes early",
        helpText =
            "Specifies whether early index creation is enabled. If the template runs a large number of DDL statements, it's more efficient to create indexes before loading data. Therefore, the default behavior is to create the indexes first when the number of DDL statements exceeds a threshold. To disable this feature, set `earlyIndexCreateFlag` to `false`. The default value is `true`.")
    ValueProvider<Boolean> getEarlyIndexCreateFlag();

    void setEarlyIndexCreateFlag(ValueProvider<Boolean> value);

    @TemplateCreationParameter(value = "false")
    @Description("If true, wait for job finish")
    boolean getWaitUntilFinish();

        order = 9,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Project Id",
        helpText =
            "The ID of the Google Cloud project that contains the Spanner database. If not set, the default Google Cloud project is used.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    void setWaitUntilFinish(boolean value);

        order = 10,
        optional = true,
        description = "DDL Creation timeout in minutes",
        helpText =
            "The timeout in minutes for DDL statements performed by the template. The default value is 30 minutes.")
    ValueProvider<Integer> getDdlCreationTimeoutInMinutes();

    void setDdlCreationTimeoutInMinutes(ValueProvider<Integer> value);

        order = 11,
        groupName = "Target",
        enumOptions = {
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. Possible values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);

  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    Pipeline p = Pipeline.create(options);

    SpannerConfig spannerConfig =
            // Temporary fix explicitly setting SpannerConfig.projectId to the default project
            // if spannerProjectId is not provided as a parameter. Required as of Beam 2.38,
            // which no longer accepts null label values on metrics, and SpannerIO#setup() has
            // a bug resulting in the label value being set to the original parameter value,
            // with no fallback to the default project.
            // TODO: remove NestedValueProvider when this is fixed in Beam.
                    (SerializableFunction<String, String>)
                        input -> input != null ? input : SpannerOptions.getDefaultProjectId()))

        new ImportTransform(

    PipelineResult result =;

    if (options.getWaitUntilFinish()
        /* Only if template location is null, there is a dataflow job to wait for. Else it's
         * template generation which doesn't start a dataflow job.
         */ == null) {

