Streaming perubahan Spanner ke template BigQuery

Template aliran perubahan Spanner ke BigQuery adalah pipeline streaming yang melakukan streaming data perubahan Spanner dan menulisnya ke dalam tabel BigQuery menggunakan Dataflow Runner V2.

Semua kolom yang dipantau oleh aliran perubahan disertakan dalam setiap baris tabel BigQuery, terlepas dari apakah kolom tersebut diubah oleh transaksi Spanner atau tidak. Kolom yang tidak ditonton tidak disertakan dalam baris BigQuery. Setiap perubahan Spanner yang kurang dari stempel waktu Dataflow berhasil diterapkan ke tabel BigQuery atau disimpan dalam antrean dead letter untuk dicoba ulang. Baris BigQuery disisipkan secara tidak berurutan dibandingkan dengan urutan stempel waktu commit Spanner asli.

Jika tabel BigQuery yang diperlukan tidak ada, pipeline akan membuatnya. Jika tidak, tabel BigQuery yang ada akan digunakan. Skema tabel BigQuery yang ada harus berisi kolom pelacakan yang sesuai dari tabel Spanner dan kolom metadata tambahan yang tidak diabaikan secara eksplisit oleh opsi ignoreFields. Lihat deskripsi kolom metadata dalam daftar berikut. Setiap baris BigQuery baru menyertakan semua kolom yang dipantau oleh aliran perubahan dari baris yang sesuai di tabel Spanner Anda pada stempel waktu data perubahan.

Kolom metadata berikut ditambahkan ke tabel BigQuery. Untuk mengetahui detail selengkapnya tentang kolom ini, lihat Kumpulan data perubahan data di "Mengubah partisi, kumpulan data, dan kueri streaming data".

  • _metadata_spanner_mod_type: Jenis modifikasi (sisipkan, update, atau hapus) transaksi Spanner. Diekstrak dari data perubahan stream perubahan.
  • _metadata_spanner_table_name: Nama tabel Spanner. Kolom ini bukan nama tabel metadata konektor.
  • _metadata_spanner_commit_timestamp: Stempel waktu commit Spanner, yang merupakan waktu saat perubahan di-commit. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_server_transaction_id: String unik secara global yang mewakili transaksi Spanner tempat perubahan dilakukan. Hanya gunakan nilai ini dalam konteks pemrosesan data aliran perubahan. ID ini tidak berkorelasi dengan ID transaksi di API Spanner. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_record_sequence: Nomor urutan untuk data dalam transaksi Spanner. Nomor urutan dijamin unik dan meningkat secara monoton, tetapi tidak harus berurutan, dalam transaksi. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: Menunjukkan apakah data adalah data terakhir untuk transaksi Spanner di partisi saat ini. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_number_of_records_in_transaction: Jumlah kumpulan data perubahan yang merupakan bagian dari transaksi Spanner di seluruh partisi aliran data perubahan. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_number_of_partitions_in_transaction: Jumlah partisi yang menampilkan kumpulan data perubahan untuk transaksi Spanner. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_big_query_commit_timestamp: Stempel waktu commit saat baris disisipkan ke BigQuery. Jika useStorageWriteApi adalah true, kolom ini tidak otomatis dibuat di tabel log perubahan oleh pipeline. Dalam hal ini, Anda harus menambahkan kolom ini secara manual di tabel log perubahan jika diperlukan.

