Modelo do Sourcedb para o Spanner

O modelo do SourceDB para Spanner é um pipeline em lote que copia dados de um banco de dados relacional para um banco de dados do Spanner. Esse pipeline usa o JDBC para se conectar ao banco de dados relacional. É possível usar esse modelo para copiar dados de qualquer banco de dados relacional com drivers JDBC disponíveis no Spanner. Isso só é compatível com um conjunto limitado de tipos do MySQL

Para ter uma camada extra de proteção, é possível transmitir uma chave do Cloud KMS com um nome de usuário, senha e parâmetros da string de conexão criptografados em Base64 com a chave do Cloud KMS. Consulte o endpoint de criptografia da API Cloud KMS para saber mais detalhes sobre como criptografar o nome de usuário, senha e parâmetros da string de conexão.

Requisitos de pipeline

  • É necessário que os drivers do JDBC para o banco de dados relacional estejam disponíveis.
  • As tabelas do Spanner precisam existir antes da execução do pipeline.
  • As tabelas do Spanner precisam ter um esquema compatível.
  • O banco de dados relacional precisa estar acessível na sub-rede em que o Dataflow é executado.

Parâmetros do modelo

Parâmetro Descrição
sourceConfigURL A string do URL de conexão do JDBC. Por exemplo, jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8 ou a configuração do fragmento.
instanceId A instância de destino do Cloud Spanner.
databaseId O banco de dados de destino do Cloud Spanner.
projectId É o nome do projeto do Cloud Spanner.
outputDirectory Esse diretório é usado para despejar os registros com falha/ignorados/filtrados em uma migração.
jdbcDriverJars Opcional: a lista separada por vírgulas de arquivos JAR do driver. Exemplo: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar. O padrão é vazio.
jdbcDriverClassName Opcional: o nome da classe do driver do JDBC. Por exemplo, com.mysql.jdbc.Driver. O padrão é: com.mysql.jdbc.Driver.
username Opcional: o nome do usuário a ser usado para a conexão JDBC. O padrão é vazio.
password Opcional: a senha a ser usada para a conexão JDBC. O padrão é vazio.
tables Opcional: tabelas para migrar da origem. O padrão é vazio.
numPartitions Opcional: o número de partições. Isso, junto com os limites inferior e superior, cria avanços de partições para expressões da cláusula WHERE geradas, usadas para dividir a coluna da partição de maneira uniforme. Quando a entrada for menor que 1, o número será definido como 1. O padrão é 0.
spannerHost Opcional: o endpoint do Cloud Spanner para chamar no modelo. Exemplo: https://batch-spanner.googleapis.com. O padrão é https://batch-spanner.googleapis.com.
maxConnections Opcional: configura o pool de conexões JDBC em cada worker com o número máximo de conexões. Use um número negativo para não haver limite. Por exemplo, -1. O padrão é 0.
sessionFilePath Opcional: caminho da sessão no Cloud Storage que contém informações de mapeamento da ferramenta de migração do Spanner. O padrão é vazio.
transformationJarPath Opcional: local do jar personalizado no Cloud Storage que contém a lógica de transformação personalizada para processar registros. O padrão é vazio.
transformationClassName Opcional: nome de classe totalmente qualificado com lógica de transformação personalizada. É um campo obrigatório no caso de transformationJarPath é especificado. O padrão é vazio.
transformationCustomParameters Opcional: string contendo os parâmetros personalizados que serão passados para a classe de transformação personalizada. O padrão é vazio.
disabledAlgorithms Opcional: algoritmos separados por vírgulas para desativar. Se esse valor for definido como nenhum, nenhum algoritmo será desativado. Use esse parâmetro com cuidado, porque os algoritmos desativados por padrão podem ter vulnerabilidades ou problemas de desempenho. Exemplo: SSLv3, RC4.
extraFilesToStage Opcional: caminhos do Cloud Storage separados por vírgulas ou secrets do Secret Manager para que os arquivos sejam organizados no worker. Esses arquivos são salvos no diretório /extra_files em cada worker. Exemplo: gs://<BUCKET>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Sourcedb to Spanner template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Sourcedb_to_Spanner_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       sourceConfigURL=SOURCE_CONFIG_URL,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       projectId=PROJECT_ID,\
       outputDirectory=OUTPUT_DIRECTORY,\

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SOURCE_CONFIG_URL: o URL para se conectar ao host do banco de dados de origem. Pode ser um destes valores: O URL de conexão JDBC, que precisa conter o host, a porta e o nome do banco de dados de origem e pode conter, opcionalmente, propriedades como autoReconnect, maxReconnects etc. Formato: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2. O caminho da configuração do fragmento
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID do banco de dados do Cloud Spanner.
  • PROJECT_ID: o ID do projeto do Cloud Spanner.
  • OUTPUT_DIRECTORY: o diretório de saída para eventos com falha/ignorados/filtrados

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "sourceConfigURL": "SOURCE_CONFIG_URL",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "projectId": "PROJECT_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Sourcedb_to_Spanner_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SOURCE_CONFIG_URL: o URL para se conectar ao host do banco de dados de origem. Pode ser um destes valores: O URL de conexão JDBC, que precisa conter o host, a porta e o nome do banco de dados de origem e pode conter, opcionalmente, propriedades como autoReconnect, maxReconnects etc. Formato: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2. O caminho da configuração do fragmento
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID do banco de dados do Cloud Spanner.
  • PROJECT_ID: o ID do projeto do Cloud Spanner.
  • OUTPUT_DIRECTORY: o diretório de saída para eventos com falha/ignorados/filtrados
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A template that copies data from a relational database using JDBC to an existing Spanner
 * database.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Sourcedb_to_Spanner_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Sourcedb to Spanner",
    description = {
      "The SourceDB to Spanner template is a batch pipeline that copies data from a relational"
          + " database into an existing Spanner database. This pipeline uses JDBC to connect to"
          + " the relational database. You can use this template to copy data from any relational"
          + " database with available JDBC drivers into Spanner. This currently only supports a limited set of types of MySQL",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a"
          + " Base64-encoded username, password, and connection string parameters encrypted with"
          + " the Cloud KMS key. See the <a"
          + " href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud"
          + " KMS API encryption endpoint</a> for additional details on encrypting your username,"
          + " password, and connection string parameters."
    },
    optionsClass = SourceDbToSpannerOptions.class,
    flexContainerName = "source-db-to-spanner",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/sourcedb-to-spanner",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "The Spanner tables must exist before pipeline execution.",
      "The Spanner tables must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class SourceDbToSpanner {

  private static final Logger LOG = LoggerFactory.getLogger(SourceDbToSpanner.class);

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link SourceDbToSpanner#run} method to start the
   * pipeline and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    SourceDbToSpannerOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SourceDbToSpannerOptions.class);
    run(options);
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(SourceDbToSpannerOptions options) {
    // TODO - Validate if options are as expected
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig = createSpannerConfig(options);

    // Decide type and source of migration
    // TODO(vardhanvthigle): Move this within pipelineController.
    switch (options.getSourceDbDialect()) {
      case SourceDbToSpannerOptions.CASSANDRA_SOURCE_DIALECT:
        return PipelineController.executeCassandraMigration(options, pipeline, spannerConfig);
      default:
        /* Implementation detail, not having a default leads to failure in compile time checks enforced here */
        /* Making jdbc as default case which includes MYSQL and PG. */
        return executeJdbcMigration(options, pipeline, spannerConfig);
    }
  }

  // TODO(vardhanvthigle): Move this within pipelineController.
  private static PipelineResult executeJdbcMigration(
      SourceDbToSpannerOptions options, Pipeline pipeline, SpannerConfig spannerConfig) {
    if (options.getSourceConfigURL().startsWith("gs://")) {
      List<Shard> shards =
          new ShardFileReader(new SecretManagerAccessorImpl())
              .readForwardMigrationShardingConfig(options.getSourceConfigURL());
      return PipelineController.executeJdbcShardedMigration(
          options, pipeline, shards, spannerConfig);
    } else {
      return PipelineController.executeJdbcSingleInstanceMigration(
          options, pipeline, spannerConfig);
    }
  }

  @VisibleForTesting
  static SpannerConfig createSpannerConfig(SourceDbToSpannerOptions options) {
    return SpannerConfig.create()
        .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
        .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
        .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
        .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
  }
}