Menggabungkan data streaming dengan Dataflow SQL


Tutorial ini menunjukkan cara menggunakan Dataflow SQL untuk menggabungkan aliran data dari Pub/Sub dengan data dari tabel BigQuery.

Tujuan

Dalam tutorial ini, Anda telah:

  • Menulis kueri Dataflow SQL yang menggabungkan data streaming Pub/Sub dengan data tabel BigQuery.
  • Deploy tugas Dataflow dari UI Dataflow SQL.

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Aktifkan API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, dan Data Catalog. .

    Mengaktifkan API

  5. Buat akun layanan:

    1. Di konsol Google Cloud, buka halaman Buat akun layanan.

      Buka Create service account
    2. Pilih project Anda.
    3. Di kolom Nama akun layanan, masukkan nama. Konsol Google Cloud akan mengisi kolom ID akun layanan berdasarkan nama ini.

      Di kolom Deskripsi akun layanan, masukkan sebuah deskripsi. Sebagai contoh, Service account for quickstart.

    4. Klik Buat dan lanjutkan.
    5. Berikan peran Project > Owner ke akun layanan.

      Untuk memberikan peran, temukan daftar Pilih peran, lalu pilih Project > Owner.

    6. Klik Lanjutkan.
    7. Klik Selesai untuk menyelesaikan pembuatan akun layanan.

      Jangan tutup jendela browser Anda. Anda akan menggunakannya pada langkah berikutnya.

  6. Membuat kunci akun layanan:

    1. Di konsol Google Cloud, klik alamat email untuk akun layanan yang telah dibuat.
    2. Klik Kunci.
    3. Klik Tambahkan kunci, lalu klik Buat kunci baru.
    4. Klik Create. File kunci JSON akan didownload ke komputer Anda.
    5. Klik Close.
  7. Tetapkan variabel lingkungan GOOGLE_APPLICATION_CREDENTIALS ke jalur file JSON yang berisi kredensial Anda. Variabel ini hanya berlaku untuk sesi shell Anda saat ini. Jadi, jika Anda membuka sesi baru, tetapkan variabel kembali.

  8. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  9. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  10. Aktifkan API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, dan Data Catalog. .

    Mengaktifkan API

  11. Buat akun layanan:

    1. Di konsol Google Cloud, buka halaman Buat akun layanan.

      Buka Create service account
    2. Pilih project Anda.
    3. Di kolom Nama akun layanan, masukkan nama. Konsol Google Cloud akan mengisi kolom ID akun layanan berdasarkan nama ini.

      Di kolom Deskripsi akun layanan, masukkan sebuah deskripsi. Sebagai contoh, Service account for quickstart.

    4. Klik Buat dan lanjutkan.
    5. Berikan peran Project > Owner ke akun layanan.

      Untuk memberikan peran, temukan daftar Pilih peran, lalu pilih Project > Owner.

    6. Klik Lanjutkan.
    7. Klik Selesai untuk menyelesaikan pembuatan akun layanan.

      Jangan tutup jendela browser Anda. Anda akan menggunakannya pada langkah berikutnya.

  12. Membuat kunci akun layanan:

    1. Di konsol Google Cloud, klik alamat email untuk akun layanan yang telah dibuat.
    2. Klik Kunci.
    3. Klik Tambahkan kunci, lalu klik Buat kunci baru.
    4. Klik Create. File kunci JSON akan didownload ke komputer Anda.
    5. Klik Close.
  13. Tetapkan variabel lingkungan GOOGLE_APPLICATION_CREDENTIALS ke jalur file JSON yang berisi kredensial Anda. Variabel ini hanya berlaku untuk sesi shell Anda saat ini. Jadi, jika Anda membuka sesi baru, tetapkan variabel kembali.

  14. Menginstal dan melakukan inisialisasi pada gcloud CLI. Pilih salah satu opsi penginstalan. Anda mungkin perlu menetapkan properti project ke project yang digunakan untuk panduan ini.
  15. Buka UI web Dataflow SQL di Konsol Google Cloud. Tindakan ini akan membuka project yang terakhir Anda akses. Untuk beralih ke project lain, klik nama project di bagian atas UI web Dataflow SQL, lalu telusuri project yang ingin Anda gunakan.
    Buka UI web Dataflow SQL

Membuat sumber contoh

Jika Anda ingin mengikuti contoh yang diberikan dalam tutorial ini, buat sumber berikut dan gunakan dalam langkah-langkah tutorial.

  • Topik Pub/Sub bernama transactions - Aliran data transaksi yang masuk melalui langganan ke topik Pub/Sub. Data untuk setiap transaksi mencakup informasi seperti produk yang dibeli, harga promo, serta kota dan negara bagian tempat pembelian terjadi. Setelah membuat topik Pub/Sub, buat skrip yang memublikasikan pesan ke topik Anda. Anda akan menjalankan skrip ini di bagian selanjutnya dari tutorial ini.
  • Tabel BigQuery bernama us_state_salesregions - Tabel yang menyediakan pemetaan status ke wilayah penjualan. Sebelum membuat tabel ini, Anda perlu membuat {i>dataset<i} BigQuery.

