Streaming pesan Pub/Sub melalui WebSockets


Tutorial ini menggambarkan cara bagi aplikasi frontend—dalam hal ini, halaman web—untuk menangani data yang masuk dalam volume tinggi saat Anda menggunakan Google Cloud. Tutorial ini menjelaskan beberapa tantangan streaming bervolume tinggi. Contoh aplikasi disediakan bersama tutorial ini yang menggambarkan cara menggunakan WebSockets untuk memvisualisasikan aliran pesan padat yang dipublikasikan ke topik Pub/Sub, memprosesnya secara tepat waktu guna mempertahankan frontend yang berperforma tinggi.

Tutorial ini ditujukan bagi developer yang terbiasa dengan komunikasi browser ke server melalui HTTP dan menulis aplikasi frontend menggunakan HTML, CSS, dan JavaScript. Tutorial ini mengasumsikan bahwa Anda memiliki pengalaman dengan Google Cloud dan sudah memahami alat command line Linux.

Tujuan

  • Buat dan konfigurasi instance mesin virtual (VM) dengan komponen yang diperlukan untuk menstreaming payload langganan Pub/Sub ke klien browser.
  • Konfigurasikan proses di VM untuk berlangganan topik Pub/Sub dan menampilkan setiap pesan ke log.
  • Instal server web untuk menyajikan konten statis dan untuk melakukan streaming output perintah shell ke klien WebSocket.
  • Visualisasikan agregasi aliran WebSocket dan contoh pesan individual di browser menggunakan HTML, CSS, dan JavaScript.

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  5. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  6. Buka Cloud Shell untuk menjalankan perintah yang tercantum dalam tutorial ini.

    BUKA Cloud Shell

    Anda menjalankan semua perintah terminal dalam tutorial ini dari Cloud Shell.

  7. Aktifkan Compute Engine API dan Pub/Sub API:
    gcloud services enable compute pubsub

Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Lihat Pembersihan untuk mengetahui detail selengkapnya.

Pengantar

Karena semakin banyak aplikasi yang mengadopsi model berbasis peristiwa, aplikasi frontend harus dapat membuat koneksi yang sederhana dan lancar dengan layanan pesan yang menjadi landasan arsitektur ini.

Ada beberapa opsi untuk streaming data ke klien browser web; yang paling umum adalah WebSockets. Tutorial ini akan memandu Anda menginstal proses yang berlangganan ke aliran pesan yang dipublikasikan ke topik Pub/Sub, dan mengarahkan pesan tersebut melalui server web yang dirutekan ke klien yang terhubung melalui WebSockets.

Untuk tutorial ini, Anda akan menggunakan topik Pub/Sub yang tersedia untuk umum dan digunakan dalam NYC Taxi Tycoon Google Dataflow CodeLab. Topik ini memberi Anda aliran real time simulasi telemetri taksi berdasarkan data historis perjalanan yang diambil di New York City dari set data perjalanan Taxi & Limousine Commission.

Arsitektur

Diagram berikut menunjukkan arsitektur tutorial yang Anda buat dalam tutorial ini.

Arsitektur tutorial

Diagram menunjukkan penayang pesan yang berada di luar project yang berisi resource Compute Engine; penayang mengirim pesan ke topik Pub/Sub. Instance Compute Engine membuat pesan tersedia melalui WebSockets ke browser yang menjalankan dasbor berdasarkan HTML5 dan JavaScript.

Tutorial ini menggunakan kombinasi alat untuk menjembatani Pub/Sub dan Websockets:

  • pulltop adalah program Node.js yang Anda instal sebagai bagian dari tutorial ini. Alat ini berlangganan topik Pub/Sub dan mengalirkan pesan yang diterima ke output standar.
  • websocketd adalah alat command line kecil yang menggabungkan program antarmuka command line yang ada dan memungkinkannya diakses menggunakan WebSocket.

