Template Pub/Sub Avro ke BigQuery

Template Pub/Sub Avro to BigQuery adalah pipeline streaming yang menyerap data Avro dari langganan Pub/Sub ke tabel BigQuery. Setiap error yang terjadi saat menulis ke tabel BigQuery akan di-streaming ke topik Pub/Sub yang belum diproses.

Persyaratan pipeline

  • Langganan Pub/Sub input harus ada.
  • File skema untuk data Avro harus ada di Cloud Storage.
  • Topik Pub/Sub yang belum diproses harus ada.
  • Set data BigQuery output harus ada.

Parameter template

Parameter Deskripsi
schemaPath Lokasi Cloud Storage file skema Avro. Misalnya, gs://path/to/my/schema.avsc.
inputSubscription Langganan input Pub/Sub untuk dibaca. Misalnya, projects/<project>/subscriptions/<subscription>.
outputTopic Topik Pub/Sub yang akan digunakan untuk kumpulan data yang belum diproses. Misalnya, projects/<project-id>/topics/<topic-name>.
outputTableSpec Lokasi tabel output BigQuery. Misalnya, <my-project>:<my-dataset>.<my-table>. Bergantung pada createDisposition yang ditentukan, tabel output dapat otomatis dibuat menggunakan skema Avro yang disediakan pengguna.
writeDisposition Opsional: BigQuery WriteDisposition. Misalnya, WRITE_APPEND, WRITE_EMPTY, atau WRITE_TRUNCATE. Default: WRITE_APPEND
createDisposition Opsional: BigQuery CreateDisposition. Misalnya, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED
useStorageWriteApi Opsional: Jika true, pipeline akan menggunakan BigQuery Storage Write API. Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API.
useStorageWriteApiAtLeastOnce Opsional: Saat menggunakan Storage Write API, menentukan semantik penulisan. Untuk menggunakan semantik minimal satu kali, tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.
numStorageWriteApiStreams Opsional: Menentukan jumlah aliran operasi tulis saat menggunakan Storage Write API. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.
storageWriteApiTriggeringFrequencySec Opsional: Saat menggunakan Storage Write API, menentukan frekuensi pemicu, dalam hitungan detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Job name, masukkan nama pekerjaan yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Pub/Sub Avro to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan parameter value Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Ganti kode berikut:

  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_PATH: jalur Cloud Storage ke file skema Avro (misalnya, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: nama langganan input Pub/Sub
  • BIGQUERY_TABLE: nama tabel output BigQuery
  • DEADLETTER_TOPIC: topik Pub/Sub yang akan digunakan untuk antrean yang belum diproses

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Ganti kode berikut:

  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_PATH: jalur Cloud Storage ke file skema Avro (misalnya, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: nama langganan input Pub/Sub
  • BIGQUERY_TABLE: nama tabel output BigQuery
  • DEADLETTER_TOPIC: topik Pub/Sub yang akan digunakan untuk antrean yang belum diproses

Langkah selanjutnya