Melakukan streaming pesan Pub/Sub melalui WebSockets


Tutorial ini menggambarkan cara aplikasi frontend—dalam hal ini, halaman web—untuk menangani data masuk dalam volume tinggi saat Anda menggunakan Google Cloud. Tutorial ini menjelaskan beberapa tantangan streaming bervolume tinggi. Contoh aplikasi disediakan dengan tutorial ini yang mengilustrasikan cara menggunakan WebSockets untuk memvisualisasikan aliran pesan yang padat yang dipublikasikan ke topik Pub/Sub, memprosesnya secara tepat waktu yang mempertahankan frontend berperforma tinggi.

Tutorial ini ditujukan untuk developer yang sudah terbiasa dengan komunikasi browser ke server melalui HTTP dan menulis aplikasi frontend menggunakan HTML, CSS, dan JavaScript. Tutorial ini mengasumsikan bahwa Anda memiliki pengalaman dengan Google Cloud dan memahami alat command line Linux.

Tujuan

  • Buat dan konfigurasikan instance virtual machine (VM) dengan komponen yang diperlukan untuk melakukan streaming payload langganan Pub/Sub ke klien browser.
  • Konfigurasikan proses di VM untuk berlangganan topik Pub/Sub dan menghasilkan output setiap pesan ke log.
  • Instal server web untuk menayangkan konten statis dan melakukan streaming output perintah shell ke klien WebSocket.
  • Visualisasikan agregasi streaming WebSocket dan setiap contoh pesan di browser menggunakan HTML, CSS, dan JavaScript.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloud yang dapat ditagih berikut: Google Cloud:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Buka Cloud Shell untuk menjalankan perintah yang tercantum dalam tutorial ini.

    BUKA Cloud Shell

    Anda menjalankan semua perintah terminal dalam tutorial ini dari Cloud Shell.

  7. Aktifkan Compute Engine API dan Pub/Sub API:
    gcloud services enable compute pubsub

Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Lihat Pembersihan untuk mengetahui detail selengkapnya.

Pengantar

Seiring semakin banyak aplikasi yang menggunakan model berbasis peristiwa, aplikasi frontend harus dapat membuat koneksi sederhana dan lancar ke layanan pesan yang membentuk dasar arsitektur ini.

Ada beberapa opsi untuk melakukan streaming data ke klien browser web; yang paling umum adalah WebSocket. Tutorial ini akan memandu Anda menginstal proses yang berlangganan streaming pesan yang dipublikasikan ke topik Pub/Sub, dan merutekan pesan tersebut melalui server web yang sedang dirutekan ke klien yang terhubung melalui WebSockets.

Untuk tutorial ini, Anda akan menggunakan topik Pub/Sub yang tersedia secara publik yang digunakan dalam NYC Taxi Tycoon Google Dataflow CodeLab. Topik ini memberi Anda aliran real-time simulasi telemetri taksi berdasarkan data perjalanan historis yang diambil di New York City dari set data catatan perjalanan Komisi Taksi & Limousine.

Arsitektur

Diagram berikut menunjukkan arsitektur tutorial yang Anda buat dalam tutorial ini.

Arsitektur tutorial

Diagram menunjukkan penayang pesan yang berada di luar project yang berisi resource Compute Engine; penayang mengirimkan pesan ke topik Pub/Sub. Instance Compute Engine menyediakan pesan melalui WebSocket ke browser yang menjalankan dasbor berdasarkan HTML5 dan JavaScript.

Tutorial ini menggunakan kombinasi alat untuk menjembatani Pub/Sub dan Websocket:

  • pulltop adalah program Node.js yang Anda instal sebagai bagian dari tutorial ini. Alat ini berlangganan topik Pub/Sub dan melakukan streaming pesan yang diterima ke output standar.
  • websocketd adalah alat command line kecil yang menggabungkan program antarmuka command line yang ada dan memungkinkannya diakses menggunakan WebSocket.