Menetapkan skema ke topik Pub/Sub

Dengan menetapkan skema, Anda dapat menjalankan kueri SQL pada data topik Pub/Sub. Saat ini, Dataflow SQL mengharapkan pesan dalam topik Pub/Sub diserialisasi dalam format JSON.

Untuk menetapkan skema ke contoh topik Pub/Sub transactions:

  1. Buat file teks dan beri nama transactions_schema.yaml. Salin dan tempel teks skema berikut ke dalam transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Tetapkan skema menggunakan Google Cloud CLI.

    a. Update gcloud CLI dengan perintah berikut. Pastikan versi CLI gcloud adalah 242.0.0 atau yang lebih tinggi.

      gcloud components update
    

    b. Jalankan perintah berikut di jendela command line. Ganti project-id dengan project ID Anda, dan path-to-file dengan jalur ke file transactions_schema.yaml Anda.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Untuk mengetahui informasi selengkapnya tentang parameter perintah dan format file skema yang diizinkan, lihat halaman dokumentasi untuk pembaruan entri data-katalog gcloud.

    c. Konfirmasi bahwa skema Anda berhasil ditetapkan ke topik Pub/Sub transactions. Ganti project-id dengan project ID Anda.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Menemukan sumber Pub/Sub

UI Dataflow SQL menyediakan cara untuk menemukan objek sumber data Pub/Sub untuk setiap project yang dapat Anda akses, sehingga Anda tidak perlu mengingat nama lengkapnya.

Untuk contoh dalam tutorial ini, buka editor Dataflow SQL dan telusuri topik Pub/Sub transactions yang Anda buat:

  1. Buka SQL Workspace.

  2. Di panel Dataflow SQL Editor, di kotak penelusuran, telusuri projectid=project-id transactions. Ganti project-id dengan project ID Anda.

    Panel penelusuran Data Catalog di ruang kerja Dataflow SQL.

Lihat skemanya

  1. Di panel Dataflow SQL Editor pada UI Dataflow SQL, klik transactions atau telusuri topik Pub/Sub dengan mengetik projectid=project-id system=cloud_pubsub, lalu pilih topik.
  2. Di bagian Schema, Anda dapat melihat skema yang ditetapkan ke topik Pub/Sub.

    Skema yang ditetapkan ke topik, termasuk daftar nama kolom dan deskripsinya.

Membuat kueri SQL

UI Dataflow SQL memungkinkan Anda membuat kueri SQL untuk menjalankan tugas Dataflow.

Kueri SQL berikut adalah kueri pengayaan data. Langkah ini menambahkan kolom lain, sales_region, ke aliran peristiwa Pub/Sub (transactions), menggunakan tabel BigQuery (us_state_salesregions) yang memetakan status ke wilayah penjualan.

Salin dan tempel kueri SQL berikut ke Editor kueri. Ganti project-id dengan project ID Anda.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Saat Anda memasukkan kueri di UI Dataflow SQL, validator kueri akan memverifikasi sintaksis kueri. Ikon tanda centang hijau akan ditampilkan jika kueri valid. Jika kueri tidak valid, ikon tanda seru berwarna merah akan ditampilkan. Jika sintaksis kueri Anda tidak valid, mengklik ikon validator akan memberikan informasi tentang apa yang perlu Anda perbaiki.

Screenshot berikut menampilkan kueri yang valid di Editor kueri. Validator menampilkan tanda centang hijau.

Ruang kerja Dataflow SQL dengan kueri dari tutorial yang terlihat di editor.

Membuat tugas Dataflow untuk menjalankan kueri SQL Anda

Untuk menjalankan kueri SQL Anda, buat tugas Dataflow dari UI Dataflow SQL.

  1. Di Query editor, klik Create job.

  2. Di panel Buat tugas Dataflow yang terbuka:

    • Untuk Tujuan, pilih BigQuery.
    • Untuk Dataset ID, pilih dataflow_sql_tutorial.
    • Untuk Table name, masukkan sales.
    Membuat formulir Tugas Dataflow SQL.
  3. Opsional: Dataflow secara otomatis memilih setelan yang optimal untuk tugas Dataflow SQL, tetapi Anda dapat memperluas menu Parameter opsional untuk menentukan opsi pipeline berikut secara manual:

    • Jumlah maksimum pekerja
    • Zone
    • Email akun layanan
    • Machine type
    • Eksperimen tambahan
    • Konfigurasi alamat IP pekerja
    • Jaringan
    • Subnetwork
  4. Klik Create. Tugas Dataflow Anda memerlukan waktu beberapa menit untuk mulai berjalan.

Melihat tugas Dataflow

Dataflow mengubah kueri SQL Anda menjadi pipeline Apache Beam. Klik Lihat tugas untuk membuka UI web Dataflow, tempat Anda dapat melihat representasi grafis pipeline Anda.

