Template streaming perubahan Spanner ke Pub/Sub

Streaming perubahan Spanner ke template Pub/Sub adalah pipeline streaming yang melakukan streaming data perubahan Spanner dan menulisnya ke topik Pub/Sub menggunakan Dataflow Runner V2.

Untuk menampilkan data ke topik Pub/Sub baru, Anda harus membuat topik terlebih dahulu. Setelah pembuatan, Pub/Sub akan otomatis membuat dan melampirkan langganan ke topik baru. Jika Anda mencoba menghasilkan output data ke topik Pub/Sub yang tidak ada, pipeline dataflow akan menampilkan pengecualian, dan pipeline akan macet karena terus mencoba membuat koneksi.

Jika topik Pub/Sub yang diperlukan sudah ada, Anda dapat menghasilkan output data ke topik tersebut.

Untuk mengetahui informasi selengkapnya, lihat Tentang aliran perubahan, Mem-build koneksi aliran perubahan dengan Dataflow, dan Praktik terbaik aliran perubahan.

Persyaratan pipeline

  • Instance Spanner harus ada sebelum menjalankan pipeline.
  • Database Spanner harus ada sebelum menjalankan pipeline.
  • Instance metadata Spanner harus ada sebelum menjalankan pipeline.
  • Database metadata Spanner harus ada sebelum menjalankan pipeline.
  • Aliran perubahan Spanner harus ada sebelum menjalankan pipeline.
  • Topik Pub/Sub harus ada sebelum menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • spannerInstanceId: Instance Spanner tempat membaca aliran data perubahan.
  • spannerDatabase: Database Spanner tempat membaca aliran data perubahan.
  • spannerMetadataInstanceId: Instance Spanner yang akan digunakan untuk tabel metadata konektor aliran data perubahan.
  • spannerMetadataDatabase: Database Spanner yang akan digunakan untuk tabel metadata konektor aliran data perubahan.
  • spannerChangeStreamName: Nama aliran data perubahan Spanner yang akan dibaca.
  • pubsubTopic: Topik Pub/Sub untuk output aliran perubahan.