Dengan menggabungkan pulltop dan websocketd, Anda dapat memiliki pesan yang diterima dari topik Pub/Sub yang di-streaming ke browser menggunakan WebSockets.

Menyesuaikan throughput topik Pub/Sub

Topik Pub/Sub publik NYC Taxi Tycoon menghasilkan 2.000 hingga 2.500 update simulasi naik taksi per detik—hingga 8 Mb atau lebih per detik. Kontrol alur bawaan di Pub/Sub akan otomatis memperlambat rasio pesan pelanggan jika Pub/Sub mendeteksi bertambahnya antrean pesan yang tidak dikonfirmasi. Oleh karena itu, Anda mungkin melihat variabilitas tingkat pesan yang tinggi di berbagai workstation, koneksi jaringan, dan kode pemrosesan front-end.

Pemrosesan pesan browser yang efektif

Mengingat tingginya volume pesan yang masuk melalui aliran data WebSocket, Anda harus berhati-hati dalam menulis kode frontend yang memproses aliran data ini. Misalnya, Anda dapat membuat elemen HTML secara dinamis untuk setiap pesan. Namun, dengan kecepatan pesan yang diharapkan, mengupdate halaman untuk setiap pesan dapat mengunci jendela browser. Alokasi memori yang sering sebagai akibat dari pembuatan elemen HTML secara dinamis juga memperpanjang durasi pembersihan sampah memori, sehingga menurunkan pengalaman pengguna. Singkatnya, Anda tidak perlu memanggil document.createElement() untuk setiap dari sekitar 2.000 pesan yang masuk setiap detik.

Pendekatan yang diambil oleh tutorial ini untuk mengelola aliran pesan yang padat ini adalah sebagai berikut:

  • Hitung dan perbarui kumpulan metrik streaming secara real time, yang menampilkan sebagian besar informasi tentang pesan yang diamati sebagai nilai agregat.
  • Gunakan dasbor berbasis browser untuk memvisualisasikan sampel kecil setiap pesan pada jadwal yang telah ditentukan, dan hanya menampilkan peristiwa pengantaran dan pengambilan secara real time.

Gambar berikut menampilkan dasbor yang dibuat sebagai bagian dari tutorial ini.

Dasbor yang dibuat di halaman web oleh kode dalam tutorial ini

Gambar ini menunjukkan latensi pesan terakhir selama 24 milidetik dengan kecepatan hampir 2.100 pesan per detik. Jika jalur kode penting untuk memproses setiap pesan tidak selesai tepat waktu, jumlah pesan yang diamati per detik akan berkurang seiring meningkatnya latensi pesan terakhir. Pengambilan sampel perjalanan dilakukan menggunakan JavaScript setInterval API yang disetel untuk siklus sekali setiap tiga detik, yang mencegah frontend membuat sejumlah besar elemen DOM selama masa aktifnya. (Mayoritasnya hampir tidak dapat diamati dengan kecepatan lebih dari 10 per detik.)

Dasbor mulai memproses peristiwa di tengah streaming, sehingga perjalanan yang sedang berlangsung akan dikenali sebagai peristiwa baru oleh dasbor, kecuali jika telah dilihat sebelumnya. Kode ini menggunakan array asosiatif untuk menyimpan setiap perjalanan yang diamati, yang diindeks oleh nilai ride_id, dan menghapus referensi ke perjalanan tertentu saat penumpang diturunkan. Perjalanan dalam status "dalam perjalanan" atau "pengambilan" menambahkan referensi ke array tersebut kecuali jika (untuk kasus "dalam perjalanan") perjalanan telah diamati sebelumnya.

Menginstal dan mengonfigurasi server WebSocket