Pipeline dari kueri SQL yang ditampilkan di UI web Dataflow.

Untuk melihat perincian transformasi yang terjadi di pipeline, klik kotaknya. Misalnya, jika Anda mengklik kotak pertama dalam representasi grafis, berlabel Jalankan Kueri SQL, grafik akan muncul yang menunjukkan operasi yang berlangsung di belakang layar.

Dua kotak pertama mewakili dua input yang Anda gabungkan: topik Pub/Sub, transactions, dan tabel BigQuery, us_state_salesregions.

Menulis output gabungan dari dua input yang selesai dalam 25 detik.

Untuk melihat tabel output yang berisi hasil tugas, buka UI BigQuery. Di panel Explorer, di project Anda, klik set data dataflow_sql_tutorial yang Anda buat. Kemudian, klik tabel output, sales. Tab Preview menampilkan konten tabel output.

Tabel pratinjau penjualan berisi kolom untuk tr_time_str, first_name, last_name, city, state, product, amount, dan sales_region.

Lihat pekerjaan sebelumnya dan edit kueri Anda

UI Dataflow menyimpan tugas dan kueri sebelumnya di halaman Jobs Dataflow.

Anda dapat menggunakan daftar histori tugas untuk melihat kueri SQL sebelumnya. Misalnya, Anda ingin mengubah kueri untuk menggabungkan penjualan berdasarkan wilayah penjualan setiap 15 detik. Gunakan halaman Tugas untuk mengakses tugas yang sedang berjalan yang Anda mulai sebelumnya dalam tutorial, menyalin kueri SQL, dan menjalankan tugas lain dengan kueri yang diubah.

  1. Dari halaman Jobs Dataflow, klik tugas yang ingin Anda edit.

  2. Pada halaman Job details, di panel Job info, di bagian Pipeline options, cari kueri SQL. Cari baris untuk queryString.

    Opsi pipeline tugas bernama queryString.
  3. Salin dan tempel kueri SQL berikut ke Dataflow SQL Editor di SQL Workspace untuk menambahkan tumming window. Ganti project-id dengan project ID Anda.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Klik Create job untuk membuat tugas baru dengan kueri yang diubah.

Pembersihan

Agar tidak menimbulkan biaya pada akun Penagihan Cloud Anda untuk resource yang digunakan dalam tutorial ini:

  1. Hentikan skrip publikasi transactions_injector.py Anda jika masih berjalan.

  2. Hentikan tugas Dataflow yang sedang berjalan. Buka UI web Dataflow di Konsol Google Cloud.

    Buka UI web Dataflow

    Untuk setiap tugas yang Anda buat dengan mengikuti panduan ini, lakukan langkah-langkah berikut:

    1. Klik nama pekerjaan.

    2. Di halaman Detail tugas, klik Berhenti. Dialog Stop Job akan muncul dengan opsi Anda untuk cara menghentikan tugas.

    3. Pilih Batal.

    4. Klik Stop job. Layanan akan menghentikan semua penyerapan dan pemrosesan data sesegera mungkin. Karena Cancel segera menghentikan pemrosesan, Anda mungkin kehilangan semua data "sedang berlangsung". Menghentikan tugas mungkin memerlukan waktu beberapa menit.

  3. Hapus set data BigQuery Anda. Buka UI web BigQuery di Konsol Google Cloud.

    Buka UI web BigQuery

    1. Pada panel Explorer, di bagian Resources, klik set data dataflow_sql_tutorial yang Anda buat.

    2. Di panel detail, klik Hapus. Dialog konfirmasi akan terbuka.

    3. Di kotak dialog Delete dataset, konfirmasi perintah delete dengan mengetik delete, lalu klik Delete.

  4. Menghapus topik Pub/Sub Anda. Buka halaman topik Pub/Sub di Konsol Google Cloud.

    Buka halaman topik Pub/Sub

    1. Pilih topik transactions.

    2. Klik Hapus untuk menghapus topik secara permanen. Dialog konfirmasi akan terbuka.

    3. Di kotak dialog Delete topic, konfirmasi perintah delete dengan mengetik delete, lalu klik Delete.

    4. Buka halaman langganan Pub/Sub.

    5. Pilih langganan yang tersisa untuk transactions. Jika tugas Anda tidak berjalan lagi, mungkin tidak ada langganan.

    6. Klik Hapus untuk menghapus langganan secara permanen. Pada dialog konfirmasi, klik Delete.

  5. Hapus bucket staging Dataflow di Cloud Storage. Buka halaman Buckets Cloud Storage di Konsol Google Cloud.

    Buka Buckets

    1. Pilih bucket staging Dataflow.

    2. Klik Hapus untuk menghapus bucket secara permanen. Dialog konfirmasi akan terbuka.

    3. Di kotak dialog Delete bucket, konfirmasi perintah hapus dengan mengetik DELETE, lalu klik Delete.

Langkah selanjutnya