Template Pub/Sub ke MongoDB

Template Pub/Sub ke MongoDB adalah pipeline streaming yang membaca pesan berenkode JSON dari langganan Pub/Sub dan menulisnya ke MongoDB sebagai dokumen. Jika diperlukan, pipeline ini mendukung transformasi tambahan yang dapat disertakan menggunakan fungsi yang ditentukan pengguna (UDF) JavaScript. Setiap error terjadi karena ketidakcocokan skema, format JSON yang salah, atau saat menjalankan transformasi, dicatat dalam tabel BigQuery untuk pesan yang belum diproses bersama dengan pesan input. Jika tabel untuk kumpulan data yang belum diproses tidak ada sebelum eksekusi, pipeline akan otomatis membuat tabel ini.

Persyaratan pipeline

  • Langganan Pub/Sub harus ada dan pesan harus dienkode dalam format JSON yang valid.
  • Cluster MongoDB harus ada dan harus dapat diakses dari mesin pekerja Dataflow.

Parameter template

Parameter Deskripsi
inputSubscription Nama langganan Pub/Sub. Contoh: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Daftar server MongoDB yang dipisahkan koma. Contoh: 192.285.234.12:27017,192.287.123.11:27017
database Database di MongoDB untuk menyimpan koleksi. Misalnya: my-db.
collection Nama koleksi di dalam database MongoDB. Misalnya: my-collection.
deadletterTable Tabel BigQuery yang menyimpan pesan karena kegagalan (skema yang tidak cocok, format JSON salah, dll.). Misalnya: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath Opsional: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName Opsional: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsi adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.
javascriptTextTransformReloadIntervalMinutes Opsional: Menentukan seberapa sering harus memuat ulang UDF, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow secara berkala akan memeriksa file UDF di Cloud Storage, dan memuat ulang UDF jika file diubah. Parameter ini memungkinkan Anda memperbarui UDF saat pipeline berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.
batchSize Opsional: Ukuran batch yang digunakan untuk penyisipan batch dokumen ke MongoDB. Default: 1000.
batchSizeBytes Opsional: Ukuran tumpukan dalam byte. Default: 5242880.
maxConnectionIdleTime Opsional: Waktu tidak ada aktivitas maksimum yang diizinkan dalam detik sebelum waktu koneksi habis. Default: 60000.
sslEnabled Opsional: Nilai Boolean yang menunjukkan apakah koneksi ke MongoDB telah diaktifkan atau tidak. Default: true.
ignoreSSLCertificate Opsional: Nilai Boolean yang menunjukkan apakah sertifikat SSL harus diabaikan. Default: true.
withOrdered Opsional: Nilai Boolean yang memungkinkan penyisipan massal yang dipesan ke MongoDB. Default: true.
withSSLInvalidHostNameAllowed Opsional: Nilai Boolean yang menunjukkan apakah nama host yang tidak valid diizinkan untuk koneksi SSL. Default: true.

Fungsi yang ditentukan pengguna

Anda juga dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: satu baris dari file CSV input.
  • Output: dokumen JSON string yang akan disisipkan ke MongoDB.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Job name, masukkan nama pekerjaan yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Pub/Sub to MongoDB template.
  6. Di kolom parameter yang disediakan, masukkan parameter value Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • INPUT_SUBSCRIPTION: langganan Pub/Sub (misalnya, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: alamat server MongoDB (misalnya, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: nama database MongoDB (misalnya, users)
  • COLLECTION: nama koleksi MongoDB (misalnya, profiles)
  • UNPROCESSED_TABLE: nama tabel BigQuery (misalnya your-project:your-dataset.your-table-name)

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • INPUT_SUBSCRIPTION: langganan Pub/Sub (misalnya, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: alamat server MongoDB (misalnya, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: nama database MongoDB (misalnya, users)
  • COLLECTION: nama koleksi MongoDB (misalnya, profiles)
  • UNPROCESSED_TABLE: nama tabel BigQuery (misalnya your-project:your-dataset.your-table-name)

Langkah selanjutnya