Pub/Sub ke MongoDB dengan template Python UDFs

Template Pub/Sub to MongoDB dengan Python UDF adalah pipeline streaming yang membaca pesan berenkode JSON dari langganan Pub/Sub dan menulisnya ke MongoDB sebagai dokumen. Jika diperlukan, pipeline ini mendukung transformasi tambahan yang dapat disertakan menggunakan fungsi yang ditentukan pengguna (UDF) Python.

Jika terjadi error saat memproses kumpulan data, template akan menuliskannya ke tabel BigQuery, bersama dengan pesan input. Misalnya, error mungkin terjadi karena ketidakcocokan skema, format JSON salah, atau saat mengeksekusi transform. Tentukan nama tabel di deadletterTable . Jika tabel tidak ada, pipeline akan otomatis membuatnya.

Persyaratan pipeline

  • Langganan Pub/Sub harus ada dan pesan harus dienkode dalam format JSON yang valid.
  • Cluster MongoDB harus ada dan harus dapat diakses dari mesin pekerja Dataflow.

Parameter template

Parameter Deskripsi
inputSubscription Nama langganan Pub/Sub. Contoh: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Daftar server MongoDB yang dipisahkan koma. Contoh: 192.285.234.12:27017,192.287.123.11:27017
database Database di MongoDB untuk menyimpan koleksi. Misalnya: my-db.
collection Nama koleksi dalam database MongoDB. Misalnya: my-collection.
deadletterTable Tabel BigQuery yang menyimpan pesan karena kegagalan (skema yang tidak cocok, format JSON salah, dll.). Misalnya: project-id:dataset-name.table-name.
pythonExternalTextTransformGcsPath Opsional: URI Cloud Storage dari file kode Python yang menentukan fungsi yang ditentukan pengguna (UDF) yang ingin Anda gunakan. Misalnya, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opsional: Nama fungsi yang ditentukan pengguna (UDF) Python yang ingin Anda gunakan.
batchSize Opsional: Ukuran batch yang digunakan untuk penyisipan batch dokumen ke dalam MongoDB. Default: 1000.
batchSizeBytes Opsional: Ukuran batch dalam byte. Default: 5242880.
maxConnectionIdleTime Opsional: Waktu tidak ada aktivitas maksimum yang diizinkan dalam detik sebelum waktu tunggu koneksi terjadi. Default: 60000.
sslEnabled Opsional: Nilai Boolean yang menunjukkan apakah koneksi ke MongoDB sudah mengaktifkan SSL. Default: true.
ignoreSSLCertificate Opsional: Nilai Boolean yang menunjukkan apakah sertifikat SSL harus diabaikan. Default: true.
withOrdered Opsional: Nilai Boolean yang memungkinkan penyisipan massal yang dipesan ke dalam MongoDB. Default: true.
withSSLInvalidHostNameAllowed Opsional: Nilai Boolean yang menunjukkan apakah nama host yang tidak valid diizinkan untuk koneksi SSL. Default: true.

Fungsi yang ditentukan pengguna

Atau, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen adalah diserialisasi sebagai string JSON. Untuk informasi selengkapnya, lihat Buat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: satu baris dari file CSV input.
  • Output: dokumen JSON string yang akan disisipkan ke dalam MongoDB.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Job name, masukkan nama tugas yang unik.
  4. Opsional: Untuk Regional endpoint, pilih nilai dari menu drop-down. Default region adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Pub/Sub to MongoDB with Python UDFs template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • REGION_NAME: wilayah tempat Anda ingin men-deploy tugas Dataflow Anda—misalnya, us-central1
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi {i>template<i} yang ingin Anda gunakan.

    Anda dapat menggunakan nilai berikut:

  • INPUT_SUBSCRIPTION: langganan Pub/Sub (misalnya, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: alamat server MongoDB (misalnya, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: nama database MongoDB (misalnya, users)
  • COLLECTION: nama koleksi MongoDB (misalnya, profiles)
  • UNPROCESSED_TABLE: nama tabel BigQuery (misalnya, your-project:your-dataset.your-table-name)

API

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • LOCATION: wilayah tempat Anda ingin men-deploy tugas Dataflow Anda—misalnya, us-central1
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi {i>template<i} yang ingin Anda gunakan.

    Anda dapat menggunakan nilai berikut:

  • INPUT_SUBSCRIPTION: langganan Pub/Sub (misalnya, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: alamat server MongoDB (misalnya, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: nama database MongoDB (misalnya, users)
  • COLLECTION: nama koleksi MongoDB (misalnya, profiles)
  • UNPROCESSED_TABLE: nama tabel BigQuery (misalnya, your-project:your-dataset.your-table-name)

Langkah selanjutnya