Template Datastream ke BigQuery (Streaming)

Template Datastream ke BigQuery adalah pipeline streaming yang membaca data Datastream dan mereplikasinya ke BigQuery. Template ini membaca data dari Cloud Storage menggunakan notifikasi Pub/Sub dan mereplikasinya ke dalam tabel staging BigQuery yang dipartisi menurut waktu. Setelah replikasi, template akan menjalankan MERGE di BigQuery untuk memperbarui dan memasukkan semua perubahan data perubahan (CDC) ke dalam replika tabel sumber.

Template menangani pembuatan dan pembaruan tabel BigQuery yang dikelola oleh replikasi. Jika bahasa definisi data (DDL) diperlukan, callback ke Datastream akan mengekstrak skema tabel sumber dan menerjemahkannya ke dalam jenis data BigQuery. Operasi yang didukung mencakup:

  • Tabel baru dibuat saat data disisipkan.
  • Kolom baru ditambahkan ke tabel BigQuery dengan nilai awal null.
  • Kolom yang dihapus akan diabaikan di BigQuery dan nilai mendatang akan menjadi null.
  • Kolom yang diganti namanya akan ditambahkan ke BigQuery sebagai kolom baru.
  • Perubahan jenis tidak diterapkan ke BigQuery.

Sebaiknya jalankan pipeline ini menggunakan mode streaming setidaknya sekali, karena template ini melakukan penghapusan duplikat saat menggabungkan data dari tabel BigQuery sementara ke tabel BigQuery utama. Langkah ini dalam pipeline berarti tidak ada manfaat tambahan untuk menggunakan mode streaming exactly-once.

Persyaratan pipeline

  • Aliran Datastream yang siap atau sudah mereplikasi data.
  • Notifikasi Pub/Sub Cloud Storage diaktifkan untuk data Datastream.
  • Set data tujuan BigQuery dibuat dan Akun Layanan Compute Engine telah diberi akses administrator ke set data tersebut.
  • Kunci utama diperlukan di tabel sumber agar tabel replika tujuan dapat dibuat.
  • Database sumber MySQL atau Oracle. Database PostgreSQL dan SQL Server tidak didukung.

Parameter template

Parameter yang diperlukan

  • inputFilePattern : Lokasi file untuk output file Datastream di Cloud Storage, dalam format: gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat : Format file output yang dihasilkan oleh Datastream. Nilai dapat berupa 'avro' atau 'json'. Defaultnya adalah: avro.
  • gcsPubSubSubscription : Langganan Pub/Sub yang digunakan oleh Cloud Storage untuk memberi tahu Dataflow tentang file baru yang tersedia untuk diproses, dalam format: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate : Nama set data yang berisi tabel staging. Parameter ini mendukung template, misalnya {_metadata_dataset}_log atau my_dataset_log. Biasanya, parameter ini adalah nama set data. Default-nya adalah: {_metadata_dataset}.
  • outputDatasetTemplate : Nama set data yang berisi tabel replika. Parameter ini mendukung template, misalnya {_metadata_dataset} atau my_dataset. Biasanya, parameter ini adalah nama set data. Default-nya adalah: {_metadata_dataset}.
  • deadLetterQueueDirectory : Jalur yang digunakan Dataflow untuk menulis output antrean surat mati. Jalur ini tidak boleh berada di jalur yang sama dengan output file Datastream. Default-nya adalah kosong.

