Gunakan Datastream dan Dataflow untuk mengalirkan data ke BigQuery

Di halaman ini, Anda akan menemukan praktik terbaik penggunaan Datastream dan Dataflow untuk streaming data ke BigQuery.

Set data replika partisi pada kunci yang ditetapkan pengguna

Set data staging di BigQuery dipartisi secara otomatis. Namun, secara default, set data replika tidak dipartisi karena kunci partisi pada tabel replika harus ditentukan berdasarkan logika bisnis tertentu, bukan diterapkan oleh Datastream dan Dataflow.

Untuk setiap tabel dalam set data replika yang memerlukan partisi:

  1. Hentikan dan selesaikan tugas Dataflow.

  2. Gunakan editor SQL di BigQuery untuk menjalankan skrip SQL berikut bagi setiap tabel dalam set data replika. Untuk contoh ini, tabel actor dalam set data datastream_cdc memiliki kolom last_update yang ingin kita tetapkan sebagai kunci partisi. Dengan menjalankan skrip, Anda membuat ulang tabel dengan kunci partisi yang benar.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Gunakan template Datastream ke BigQuery untuk membuat ulang tugas Dataflow.

Menjalankan fungsi yang ditentukan pengguna untuk memanipulasi data peristiwa

Anda dapat menggunakan template Dataflow ke BigQuery untuk menjalankan fungsi JavaScript yang ditentukan pengguna. Untuk melakukannya, pertama-tama, tempatkan file yang berisi fungsi di lokasi tertentu dalam Cloud Storage. Kemudian, lakukan hal berikut:

  • Gunakan parameter javascriptTextTransformGcsPath dalam template untuk menentukan lokasi file di Cloud Storage yang berisi fungsi yang ditentukan pengguna.
  • Gunakan parameter javascriptTextTransformFunctionName untuk menentukan nama fungsi JavaScript yang ingin dipanggil sebagai fungsi yang ditentukan pengguna.

Misalnya, Anda dapat menjalankan fungsi yang ditentukan pengguna untuk mempertahankan kumpulan data yang telah dihapus dalam tabel set data replika dalam BigQuery. Proses ini dikenal sebagai penghapusan untuk sementara.

Untuk melakukannya, buat fungsi yang menyalin nilai kolom _metadata_deleted ke kolom baru bernama is_deleted, lalu reset nilai kolom _metadata_deleted ke false. Hal ini menyebabkan tugas Dataflow mengabaikan peristiwa penghapusan dan mempertahankan kumpulan data yang dihapus saat memperbarui set data replika di BigQuery.

Berikut adalah kode contoh untuk fungsi yang ditentukan pengguna ini:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Menetapkan frekuensi penggabungan

Gunakan parameter mergeFrequencyMinutes dari template Aliran Data ke BigQuery untuk menetapkan frekuensi penggabungan. Ini adalah jumlah menit di antara penggabungan untuk tabel tertentu dalam set data replika di BigQuery. Saat data historis diisi ulang, sebaiknya Anda tetap mengurangi frekuensi penggabungan (12 atau 24 jam) agar biaya tetap terkendali.

Misalnya, jika Anda menetapkan nilai parameter ini ke 10 menit, maka Dataflow akan menjalankan tugas yang menggunakan template setiap 10 menit. Namun, akan ada penundaan 5 menit saat tugas pertama kali dijalankan. Untuk contoh ini, jika tugas berjalan pada pukul 09.14, penggabungan pertama akan terjadi pada pukul 09.29 (10 menit untuk penggabungan dan 5 menit untuk penundaan). Penggabungan kedua akan terjadi pada pukul 09.39, dan semua penggabungan berikutnya akan terjadi dalam interval 10 menit (09.49, 09.59, 10.09, dan seterusnya).

Jika Anda menetapkan frekuensi penggabungan ke 60 menit, tugas akan berjalan pada jam tersebut, setelah penundaan 5 menit untuk menjalankan tugas yang pertama. Jika tugas dijadwalkan untuk berjalan pada pukul 10.00, maka tugas akan berjalan pada pukul 10.05 karena penundaan 5 menit. Semua penggabungan berikutnya akan terjadi dalam interval 60 menit (11.05, 12.05, 13.05, dan sebagainya).

Baik sebagai akibat dari pengendalian biaya atau alasan lain, Anda mungkin tidak dapat melakukan penggabungan pada frekuensi yang memenuhi kebutuhan bisnis Anda. Anda mungkin tidak memiliki data terbaru. Untuk mengakses data terbaru, buat tampilan di atas tabel set data staging dan replika di BigQuery, dengan tampilan yang meniru penggabungan. Tampilan ini dibuat sebagai satu tabel logis (untuk set data staging dan replika). Jika frekuensi penggabungan rendah, dan Anda memerlukan akses yang lebih cepat ke data, gunakan tampilan.