Template Streaming Data Generator ke Pub/Sub, BigQuery, dan Cloud Storage

Template Generator Data Streaming digunakan untuk menghasilkan data atau pesan sintetis dalam jumlah tidak terbatas atau tetap berdasarkan skema yang disediakan pengguna dengan kecepatan yang ditentukan. Tujuan yang kompatibel mencakup topik Pub/Sub, tabel BigQuery, dan bucket Cloud Storage.

Berikut adalah serangkaian beberapa kemungkinan kasus penggunaan:

  • Simulasikan publikasi peristiwa real-time skala besar ke topik Pub/Sub untuk mengukur dan menentukan jumlah serta ukuran konsumen yang diperlukan untuk memproses peristiwa yang dipublikasikan.
  • Buat data sintetis ke tabel BigQuery atau bucket Cloud Storage untuk mengevaluasi tolok ukur performa atau berfungsi sebagai bukti konsep.

Format sink dan encoding yang didukung

Tabel berikut menjelaskan sink dan format encoding yang didukung oleh template ini:
JSON Avro Parquet
Pub/Sub Ya Ya Tidak
BigQuery Ya Tidak Tidak
Cloud Storage Ya Ya Ya

Persyaratan pipeline

  • Akun layanan pekerja memerlukan peran yang ditetapkan Pekerja Dataflow (roles/dataflow.worker). Untuk mengetahui informasi selengkapnya, lihat Pengantar IAM.
  • Buat file skema yang berisi template JSON untuk data yang dihasilkan. Template ini menggunakan library JSON Data Generator, sehingga Anda dapat menyediakan berbagai fungsi faker untuk setiap kolom dalam skema. Untuk informasi selengkapnya, lihat dokumentasi json-data-generator.

    Contoh:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Upload file skema ke bucket Cloud Storage.
  • Target output harus ada sebelum eksekusi. Target harus berupa topik Pub/Sub, tabel BigQuery, atau bucket Cloud Storage, bergantung pada jenis sink.
  • Jika encoding output adalah Avro atau Parquet, buat file skema Avro dan simpan di lokasi Cloud Storage.
  • Tetapkan peran IAM tambahan ke akun layanan pekerja, bergantung pada tujuan yang diinginkan.
    Tujuan Peran IAM yang juga diperlukan Terapkan ke resource mana
    Pub/Sub Pub/Sub Publisher (roles/pubsub.publisher)
    (Untuk informasi selengkapnya, lihat Kontrol akses Pub/Sub dengan IAM)
    Topik Pub/Sub
    BigQuery BigQuery Data Editor (roles/bigquery.dataEditor)
    (Untuk informasi selengkapnya, lihat Kontrol akses BigQuery dengan IAM)
    Set data BigQuery
    Cloud Storage Cloud Storage Object Admin (roles/storage.objectAdmin)
    (Untuk informasi selengkapnya, lihat Kontrol akses Cloud Storage dengan IAM)
    Bucket Cloud Storage

Parameter template

Parameter Deskripsi
schemaLocation Lokasi file skema. Misalnya: gs://mybucket/filename.json.
qps Jumlah pesan yang akan dipublikasikan per detik. Misalnya: 100.
sinkType (Opsional) Jenis sink output. Nilai yang mungkin adalah PUBSUB, BIGQUERY, GCS. Default-nya adalah PUBSUB.
outputType (Opsional) Jenis encoding output. Nilai yang mungkin adalah JSON, AVRO, PARQUET. Default-nya adalah JSON.
avroSchemaLocation (Opsional) Lokasi file Skema AVRO. Wajib jika outputType adalah AVRO atau PARQUET. Contoh: gs://mybucket/filename.avsc.
topic (Opsional) Nama topik Pub/Sub yang akan dipublikasikan datanya oleh pipeline. Wajib jika sinkType adalah Pub/Sub. Contoh: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Opsional) Nama tabel BigQuery output. Wajib diisi jika sinkType adalah BigQuery. Contoh: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Opsional) BigQuery Write Disposition. Kemungkinan nilainya adalah WRITE_APPEND, WRITE_EMPTY, atau WRITE_TRUNCATE. Default-nya adalah WRITE_APPEND.
outputDeadletterTable (Opsional) Nama tabel BigQuery output untuk menyimpan data yang gagal. Jika tidak disediakan, pipeline akan membuat tabel selama eksekusi dengan nama {output_table_name}_error_records. Misalnya: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Opsional) Jalur lokasi Cloud Storage output. Wajib jika sinkType adalah Cloud Storage. Contoh: gs://mybucket/pathprefix/.
outputFilenamePrefix (Opsional) Awalan nama file dari file output yang ditulis ke Cloud Storage. Defaultnya adalah output-.
windowDuration (Opsional) Interval periode saat output ditulis ke Cloud Storage. Defaultnya adalah 1m (dengan kata lain, 1 menit).
numShards (Opsional) Jumlah maksimum shard output. Wajib jika sinkType adalah Cloud Storage dan harus disetel ke 1 atau angka yang lebih tinggi.
messagesLimit (Opsional) Jumlah maksimum pesan output. Defaultnya adalah 0 yang menunjukkan tidak terbatas.
autoscalingAlgorithm (Opsional) Algoritma yang digunakan untuk menskalakan otomatis pekerja. Nilai yang mungkin adalah THROUGHPUT_BASED untuk mengaktifkan penskalaan otomatis atau NONE untuk menonaktifkannya.
maxNumWorkers (Opsional) Jumlah maksimum mesin pekerja. Misalnya: 10.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Streaming Data Generator template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_LOCATION: jalur ke file skema di Cloud Storage. Contoh: gs://mybucket/filename.json.
  • QPS: jumlah pesan yang akan dipublikasikan per detik
  • PUBSUB_TOPIC: topik Pub/Sub output. Contoh: projects/my-project-id/topics/my-topic-id.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_LOCATION: jalur ke file skema di Cloud Storage. Contoh: gs://mybucket/filename.json.
  • QPS: jumlah pesan yang akan dipublikasikan per detik
  • PUBSUB_TOPIC: topik Pub/Sub output. Contoh: projects/my-project-id/topics/my-topic-id.

Langkah selanjutnya