Parameter opsional

  • streamName : Nama atau template untuk aliran data yang akan melakukan polling untuk informasi skema. Default-nya adalah: {_metadata_stream}. Nilai default biasanya sudah cukup.
  • rfcStartDateTime : DateTime awal yang akan digunakan untuk mengambil data dari Cloud Storage (https://tools.ietf.org/html/rfc3339). Setelan defaultnya adalah: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency : Jumlah file DataStream serentak yang akan dibaca. Default-nya adalah 10.
  • outputProjectId : ID project Google Cloud yang berisi set data BigQuery untuk menghasilkan data. Nilai default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
  • outputStagingTableNameTemplate : Template yang akan digunakan untuk memberi nama tabel staging. Misalnya, {_metadata_table}). Default-nya adalah: {_metadata_table}_log.
  • outputTableNameTemplate : Template yang akan digunakan untuk nama tabel replika, misalnya {_metadata_table}. Default-nya adalah: {_metadata_table}.
  • ignoreFields : Kolom yang dipisahkan koma yang akan diabaikan di BigQuery. Default-nya adalah: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. (Contoh: _metadata_stream,_metadata_schema).
  • mergeFrequencyMinutes : Jumlah menit antara penggabungan untuk tabel tertentu. Defaultnya adalah: 5.
  • dlqRetryMinutes : Jumlah menit antara Percobaan Ulang DLQ. Setelan defaultnya adalah: 10.
  • dataStreamRootUrl : URL root Datastream API. Secara default ditetapkan ke: https://datastream.googleapis.com/.
  • applyMerge : Apakah akan menonaktifkan kueri MERGE untuk tugas. Defaultnya adalah: true.
  • mergeConcurrency : Jumlah kueri MERGE BigQuery serentak. Hanya efektif jika applyMerge disetel ke benar (true). Setelan defaultnya adalah: 30.
  • partitionRetentionDays : Jumlah hari yang akan digunakan untuk retensi partisi saat menjalankan penggabungan BigQuery. Setelan defaultnya adalah: 1.
  • useStorageWriteApiAtLeastOnce : Parameter ini hanya berlaku jika "Gunakan BigQuery Storage Write API" diaktifkan. Jika benar, semantik setidaknya satu kali digunakan untuk Storage Write API. Jika tidak, semantik tepat satu kali akan digunakan. Defaultnya adalah: false.
  • javascriptTextTransformGcsPath : URI Cloud Storage file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. (Contoh: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file diubah. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline berjalan, tanpa perlu memulai ulang tugas. Jika nilainya 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.
  • pythonTextTransformGcsPath : Pola jalur Cloud Storage untuk kode Python yang berisi fungsi yang ditentukan pengguna. (Contoh: gs://bucket-anda/transformasi-anda/*.py).
  • pythonRuntimeVersion : Versi runtime yang akan digunakan untuk UDF Python ini.
  • pythonTextTransformFunctionName : Nama fungsi yang akan dipanggil dari file JavaScript Anda. Hanya gunakan huruf, angka, dan garis bawah. (Contoh: transform_udf1).
  • runtimeRetries : Frekuensi runtime akan dicoba ulang sebelum gagal. Defaultnya adalah: 5.
  • useStorageWriteApi : Jika benar, pipeline akan menggunakan BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : Saat menggunakan Storage Write API, menentukan jumlah aliran tulis. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini. Setelan defaultnya adalah: 0.
  • storageWriteApiTriggeringFrequencySec : Saat menggunakan Storage Write API, menentukan frekuensi pemicuan, dalam detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Fungsi yang ditentukan pengguna (UDF)

Secara opsional, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: data CDC, yang diserialisasi sebagai string JSON.
  • Output: string JSON yang cocok dengan skema tabel tujuan BigQuery.
  • Menjalankan template

    Konsol

    1. Buka halaman Create job from template Dataflow.
    2. Buka Buat tugas dari template
    3. Di kolom Nama tugas, masukkan nama tugas yang unik.
    4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

      Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

    5. Dari menu drop-down Dataflow template, pilih the Datastream to BigQuery template.
    6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
    7. Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal sekali, pilih Minimal Sekali.
    8. Klik Run job.

    gcloud

    Di shell atau terminal, jalankan template:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Ganti kode berikut:

    • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama tugas unik pilihan Anda
    • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nama set data BigQuery Anda.
    • BIGQUERY_TABLE: template tabel BigQuery Anda. Misalnya, {_metadata_schema}_{_metadata_table}_log

    API

    Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Ganti kode berikut:

    • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama tugas unik pilihan Anda
    • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nama set data BigQuery Anda.
    • BIGQUERY_TABLE: template tabel BigQuery Anda. Misalnya, {_metadata_schema}_{_metadata_table}_log

    Langkah selanjutnya