Streaming perubahan Spanner ke template BigQuery

Template aliran perubahan Spanner ke BigQuery adalah pipeline streaming yang melakukan streaming data perubahan Spanner dan menulisnya ke tabel BigQuery menggunakan Dataflow Runner V2.

Semua kolom yang dipantau oleh aliran perubahan disertakan dalam setiap baris tabel BigQuery, terlepas dari apakah kolom tersebut diubah oleh transaksi Spanner atau tidak. Kolom yang tidak ditonton tidak disertakan dalam baris BigQuery. Setiap perubahan Spanner yang kurang dari stempel waktu Dataflow berhasil diterapkan ke tabel BigQuery atau disimpan dalam antrean dead letter untuk dicoba ulang. Baris BigQuery disisipkan secara tidak berurutan dibandingkan dengan urutan stempel waktu commit Spanner asli.

Jika tabel BigQuery yang diperlukan tidak ada, pipeline akan membuatnya. Jika tidak, tabel BigQuery yang ada akan digunakan. Skema tabel BigQuery yang ada harus berisi kolom yang dilacak yang sesuai dari tabel Spanner dan kolom metadata tambahan yang tidak diabaikan secara eksplisit oleh opsi ignoreFields. Lihat deskripsi kolom metadata dalam daftar berikut. Setiap baris BigQuery baru menyertakan semua kolom yang dipantau oleh aliran perubahan dari baris yang sesuai di tabel Spanner Anda pada stempel waktu data perubahan.

Kolom metadata berikut ditambahkan ke tabel BigQuery. Untuk mengetahui detail selengkapnya tentang kolom ini, lihat Kumpulan data perubahan data di "Mengubah partisi, kumpulan data, dan kueri streaming data".

  • _metadata_spanner_mod_type: Jenis modifikasi (sisipkan, update, atau hapus) transaksi Spanner. Diekstrak dari data perubahan stream perubahan.
  • _metadata_spanner_table_name: Nama tabel Spanner. Kolom ini bukan nama tabel metadata konektor.
  • _metadata_spanner_commit_timestamp: Stempel waktu commit Spanner, yang merupakan waktu saat perubahan di-commit. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_server_transaction_id: String unik secara global yang mewakili transaksi Spanner tempat perubahan dilakukan. Hanya gunakan nilai ini dalam konteks pemrosesan data aliran perubahan. ID ini tidak berkorelasi dengan ID transaksi di API Spanner. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_record_sequence: Nomor urutan untuk data dalam transaksi Spanner. Nomor urutan dijamin unik dan meningkat secara monoton, tetapi tidak harus berurutan, dalam transaksi. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: Menunjukkan apakah data adalah data terakhir untuk transaksi Spanner di partisi saat ini. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_number_of_records_in_transaction: Jumlah kumpulan data perubahan yang merupakan bagian dari transaksi Spanner di seluruh partisi aliran data perubahan. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_spanner_number_of_partitions_in_transaction: Jumlah partisi yang menampilkan kumpulan data perubahan untuk transaksi Spanner. Nilai ini diekstrak dari data perubahan data aliran perubahan.
  • _metadata_big_query_commit_timestamp: Stempel waktu commit saat baris disisipkan ke BigQuery. Jika useStorageWriteApi adalah true, kolom ini tidak otomatis dibuat di tabel log perubahan oleh pipeline. Dalam hal ini, Anda harus menambahkan kolom ini secara manual di tabel log perubahan jika diperlukan.

