Modello SourceDB to Spanner

Il modello SourceDB to Spanner è una pipeline batch che copia i dati da un database relazionale in un database Spanner esistente. Questa pipeline utilizza JDBC per connettersi al database relazionale. Puoi utilizzare questo modello per copiare i dati da qualsiasi database relazionale con i driver JDBC disponibili in Spanner. Supporta solo un numero limitato di tipi di MySQL

Per un ulteriore livello di protezione, puoi anche passare una chiave Cloud KMS insieme a parametri di stringa di connessione, nome utente e password codificati in Base64 criptati con la chiave Cloud KMS. Consulta l'endpoint di crittografia dell'API Cloud KMS per ulteriori dettagli sulla crittografia del nome utente, della password e dei parametri della stringa di connessione.

Requisiti della pipeline

  • I driver JDBC per il database relazionale devono essere disponibili.
  • Le tabelle Spanner devono esistere prima dell'esecuzione della pipeline.
  • Le tabelle Spanner devono avere uno schema compatibile.
  • Il database relazionale deve essere accessibile dalla subnet in cui viene eseguito Dataflow.

Parametri del modello

Parametro Descrizione
sourceConfigURL La stringa dell'URL di connessione JDBC. Ad esempio, jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8 o la configurazione del frammento.
instanceId L'istanza Cloud Spanner di destinazione.
databaseId Il database Cloud Spanner di destinazione.
projectId Questo è il nome del progetto Cloud Spanner.
outputDirectory Questa directory viene utilizzata per eseguire il dump dei record non riusciti/saltati/filtrati in una migrazione.
jdbcDriverJars (Facoltativo) L'elenco separato da virgole dei file JAR del driver. Ad esempio: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar. Il valore predefinito è vuoto.
jdbcDriverClassName (Facoltativo) Il nome della classe del driver JDBC. Ad esempio: com.mysql.jdbc.Driver. Il valore predefinito è: com.mysql.jdbc.Driver.
username (Facoltativo) Il nome utente da utilizzare per la connessione JDBC. Il valore predefinito è vuoto.
password (Facoltativo) La password da utilizzare per la connessione JDBC. Il valore predefinito è vuoto.
tables (Facoltativo) Tabelle di cui eseguire la migrazione dall'origine. Il valore predefinito è vuoto.
numPartitions (Facoltativo) Il numero di partizioni. Questo, insieme al limite inferiore e superiore, forma gli intervalli di partizione per le espressioni della clausola WHERE generate utilizzate per suddividere uniformemente la colonna della partizione. Quando l'input è inferiore a 1, il numero viene impostato su 1. Il valore predefinito è 0.
spannerHost (Facoltativo) L'endpoint Cloud Spanner da chiamare nel modello. Ad esempio: https://batch-spanner.googleapis.com. Valore predefinito: https://batch-spanner.googleapis.com.
maxConnections (Facoltativo) Configura il pool di connessioni JDBC su ogni worker con il numero massimo di connessioni. Utilizza un numero negativo per indicare che non c'è limite. Ad esempio: -1. Il valore predefinito è 0.
sessionFilePath (Facoltativo) Percorso della sessione in Cloud Storage contenente le informazioni di mappatura dello strumento di migrazione Spanner. Il valore predefinito è vuoto.
transformationJarPath (Facoltativo) Posizione del file jar personalizzato in Cloud Storage contenente la logica di trasformazione personalizzata per l'elaborazione dei record. Il valore predefinito è vuoto.
transformationClassName (Facoltativo) Nome di classe completo con la logica di trasformazione personalizzata. È un campo obbligatorio se viene specificato transformationJarPath. Il valore predefinito è vuoto.
transformationCustomParameters (Facoltativo) Stringa contenente eventuali parametri personalizzati da passare alla classe di trasformazione personalizzata. Il valore predefinito è vuoto.
disabledAlgorithms (Facoltativo) Algoritmi da disattivare separati da virgole. Se questo valore è impostato su none, nessun algoritmo è disattivato. Utilizza questo parametro con cautela, poiché gli algoritmi disattivati per impostazione predefinita potrebbero presentare vulnerabilità o problemi di prestazioni. Ad esempio: SSLv3, RC4.
extraFilesToStage (Facoltativo) Percorsi Cloud Storage separati da virgole o secret di Secret Manager per i file da mettere in fase nel worker. Questi file vengono salvati nella directory /extra_files in ogni worker. Ad esempio: gs://<BUCKET>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Sourcedb to Spanner template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Sourcedb_to_Spanner_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       sourceConfigURL=SOURCE_CONFIG_URL,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       projectId=PROJECT_ID,\
       outputDirectory=OUTPUT_DIRECTORY,\

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SOURCE_CONFIG_URL: l'URL per connettersi all'host del database di origine. Può essere uno dei due. L'URL di connessione JDBC, che deve contenere il nome dell'host, della porta e del database di origine e può facoltativamente contenere proprietà come autoReconnect, maxReconnects e così via. Formato: "jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2. Il percorso della configurazione del frammento
  • INSTANCE_ID: l'ID istanza Cloud Spanner.
  • DATABASE_ID: l'ID del database Cloud Spanner.
  • PROJECT_ID: l'ID progetto Cloud Spanner.
  • OUTPUT_DIRECTORY: la directory di output per gli eventi non riusciti/saltati/filtrati

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "sourceConfigURL": "SOURCE_CONFIG_URL",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "projectId": "PROJECT_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Sourcedb_to_Spanner_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SOURCE_CONFIG_URL: l'URL per connettersi all'host del database di origine. Può essere uno dei due. L'URL di connessione JDBC, che deve contenere il nome dell'host, della porta e del database di origine e può facoltativamente contenere proprietà come autoReconnect, maxReconnects e così via. Formato: "jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2. Il percorso della configurazione del frammento
  • INSTANCE_ID: l'ID istanza Cloud Spanner.
  • DATABASE_ID: l'ID del database Cloud Spanner.
  • PROJECT_ID: l'ID progetto Cloud Spanner.
  • OUTPUT_DIRECTORY: la directory di output per gli eventi non riusciti/saltati/filtrati
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A template that copies data from a relational database using JDBC to an existing Spanner
 * database.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Sourcedb_to_Spanner_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Sourcedb to Spanner",
    description = {
      "The SourceDB to Spanner template is a batch pipeline that copies data from a relational"
          + " database into an existing Spanner database. This pipeline uses JDBC to connect to"
          + " the relational database. You can use this template to copy data from any relational"
          + " database with available JDBC drivers into Spanner. This currently only supports a limited set of types of MySQL",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a"
          + " Base64-encoded username, password, and connection string parameters encrypted with"
          + " the Cloud KMS key. See the <a"
          + " href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud"
          + " KMS API encryption endpoint</a> for additional details on encrypting your username,"
          + " password, and connection string parameters."
    },
    optionsClass = SourceDbToSpannerOptions.class,
    flexContainerName = "source-db-to-spanner",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/sourcedb-to-spanner",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "The Spanner tables must exist before pipeline execution.",
      "The Spanner tables must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class SourceDbToSpanner {

  private static final Logger LOG = LoggerFactory.getLogger(SourceDbToSpanner.class);

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link SourceDbToSpanner#run} method to start the
   * pipeline and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    SourceDbToSpannerOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SourceDbToSpannerOptions.class);
    run(options);
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(SourceDbToSpannerOptions options) {
    // TODO - Validate if options are as expected
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig = createSpannerConfig(options);

    // Decide type and source of migration
    // TODO(vardhanvthigle): Move this within pipelineController.
    switch (options.getSourceDbDialect()) {
      case SourceDbToSpannerOptions.CASSANDRA_SOURCE_DIALECT:
        return PipelineController.executeCassandraMigration(options, pipeline, spannerConfig);
      default:
        /* Implementation detail, not having a default leads to failure in compile time checks enforced here */
        /* Making jdbc as default case which includes MYSQL and PG. */
        return executeJdbcMigration(options, pipeline, spannerConfig);
    }
  }

  // TODO(vardhanvthigle): Move this within pipelineController.
  private static PipelineResult executeJdbcMigration(
      SourceDbToSpannerOptions options, Pipeline pipeline, SpannerConfig spannerConfig) {
    if (options.getSourceConfigURL().startsWith("gs://")) {
      List<Shard> shards =
          new ShardFileReader(new SecretManagerAccessorImpl())
              .readForwardMigrationShardingConfig(options.getSourceConfigURL());
      return PipelineController.executeJdbcShardedMigration(
          options, pipeline, shards, spannerConfig);
    } else {
      return PipelineController.executeJdbcSingleInstanceMigration(
          options, pipeline, spannerConfig);
    }
  }

  @VisibleForTesting
  static SpannerConfig createSpannerConfig(SourceDbToSpannerOptions options) {
    return SpannerConfig.create()
        .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
        .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
        .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
        .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
  }
}