Aliran data perubahan Spanner ke template Cloud Storage

Template streaming perubahan Spanner ke Cloud Storage adalah pipeline streaming yang melakukan streaming data perubahan Spanner dan menulisnya ke bucket Cloud Storage menggunakan Dataflow Runner v2.

Pipeline mengelompokkan data aliran perubahan Spanner ke dalam periode berdasarkan stempel waktunya, dengan setiap periode mewakili durasi waktu yang panjangnya dapat Anda konfigurasikan dengan template ini. Semua data dengan stempel waktu yang termasuk dalam periode waktu dijamin berada dalam periode waktu tersebut; tidak boleh ada data yang terlambat. Anda juga dapat menentukan jumlah shard output; pipeline akan membuat satu file output Cloud Storage per periode per shard. Dalam file output, data tidak diurutkan. File output dapat ditulis dalam format JSON atau AVRO, bergantung pada konfigurasi pengguna.

Perhatikan bahwa Anda dapat meminimalkan latensi jaringan dan biaya transportasi jaringan dengan menjalankan tugas Dataflow dari region yang sama dengan instance Spanner atau bucket Cloud Storage Anda. Jika Anda menggunakan sumber, sink, lokasi file staging, atau lokasi file sementara yang berada di luar region tugas Anda, data Anda mungkin dikirim ke berbagai region. Lihat selengkapnya tentang region Dataflow.

Pelajari lebih lanjut aliran perubahan, cara mem-build pipeline Dataflow aliran perubahan, dan praktik terbaik.

Persyaratan pipeline

  • Instance Spanner harus ada sebelum menjalankan pipeline.
  • Database Spanner harus ada sebelum menjalankan pipeline.
  • Instance metadata Spanner harus ada sebelum menjalankan pipeline.
  • Database metadata Spanner harus ada sebelum menjalankan pipeline.
  • Aliran perubahan Spanner harus ada sebelum menjalankan pipeline.
  • Bucket output Cloud Storage harus sudah ada sebelum menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • spannerInstanceId: ID instance Spanner tempat data aliran perubahan dibaca.
  • spannerDatabase: Database Spanner tempat data aliran perubahan dibaca.
  • spannerMetadataInstanceId: ID instance Spanner yang akan digunakan untuk tabel metadata konektor aliran perubahan.
  • spannerMetadataDatabase: Database Spanner yang akan digunakan untuk tabel metadata konektor aliran data perubahan.
  • spannerChangeStreamName: Nama aliran data perubahan Spanner yang akan dibaca.
  • gcsOutputDirectory: Awalan jalur dan nama file untuk menulis file output. Harus diakhiri dengan garis miring. Format DateTime digunakan untuk mengurai jalur direktori untuk pemformat tanggal & waktu. Contoh, gs://your-bucket/your-path.

Parameter opsional

  • spannerProjectId: ID project Google Cloud yang berisi database Spanner untuk membaca aliran perubahan. Project ini juga merupakan tempat tabel metadata konektor aliran perubahan dibuat. Default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
  • spannerDatabaseRole: Peran database Spanner yang akan digunakan saat menjalankan template. Parameter ini hanya diperlukan jika akun utama IAM yang menjalankan template adalah pengguna kontrol akses terperinci. Peran database harus memiliki hak istimewa SELECT di aliran perubahan dan hak istimewa EXECUTE di fungsi baca aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses terperinci untuk aliran perubahan (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: Nama tabel metadata konektor aliran perubahan Spanner yang akan digunakan. Jika tidak disediakan, tabel metadata aliran perubahan Spanner akan otomatis dibuat selama eksekusi pipeline. Anda harus memberikan nilai untuk parameter ini saat memperbarui pipeline yang ada. Jika tidak, jangan gunakan parameter ini.
  • startTimestamp: DateTime awal, inklusif, yang akan digunakan untuk membaca aliran perubahan, dalam format Ex-2021-10-12T07:20:50.52Z. Secara default, stempel waktu saat pipeline dimulai, yaitu waktu saat ini.
  • endTimestamp: DateTime akhir, inklusif, yang akan digunakan untuk membaca aliran perubahan. Contoh, Ex-2021-10-12T07:20:50.52Z. Defaultnya adalah waktu tak terbatas di masa mendatang.
  • spannerHost: Endpoint Cloud Spanner yang akan dipanggil dalam template. Hanya digunakan untuk pengujian. Contoh, https://spanner.googleapis.com. Secara default: https://spanner.googleapis.com.
  • outputFileFormat: Format file Cloud Storage output. Format yang diizinkan adalah TEXT dan AVRO. Setelan defaultnya adalah AVRO.
  • windowDuration: Durasi periode adalah interval saat data ditulis ke direktori output. Konfigurasikan durasi berdasarkan throughput pipeline. Misalnya, throughput yang lebih tinggi mungkin memerlukan ukuran jendela yang lebih kecil agar data sesuai dengan memori. Default-nya adalah 5m (lima menit), dengan minimum 1s (satu detik). Format yang diizinkan adalah: [int]s (untuk detik, contoh: 5d), [int]m (untuk menit, contoh: 12m), [int]h (untuk jam, contoh: 2h). Contoh, 5m.
  • rpcPriority: Prioritas permintaan untuk panggilan Spanner. Nilainya harus berupa HIGH, MEDIUM, atau LOW. Setelan defaultnya adalah HIGH.
  • outputFilenamePrefix: Awalan yang akan ditempatkan pada setiap file dengan jendela. Contoh, output-. Setelan defaultnya adalah: output.
  • numShards: Jumlah maksimum shard output yang dihasilkan saat menulis. Jumlah shard yang lebih tinggi berarti throughput yang lebih tinggi untuk menulis ke Cloud Storage, tetapi berpotensi meningkatkan biaya agregasi data di seluruh shard saat memproses file Cloud Storage output. Setelan defaultnya adalah: 20.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Spanner change streams to Google Cloud Storage template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Cloud Spanner
  • SPANNER_DATABASE: Database Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Cloud Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Cloud Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: Lokasi file untuk output aliran perubahan

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Cloud Spanner
  • SPANNER_DATABASE: Database Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Cloud Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Cloud Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: Lokasi file untuk output aliran perubahan
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToGcsOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToGcs} pipeline streams change stream record(s) and stores to
 * Google Cloud Storage bucket in user specified format. The sink data can be stored in a Text or
 * Avro file format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_Google_Cloud_Storage.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_Google_Cloud_Storage",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Cloud Storage",
    description = {
      "The Cloud Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner V2.\n",
      "The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. "
          + "All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. "
          + "You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. "
          + "Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.\n",
      "Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Cloud Spanner instance or Cloud Storage bucket. "
          + "If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. "
          + "See more about <a href=\"https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\">Dataflow regional endpoints</a>.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToGcsOptions.class,
    flexContainerName = "spanner-changestreams-to-gcs",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Cloud Storage output bucket must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToGcs {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToGcs.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Files to GCS");

    SpannerChangeStreamsToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToGcsOptions.class);

    run(options);
  }

  private static String getProjectId(SpannerChangeStreamsToGcsOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) {
    LOG.info("Requested File Format is " + options.getOutputFileFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);

    // Get the Spanner project, instance, database, and change stream parameters.
    String projectId = getProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(projectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      LOG.info("Setting database role on SpannerConfig: " + options.getSpannerDatabaseRole());
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    LOG.info("Created SpannerConfig: " + spannerConfig);
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Creating " + options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
            "Write To GCS",
            FileFormatFactorySpannerChangeStreams.newBuilder().setOptions(options).build());

    return pipeline.run();
  }
}

Langkah berikutnya