Saat menggunakan template ini, perhatikan detail berikut:

  • Anda dapat menggunakan template ini untuk menyebarkan kolom baru di tabel yang ada atau tabel baru dari Spanner ke BigQuery. Untuk informasi selengkapnya, lihat Menangani penambahan tabel atau kolom pelacakan.
  • Untuk jenis pengambilan nilai OLD_AND_NEW_VALUES dan NEW_VALUES, saat kumpulan data perubahan data berisi perubahan UPDATE, template harus melakukan pembacaan yang tidak berlaku lagi ke Spanner pada stempel waktu commit kumpulan data perubahan data untuk mengambil kolom yang tidak berubah tetapi diamati. Pastikan untuk mengonfigurasi 'version_retention_period' database Anda dengan benar untuk pembacaan yang sudah tidak berlaku. Untuk jenis pengambilan nilai NEW_ROW, template lebih efisien, karena kumpulan data perubahan menangkap baris baru lengkap termasuk kolom yang tidak diperbarui dalam permintaan UPDATE, dan template tidak perlu melakukan pembacaan yang sudah tidak berlaku.
  • Untuk meminimalkan latensi jaringan dan biaya transportasi jaringan, jalankan tugas Dataflow dari region yang sama dengan instance Spanner atau tabel BigQuery Anda. Jika Anda menggunakan sumber, sink, lokasi file staging, atau lokasi file sementara yang berada di luar region tugas Anda, data Anda mungkin dikirim ke berbagai region. Untuk informasi selengkapnya, lihat Region Dataflow.
  • Template ini mendukung semua jenis data Spanner yang valid. Jika jenis BigQuery lebih presisi daripada jenis Spanner, penurunan presisi dapat terjadi selama transformasi. Secara khusus:
    • Untuk jenis JSON Spanner, urutan anggota objek diurutkan secara leksikografis, tetapi tidak ada jaminan seperti itu untuk jenis JSON BigQuery.
    • Spanner mendukung jenis TIMESTAMP nanodetik, tetapi BigQuery hanya mendukung jenis TIMESTAMP mikrodetik.
  • Template ini tidak mendukung penggunaan BigQuery Storage Write API dalam mode tepat satu kali.

Pelajari lebih lanjut aliran perubahan, cara mem-build pipeline Dataflow aliran perubahan, dan praktik terbaik.

Persyaratan pipeline

  • Instance Spanner harus ada sebelum menjalankan pipeline.
  • Database Spanner harus ada sebelum menjalankan pipeline.
  • Instance metadata Spanner harus ada sebelum menjalankan pipeline.
  • Database metadata Spanner harus ada sebelum menjalankan pipeline.
  • Aliran perubahan Spanner harus ada sebelum menjalankan pipeline.
  • Set data BigQuery harus ada sebelum menjalankan pipeline.

Menangani penambahan tabel atau kolom pelacakan

Bagian ini menjelaskan praktik terbaik untuk menangani penambahan tabel dan kolom Spanner pelacakan saat pipeline berjalan.

  • Sebelum menambahkan kolom baru ke cakupan aliran perubahan Spanner, tambahkan kolom tersebut ke tabel log perubahan BigQuery terlebih dahulu. Kolom yang ditambahkan harus memiliki jenis data yang cocok dan berupa NULLABLE. Tunggu setidaknya 10 menit sebelum Anda melanjutkan untuk membuat kolom atau tabel baru di Spanner. Menulis ke kolom baru tanpa menunggu dapat menyebabkan data yang tidak diproses dengan kode error tidak valid di direktori antrean email yang tidak terkirim.
  • Untuk menambahkan tabel baru, tambahkan tabel terlebih dahulu di database Spanner. Tabel dibuat secara otomatis di BigQuery saat pipeline menerima kumpulan data untuk tabel baru.
  • Setelah Anda menambahkan kolom atau tabel baru di database Spanner, pastikan untuk mengubah aliran perubahan untuk melacak kolom atau tabel baru yang Anda inginkan jika belum dilacak secara implisit.
  • Template tidak menghapus tabel atau kolom dari BigQuery. Jika kolom dihapus dari tabel Spanner, nilai null akan diisi ke kolom log perubahan BigQuery untuk data yang dihasilkan setelah kolom dihapus dari tabel Spanner, kecuali jika Anda menghapus kolom secara manual dari BigQuery.
  • Template tidak mendukung pembaruan jenis kolom. Meskipun Spanner mendukung perubahan kolom STRING menjadi BYTES atau kolom BYTES menjadi STRING, Anda tidak dapat mengubah jenis data kolom yang ada atau menggunakan nama kolom yang sama dengan jenis data yang berbeda di BigQuery. Jika Anda menghapus dan membuat ulang kolom dengan nama yang sama, tetapi jenisnya berbeda di Spanner, data mungkin akan ditulis ke kolom BigQuery yang ada, tetapi jenisnya tidak berubah.
  • Template ini tidak mendukung pembaruan mode kolom. Kolom metadata yang direplikasi ke BigQuery ditetapkan ke mode REQUIRED. Semua kolom lain yang direplikasi ke BigQuery ditetapkan ke NULLABLE, terlepas dari apakah kolom tersebut ditentukan sebagai NOT NULL di tabel Spanner. Anda tidak dapat memperbarui kolom NULLABLE ke mode REQUIRED di BigQuery.
  • Mengubah jenis pengambilan nilai stream perubahan tidak didukung untuk menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • spannerInstanceId : Instance Spanner tempat membaca aliran data perubahan.
  • spannerDatabase : Database Spanner tempat membaca aliran data perubahan.
  • spannerMetadataInstanceId : Instance Spanner yang akan digunakan untuk tabel metadata konektor aliran data perubahan.
  • spannerMetadataDatabase : Database Spanner yang akan digunakan untuk tabel metadata konektor aliran data perubahan.
  • spannerChangeStreamName : Nama aliran data perubahan Spanner yang akan dibaca.
  • bigQueryDataset : Set data BigQuery untuk output aliran perubahan.

