Dokumen ini menjelaskan cara menulis data dari Dataflow ke BigQuery menggunakan konektor I/O BigQuery Apache Beam.
Konektor I/O BigQuery tersedia di Apache Beam SDK. Sebaiknya gunakan SDK versi terbaru. Untuk informasi selengkapnya, lihat Apache Beam 2.x SDK.
Dukungan lintas bahasa untuk Python juga tersedia.
Ringkasan
Konektor I/O BigQuery mendukung metode berikut untuk menulis ke BigQuery:
STORAGE_WRITE_API
. Dalam mode ini, konektor melakukan penulisan langsung ke penyimpanan BigQuery, menggunakan BigQuery Storage Write API. Storage Write API menggabungkan penyerapan streaming dan pemuatan batch ke dalam satu API berperforma tinggi. Mode ini menjamin semantik yang tepat satu kali.STORAGE_API_AT_LEAST_ONCE
. Mode ini juga menggunakan Storage Write API, tetapi menyediakan semantik setidaknya satu kali. Mode ini menghasilkan latensi yang lebih rendah untuk sebagian besar pipeline. Namun, penulisan duplikat dapat terjadi.FILE_LOADS
. Dalam mode ini, konektor menulis data input ke file staging di Cloud Storage. Kemudian, skrip akan menjalankan tugas pemuatan BigQuery untuk memuat data ke BigQuery. Mode ini merupakan default untukPCollections
terbatas, yang paling umum ditemukan dalam pipeline batch.STREAMING_INSERTS
. Dalam mode ini, konektor menggunakan API streaming lama. Mode ini merupakan setelan default untukPCollections
yang tidak dibatasi, tetapi tidak direkomendasikan untuk project baru.
Saat memilih metode tulis, pertimbangkan poin-poin berikut:
- Untuk tugas streaming, pertimbangkan untuk menggunakan
STORAGE_WRITE_API
atauSTORAGE_API_AT_LEAST_ONCE
, karena mode ini menulis langsung ke penyimpanan BigQuery, tanpa menggunakan file staging perantara. - Jika Anda menjalankan pipeline menggunakan mode streaming minimal satu kali, tetapkan mode tulis ke
STORAGE_API_AT_LEAST_ONCE
. Setelan ini lebih efisien dan cocok dengan semantik mode streaming minimal satu kali. - Pemuatan file dan Storage Write API memiliki kuota dan batas yang berbeda.
- Tugas pemuatan menggunakan gabungan slot BigQuery bersama atau slot yang dipesan. Untuk menggunakan slot yang dipesan, jalankan tugas pemuatan dalam project dengan penetapan reservasi jenis
PIPELINE
. Tugas pemuatan bersifat gratis jika Anda menggunakan gabungan slot BigQuery bersama. Namun, BigQuery tidak memberikan jaminan terkait kapasitas yang tersedia dari penyimpanan bersama. Untuk mengetahui informasi selengkapnya, lihat Pengantar reservasi.
Keparalelan
Untuk
FILE_LOADS
danSTORAGE_WRITE_API
dalam pipeline streaming, konektor melakukan sharding data ke sejumlah file atau aliran. Secara umum, sebaiknya panggilwithAutoSharding
untuk mengaktifkan sharding otomatis.Untuk
FILE_LOADS
dalam pipeline batch, konektor menulis data ke file yang dipartisi, yang kemudian dimuat ke BigQuery secara paralel.Untuk
STORAGE_WRITE_API
dalam pipeline batch, setiap pekerja membuat satu atau beberapa aliran data untuk ditulis ke BigQuery, yang ditentukan oleh jumlah total shard.Untuk
STORAGE_API_AT_LEAST_ONCE
, ada satu aliran tulis default. Beberapa pekerja ditambahkan ke aliran data ini.
Performa
Tabel berikut menampilkan metrik performa untuk berbagai opsi baca BigQuery I/O. Workload dijalankan pada satu pekerja e2-standard2
, menggunakan Apache Beam SDK 2.49.0 untuk Java. Mereka tidak
menggunakan Runner v2.
100 M data | 1 kB | 1 kolom | Throughput (byte) | Throughput (elemen) |
---|---|---|
Penulisan Penyimpanan | 55 MBps | 54.000 elemen per detik |
Pemuatan Avro | 78 MBps | 77.000 elemen per detik |
Pemuatan JSON | 54 MBps | 53.000 elemen per detik |
Metrik ini didasarkan pada pipeline batch sederhana. Keduanya ditujukan untuk membandingkan performa antara konektor I/O, dan tidak selalu merepresentasikan pipeline di dunia nyata. Performa pipeline Dataflow bersifat kompleks, dan merupakan fungsi dari jenis VM, data yang sedang diproses, performa sumber dan sink eksternal, serta kode pengguna. Metrik didasarkan pada menjalankan Java SDK dan tidak mewakili karakteristik performa SDK bahasa lainnya. Untuk mengetahui informasi selengkapnya, lihat Performa Beam IO.
Praktik terbaik
Bagian ini menjelaskan praktik terbaik untuk menulis ke BigQuery dari Dataflow.
Pertimbangan umum
Storage Write API memiliki batas kuota. Konektor menangani batas ini untuk sebagian besar pipeline. Namun, beberapa skenario dapat menghabiskan aliran Storage Write API yang tersedia. Misalnya, masalah ini mungkin terjadi pada pipeline yang menggunakan sharding otomatis dan penskalaan otomatis dengan sejumlah besar tujuan, terutama dalam tugas yang berjalan lama dengan workload yang sangat bervariasi. Jika masalah ini terjadi, pertimbangkan untuk menggunakan
STORAGE_WRITE_API_AT_LEAST_ONCE
, untuk menghindari masalah ini.Gunakan metrik Google Cloud untuk memantau penggunaan kuota Storage Write API Anda.
Saat menggunakan pemuatan file, Avro biasanya mengungguli JSON. Untuk menggunakan Avro, panggil
withAvroFormatFunction
.Secara default, tugas pemuatan berjalan dalam project yang sama dengan tugas Dataflow. Untuk menentukan project yang berbeda, panggil
withLoadJobProjectId
.Saat menggunakan Java SDK, pertimbangkan untuk membuat class yang mewakili skema tabel BigQuery. Lalu, panggil
useBeamSchema
di pipeline Anda untuk otomatis melakukan konversi antara jenisRow
Apache Beam danTableRow
BigQuery. Untuk contoh class skema, lihatExampleModel.java
.Jika Anda memuat tabel dengan skema kompleks yang berisi ribuan kolom, sebaiknya panggil
withMaxBytesPerPartition
untuk menetapkan ukuran maksimum yang lebih kecil bagi setiap tugas pemuatan.
Pipeline streaming
Rekomendasi berikut berlaku untuk pipeline streaming.
Untuk pipeline streaming, sebaiknya gunakan Storage Write API (
STORAGE_WRITE_API
atauSTORAGE_API_AT_LEAST_ONCE
).Pipeline streaming dapat menggunakan pemuatan file, tetapi pendekatan ini memiliki kekurangan:
- Diperlukan windowing untuk menulis file. Anda tidak dapat menggunakan jendela global.
- BigQuery memuat file berdasarkan upaya terbaik saat menggunakan kumpulan slot bersama. Mungkin terdapat penundaan yang signifikan antara saat record ditulis dan saat data tersedia di BigQuery.
- Jika tugas pemuatan gagal — misalnya, karena data yang buruk atau ketidakcocokan skema — seluruh pipeline akan gagal.
Sebaiknya gunakan
STORAGE_WRITE_API_AT_LEAST_ONCE
jika memungkinkan. Hal ini dapat mengakibatkan data duplikat ditulis ke BigQuery, tetapi lebih murah dan lebih skalabel daripadaSTORAGE_WRITE_API
.Secara umum, hindari penggunaan
STREAMING_INSERTS
. Penyisipan streaming lebih mahal daripada Storage Write API, dan performanya juga tidak bagus.Sharding data dapat meningkatkan performa di pipeline streaming. Untuk sebagian besar pipeline, sharding otomatis adalah titik awal yang baik. Namun, Anda dapat melakukan penyesuaian sharding seperti berikut:
- Untuk
STORAGE_WRITE_API
, panggilwithNumStorageWriteApiStreams
guna menetapkan jumlah aliran operasi tulis. - Untuk
FILE_LOADS
, panggilwithNumFileShards
guna menetapkan jumlah shard file.
- Untuk
Jika Anda menggunakan streaming insert, sebaiknya setel
retryTransientErrors
sebagai kebijakan coba lagi.
Pipeline batch
Rekomendasi berikut berlaku untuk pipeline batch.
Untuk sebagian besar pipeline batch besar, sebaiknya coba
FILE_LOADS
terlebih dahulu. Pipeline batch dapat menggunakanSTORAGE_WRITE_API
, tetapi kemungkinan akan melebihi batas kuota dalam skala besar (1.000+ vCPU) atau jika pipeline serentak sedang berjalan. Apache Beam tidak membatasi jumlah maksimum aliran tulis untuk tugasSTORAGE_WRITE_API
batch, sehingga tugas tersebut pada akhirnya mencapai batas BigQuery Storage API.Saat menggunakan
FILE_LOADS
, Anda mungkin menghabiskan kumpulan slot BigQuery bersama atau kumpulan slot yang dipesan. Jika Anda mengalami kegagalan seperti ini, coba pendekatan berikut:- Kurangi jumlah maksimum pekerja atau ukuran pekerja untuk tugas.
- Beli lebih banyak slot yang direservasi.
- Sebaiknya gunakan
STORAGE_WRITE_API
.
Pipeline kecil hingga sedang (<1.000 vCPU) mungkin dapat memanfaatkan penggunaan
STORAGE_WRITE_API
. Untuk tugas yang lebih kecil ini, pertimbangkan untuk menggunakanSTORAGE_WRITE_API
jika Anda menginginkan antrean surat mati atau saat kumpulan slot bersamaFILE_LOADS
tidak cukup.Jika Anda dapat menoleransi data duplikat, pertimbangkan untuk menggunakan
STORAGE_WRITE_API_AT_LEAST_ONCE
. Mode ini dapat menyebabkan data duplikat ditulis ke BigQuery, tetapi mungkin lebih murah daripada opsiSTORAGE_WRITE_API
.Mode tulis yang berbeda dapat berfungsi secara berbeda berdasarkan karakteristik pipeline Anda. Lakukan eksperimen untuk menemukan mode tulis terbaik bagi beban kerja Anda.
Menangani error tingkat baris
Bagian ini menjelaskan cara menangani error yang mungkin terjadi di tingkat baris, misalnya karena format data input atau ketidakcocokan skema yang salah.
Untuk Storage Write API, setiap baris yang tidak dapat ditulis ditempatkan ke dalam PCollection
terpisah. Untuk mendapatkan koleksi ini, panggil
getFailedStorageApiInserts
pada objek
WriteResult
. Untuk contoh pendekatan ini, lihat Streaming data ke BigQuery.
Sebaiknya kirim error ke tabel atau antrean yang mati untuk diproses nanti. Untuk mengetahui informasi
selengkapnya tentang pola ini, lihat
pola penghentian pengiriman BigQueryIO
.
Untuk FILE_LOADS
, jika terjadi error saat memuat data, tugas pemuatan akan gagal dan pipeline akan menampilkan pengecualian runtime. Anda dapat melihat error tersebut di log Dataflow atau melihat histori tugas BigQuery.
Konektor I/O tidak menampilkan informasi tentang setiap baris yang gagal.
Untuk informasi lebih lanjut tentang cara memecahkan masalah error, lihat error konektor BigQuery.
Contoh
Contoh berikut menunjukkan cara menggunakan Dataflow untuk menulis ke BigQuery.
Menulis ke tabel yang sudah ada
Contoh berikut membuat pipeline batch yang menulis
PCollection<MyData>
ke BigQuery, dengan MyData
sebagai jenis data
kustom.
Metode BigQueryIO.write()
menampilkan
jenis BigQueryIO.Write<T>
, yang digunakan untuk mengonfigurasi operasi
tulis. Untuk informasi selengkapnya, lihat
Menulis ke tabel
di dokumentasi Apache Beam. Contoh kode ini menulis ke tabel yang ada (CREATE_NEVER
) dan menambahkan baris baru ke tabel (WRITE_APPEND
).
Java
Untuk mengautentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Menulis ke tabel baru atau yang sudah ada
Contoh berikut membuat tabel baru jika tabel tujuan tidak ada, dengan menetapkan
create disposition
ke CREATE_IF_NEEDED
. Saat menggunakan opsi ini, Anda harus memberikan skema
tabel. Konektor menggunakan skema ini jika membuat tabel baru.
Java
Untuk mengautentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Mengalirkan data ke BigQuery
Contoh berikut menunjukkan cara melakukan streaming data menggunakan semantik yang tepat satu kali, dengan menetapkan mode tulis ke STORAGE_WRITE_API
Tidak semua pipeline streaming memerlukan semantik tepat satu kali. Misalnya, Anda
mungkin dapat
menghapus duplikat secara manual
dari tabel tujuan. Jika kemungkinan pencatatan duplikat dapat diterima untuk skenario Anda, pertimbangkan untuk menggunakan semantik minimal satu kali dengan menetapkan metode tulis ke STORAGE_API_AT_LEAST_ONCE
. Metode ini
umumnya lebih efisien dan menghasilkan latensi yang lebih rendah untuk sebagian besar pipeline.
Java
Untuk mengautentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Langkah selanjutnya
- Pelajari lebih lanjut konektor I/O BigQuery di dokumentasi Apache Beam.
- Baca tentang Streaming data ke BigQuery menggunakan Storage Write API (postingan blog).