Parameter opsional

  • spannerProjectId: Project tempat aliran data perubahan akan dibaca. Project ini juga merupakan tempat tabel metadata konektor aliran perubahan dibuat. Default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
  • spannerDatabaseRole: Peran database Spanner yang akan digunakan saat menjalankan template. Parameter ini hanya diperlukan jika akun utama IAM yang menjalankan template adalah pengguna kontrol akses terperinci. Peran database harus memiliki hak istimewa SELECT di aliran perubahan dan hak istimewa EXECUTE di fungsi baca aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses terperinci untuk aliran perubahan (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: Nama tabel metadata konektor aliran perubahan Spanner yang akan digunakan. Jika tidak disediakan, Spanner akan otomatis membuat tabel metadata konektor aliran data selama perubahan alur pipeline. Anda harus memberikan parameter ini saat memperbarui pipeline yang ada. Jangan gunakan parameter ini untuk kasus lainnya.
  • startTimestamp: DateTime awal (https://tools.ietf.org/html/rfc3339), inklusif, untuk digunakan membaca aliran perubahan. Misalnya, contoh- 2021-10-12T07:20:50.52Z. Secara default, stempel waktu saat pipeline dimulai, yaitu waktu saat ini.
  • endTimestamp: DateTime akhir (https://tools.ietf.org/html/rfc3339), inklusif, untuk digunakan membaca aliran perubahan. Misalnya, contoh- 2021-10-12T07:20:50.52Z. Defaultnya adalah waktu tak terbatas di masa mendatang.
  • spannerHost: Endpoint Cloud Spanner yang akan dipanggil dalam template. Hanya digunakan untuk pengujian. Contoh, https://spanner.googleapis.com. Secara default: https://spanner.googleapis.com.
  • outputDataFormat: Format output. Output digabungkan dalam banyak PubsubMessages dan dikirim ke topik Pub/Sub. Format yang diizinkan adalah JSON dan AVRO. Default-nya adalah JSON.
  • pubsubAPI: Pub/Sub API yang digunakan untuk menerapkan pipeline. API yang diizinkan adalah pubsubio dan native_client. Untuk sejumlah kecil kueri per detik (QPS), native_client memiliki latensi yang lebih rendah. Untuk QPS dalam jumlah besar, pubsubio memberikan performa yang lebih baik dan lebih stabil. Defaultnya adalah pubsubio.
  • pubsubProjectId: Project topik Pub/Sub. Default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
  • rpcPriority: Prioritas permintaan untuk panggilan Spanner. Nilai yang diizinkan adalah TINGGI, SEDANG, dan RENDAH. Defaultnya adalah: TINGGI).
  • includeSpannerSource: Apakah akan menyertakan ID database dan ID instance spanner untuk membaca aliran perubahan dalam data pesan output atau tidak. Defaultnya adalah: false.
  • outputMessageMetadata: Nilai string untuk kolom kustom outputMessageMetadata dalam pesan pub/sub output. Nilai defaultnya kosong dan kolom outputMessageMetadata hanya diisi jika nilai ini tidak kosong. Escape karakter khusus apa pun saat memasukkan nilai di sini(yaitu: tanda kutip ganda).

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Spanner change streams to Pub/Sub template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Spanner
  • PUBSUB_TOPIC: Topik Pub/Sub untuk output aliran perubahan

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Spanner
  • PUBSUB_TOPIC: Topik Pub/Sub untuk output aliran perubahan
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToPubSub} pipeline streams change stream record(s) and stores to
 * pubsub topic in user specified format. The sink data can be stored in a JSON Text or Avro data
 * format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Pub/Sub",
    description = {
      "The Cloud Spanner change streams to the Pub/Sub template is a streaming pipeline that streams Cloud Spanner data change records and writes them into Pub/Sub topics using Dataflow Runner V2.\n",
      "To output your data to a new Pub/Sub topic, you need to first create the topic. After creation, Pub/Sub automatically generates and attaches a subscription to the new topic. "
          + "If you try to output data to a Pub/Sub topic that doesn't exist, the dataflow pipeline throws an exception, and the pipeline gets stuck as it continuously tries to make a connection.\n",
      "If the necessary Pub/Sub topic already exists, you can output data to that topic.",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToPubSubOptions.class,
    flexContainerName = "spanner-changestreams-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist before running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Pub/Sub topic must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToPubSub {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToPubSub.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Messages to Pub/Sub");

    SpannerChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static String getSpannerProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  private static String getPubsubProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getPubsubProjectId().isEmpty()
        ? options.getProject()
        : options.getPubsubProjectId();
  }

  public static boolean isValidAsciiString(String outputMessageMetadata) {
    if (outputMessageMetadata != null
        && !StandardCharsets.US_ASCII.newEncoder().canEncode(outputMessageMetadata)) {
      return false;
    }
    return true;
  }

  public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) {
    LOG.info("Requested Message Format is " + options.getOutputDataFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);
    // Get the Spanner project, instance, database, metadata instance, metadata database
    // change stream, pubsub topic, and pubsub api parameters.
    String spannerProjectId = getSpannerProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();
    String pubsubProjectId = getPubsubProjectId(options);
    String pubsubTopicName = options.getPubsubTopic();
    String pubsubAPI = options.getPubsubAPI();
    Boolean includeSpannerSource = options.getIncludeSpannerSource();
    String outputMessageMetadata = options.getOutputMessageMetadata();

    // Ensure outputMessageMetadata only contains valid ascii characters
    if (!isValidAsciiString(outputMessageMetadata)) {
      throw new RuntimeException("outputMessageMetadata contains non ascii characters.");
    }

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Convert each record to a PubsubMessage",
            FileFormatFactorySpannerChangeStreamsToPubSub.newBuilder()
                .setOutputDataFormat(options.getOutputDataFormat())
                .setProjectId(pubsubProjectId)
                .setPubsubAPI(pubsubAPI)
                .setPubsubTopicName(pubsubTopicName)
                .setIncludeSpannerSource(includeSpannerSource)
                .setSpannerDatabaseId(databaseId)
                .setSpannerInstanceId(instanceId)
                .setOutputMessageMetadata(outputMessageMetadata)
                .build());
    return pipeline.run();
  }
}

Langkah berikutnya