Saat menggunakan template ini, perhatikan detail berikut:

  • Anda dapat menggunakan template ini untuk menyebarkan kolom baru di tabel yang ada atau tabel baru dari Spanner ke BigQuery. Untuk informasi selengkapnya, lihat Menangani penambahan tabel atau kolom pelacakan.
  • Untuk jenis pengambilan nilai OLD_AND_NEW_VALUES dan NEW_VALUES, saat kumpulan data perubahan data berisi perubahan UPDATE, template harus melakukan pembacaan yang tidak berlaku lagi ke Spanner pada stempel waktu commit kumpulan data perubahan data untuk mengambil kolom yang tidak berubah tetapi diamati. Pastikan untuk mengonfigurasi 'version_retention_period' database Anda dengan benar untuk pembacaan yang sudah tidak berlaku. Untuk jenis pengambilan nilai NEW_ROW, template lebih efisien, karena kumpulan data perubahan menangkap baris baru lengkap termasuk kolom yang tidak diperbarui dalam permintaan UPDATE, dan template tidak perlu melakukan pembacaan yang sudah tidak berlaku.
  • Untuk meminimalkan latensi jaringan dan biaya transportasi jaringan, jalankan tugas Dataflow dari region yang sama dengan instance Spanner atau tabel BigQuery Anda. Jika Anda menggunakan sumber, sink, lokasi file staging, atau lokasi file sementara yang berada di luar region tugas Anda, data Anda mungkin dikirim ke berbagai region. Untuk informasi selengkapnya, lihat Region Dataflow.
  • Template ini mendukung semua jenis data Spanner yang valid. Jika jenis BigQuery lebih presisi daripada jenis Spanner, kehilangan presisi dapat terjadi selama transformasi. Secara khusus:
    • Untuk jenis JSON Spanner, urutan anggota objek diurutkan secara leksikografis, tetapi tidak ada jaminan seperti itu untuk jenis JSON BigQuery.
    • Spanner mendukung jenis TIMESTAMP nanodetik, tetapi BigQuery hanya mendukung jenis TIMESTAMP mikrodetik.

Pelajari lebih lanjut aliran perubahan, cara mem-build pipeline Dataflow aliran perubahan, dan praktik terbaik.

Persyaratan pipeline

  • Instance Spanner harus ada sebelum menjalankan pipeline.
  • Database Spanner harus ada sebelum menjalankan pipeline.
  • Instance metadata Spanner harus ada sebelum menjalankan pipeline.
  • Database metadata Spanner harus ada sebelum menjalankan pipeline.
  • Aliran perubahan Spanner harus ada sebelum menjalankan pipeline.
  • Set data BigQuery harus ada sebelum menjalankan pipeline.

Menangani penambahan tabel atau kolom pelacakan

Bagian ini menjelaskan praktik terbaik untuk menangani penambahan tabel dan kolom Spanner pelacakan saat pipeline berjalan. Versi template terlama yang didukung untuk fitur ini adalah 2024-09-19-00_RC00.

  • Sebelum menambahkan kolom baru ke cakupan aliran perubahan Spanner, tambahkan kolom tersebut ke tabel log perubahan BigQuery terlebih dahulu. Kolom yang ditambahkan harus memiliki jenis data yang cocok dan berupa NULLABLE. Tunggu setidaknya 10 menit sebelum Anda melanjutkan untuk membuat kolom atau tabel baru di Spanner. Menulis ke kolom baru tanpa menunggu dapat mengakibatkan data yang tidak diproses dengan kode error tidak valid di direktori antrean email yang tidak terkirim.
  • Untuk menambahkan tabel baru, tambahkan tabel terlebih dahulu di database Spanner. Tabel dibuat secara otomatis di BigQuery saat pipeline menerima kumpulan data untuk tabel baru.
  • Setelah Anda menambahkan kolom atau tabel baru di database Spanner, pastikan untuk mengubah aliran perubahan untuk melacak kolom atau tabel baru yang Anda inginkan jika belum dilacak secara implisit.
  • Template tidak menghapus tabel atau kolom dari BigQuery. Jika kolom dihapus dari tabel Spanner, nilai null akan diisi ke kolom log perubahan BigQuery untuk data yang dihasilkan setelah kolom dihapus dari tabel Spanner, kecuali jika Anda menghapus kolom secara manual dari BigQuery.
  • Template tidak mendukung pembaruan jenis kolom. Meskipun Spanner mendukung perubahan kolom STRING menjadi BYTES atau kolom BYTES menjadi STRING, Anda tidak dapat mengubah jenis data kolom yang ada atau menggunakan nama kolom yang sama dengan jenis data yang berbeda di BigQuery. Jika Anda menghapus dan membuat ulang kolom dengan nama yang sama, tetapi jenisnya berbeda di Spanner, data mungkin akan ditulis ke kolom BigQuery yang ada, tetapi jenisnya tidak berubah.
  • Template ini tidak mendukung pembaruan mode kolom. Kolom metadata yang direplikasi ke BigQuery ditetapkan ke mode REQUIRED. Semua kolom lain yang direplikasi ke BigQuery ditetapkan ke NULLABLE, terlepas dari apakah kolom tersebut ditentukan sebagai NOT NULL di tabel Spanner. Anda tidak dapat memperbarui kolom NULLABLE ke mode REQUIRED di BigQuery.
  • Mengubah jenis pengambilan nilai stream perubahan tidak didukung untuk menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • spannerInstanceId: Instance Spanner tempat membaca aliran data perubahan.
  • spannerDatabase: Database Spanner tempat membaca aliran data perubahan.
  • spannerMetadataInstanceId: Instance Spanner yang akan digunakan untuk tabel metadata konektor aliran data perubahan.
  • spannerMetadataDatabase: Database Spanner yang akan digunakan untuk tabel metadata konektor aliran data perubahan.
  • spannerChangeStreamName: Nama aliran data perubahan Spanner yang akan dibaca.
  • bigQueryDataset: Set data BigQuery untuk output aliran perubahan.