Untuk memulai, buat instance Compute Engine yang akan Anda gunakan sebagai server WebSocket. Setelah membuat instance, Anda menginstal alat pada instance yang Anda perlukan nanti.

  1. Di Cloud Shell, tetapkan zona Compute Engine default. Contoh berikut menunjukkan us-central1-a, tetapi Anda dapat menggunakan zona apa pun yang Anda inginkan.

    gcloud config set compute/zone us-central1-a
    
  2. Buat instance Compute Engine bernama websocket-server di zona default:

    gcloud compute instances create websocket-server --tags wss
    
  3. Tambahkan aturan firewall yang mengizinkan traffic TCP pada port 8000 ke instance mana pun yang diberi tag sebagai wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Jika Anda menggunakan project yang sudah ada, pastikan port TCP 22 terbuka untuk mengizinkan konektivitas SSH ke instance.

    Secara default, aturan firewall default-allow-ssh diaktifkan di jaringan default. Namun, jika Anda atau administrator menghapus aturan default dalam project yang ada, port TCP 22 mungkin tidak terbuka. (Jika Anda membuat project baru untuk tutorial ini, aturan tersebut diaktifkan secara default, dan Anda tidak perlu melakukan apa pun.)

    Tambahkan aturan firewall yang mengizinkan traffic TCP pada port 22 ke instance mana pun yang diberi tag sebagai wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Hubungkan ke instance menggunakan SSH:

    gcloud compute ssh websocket-server
    
  6. Dengan perintah terminal instance, ganti akun ke root agar Anda dapat menginstal software:

    sudo -s
    
  7. Instal alat git dan unzip:

    apt-get install -y unzip git
    
  8. Instal biner websocketd pada instance:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Menginstal Node.js dan kode tutorial

  1. Di terminal pada instance, instal Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Download repositori sumber tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Ubah izin di pulltop untuk mengizinkan eksekusi:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instal dependensi pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Menguji apakah pulltop dapat membaca pesan

  1. Pada instance, jalankan pulltop terhadap topik publik:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Jika pulltop berfungsi, Anda akan melihat aliran hasil seperti berikut:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Tekan Ctrl+C untuk menghentikan streaming.

Tetapkan alur pesan ke websocketd

Setelah memastikan bahwa pulltop dapat membaca topik Pub/Sub, Anda dapat memulai proses websocketd untuk mulai mengirim pesan ke browser.

Mengambil pesan topik ke file lokal

Untuk tutorial ini, Anda akan merekam aliran pesan yang didapatkan dari pulltop dan menulisnya ke file lokal. Merekam traffic pesan ke file lokal akan menambahkan persyaratan penyimpanan, tetapi juga memisahkan operasi proses websocketd dari pesan topik Pub/Sub streaming. Merekam informasi secara lokal memungkinkan skenario saat Anda mungkin ingin menghentikan streaming Pub/Sub untuk sementara (mungkin untuk menyesuaikan parameter kontrol alur), tetapi tidak memaksa reset klien WebSocket yang saat ini terhubung. Saat aliran pesan aktif kembali, websocketd akan otomatis melanjutkan streaming pesan ke klien.

  1. Pada instance, jalankan pulltop terhadap topik publik, dan alihkan output pesan ke file taxi.json lokal. Perintah nohup menginstruksikan OS agar proses pulltop tetap berjalan jika Anda logout atau menutup terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Pastikan bahwa pesan JSON sedang ditulis ke file:

    tail /var/tmp/taxi.json
    

    Jika pesan ditulis ke file taxi.json, output-nya akan mirip dengan berikut ini:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Ubah ke folder web aplikasi Anda:

    cd ../web
    
  4. Mulai websocketd untuk mulai menstreaming konten file lokal menggunakan WebSockets:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Tindakan ini menjalankan perintah websocketd di latar belakang. Alat websocketd menggunakan output perintah tail dan mengalirkan setiap elemen sebagai pesan WebSocket.

  5. Periksa konten nohup.out untuk memverifikasi bahwa server dimulai dengan benar:

    tail nohup.out
    

    Jika semuanya berfungsi dengan baik, output-nya akan terlihat seperti berikut:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Memvisualisasikan pesan