Parameter opsional

  • spannerProjectId : Project tempat aliran data perubahan dibaca. Nilai ini juga merupakan project tempat tabel metadata konektor aliran perubahan dibuat. Nilai default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
  • spannerDatabaseRole : Peran database Spanner yang akan digunakan saat menjalankan template. Parameter ini hanya diperlukan jika akun utama IAM yang menjalankan template adalah pengguna kontrol akses terperinci. Peran database harus memiliki hak istimewa SELECT pada aliran perubahan dan hak istimewa EXECUTE pada fungsi baca aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses terperinci untuk aliran perubahan (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : Nama tabel metadata konektor aliran perubahan Spanner yang akan digunakan. Jika tidak diberikan, tabel metadata konektor aliran perubahan Spanner akan otomatis dibuat selama alur pipeline. Anda harus memberikan parameter ini saat memperbarui pipeline yang ada. Jika tidak, jangan berikan parameter ini.
  • rpcPriority : Prioritas permintaan untuk panggilan Spanner. Nilainya harus berupa salah satu nilai berikut: HIGH, MEDIUM, atau LOW. Nilai defaultnya adalah HIGH.
  • spannerHost : Endpoint Cloud Spanner yang akan dipanggil dalam template. Hanya digunakan untuk pengujian. (Contoh: https://batch-spanner.googleapis.com).
  • startTimestamp : DateTime awal (https://datatracker.ietf.org/doc/html/rfc3339), inklusif, untuk digunakan membaca aliran perubahan. Ex-2021-10-12T07:20:50.52Z. Secara default, stempel waktu saat pipeline dimulai, yaitu waktu saat ini.
  • endTimestamp : DateTime akhir (https://datatracker.ietf.org/doc/html/rfc3339), inklusif, untuk digunakan membaca aliran perubahan.Contoh: 2021-10-12T07:20:50.52Z. Defaultnya adalah waktu tak terbatas di masa mendatang.
  • bigQueryProjectId : Project BigQuery. Nilai defaultnya adalah project untuk tugas Dataflow.
  • bigQueryChangelogTableNameTemplate : Template untuk nama tabel BigQuery yang berisi log perubahan. Default-nya adalah: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : Jalur untuk menyimpan data yang belum diproses. Jalur default adalah direktori di lokasi sementara tugas Dataflow. Nilai default biasanya sudah cukup.
  • dlqRetryMinutes : Jumlah menit antara percobaan ulang antrean pesan yang tidak terkirim. Nilai defaultnya adalah 10.
  • ignoreFields : Daftar kolom yang dipisahkan koma (peka huruf besar/kecil) yang akan diabaikan. Kolom ini dapat berupa kolom tabel yang ditonton, atau kolom metadata yang ditambahkan oleh pipeline. Kolom yang diabaikan tidak disisipkan ke BigQuery. Jika Anda mengabaikan kolom _metadata_spanner_table_name, parameter bigQueryChangelogTableNameTemplate juga akan diabaikan. Default-nya adalah kosong.
  • disableDlqRetries : Apakah akan menonaktifkan percobaan ulang untuk DLQ atau tidak. Defaultnya adalah: false.
  • useStorageWriteApi : Jika benar, pipeline akan menggunakan BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : Saat menggunakan Storage Write API, menentukan semantik tulis. Untuk menggunakan semantik minimal satu kali (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.
  • numStorageWriteApiStreams : Saat menggunakan Storage Write API, menentukan jumlah aliran tulis. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini. Setelan defaultnya adalah: 0.
  • storageWriteApiTriggeringFrequencySec : Saat menggunakan Storage Write API, menentukan frekuensi pemicuan, dalam detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Spanner change streams to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Spanner
  • BIGQUERY_DATASET: Set data BigQuery untuk output aliran perubahan

API

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • SPANNER_INSTANCE_ID: ID instance Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID instance metadata Spanner
  • SPANNER_METADATA_DATABASE: Database metadata Spanner
  • SPANNER_CHANGE_STREAM: Aliran data perubahan Spanner
  • BIGQUERY_DATASET: Set data BigQuery untuk output aliran perubahan

Langkah selanjutnya