Generator Data Streaming ke template Pub/Sub, BigQuery, dan Cloud Storage

Template Generator Data Streaming digunakan untuk membuat pesan atau data sintetis dalam jumlah tidak terbatas atau tetap berdasarkan skema yang disediakan pengguna pada kecepatan yang ditentukan. Tujuan yang kompatibel mencakup topik Pub/Sub, tabel BigQuery, dan bucket Cloud Storage.

Berikut ini adalah serangkaian beberapa kasus penggunaan yang mungkin terjadi:

  • Simulasikan publikasi peristiwa real-time berskala besar ke topik Pub/Sub untuk mengukur dan menentukan jumlah serta ukuran konsumen yang diperlukan untuk memproses peristiwa yang dipublikasikan.
  • Membuat data sintetis ke tabel BigQuery atau bucket Cloud Storage untuk mengevaluasi tolok ukur performa atau berfungsi sebagai bukti konsep.

Sink dan format encoding yang didukung

Tabel berikut menjelaskan format sink dan encoding mana yang didukung oleh template ini:
JSON Avro Parquet
Pub/Sub Ya Ya Tidak
BigQuery Ya Tidak Tidak
Cloud Storage Ya Ya Ya

Persyaratan pipeline

  • Akun layanan pekerja memerlukan peran Dataflow Worker (roles/dataflow.worker). Untuk mengetahui informasi selengkapnya, lihat Pengantar IAM.
  • Buat file skema yang berisi template JSON untuk data yang dihasilkan. Template ini menggunakan library Generator Data JSON, sehingga Anda dapat menyediakan berbagai fungsi palsu untuk setiap kolom dalam skema. Untuk mengetahui informasi selengkapnya, lihat dokumentasi json-data-generator.

    Contoh:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Upload file skema ke bucket Cloud Storage.
  • Target output harus ada sebelum eksekusi. Target tersebut harus berupa topik Pub/Sub, tabel BigQuery, atau bucket Cloud Storage, bergantung pada jenis sink.
  • Jika encoding output adalah Avro atau Parquet, buat file skema Avro dan simpan di lokasi Cloud Storage.
  • Tetapkan peran IAM tambahan untuk akun layanan pekerja, bergantung pada tujuan yang diinginkan.
    Tujuan Selain itu, peran IAM yang diperlukan Terapkan ke resource mana
    Pub/Sub Penerbit Pub/Sub (roles/pubsub.publisher)
    (Untuk mengetahui informasi selengkapnya, lihat Kontrol akses Pub/Sub dengan IAM)
    Topik Pub/Sub
    BigQuery Editor Data BigQuery (roles/bigquery.dataEditor)
    (Untuk mengetahui informasi selengkapnya, lihat Kontrol akses BigQuery dengan IAM)
    Set data BigQuery
    Cloud Storage Admin Objek Cloud Storage (roles/storage.objectAdmin)
    (Untuk mengetahui informasi selengkapnya, lihat Kontrol akses Cloud Storage dengan IAM)
    Bucket Cloud Storage

Parameter template

Parameter Deskripsi
schemaLocation Lokasi file skema. Misalnya: gs://mybucket/filename.json.
qps Jumlah pesan yang akan dipublikasikan per detik. Misalnya: 100.
sinkType (Opsional) Jenis sink output. Nilai yang mungkin adalah PUBSUB, BIGQUERY, GCS. Defaultnya adalah PUBSUB.
outputType (Opsional) Jenis encoding output. Nilai yang mungkin adalah JSON, AVRO, PARQUET. Defaultnya adalah JSON.
avroSchemaLocation (Opsional) Lokasi file AVRO Schema. Wajib jika outputType adalah AVRO atau PARQUET. Contoh: gs://mybucket/filename.avsc.
topic (Opsional) Nama topik Pub/Sub tempat pipeline harus memublikasikan data. Wajib jika sinkType adalah Pub/Sub. Contoh: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Opsional) Nama tabel BigQuery output. Wajib jika sinkType adalah BigQuery. Contoh: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Opsional) Disposisi Tulis BigQuery. Nilai yang mungkin adalah WRITE_APPEND, WRITE_EMPTY, atau WRITE_TRUNCATE. Defaultnya adalah WRITE_APPEND.
outputDeadletterTable (Opsional) Nama tabel BigQuery output untuk menyimpan data yang gagal. Jika tidak disediakan, pipeline akan membuat tabel selama eksekusi dengan nama {output_table_name}_error_records. Misalnya: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Opsional) Jalur lokasi Cloud Storage output. Wajib jika sinkType adalah Cloud Storage. Contoh: gs://mybucket/pathprefix/.
outputFilenamePrefix (Opsional) Awalan nama file dari file output yang ditulis ke Cloud Storage. Defaultnya adalah output-.
windowDuration (Opsional) Interval jendela saat output ditulis ke Cloud Storage. Defaultnya adalah 1m (dengan kata lain, 1 menit).
numShards (Opsional) Jumlah maksimum shard output. Wajib jika sinkType adalah Cloud Storage dan harus ditetapkan ke 1 angka atau lebih tinggi.
messagesLimit (Opsional) Jumlah maksimum pesan output. Defaultnya adalah 0 yang menunjukkan tak terbatas.
autoscalingAlgorithm (Opsional) Algoritme yang digunakan untuk menskalakan pekerja. Nilai yang memungkinkan adalah THROUGHPUT_BASED untuk mengaktifkan penskalaan otomatis atau NONE untuk menonaktifkan.
maxNumWorkers (Opsional) Jumlah maksimum mesin pekerja. Misalnya: 10.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Job name, masukkan nama pekerjaan yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Streaming Data Generator template.
  6. Di kolom parameter yang disediakan, masukkan parameter value Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_LOCATION: jalur ke file skema di Cloud Storage. Contoh: gs://mybucket/filename.json.
  • QPS: jumlah pesan yang akan dipublikasikan per detik
  • PUBSUB_TOPIC: topik Pub/Sub output. Contoh: projects/my-project-id/topics/my-topic-id.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_LOCATION: jalur ke file skema di Cloud Storage. Contoh: gs://mybucket/filename.json.
  • QPS: jumlah pesan yang akan dipublikasikan per detik
  • PUBSUB_TOPIC: topik Pub/Sub output. Contoh: projects/my-project-id/topics/my-topic-id.

Langkah selanjutnya