Setiap pesan transportasi online yang dipublikasikan ke topik Pub/Sub memiliki struktur seperti ini:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

Berdasarkan nilai tersebut, Anda menghitung beberapa metrik untuk header dasbor. Perhitungan dijalankan sekali per peristiwa perjalanan masuk. Nilainya mencakup hal berikut:

  • Latensi pesan terakhir. Jumlah detik antara stempel waktu stempel waktu peristiwa perjalanan terakhir yang diamati dan waktu saat ini (diambil dari jam di sistem yang menghosting browser web).
  • Perjalanan aktif. Jumlah perjalanan yang sedang berlangsung. Angka ini dapat bertambah dengan cepat, dan angka tersebut menurun saat nilai ride_status dropoff diamati.
  • Tarif pesan. Jumlah rata-rata peristiwa berkendara yang diproses per detik.
  • Total jumlah yang diukur. Jumlah meter dari semua perjalanan yang aktif. Jumlah ini menurun saat perjalanan dihentikan.
  • Jumlah total penumpang. Jumlah penumpang di semua perjalanan. Jumlah ini menurun saat perjalanan selesai.
  • Jumlah rata-rata penumpang per perjalanan. Total jumlah perjalanan, dibagi dengan total jumlah penumpang.
  • Jumlah berbayar rata-rata per penumpang. Total jumlah yang diukur dibagi dengan total jumlah penumpang.

Selain metrik dan masing-masing contoh perjalanan, saat penumpang dijemput atau diantar, dasbor akan menampilkan notifikasi peringatan di atas petak sampel perjalanan.

  1. Dapatkan alamat IP eksternal instance saat ini:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Salin alamat IP.

  3. Di komputer lokal, buka browser web baru dan masukkan URL:

    http://$ip-address:8000.

    Anda akan melihat halaman yang menampilkan dasbor untuk tutorial ini:

    Dasbor yang dibuat oleh kode dalam tutorial ini, dengan pesan selamat datang dan sebelum data apa pun ditampilkan.

  4. Klik ikon taksi di bagian atas untuk membuka koneksi ke streaming dan mulai memproses pesan.

    Setiap perjalanan divisualisasikan dengan sampel sembilan perjalanan aktif yang dirender setiap tiga detik:

    Dasbor menampilkan perjalanan aktif.

    Anda dapat mengklik ikon taksi kapan saja untuk memulai atau menghentikan aliran WebSocket. Jika koneksi WebSocket terputus, ikon akan berubah menjadi merah, dan pembaruan pada metrik serta masing-masing perjalanan akan dihentikan. Untuk menghubungkan kembali, klik ikon taksi lagi.

Performa

Screenshot berikut menunjukkan pemantauan performa Chrome Developer Tools saat tab browser memproses sekitar 2.100 pesan per detik.

Panel pemantauan performa browser yang menampilkan penggunaan CPU, ukuran heap, node DOM, dan rekalkulasi gaya per detik. Nilainya relatif datar.

Dengan pengiriman pesan yang terjadi pada latensi sekitar 30 md, penggunaan CPU rata-rata sekitar 80%. Pemakaian memori ditampilkan minimal 29 MB, dengan total 57 MB yang dialokasikan, serta tumbuh dan menyusut dengan bebas.

Pembersihan

Hapus aturan firewall

Jika menggunakan project yang ada untuk tutorial ini, Anda dapat menghapus aturan firewall yang dibuat. Praktik yang baik adalah meminimalkan porta terbuka.

  1. Hapus aturan firewall yang Anda buat untuk mengizinkan TCP pada port 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Jika Anda juga membuat aturan firewall untuk mengizinkan konektivitas SSH, hapus aturan firewall untuk mengizinkan TCP pada port 22:

    gcloud compute firewall-rules delete wss-ssh
    

Menghapus project

Jika tidak ingin menggunakan project ini lagi, Anda dapat menghapus project tersebut.

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Langkah selanjutnya