Template Pub/Sub Proto to BigQuery dengan UDF Python

Template Pub/Sub proto to BigQuery adalah pipeline streaming yang menyerap data proto dari langganan Pub/Sub ke dalam tabel BigQuery. Setiap error yang terjadi saat menulis ke tabel BigQuery di-streaming ke topik Pub/Sub yang belum diproses.

Fungsi yang ditentukan pengguna (UDF) Python dapat disediakan untuk mengubah data. Error saat menjalankan UDF dapat dikirim ke topik Pub/Sub terpisah atau topik yang sama yang belum diproses seperti error BigQuery.

Persyaratan pipeline

  • Langganan Pub/Sub input harus ada.
  • File skema untuk data Proto harus ada di Cloud Storage.
  • Topik Pub/Sub output harus ada.
  • Set data BigQuery output harus ada.
  • Jika ada, tabel BigQuery harus memiliki skema yang cocok dengan data proto, terlepas dari nilai createDisposition.

Parameter template

Parameter Deskripsi
protoSchemaPath Lokasi Cloud Storage file skema proto mandiri. Misalnya, gs://path/to/my/file.pb. File ini dapat dibuat dengan flag --descriptor_set_out dari perintah protoc. Flag --include_imports menjamin bahwa file bersifat mandiri.
fullMessageName Nama pesan proto lengkap. Misalnya, package.name.MessageName, dengan package.name adalah nilai yang disediakan untuk pernyataan package, bukan pernyataan java_package.
inputSubscription Langganan input Pub/Sub yang akan dibaca. Misalnya, projects/<project>/subscriptions/<subscription>.
outputTopic Topik Pub/Sub yang akan digunakan untuk data yang belum diproses. Misalnya, projects/<project-id>/topics/<topic-name>.
outputTableSpec Lokasi tabel output BigQuery. Misalnya, my-project:my_dataset.my_table. Bergantung pada createDisposition yang ditentukan, tabel output dapat dibuat secara otomatis menggunakan file skema input.
preserveProtoFieldNames Opsional: true untuk mempertahankan nama kolom Proto asli dalam JSON. false untuk menggunakan nama JSON yang lebih standar. Misalnya, false akan mengubah field_name menjadi fieldName. (Default: false)
bigQueryTableSchemaPath Opsional: Jalur Cloud Storage ke jalur skema BigQuery. Misalnya, gs://path/to/my/schema.json. Jika tidak disediakan, skema akan disimpulkan dari skema Proto.
pythonExternalTextTransformGcsPath Opsional: URI Cloud Storage dari file kode Python yang menentukan fungsi yang ditentukan pengguna (UDF) yang ingin Anda gunakan. Misalnya, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opsional: Nama fungsi yang ditentukan pengguna (UDF) Python yang ingin Anda gunakan.
udfOutputTopic Opsional: Topik Pub/Sub yang menyimpan error UDF. Misalnya, projects/<project-id>/topics/<topic-name>. Jika tidak diberikan, error UDF akan dikirim ke topik yang sama dengan outputTopic.
writeDisposition Opsional: WriteDisposition BigQuery. Misalnya, WRITE_APPEND, WRITE_EMPTY, atau WRITE_TRUNCATE. Default: WRITE_APPEND.
createDisposition Opsional: CreateDisposition BigQuery. Misalnya, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED.
useStorageWriteApi Opsional: Jika true, pipeline akan menggunakan BigQuery Storage Write API. Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API.
useStorageWriteApiAtLeastOnce Opsional: Saat menggunakan Storage Write API, menentukan semantik tulis. Untuk menggunakan semantik setidaknya satu kali, tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.
numStorageWriteApiStreams Opsional: Saat menggunakan Storage Write API, menentukan jumlah aliran tulis. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.
storageWriteApiTriggeringFrequencySec Opsional: Saat menggunakan Storage Write API, tentukan frekuensi pemicuan, dalam detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Fungsi yang ditentukan pengguna (UDF)

Secara opsional, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: kolom data pesan Pub/Sub, yang diserialisasi sebagai string JSON.
  • Output: string JSON yang cocok dengan skema tabel tujuan BigQuery.
  • Menjalankan template

    Konsol

    1. Buka halaman Create job from template Dataflow.
    2. Buka Buat tugas dari template
    3. Di kolom Nama tugas, masukkan nama tugas yang unik.
    4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

      Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

    5. Dari menu drop-down Dataflow template, pilih the Pub/Sub Proto to BigQuery with Python UDF template.
    6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
    7. Klik Run job.

    gcloud

    Di shell atau terminal, jalankan template:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Ganti kode berikut:

    • JOB_NAME: nama tugas unik pilihan Anda
    • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
    • VERSION: versi template yang ingin Anda gunakan

      Anda dapat menggunakan nilai berikut:

    • SCHEMA_PATH: jalur Cloud Storage ke file skema Proto (misalnya, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: nama pesan Proto (misalnya, package.name.MessageName)
    • SUBSCRIPTION_NAME: nama langganan input Pub/Sub
    • BIGQUERY_TABLE: nama tabel output BigQuery
    • UNPROCESSED_TOPIC: topik Pub/Sub yang akan digunakan untuk antrean yang belum diproses

    API

    Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Ganti kode berikut:

    • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama tugas unik pilihan Anda
    • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
    • VERSION: versi template yang ingin Anda gunakan

      Anda dapat menggunakan nilai berikut:

    • SCHEMA_PATH: jalur Cloud Storage ke file skema Proto (misalnya, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: nama pesan Proto (misalnya, package.name.MessageName)
    • SUBSCRIPTION_NAME: nama langganan input Pub/Sub
    • BIGQUERY_TABLE: nama tabel output BigQuery
    • UNPROCESSED_TOPIC: topik Pub/Sub yang akan digunakan untuk antrean yang belum diproses

    Langkah selanjutnya