Proto Pub/Sub ke BigQuery dengan template Python UDF

Template proto Pub/Sub ke BigQuery adalah pipeline streaming yang menyerap data proto dari langganan Pub/Sub ke tabel BigQuery. Setiap error yang terjadi saat menulis ke tabel BigQuery akan di-streaming ke topik Pub/Sub yang belum diproses.

Fungsi yang ditentukan pengguna (UDF) Python dapat disediakan untuk mengubah data. Error saat mengeksekusi UDF dapat dikirim ke topik Pub/Sub terpisah atau topik yang belum diproses yang sama dengan error BigQuery.

Persyaratan pipeline

  • Langganan Pub/Sub input harus ada.
  • File skema untuk data Proto harus ada di Cloud Storage.
  • Topik Pub/Sub output harus ada.
  • Set data BigQuery output harus ada.
  • Jika ada, tabel BigQuery harus memiliki skema yang cocok dengan data proto, terlepas dari nilai createDisposition.

Parameter template

Parameter Deskripsi
protoSchemaPath Lokasi Cloud Storage untuk file skema proto mandiri. Misalnya, gs://path/to/my/file.pb. File ini dapat dibuat dengan flag --descriptor_set_out dari perintah protoc. Flag --include_imports menjamin bahwa file tersebut bersifat mandiri.
fullMessageName Nama pesan proto lengkap. Misalnya, package.name.MessageName, dengan package.name sebagai nilai yang diberikan untuk pernyataan package, bukan pernyataan java_package.
inputSubscription Langganan input Pub/Sub untuk dibaca. Misalnya, projects/<project>/subscriptions/<subscription>.
outputTopic Topik Pub/Sub yang akan digunakan untuk kumpulan data yang belum diproses. Misalnya, projects/<project-id>/topics/<topic-name>.
outputTableSpec Lokasi tabel output BigQuery. Misalnya, my-project:my_dataset.my_table. Bergantung pada createDisposition yang ditentukan, tabel output mungkin akan otomatis dibuat menggunakan file skema input.
preserveProtoFieldNames Opsional: true untuk mempertahankan nama kolom Proto asli di JSON. false untuk menggunakan nama JSON yang lebih standar. Misalnya, false akan mengubah field_name menjadi fieldName. (Default: false)
bigQueryTableSchemaPath Opsional: Jalur Cloud Storage ke jalur skema BigQuery. Misalnya, gs://path/to/my/schema.json. Jika tidak diberikan, skema akan disimpulkan dari skema Proto.
pythonExternalTextTransformGcsPath Opsional: URI Cloud Storage dari file kode Python yang menentukan fungsi yang ditentukan pengguna (UDF) yang ingin Anda gunakan. Misalnya, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opsional: Nama fungsi yang ditentukan pengguna (UDF) Python yang ingin Anda gunakan.
udfOutputTopic Opsional: Topik Pub/Sub yang menyimpan error UDF. Misalnya: projects/<project-id>/topics/<topic-name>. Jika tidak diberikan, error UDF akan dikirim ke topik yang sama dengan outputTopic.
writeDisposition Opsional: WriteDisposition BigQuery. Misalnya, WRITE_APPEND, WRITE_EMPTY, atau WRITE_TRUNCATE. Default: WRITE_APPEND.
createDisposition Opsional: CreateDisposition BigQuery. Misalnya, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED.
useStorageWriteApi Opsional: Jika true, pipeline akan menggunakan BigQuery Storage Write API. Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API.
useStorageWriteApiAtLeastOnce Opsional: Saat menggunakan Storage Write API, menentukan semantik penulisan. Untuk menggunakan semantik minimal satu kali, tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.
numStorageWriteApiStreams Opsional: Menentukan jumlah aliran operasi tulis saat menggunakan Storage Write API. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.
storageWriteApiTriggeringFrequencySec Opsional: Saat menggunakan Storage Write API, menentukan frekuensi pemicu, dalam hitungan detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Fungsi yang ditentukan pengguna

Anda juga dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: kolom data pesan Pub/Sub, yang diserialisasi sebagai string JSON.
  • Output: string JSON yang cocok dengan skema tabel tujuan BigQuery.
  • Menjalankan template

    Konsol

    1. Buka halaman Create job from template Dataflow.
    2. Buka Buat tugas dari template
    3. Di kolom Job name, masukkan nama pekerjaan yang unik.
    4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

      Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

    5. Dari menu drop-down Dataflow template, pilih the Pub/Sub Proto to BigQuery with Python UDF template.
    6. Di kolom parameter yang disediakan, masukkan parameter value Anda.
    7. Klik Run job.

    gcloud

    Di shell atau terminal Anda, jalankan template:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Ganti kode berikut:

    • JOB_NAME: nama pekerjaan unik pilihan Anda
    • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
    • VERSION: versi template yang ingin Anda gunakan

      Anda dapat menggunakan nilai berikut:

    • SCHEMA_PATH: jalur Cloud Storage ke file skema Proto (misalnya, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: nama pesan Proto (misalnya, package.name.MessageName)
    • SUBSCRIPTION_NAME: nama langganan input Pub/Sub
    • BIGQUERY_TABLE: nama tabel output BigQuery
    • UNPROCESSED_TOPIC: topik Pub/Sub yang akan digunakan untuk antrean yang belum diproses

    API

    Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Ganti kode berikut:

    • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama pekerjaan unik pilihan Anda
    • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
    • VERSION: versi template yang ingin Anda gunakan

      Anda dapat menggunakan nilai berikut:

    • SCHEMA_PATH: jalur Cloud Storage ke file skema Proto (misalnya, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: nama pesan Proto (misalnya, package.name.MessageName)
    • SUBSCRIPTION_NAME: nama langganan input Pub/Sub
    • BIGQUERY_TABLE: nama tabel output BigQuery
    • UNPROCESSED_TOPIC: topik Pub/Sub yang akan digunakan untuk antrean yang belum diproses

    Langkah selanjutnya