Template Sourcedb ke Spanner

Template SourceDB ke Spanner adalah pipeline batch yang menyalin data dari database relasional ke database Spanner yang ada. Pipeline ini menggunakan JDBC untuk terhubung ke database relasional. Anda dapat menggunakan template ini untuk menyalin data dari database relasional apa pun dengan driver JDBC yang tersedia ke Spanner. Ini hanya mendukung serangkaian jenis MySQL yang terbatas

Untuk lapisan perlindungan tambahan, Anda juga dapat meneruskan kunci Cloud KMS beserta parameter string koneksi, nama pengguna, dan sandi yang dienkode Base64 yang dienkripsi dengan kunci Cloud KMS. Lihat endpoint enkripsi API Cloud KMS untuk mengetahui detail tambahan tentang mengenkripsi parameter string koneksi, nama pengguna, dan sandi Anda.

Persyaratan pipeline

  • Driver JDBC untuk database relasional harus tersedia.
  • Tabel Spanner harus ada sebelum eksekusi pipeline.
  • Tabel Spanner harus memiliki skema yang kompatibel.
  • Database relasional harus dapat diakses dari subnet tempat Dataflow berjalan.

Parameter template

Parameter Deskripsi
sourceConfigURL String URL koneksi JDBC. Misalnya, jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8 atau konfigurasi shard.
instanceId Instance Cloud Spanner tujuan.
databaseId Database Cloud Spanner tujuan.
projectId Ini adalah nama project Cloud Spanner.
outputDirectory Direktori ini digunakan untuk membuang kumpulan data yang gagal/dilewati/difilter dalam migrasi.
jdbcDriverJars Opsional: Daftar file JAR driver yang dipisahkan koma. Contoh: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar. Default-nya adalah kosong.
jdbcDriverClassName Opsional: Nama class driver JDBC. Misalnya: com.mysql.jdbc.Driver. Defaultnya adalah: com.mysql.jdbc.Driver.
username Opsional: Nama pengguna yang akan digunakan untuk koneksi JDBC. Default-nya adalah kosong.
password Opsional: Sandi yang akan digunakan untuk koneksi JDBC. Default-nya adalah kosong.
tables Opsional: Tabel yang akan dimigrasikan dari sumber. Default-nya adalah kosong.
numPartitions Opsional: Jumlah partisi. Hal ini, bersama dengan batas bawah dan atas, membentuk langkah partisi untuk ekspresi klausa WHERE yang dihasilkan yang digunakan untuk membagi kolom partisi secara merata. Jika input kurang dari 1, angka akan ditetapkan ke 1. Default-nya adalah: 0.
spannerHost Opsional: Endpoint Cloud Spanner yang akan dipanggil dalam template. Contoh: https://batch-spanner.googleapis.com. Defaultnya adalah: https://batch-spanner.googleapis.com.
maxConnections Opsional: Mengonfigurasi kumpulan koneksi JDBC di setiap pekerja dengan jumlah koneksi maksimum. Gunakan angka negatif untuk tidak ada batas. Misalnya: -1. Default-nya adalah: 0.
sessionFilePath Opsional: Jalur sesi di Cloud Storage yang berisi informasi pemetaan dari Alat Migrasi Spanner. Default-nya adalah kosong.
transformationJarPath Opsional: Lokasi jar kustom di Cloud Storage yang berisi logika transformasi kustom untuk memproses data. Default-nya adalah kosong.
transformationClassName Opsional: Nama class yang sepenuhnya memenuhi syarat dan memiliki logika transformasi kustom. Ini adalah kolom wajib jika transformationJarPath ditentukan. Default-nya adalah kosong.
transformationCustomParameters Opsional: String yang berisi parameter kustom yang akan diteruskan ke class transformasi kustom. Default-nya adalah kosong.
disabledAlgorithms Opsional: Algoritme yang dipisahkan koma untuk dinonaktifkan. Jika nilai ini disetel ke none, tidak ada algoritma yang dinonaktifkan. Gunakan parameter ini dengan hati-hati, karena algoritma yang dinonaktifkan secara default mungkin memiliki kerentanan atau masalah performa. Contoh: SSLv3, RC4.
extraFilesToStage Opsional: Jalur Cloud Storage yang dipisahkan koma atau secret Secret Manager untuk file yang akan di-stage di pekerja. File ini disimpan di direktori /extra_files di setiap pekerja. Contoh: gs://<BUCKET>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Sourcedb to Spanner template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Sourcedb_to_Spanner_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       sourceConfigURL=SOURCE_CONFIG_URL,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       projectId=PROJECT_ID,\
       outputDirectory=OUTPUT_DIRECTORY,\

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SOURCE_CONFIG_URL: URL untuk terhubung ke host database sumber. Nilai ini dapat berupa 1. URL koneksi JDBC - yang harus berisi nama host, port, dan db sumber, serta dapat berisi properti seperti autoReconnect, maxReconnects, dll. Format: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2. Jalur konfigurasi shard
  • INSTANCE_ID: ID Instance Cloud Spanner.
  • DATABASE_ID: ID Database Cloud Spanner.
  • PROJECT_ID: Project ID Cloud Spanner.
  • OUTPUT_DIRECTORY: Direktori output untuk peristiwa yang gagal/dilewati/difilter

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "sourceConfigURL": "SOURCE_CONFIG_URL",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "projectId": "PROJECT_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Sourcedb_to_Spanner_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SOURCE_CONFIG_URL: URL untuk terhubung ke host database sumber. Nilai ini dapat berupa 1. URL koneksi JDBC - yang harus berisi nama host, port, dan db sumber, serta dapat berisi properti seperti autoReconnect, maxReconnects, dll. Format: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2. Jalur konfigurasi shard
  • INSTANCE_ID: ID Instance Cloud Spanner.
  • DATABASE_ID: ID Database Cloud Spanner.
  • PROJECT_ID: Project ID Cloud Spanner.
  • OUTPUT_DIRECTORY: Direktori output untuk peristiwa yang gagal/dilewati/difilter
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A template that copies data from a relational database using JDBC to an existing Spanner
 * database.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Sourcedb_to_Spanner_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Sourcedb to Spanner",
    description = {
      "The SourceDB to Spanner template is a batch pipeline that copies data from a relational"
          + " database into an existing Spanner database. This pipeline uses JDBC to connect to"
          + " the relational database. You can use this template to copy data from any relational"
          + " database with available JDBC drivers into Spanner. This currently only supports a limited set of types of MySQL",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a"
          + " Base64-encoded username, password, and connection string parameters encrypted with"
          + " the Cloud KMS key. See the <a"
          + " href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud"
          + " KMS API encryption endpoint</a> for additional details on encrypting your username,"
          + " password, and connection string parameters."
    },
    optionsClass = SourceDbToSpannerOptions.class,
    flexContainerName = "source-db-to-spanner",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/sourcedb-to-spanner",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "The Spanner tables must exist before pipeline execution.",
      "The Spanner tables must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class SourceDbToSpanner {

  private static final Logger LOG = LoggerFactory.getLogger(SourceDbToSpanner.class);

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link SourceDbToSpanner#run} method to start the
   * pipeline and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    SourceDbToSpannerOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SourceDbToSpannerOptions.class);
    run(options);
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(SourceDbToSpannerOptions options) {
    // TODO - Validate if options are as expected
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig = createSpannerConfig(options);

    // Decide type and source of migration
    // TODO(vardhanvthigle): Move this within pipelineController.
    switch (options.getSourceDbDialect()) {
      case SourceDbToSpannerOptions.CASSANDRA_SOURCE_DIALECT:
        return PipelineController.executeCassandraMigration(options, pipeline, spannerConfig);
      default:
        /* Implementation detail, not having a default leads to failure in compile time checks enforced here */
        /* Making jdbc as default case which includes MYSQL and PG. */
        return executeJdbcMigration(options, pipeline, spannerConfig);
    }
  }

  // TODO(vardhanvthigle): Move this within pipelineController.
  private static PipelineResult executeJdbcMigration(
      SourceDbToSpannerOptions options, Pipeline pipeline, SpannerConfig spannerConfig) {
    if (options.getSourceConfigURL().startsWith("gs://")) {
      List<Shard> shards =
          new ShardFileReader(new SecretManagerAccessorImpl())
              .readForwardMigrationShardingConfig(options.getSourceConfigURL());
      return PipelineController.executeJdbcShardedMigration(
          options, pipeline, shards, spannerConfig);
    } else {
      return PipelineController.executeJdbcSingleInstanceMigration(
          options, pipeline, spannerConfig);
    }
  }

  @VisibleForTesting
  static SpannerConfig createSpannerConfig(SourceDbToSpannerOptions options) {
    return SpannerConfig.create()
        .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
        .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
        .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
        .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
  }
}