Dataflow adalah layanan terkelola untuk mentransformasi dan memperkaya data. Konektor Dataflow untuk Spanner memungkinkan Anda membaca data dari dan menulis data ke Spanner di pipeline Dataflow, yang secara opsional mengubah atau memodifikasi data. Anda juga dapat membuat pipeline yang mentransfer data antara Spanner dan produk Google Cloud lainnya.
Konektor Dataflow adalah metode yang direkomendasikan untuk memindahkan data ke dalam dan keluar dari Spanner secara efisien secara massal, dan untuk melakukan transformasi besar ke database yang tidak didukung oleh DML Terpartisi, seperti pemindahan tabel, penghapusan massal yang memerlukan JOIN, dan sebagainya. Ketika menggunakan database individual, ada metode lain yang dapat Anda gunakan untuk mengimpor dan mengekspor data:
- Gunakan konsol Google Cloud untuk mengekspor database individual dari Spanner ke Cloud Storage dalam format Avro.
- Gunakan konsol Google Cloud untuk import database kembali ke Spanner dari file yang Anda ekspor ke Cloud Storage.
- Gunakan REST API atau Google Cloud CLI untuk menjalankan tugas ekspor atau import dari Spanner ke Cloud Storage dan kembali (menggunakan format Avro).
Konektor Dataflow untuk Spanner adalah bagian dari Apache Beam Java SDK, dan menyediakan API untuk melakukan tindakan di atas. Untuk informasi selengkapnya tentang beberapa konsep yang dibahas di bawah ini, seperti objek dan transformasi PCollection, lihat Panduan pemrograman Apache Beam.
Tambahkan konektor ke project Maven Anda
Untuk menambahkan konektor Google Cloud Dataflow ke project Maven, tambahkan artefak Maven beam-sdks-java-io-google-cloud-platform
ke file pom.xml
Anda sebagai dependensi.
Misalnya, dengan asumsi bahwa file pom.xml
menetapkan beam.version
ke
nomor versi yang sesuai, Anda akan menambahkan dependensi berikut:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Membaca data dari Spanner
Untuk membaca dari Spanner, terapkan transformasi SpannerIO.read().
Konfigurasi operasi baca menggunakan metode dalam class SpannerIO.Read
.
Menerapkan transformasi akan menampilkan PCollection<Struct>
, dengan
setiap elemen dalam koleksi mewakili satu baris yang ditampilkan oleh operasi
baca. Anda dapat membaca dari Spanner dengan dan tanpa kueri SQL
tertentu, bergantung pada output yang diinginkan.
Menerapkan transformasi SpannerIO.read()
akan menampilkan tampilan data yang konsisten dengan
melakukan pembacaan yang kuat. Kecuali jika Anda menentukan sebaliknya, hasil pembacaan akan di-snapshot pada saat Anda memulai pembacaan. Lihat baca untuk
mengetahui informasi selengkapnya tentang berbagai jenis pembacaan yang dapat dilakukan Spanner.
Membaca data menggunakan kueri
Untuk membaca kumpulan data tertentu dari Spanner, konfigurasikan transformasi
menggunakan metode SpannerIO.Read.withQuery()
untuk menentukan kueri
SQL. Contoh:
Membaca data tanpa menentukan kueri
Untuk membaca dari database tanpa menggunakan kueri, Anda dapat menentukan nama tabel menggunakan metode SpannerIO.Read.withTable(), dan menentukan daftar kolom yang akan dibaca dengan metode SpannerIO.Read.withColumns(). Contoh:
GoogleSQL
PostgreSQL
Untuk membatasi baris yang dibaca, Anda dapat menentukan sekumpulan kunci utama yang akan dibaca menggunakan metode SpannerIO.Read.withKeySet().
Anda juga dapat membaca tabel menggunakan indeks sekunder yang ditentukan. Seperti pada panggilan API readUsingIndex(), indeks harus berisi semua data yang muncul di hasil kueri.
Untuk melakukannya, tentukan tabel seperti yang ditunjukkan pada contoh sebelumnya, dan tentukan
indeks yang berisi nilai kolom yang diinginkan menggunakan
metode SpannerIO.Read.withIndex()
. Indeks harus menyimpan semua kolom yang perlu dibaca oleh transformasi. {i>Primary key<i} tabel dasar
disimpan secara implisit. Misalnya, untuk membaca tabel Songs
menggunakan indeks
SongsBySongName
, Anda menggunakan
kode berikut:
GoogleSQL
PostgreSQL
Mengontrol penghentian data transaksi
Transformasi dijamin akan dieksekusi pada snapshot data yang konsisten. Untuk
mengontrol status tidak berlaku data, gunakan
metode SpannerIO.Read.withTimestampBound()
. Lihat transaksi untuk mengetahui informasi selengkapnya.
Membaca dari beberapa tabel dalam transaksi yang sama
Jika Anda ingin membaca data dari beberapa tabel secara bersamaan untuk memastikan konsistensi data, lakukan semua pembacaan dalam satu transaksi. Untuk melakukannya, terapkan transformasi createTransaction()
, dengan membuat objek PCollectionView<Transaction>
yang kemudian membuat transaksi. Tampilan
yang dihasilkan dapat diteruskan ke operasi baca
menggunakan SpannerIO.Read.withTransaction()
.
GoogleSQL
PostgreSQL
Membaca data dari semua tabel yang tersedia
Anda dapat membaca data dari semua tabel yang tersedia di database Spanner.
GoogleSQL
PostgreSQL
Memecahkan masalah kueri yang tidak didukung
Konektor Dataflow hanya mendukung kueri SQL Spanner dengan operator pertama dalam rencana eksekusi kueri adalah Distributed Union. Jika Anda mencoba membaca data dari Spanner menggunakan kueri dan mendapatkan pengecualian yang menyatakan bahwa kueri does not have a DistributedUnion at
the root
, ikuti langkah-langkah dalam Memahami cara Spanner menjalankan kueri guna mengambil rencana eksekusi untuk kueri Anda menggunakan Konsol Google Cloud.
Jika kueri SQL Anda tidak didukung, sederhanakan kueri yang memiliki gabungan
terdistribusi sebagai operator pertama dalam rencana eksekusi kueri. Hapus fungsi agregat, gabungan tabel, serta operator DISTINCT
, GROUP BY
, dan ORDER
, karena mereka adalah operator yang paling mungkin mencegah kueri berjalan.
Membuat mutasi untuk operasi tulis
Gunakan metode newInsertOrUpdateBuilder()
class Mutation
, bukan metode newInsertBuilder()
, kecuali jika benar-benar diperlukan untuk pipeline Java. Untuk pipeline Python, gunakan
SpannerInsertOrUpdate()
, bukan
SpannerInsert()
. Dataflow memberikan jaminan setidaknya satu kali, yang berarti mutasi dapat ditulis beberapa kali. Akibatnya, hanya mutasi INSERT
yang mungkin menghasilkan
error com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
yang menyebabkan
pipeline gagal. Untuk mencegah error ini, gunakan mutasi INSERT_OR_UPDATE
, yang menambahkan baris baru atau memperbarui nilai kolom jika baris sudah ada. Mutasi INSERT_OR_UPDATE
dapat diterapkan lebih dari sekali.
Menulis ke Spanner dan mentransformasi data
Anda dapat menulis data ke Spanner dengan konektor Dataflow menggunakan transformasi SpannerIO.write()
untuk menjalankan kumpulan mutasi baris input. Konektor Dataflow mengelompokkan mutasi ke dalam beberapa batch untuk efisiensi.
Contoh berikut menunjukkan cara menerapkan transformasi tulis ke PCollection
mutasi:
GoogleSQL
PostgreSQL
Jika transformasi tiba-tiba berhenti sebelum selesai, mutasi yang telah diterapkan tidak akan di-roll back.
Menerapkan kelompok mutasi secara atomik
Anda dapat menggunakan class MutationGroup
untuk memastikan
sekelompok mutasi diterapkan secara atomik. Mutasi dalam MutationGroup
dijamin akan dikirimkan dalam transaksi yang sama, tetapi transaksi tersebut mungkin akan dicoba lagi.
Grup mutasi akan berfungsi optimal jika digunakan untuk mengelompokkan mutasi yang memengaruhi data yang disimpan berdekatan dalam ruang kunci. Karena Spanner menyisipkan data tabel induk dan turunan bersama-sama di tabel induk, data tersebut selalu berdekatan di ruang kunci. Sebaiknya buat struktur grup mutasi agar berisi satu mutasi yang diterapkan ke tabel induk dan mutasi tambahan yang diterapkan ke tabel turunan, atau sehingga semua mutasinya memodifikasi data yang berdekatan dalam ruang kunci. Untuk mengetahui informasi selengkapnya tentang cara Spanner menyimpan data tabel induk dan turunan, lihat Skema dan model data. Jika Anda tidak mengatur grup mutasi di seputar hierarki tabel yang direkomendasikan, atau jika data yang diakses tidak berdekatan dalam ruang kunci, Spanner mungkin perlu melakukan commit dua fase, yang akan menghasilkan performa yang lebih lambat. Untuk mengetahui informasi selengkapnya, lihat Keseimbangan lokalitas.
Untuk menggunakan MutationGroup
, build transformasi SpannerIO.write()
dan panggil metode
SpannerIO.Write.grouped()
, yang menampilkan
transformasi yang kemudian dapat Anda terapkan ke PCollection
objek MutationGroup
.
Saat membuat MutationGroup
, mutasi pertama yang tercantum menjadi
mutasi primer. Jika grup mutasi Anda memengaruhi tabel induk dan turunan, mutasi utama harus berupa mutasi ke tabel induk. Jika tidak,
Anda dapat menggunakan mutasi apa pun sebagai mutasi utama. Konektor Dataflow
menggunakan mutasi utama untuk menentukan batas partisi agar
dapat mengelompokkan mutasi secara efisien bersama-sama.
Misalnya, bayangkan aplikasi Anda memantau perilaku dan menandai perilaku pengguna yang bermasalah untuk ditinjau. Untuk setiap perilaku yang ditandai, Anda ingin memperbarui tabel Users
untuk memblokir akses pengguna ke aplikasi Anda, dan Anda juga perlu mencatat insiden di tabel PendingReviews
. Untuk memastikan kedua tabel diupdate secara atomik, gunakan MutationGroup
:
GoogleSQL
PostgreSQL
Saat membuat grup mutasi, mutasi pertama yang diberikan sebagai argumen akan menjadi mutasi utama. Dalam hal ini, kedua tabel tersebut tidak terkait, sehingga
tidak ada mutasi primer yang jelas. Kita telah memilih userMutation
sebagai utama dengan
menempatkannya terlebih dahulu. Menerapkan dua mutasi secara terpisah akan lebih cepat, tetapi tidak akan menjamin atomicity, sehingga grup mutasi adalah pilihan terbaik dalam situasi ini.
Langkah selanjutnya
- Pelajari lebih lanjut cara mendesain pipeline data Apache Beam.
- Ekspor dan import database Spanner di Google Cloud Console menggunakan Dataflow.