Dengan menggabungkan pulltop dan websocketd, Anda dapat memiliki pesan yang diterima dari topik Pub/Sub yang di-streaming ke browser menggunakan WebSockets.

Menyesuaikan throughput topik Pub/Sub

Topik Pub/Sub publik NYC Taxi Tycoon menghasilkan 2.000 hingga 2.500 simulasi update perjalanan taksi per detik—hingga 8 Mb atau lebih per detik. Kontrol aliran bawaan di Pub/Sub akan memperlambat kecepatan pesan pelanggan secara otomatis jika Pub/Sub mendeteksi antrean pesan yang tidak dikonfirmasi yang terus bertambah. Oleh karena itu, Anda mungkin melihat variabilitas kecepatan pesan yang tinggi di berbagai workstation, koneksi jaringan, dan kode pemrosesan front-end.

Pemrosesan pesan browser yang efektif

Mengingat volume pesan yang tinggi yang masuk melalui streaming WebSocket, Anda harus cermat dalam menulis kode frontend yang memproses streaming ini. Misalnya, Anda dapat membuat elemen HTML secara dinamis untuk setiap pesan. Namun, pada tingkat pesan yang diharapkan, memperbarui halaman untuk setiap pesan dapat mengunci jendela browser. Alokasi memori yang sering terjadi akibat pembuatan elemen HTML secara dinamis juga memperpanjang durasi pembersihan sampah memori, sehingga mengurangi pengalaman pengguna. Singkatnya, Anda tidak ingin memanggil document.createElement() untuk setiap pesan dari sekitar 2.000 pesan yang masuk setiap detik.

Pendekatan yang diambil oleh tutorial ini untuk mengelola aliran pesan yang padat ini adalah sebagai berikut:

  • Menghitung dan terus memperbarui serangkaian metrik streaming secara real time, yang menampilkan sebagian besar informasi tentang pesan yang diamati sebagai nilai gabungan.
  • Gunakan dasbor berbasis browser untuk memvisualisasikan sampel kecil dari setiap pesan sesuai jadwal yang telah ditentukan, yang hanya menampilkan peristiwa pengantaran dan pengambilan secara real time.

Gambar berikut menunjukkan dasbor yang dibuat sebagai bagian dari tutorial ini.

Dasbor yang dibuat di halaman web oleh kode dalam tutorial ini

Gambar ini menggambarkan latensi pesan terakhir sebesar 24 milidetik dengan kecepatan hampir 2.100 pesan per detik. Jika jalur kode penting untuk memproses setiap pesan tidak selesai tepat waktu, jumlah pesan yang diamati per detik akan menurun seiring meningkatnya latensi pesan terakhir. Pengambilan sampel perjalanan dilakukan menggunakan setInterval API JavaScript yang disetel untuk berputar sekali setiap tiga detik, yang mencegah frontend membuat elemen DOM dalam jumlah yang sangat besar selama masa aktifnya. (Sebagian besar dari peristiwa tersebut praktis tidak dapat diamati pada kecepatan lebih tinggi dari 10 per detik.)

Dasbor mulai memproses peristiwa di tengah streaming, sehingga perjalanan yang sedang berlangsung akan dikenali sebagai baru oleh dasbor, kecuali jika telah dilihat sebelumnya. Kode ini menggunakan array asosiatif untuk menyimpan setiap perjalanan yang diamati, yang diindeks oleh nilai ride_id, dan menghapus referensi ke perjalanan tertentu saat penumpang telah diturunkan. Perjalanan dalam status "enroute" atau "pickup" menambahkan referensi ke array tersebut, kecuali (untuk kasus "enroute") perjalanan telah diamati sebelumnya.

Menginstal dan mengonfigurasi server WebSocket

