Template Datastream ke BigQuery (Stream)

Template Datastream ke BigQuery adalah pipeline streaming yang membaca data Datastream dan mereplikasinya ke BigQuery. Template tersebut membaca data dari Cloud Storage menggunakan notifikasi Pub/Sub dan mereplikasinya ke dalam tabel staging BigQuery yang dipartisi berdasarkan waktu. Setelah replikasi, template akan mengeksekusi MERGE di BigQuery untuk memperbarui dan menyisipkan semua perubahan pengambilan data perubahan (CDC) ke dalam replika tabel sumber.

Template ini menangani pembuatan dan pembaruan tabel BigQuery yang dikelola oleh replikasi. Jika bahasa definisi data (DDL) diperlukan, callback ke Datastream akan mengekstrak skema tabel sumber dan menerjemahkannya menjadi jenis data BigQuery. Operasi yang didukung meliputi:

  • Tabel baru dibuat saat data disisipkan.
  • Kolom baru ditambahkan ke tabel BigQuery dengan nilai awal null.
  • Kolom yang dihapus akan diabaikan di BigQuery dan nilai mendatang akan bernilai null.
  • Kolom dengan nama yang baru ditambahkan ke BigQuery sebagai kolom baru.
  • Perubahan jenis tidak disebarkan ke BigQuery.

Sebaiknya jalankan pipeline ini menggunakan mode streaming minimal satu kali, karena template melakukan de-duplikasi saat menggabungkan data dari tabel BigQuery sementara ke tabel BigQuery utama. Langkah dalam pipeline ini berarti tidak ada manfaat tambahan jika menggunakan mode streaming tepat satu kali.

Persyaratan pipeline

  • Aliran Datastream yang siap atau sudah mereplikasi data.
  • Notifikasi Pub/Sub Cloud Storage diaktifkan untuk data Datastream.
  • Set data tujuan BigQuery dibuat dan Akun Layanan Compute Engine telah diberi akses admin ke set data tersebut.
  • Kunci utama diperlukan dalam tabel sumber agar tabel replika tujuan dapat dibuat.
  • Database sumber MySQL atau Oracle. Database PostgreSQL tidak didukung.

Parameter template

Parameter Deskripsi
inputFilePattern Lokasi file yang akan direplikasi oleh file Datastream di Cloud Storage. Lokasi file ini biasanya merupakan jalur root untuk streaming.
gcsPubSubSubscription Langganan Pub/Sub dengan notifikasi file Datastream. Misalnya, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat Format file output yang dihasilkan oleh Datastream. Misalnya avro,json. Default, avro.
outputStagingDatasetTemplate Nama set data yang ada untuk berisi tabel staging. Anda dapat menyertakan template {_metadata_dataset} sebagai placeholder yang diganti dengan nama set data/skema sumber (misalnya, {_metadata_dataset}_log).
outputDatasetTemplate Nama set data yang ada untuk memuat tabel replika. Anda dapat menyertakan template {_metadata_dataset} sebagai placeholder yang diganti dengan nama set data/skema sumber (misalnya, {_metadata_dataset}).
deadLetterQueueDirectory Jalur file untuk menyimpan pesan yang belum diproses dengan alasan kegagalan pemrosesan. Defaultnya adalah direktori di bawah lokasi sementara tugas Dataflow. Nilai defaultnya cukup dalam sebagian besar kondisi.
outputStagingTableNameTemplate Opsional: Template untuk nama tabel staging. Defaultnya adalah {_metadata_table}_log. Jika Anda mereplikasi beberapa skema, {_metadata_schema}_{_metadata_table}_log yang disarankan.
outputTableNameTemplate Opsional: Template untuk nama tabel replika. Default, {_metadata_table}. Jika Anda mereplikasi beberapa skema, {_metadata_schema}_{_metadata_table} yang disarankan.
outputProjectId Opsional: Project untuk set data BigQuery yang menjadi tujuan output data. Default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
streamName Opsional: Nama atau template aliran data yang akan di-polling untuk mengetahui informasi skema. Default, {_metadata_stream}.
mergeFrequencyMinutes Opsional: Jumlah menit di antara penggabungan untuk tabel tertentu. Default, 5.
dlqRetryMinutes Opsional: Jumlah menit antara percobaan ulang antrean surat mati (DLQ). Default, 10.
javascriptTextTransformGcsPath Opsional: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName Opsional: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsi adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.
useStorageWriteApi Opsional: Jika true, pipeline akan menggunakan BigQuery Storage Write API. Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API.
useStorageWriteApiAtLeastOnce Opsional: Saat menggunakan Storage Write API, menentukan semantik penulisan. Untuk menggunakan semantik minimal satu kali, tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.
numStorageWriteApiStreams Opsional: Menentukan jumlah aliran operasi tulis saat menggunakan Storage Write API. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.
storageWriteApiTriggeringFrequencySec Opsional: Saat menggunakan Storage Write API, menentukan frekuensi pemicu, dalam hitungan detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.
applyMerge Opsional: Menentukan apakah template mengeksekusi pernyataan MERGE di BigQuery setelah mereplikasi data ke tabel staging. Default: true.
fileReadConcurrency Opsional: Jumlah file Datastream yang akan dibaca secara serentak. Default: 10.
mergeConcurrency Opsional: Jumlah pernyataan MERGE BigQuery serentak. Default: 30.
partitionRetentionDays Opsional: Jumlah hari yang digunakan untuk retensi partisi saat menjalankan pernyataan MERGE BigQuery. Default: 1.
rfcStartDateTime Opsional: Waktu mulai untuk membaca file dari Cloud Storage, sebagai nilai tanggal-waktu RFC 3339. Default: 1970-01-01T00:00:00.00Z.

Fungsi yang ditentukan pengguna

Anda juga dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: data CDC, diserialisasi sebagai string JSON.
  • Output: string JSON yang cocok dengan skema tabel tujuan BigQuery.
  • Menjalankan template

    Konsol

    1. Buka halaman Create job from template Dataflow.
    2. Buka Buat tugas dari template
    3. Di kolom Job name, masukkan nama pekerjaan yang unik.
    4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

      Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

    5. Dari menu drop-down Dataflow template, pilih the Datastream to BigQuery template.
    6. Di kolom parameter yang disediakan, masukkan parameter value Anda.
    7. Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal satu kali, pilih Minimal Sekali.
    8. Klik Run job.

    gcloud

    Di shell atau terminal Anda, jalankan template:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Ganti kode berikut:

    • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama pekerjaan unik pilihan Anda
    • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nama set data BigQuery Anda.
    • BIGQUERY_TABLE: template tabel BigQuery Anda. Misalnya, {_metadata_schema}_{_metadata_table}_log

    API

    Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Ganti kode berikut:

    • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama pekerjaan unik pilihan Anda
    • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nama set data BigQuery Anda.
    • BIGQUERY_TABLE: template tabel BigQuery Anda. Misalnya, {_metadata_schema}_{_metadata_table}_log

    Langkah selanjutnya