Parameter opsional

  • spannerProjectId: Project tempat aliran data perubahan akan dibaca. Nilai ini juga merupakan project tempat tabel metadata konektor aliran perubahan dibuat. Nilai default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
  • spannerDatabaseRole: Peran database Spanner yang akan digunakan saat menjalankan template. Parameter ini hanya diperlukan jika akun utama IAM yang menjalankan template adalah pengguna kontrol akses terperinci. Peran database harus memiliki hak istimewa SELECT di aliran perubahan dan hak istimewa EXECUTE di fungsi baca aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses terperinci untuk aliran perubahan (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: Nama tabel metadata konektor aliran perubahan Spanner yang akan digunakan. Jika tidak diberikan, tabel metadata konektor aliran perubahan Spanner akan otomatis dibuat selama alur pipeline. Anda harus memberikan parameter ini saat memperbarui pipeline yang ada. Jika tidak, jangan berikan parameter ini.
  • rpcPriority: Prioritas permintaan untuk panggilan Spanner. Nilainya harus berupa salah satu nilai berikut: HIGH, MEDIUM, atau LOW. Nilai defaultnya adalah HIGH.
  • spannerHost: Endpoint Cloud Spanner yang akan dipanggil dalam template. Hanya digunakan untuk pengujian. Contoh, https://batch-spanner.googleapis.com.
  • startTimestamp: DateTime awal (https://datatracker.ietf.org/doc/html/rfc3339), inklusif, untuk digunakan membaca aliran perubahan. Ex-2021-10-12T07:20:50.52Z. Secara default, stempel waktu saat pipeline dimulai, yaitu waktu saat ini.
  • endTimestamp: DateTime akhir (https://datatracker.ietf.org/doc/html/rfc3339), inklusif, untuk digunakan membaca aliran perubahan.Contoh: 2021-10-12T07:20:50.52Z. Defaultnya adalah waktu tak terbatas di masa mendatang.
  • bigQueryProjectId: Project BigQuery. Nilai defaultnya adalah project untuk tugas Dataflow.
  • bigQueryChangelogTableNameTemplate: Template untuk nama tabel BigQuery yang berisi log perubahan. Setelan defaultnya adalah: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory: Jalur untuk menyimpan data yang belum diproses. Jalur default adalah direktori di lokasi sementara tugas Dataflow. Nilai default biasanya sudah cukup.
  • dlqRetryMinutes: Jumlah menit antara percobaan ulang antrean pesan yang tidak terkirim. Nilai defaultnya adalah 10.
  • ignoreFields: Daftar kolom yang dipisahkan koma (peka huruf besar/kecil) yang akan diabaikan. Kolom ini mungkin berupa kolom tabel yang ditonton, atau kolom metadata yang ditambahkan oleh pipeline. Kolom yang diabaikan tidak disisipkan ke BigQuery. Jika Anda mengabaikan kolom _metadata_spanner_table_name, parameter bigQueryChangelogTableNameTemplate juga akan diabaikan. Default-nya adalah kosong.
  • disableDlqRetries: Apakah akan menonaktifkan percobaan ulang untuk DLQ atau tidak. Defaultnya adalah: false.
  • useStorageWriteApi: Jika benar, pipeline akan menggunakan BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: Saat menggunakan Storage Write API, menentukan semantik tulis. Untuk menggunakan semantik minimal satu kali (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.
  • numStorageWriteApiStreams: Saat menggunakan Storage Write API, menentukan jumlah aliran tulis. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini. Setelan defaultnya adalah: 0.
  • storageWriteApiTriggeringFrequencySec: Saat menggunakan Storage Write API, menentukan frekuensi pemicuan, dalam detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Spanner change streams to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Spanner
  • BIGQUERY_DATASET: Set data BigQuery untuk output aliran perubahan

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Spanner
  • BIGQUERY_DATASET: Set data BigQuery untuk output aliran perubahan
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.ModColumnType;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.OptionsUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO(haikuo-google): Add integration test.
// TODO(haikuo-google): Add README.
// TODO(haikuo-google): Add stackdriver metrics.
// TODO(haikuo-google): Ideally side input should be used to store schema information and shared
// accross DoFns, but since side input fix is not yet deployed at the moment, we read schema
// information in the beginning of the DoFn as a work around. We should use side input instead when
// it's available.
// TODO(haikuo-google): Test the case where tables or columns are added while the pipeline is
// running.
/**
 * This pipeline ingests {@link DataChangeRecord} from Spanner change stream. The {@link
 * DataChangeRecord} is then broken into {@link Mod}, which converted into {@link TableRow} and
 * inserted into BigQuery table.
 */
@Template(
    name = "Spanner_Change_Streams_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to BigQuery",
    description = {
      "The Cloud Spanner change streams to BigQuery template is a streaming pipeline that streams"
          + " Cloud Spanner data change records and writes them into BigQuery tables using Dataflow"
          + " Runner V2.\n",
      "All change stream watched columns are included in each BigQuery table row, regardless of"
          + " whether they are modified by a Cloud Spanner transaction. Columns not watched are not"
          + " included in the BigQuery row. Any Cloud Spanner change less than the Dataflow"
          + " watermark are either successfully applied to the BigQuery tables or are stored in the"
          + " dead-letter queue for retry. BigQuery rows are inserted out of order compared to the"
          + " original Cloud Spanner commit timestamp ordering.\n",
      "If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing"
          + " BigQuery tables are used. The schema of existing BigQuery tables must contain the"
          + " corresponding tracked columns of the Cloud Spanner tables and any additional metadata"
          + " columns that are not ignored explicitly by the ignoreFields option. See the"
          + " description of the metadata fields in the following list. Each new BigQuery row"
          + " includes all columns watched by the change stream from its corresponding row in your"
          + " Cloud Spanner table at the change record's timestamp.\n",
      "The following metadata fields are added to BigQuery tables. For more details about these"
          + " fields, see Data change records in \"Change streams partitions, records, and"
          + " queries.\"\n"
          + "- _metadata_spanner_mod_type: The modification type (insert, update, or delete) of the"
          + " Cloud Spanner transaction. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_table_name: The Cloud Spanner table name. Note this field is not"
          + " the metadata table name of the connector.\n"
          + "- _metadata_spanner_commit_timestamp: The Spanner commit timestamp, which is the time"
          + " when a change is committed. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_server_transaction_id: A globally unique string that represents"
          + " the Spanner transaction in which the change was committed. Only use this value in the"
          + " context of processing change stream records. It isn't correlated with the transaction"
          + " ID in Spanner's API. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_record_sequence: The sequence number for the record within the"
          + " Spanner transaction. Sequence numbers are guaranteed to be unique and monotonically"
          + " increasing (but not necessarily contiguous) within a transaction. Extracted from"
          + " change stream data change record.\n"
          + "- _metadata_spanner_is_last_record_in_transaction_in_partition: Indicates whether the"
          + " record is the last record for a Spanner transaction in the current partition."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_records_in_transaction: The number of data change"
          + " records that are part of the Spanner transaction across all change stream partitions."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_partitions_in_transaction: The number of partitions that"
          + " return data change records for the Spanner transaction. Extracted from change stream"
          + " data change record.\n"
          + "- _metadata_big_query_commit_timestamp: The commit timestamp of when the row is"
          + " inserted into BigQuery.\n",
      "Notes:\n"
          + "- This template does not propagate schema changes from Cloud Spanner to BigQuery."
          + " Because performing a schema change in Cloud Spanner is likely going to break the"
          + " pipeline, you might need to recreate the pipeline after the schema change.\n"
          + "- For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change"
          + " record contains an UPDATE change, the template needs to do a stale read to Cloud"
          + " Spanner at the commit timestamp of the data change record to retrieve the unchanged"
          + " but watched columns. Make sure to configure your database 'version_retention_period'"
          + " properly for the stale read. For the NEW_ROW value capture type, the template is more"
          + " efficient, because the data change record captures the full new row including columns"
          + " that are not updated in UPDATEs, and the template does not need to do a stale read.\n"
          + "- You can minimize network latency and network transport costs by running the Dataflow"
          + " job from the same region as your Cloud Spanner instance or BigQuery tables. If you"
          + " use sources, sinks, staging file locations, or temporary file locations that are"
          + " located outside of your job's region, your data might be sent across regions. See"
          + " more about Dataflow regional endpoints.\n"
          + "- This template supports all valid Cloud Spanner data types, but if the BigQuery type"
          + " is more precise than the Cloud Spanner type, precision loss might occur during the"
          + " transformation. Specifically:\n"
          + "  - For Cloud Spanner JSON type, the order of the members of an object is"
          + " lexicographically ordered, but there is no such guarantee for BigQuery JSON type.\n"
          + "  - Cloud Spanner supports nanoseconds TIMESTAMP type, BigQuery only supports"
          + " microseconds TIMESTAMP type.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change"
          + " streams</a>, <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to"
          + " build change streams Dataflow pipelines</a>, and <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best"
          + " practices</a>."
    },
    optionsClass = SpannerChangeStreamsToBigQueryOptions.class,
    flexContainerName = "spanner-changestreams-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The BigQuery dataset must exist prior to running the pipeline."
    },
    streaming = true,
    supportsExactlyOnce = true,
    supportsAtLeastOnce = true)
public final class SpannerChangeStreamsToBigQuery {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToBigQuery.class);

  // Max number of deadletter queue retries.
  private static final int DLQ_MAX_RETRIES = 5;

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting to replicate change records from Spanner change streams to BigQuery");

    SpannerChangeStreamsToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerChangeStreamsToBigQueryOptions.class);

    run(options);
  }

  private static void validateOptions(SpannerChangeStreamsToBigQueryOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options
        .getBigQueryChangelogTableNameTemplate()
        .equals(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME)) {
      throw new IllegalArgumentException(
          String.format(
              "bigQueryChangelogTableNameTemplate cannot be set to '{%s}'. This value is reserved"
                  + " for the Cloud Spanner table name.",
              BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME));
    }

    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
  }

  private static void setOptions(SpannerChangeStreamsToBigQueryOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) {
    setOptions(options);
    validateOptions(options);

    /**
     * Stages: 1) Read {@link DataChangeRecord} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link DataChangeRecord}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into {@link TableRow} by reading from Spanner at
     * commit timestamp. 4) Append {@link TableRow} to BigQuery. 5) Write Failures from 2), 3) and
     * 4) to GCS dead letter queue.
     */
    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);
    String spannerProjectId = OptionsUtils.getSpannerProjectId(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    /**
     * There are two types of errors that can occur in this pipeline:
     *
     * <p>1) Error originating from modJsonStringToTableRow. Errors here are either due to pk values
     * missing, a spanner table / column missing in the in-memory map, or some Spanner read error
     * happening in readSpannerRow. We already retry the Spanner read error inline 3 times. Th other
     * types of errors are more likely to be un-retriable.
     *
     * <p>2) Error originating from BigQueryIO.write. BigQuery storage write API already retries all
     * transient errors and outputs more permanent errors.
     *
     * <p>As a result, it is reasonable to write all errors happening in the pipeline directly into
     * the permanent DLQ, since most of the errors are likely to be non-transient.
     */
    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    // Retrieve and parse the startTimestamp and endTimestamp.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabase())
            .withRpcPriority(options.getRpcPriority());
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }

    SpannerIO.ReadChangeStream readChangeStream =
        SpannerIO.readChangeStream()
            .withSpannerConfig(spannerConfig)
            .withMetadataInstance(options.getSpannerMetadataInstanceId())
            .withMetadataDatabase(options.getSpannerMetadataDatabase())
            .withChangeStreamName(options.getSpannerChangeStreamName())
            .withInclusiveStartAt(startTimestamp)
            .withInclusiveEndAt(endTimestamp)
            .withRpcPriority(options.getRpcPriority());

    String spannerMetadataTableName = options.getSpannerMetadataTableName();
    if (spannerMetadataTableName != null) {
      readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
    }

    PCollection<DataChangeRecord> dataChangeRecord =
        pipeline
            .apply("Read from Spanner Change Streams", readChangeStream)
            .apply("Reshuffle DataChangeRecord", Reshuffle.viaRandomKey());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply("DataChangeRecord To Mod JSON", ParDo.of(new DataChangeRecordToModJsonFn()))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson =
        dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    ImmutableSet.Builder<String> ignoreFieldsBuilder = ImmutableSet.builder();
    for (String ignoreField : options.getIgnoreFields().split(",")) {
      ignoreFieldsBuilder.add(ignoreField);
    }
    ImmutableSet<String> ignoreFields = ignoreFieldsBuilder.build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions
        failsafeModJsonToTableRowOptions =
            FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setSpannerChangeStream(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setCoder(FAILSAFE_ELEMENT_CODER)
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow failsafeModJsonToTableRow =
        new FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow(
            failsafeModJsonToTableRowOptions);

    PCollectionTuple tableRowTuple =
        failsafeModJson.apply("Mod JSON To TableRow", failsafeModJsonToTableRow);
    // If users pass in the full BigQuery dataset ID (projectId.datasetName), extract the dataset
    // name for the setBigQueryDataset parameter.
    List<String> results = OptionsUtils.processBigQueryProjectAndDataset(options);
    String bigqueryProject = results.get(0);
    String bigqueryDataset = results.get(1);

    BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions
        bigQueryDynamicDestinationsOptions =
            BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setChangeStreamName(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setBigQueryProject(bigqueryProject)
                .setBigQueryDataset(bigqueryDataset)
                .setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    WriteResult writeResult;
    if (!options.getUseStorageWriteApi()) {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    } else {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .ignoreUnknownValues()
                      .withAutoSchemaUpdate(true) // only supported when using STORAGE_WRITE_API or
                      // STORAGE_API_AT_LEAST_ONCE.
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    }

    PCollection<String> transformDlqJson =
        tableRowTuple
            .get(failsafeModJsonToTableRow.transformDeadLetterOut)
            .apply(
                "Failed Mod JSON During Table Row Transformation",
                MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollection<String> bqWriteDlqJson =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "Failed Mod JSON During BigQuery Writes",
                MapElements.via(new BigQueryDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        // Generally BigQueryIO storage write retries transient errors, and only more
        // persistent errors make it into DLQ.
        .and(bqWriteDlqJson)
        .apply("Merge Failed Mod JSON From Transform And BigQuery", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retryable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static DeadLetterQueueManager buildDlqManager(
      SpannerChangeStreamsToBigQueryOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDeadLetterQueueDirectory().isEmpty()
            ? tempLocation + "dlq/"
            : options.getDeadLetterQueueDirectory();

    LOG.info("Dead letter queue directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, DLQ_MAX_RETRIES);
  }

  /**
   * Remove the following intermediate metadata fields that are not user data from {@link TableRow}:
   * _metadata_error, _metadata_retry_count, _metadata_spanner_original_payload_json.
   */
  private static TableRow removeIntermediateMetadataFields(TableRow tableRow) {
    TableRow cleanTableRow = tableRow.clone();
    Set<String> rowKeys = tableRow.keySet();
    Set<String> metadataFields = BigQueryUtils.getBigQueryIntermediateMetadataFieldNames();

    for (String rowKey : rowKeys) {
      if (metadataFields.contains(rowKey)) {
        cleanTableRow.remove(rowKey);
      } else if (rowKeys.contains("_type_" + rowKey)) {
        cleanTableRow.remove("_type_" + rowKey);
      }
    }

    return cleanTableRow;
  }

  /**
   * DoFn that converts a {@link DataChangeRecord} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class DataChangeRecordToModJsonFn extends DoFn<DataChangeRecord, String> {

    @ProcessElement
    public void process(@Element DataChangeRecord input, OutputReceiver<String> receiver) {
      for (org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod changeStreamsMod :
          input.getMods()) {
        Mod mod =
            new Mod(
                changeStreamsMod.getKeysJson(),
                changeStreamsMod.getNewValuesJson(),
                input.getCommitTimestamp(),
                input.getServerTransactionId(),
                input.isLastRecordInTransactionInPartition(),
                input.getRecordSequence(),
                input.getTableName(),
                input.getRowType().stream().map(ModColumnType::new).collect(Collectors.toList()),
                input.getModType(),
                input.getValueCaptureType(),
                input.getNumberOfRecordsInTransaction(),
                input.getNumberOfPartitionsInTransaction());

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }
  }
}

Langkah berikutnya