Untuk memulai, Anda membuat instance Compute Engine yang akan digunakan sebagai server WebSocket. Setelah membuat instance, Anda akan menginstal alat yang diperlukan nanti.

  1. Di Cloud Shell, tetapkan zona Compute Engine default. Contoh berikut menunjukkan us-central1-a, tetapi Anda dapat menggunakan zona apa pun yang Anda inginkan.

    gcloud config set compute/zone us-central1-a
    
  2. Buat instance Compute Engine bernama websocket-server di zona default:

    gcloud compute instances create websocket-server --tags wss
    
  3. Tambahkan aturan firewall yang mengizinkan traffic TCP di port 8000 ke instance mana pun yang diberi tag sebagai wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Jika Anda menggunakan project yang ada, pastikan port TCP 22 terbuka untuk mengizinkan konektivitas SSH ke instance.

    Secara default, aturan firewall default-allow-ssh diaktifkan di jaringan default. Namun, jika Anda atau administrator menghapus aturan default dalam project yang ada, port TCP 22 mungkin tidak terbuka. (Jika Anda membuat project baru untuk tutorial ini, aturan akan diaktifkan secara default, dan Anda tidak perlu melakukan apa pun.)

    Tambahkan aturan firewall yang mengizinkan traffic TCP di port 22 ke instance mana pun yang diberi tag sebagai wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Hubungkan ke instance menggunakan SSH:

    gcloud compute ssh websocket-server
    
  6. Pada perintah terminal instance, alihkan akun ke root agar Anda dapat menginstal software:

    sudo -s
    
  7. Instal alat git dan unzip:

    apt-get install -y unzip git
    
  8. Instal biner websocketd di instance:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Menginstal Node.js dan kode tutorial

  1. Di terminal pada instance, instal Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Download repositori sumber tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Ubah izin di pulltop untuk mengizinkan eksekusi:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instal dependensi pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Menguji apakah pulltop dapat membaca pesan

  1. Pada instance, jalankan pulltop terhadap topik publik:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Jika pulltop berfungsi, Anda akan melihat aliran hasil seperti berikut:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Tekan Ctrl+C untuk menghentikan streaming.

Menetapkan alur pesan ke websocketd

Setelah memastikan bahwa pulltop dapat membaca topik Pub/Sub, Anda dapat memulai proses websocketd untuk mulai mengirim pesan ke browser.

Merekam pesan topik ke file lokal

Untuk tutorial ini, Anda akan mengambil aliran pesan yang Anda dapatkan dari pulltop dan menulisnya ke file lokal. Merekam traffic pesan ke file lokal akan menambahkan persyaratan penyimpanan, tetapi juga memisahkan operasi proses websocketd dari pesan topik Pub/Sub streaming. Dengan mengambil informasi secara lokal, Anda dapat menghentikan sementara streaming Pub/Sub (mungkin untuk menyesuaikan parameter kontrol alur) tetapi tidak memaksa reset klien WebSocket yang saat ini terhubung. Saat aliran pesan dibuat ulang, websocketd akan otomatis melanjutkan streaming pesan ke klien.

  1. Pada instance, jalankan pulltop terhadap topik publik, dan alihkan output pesan ke file taxi.json lokal. Perintah nohup menginstruksikan OS untuk terus menjalankan proses pulltop jika Anda logout atau menutup terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifikasi bahwa pesan JSON sedang ditulis ke file:

    tail /var/tmp/taxi.json
    

    Jika pesan ditulis ke file taxi.json, output-nya akan mirip dengan yang berikut ini:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Ubah ke folder web aplikasi Anda:

    cd ../web
    
  4. Mulai websocketd untuk mulai melakukan streaming konten file lokal menggunakan WebSocket:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Tindakan ini akan menjalankan perintah websocketd di latar belakang. Alat websocketd menggunakan output perintah tail dan melakukan streaming setiap elemen sebagai pesan WebSocket.

  5. Periksa konten nohup.out untuk memverifikasi bahwa server dimulai dengan benar:

    tail nohup.out
    

    Jika semuanya berfungsi dengan benar, output-nya akan mirip dengan berikut ini:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Memvisualisasikan pesan

Setiap pesan perjalanan yang dipublikasikan ke topik Pub/Sub memiliki struktur seperti ini:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

Berdasarkan nilai ini, Anda menghitung beberapa metrik untuk header dasbor. Penghitungan dijalankan satu kali per peristiwa perjalanan masuk. Nilainya mencakup hal berikut:

  • Latensi pesan terakhir. Jumlah detik antara stempel waktu peristiwa perjalanan yang terakhir diamati dan waktu saat ini (berasal dari jam di sistem yang menghosting browser web).
  • Perjalanan aktif. Jumlah perjalanan yang sedang berlangsung. Jumlah ini dapat tumbuh dengan cepat, dan jumlahnya menurun saat nilai ride_status dropoff diamati.
  • Rasio pesan. Jumlah rata-rata peristiwa perjalanan yang diproses per detik.
  • Total jumlah terukur. Jumlah meter dari semua perjalanan aktif. Jumlah ini menurun seiring penumpang turun dari kendaraan.
  • Jumlah total penumpang. Jumlah penumpang di semua perjalanan. Jumlah ini akan berkurang seiring perjalanan yang selesai.
  • Jumlah rata-rata penumpang per perjalanan. Jumlah total perjalanan, dibagi dengan jumlah total penumpang.
  • Jumlah rata-rata yang dihitung meter per penumpang. Jumlah total yang diukur dibagi dengan jumlah total penumpang.

Selain metrik dan setiap sampel perjalanan, saat penumpang dijemput atau diturunkan, dasbor akan menampilkan notifikasi pemberitahuan di atas petak sampel perjalanan.

  1. Dapatkan alamat IP eksternal instance saat ini:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Salin alamat IP.

  3. Di komputer lokal, buka browser web baru dan masukkan URL:

    http://$ip-address:8000.

    Anda akan melihat halaman yang menampilkan dasbor untuk tutorial ini:

    Dasbor yang dibuat oleh kode dalam tutorial ini, dengan pesan selamat datang dan sebelum data ditampilkan.

  4. Klik ikon taksi di bagian atas untuk membuka koneksi ke streaming dan mulai memproses pesan.

    Setiap perjalanan divisualisasi dengan sampel sembilan perjalanan aktif yang dirender setiap tiga detik:

    Dasbor yang menampilkan perjalanan aktif.

    Anda dapat mengklik ikon taksi kapan saja untuk memulai atau menghentikan streaming WebSocket. Jika koneksi WebSocket terputus, ikon akan berubah menjadi merah, dan update pada metrik dan setiap perjalanan akan dihentikan. Untuk terhubung kembali, klik ikon taksi lagi.

Performa

Screenshot berikut menunjukkan monitor performa Alat Developer Chrome saat tab browser memproses sekitar 2.100 pesan per detik.

Panel pemantauan performa browser yang menampilkan penggunaan CPU, ukuran heap, node DOM, dan penghitungan ulang gaya per detik. Nilainya relatif datar.

Dengan pengiriman pesan yang terjadi pada latensi sekitar 30 md, penggunaan CPU rata-rata sekitar 80%. Penggunaan memori ditampilkan minimal 29 MB, dengan total 57 MB dialokasikan, dan bertambah dan berkurang secara bebas.

Pembersihan

Menghapus aturan firewall

Jika menggunakan project yang ada untuk tutorial ini, Anda dapat menghapus aturan firewall yang dibuat. Sebaiknya minimalkan port yang terbuka.

  1. Hapus aturan firewall yang Anda buat untuk mengizinkan TCP di port 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Jika Anda juga membuat aturan firewall untuk mengizinkan konektivitas SSH, hapus aturan firewall untuk mengizinkan TCP di port 22:

    gcloud compute firewall-rules delete wss-ssh
    

Menghapus project

Jika tidak ingin menggunakan project ini lagi, Anda dapat menghapus project tersebut.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Langkah selanjutnya