Menggabungkan data streaming dengan Dataflow SQL


Tutorial ini menunjukkan cara menggunakan Dataflow SQL untuk menggabungkan aliran data dari Pub/Sub dengan data dari tabel BigQuery.

Tujuan

Dalam tutorial ini, Anda telah:

  • Tulis kueri Dataflow SQL yang menggabungkan data streaming Pub/Sub dengan data tabel BigQuery.
  • Men-deploy tugas Dataflow dari UI Dataflow SQL.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  11. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  12. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  13. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  14. Menginstal dan melakukan inisialisasi gcloud CLI. Pilih salah satu opsi penginstalan. Anda mungkin perlu menetapkan properti project ke project yang Anda gunakan untuk panduan ini.
  15. Buka UI web Dataflow SQL di konsol Google Cloud. Tindakan ini akan membuka project yang terakhir Anda akses. Untuk beralih ke project yang berbeda, klik nama project di bagian atas UI web Dataflow SQL, lalu telusuri project yang ingin Anda gunakan.
    Buka UI web Dataflow SQL

Membuat contoh sumber

Jika Anda ingin mengikuti contoh yang diberikan dalam tutorial ini, buat sumber berikut dan gunakan dalam langkah-langkah tutorial.

  • Topik Pub/Sub yang disebut transactions - Aliran data transaksi yang masuk melalui langganan ke topik Pub/Sub. Data untuk setiap transaksi mencakup informasi seperti produk yang dibeli, harga promo, serta kota dan negara bagian tempat pembelian terjadi. Setelah membuat topik Pub/Sub, Anda membuat skrip yang memublikasikan pesan ke topik. Anda akan menjalankan skrip ini di bagian berikutnya dalam tutorial ini.
  • Tabel BigQuery bernama us_state_salesregions - Tabel yang menyediakan pemetaan negara bagian ke wilayah penjualan. Sebelum membuat tabel ini, Anda harus membuat set data BigQuery.

Menetapkan skema ke topik Pub/Sub

Dengan menetapkan skema, Anda dapat menjalankan kueri SQL pada data topik Pub/Sub. Saat ini, Dataflow SQL mengharapkan pesan dalam topik Pub/Sub diserialisasi dalam format JSON.

Untuk menetapkan skema ke contoh topik Pub/Sub transactions:

  1. Buat file teks dan beri nama transactions_schema.yaml. Salin dan tempel teks skema berikut ke transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Tetapkan skema menggunakan Google Cloud CLI.

    a. Update gcloud CLI dengan perintah berikut. Pastikan versi gcloud CLI adalah 242.0.0 atau yang lebih tinggi.

      gcloud components update

    b. Jalankan perintah berikut di jendela command line. Ganti project-id dengan project ID Anda, dan path-to-file dengan jalur ke file transactions_schema.yaml Anda.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml

    Untuk mengetahui informasi selengkapnya tentang parameter perintah dan format file skema yang diizinkan, lihat halaman dokumentasi untuk gcloud data-catalog entries update.

    c. Pastikan skema Anda berhasil ditetapkan ke topik Pub/Sub transactions. Ganti project-id dengan project ID Anda.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'

Menemukan sumber Pub/Sub

UI Dataflow SQL menyediakan cara untuk menemukan objek sumber data Pub/Sub untuk project apa pun yang dapat Anda akses, sehingga Anda tidak perlu mengingat nama lengkapnya.

Untuk contoh dalam tutorial ini, buka editor Dataflow SQL dan telusuri topik Pub/Sub transactions yang Anda buat:

  1. Buka Ruang Kerja SQL.

  2. Di panel Dataflow SQL Editor, telusuri projectid=project-id transactions di kotak penelusuran. Ganti project-id dengan project ID Anda.

    Panel penelusuran Data Catalog di ruang kerja Dataflow SQL.

Melihat skema

  1. Di panel Dataflow SQL Editor pada UI Dataflow SQL, klik transactions atau telusuri topik Pub/Sub dengan mengetik projectid=project-id system=cloud_pubsub, lalu pilih topik.
  2. Di bagian Schema, Anda dapat melihat skema yang ditetapkan ke topik Pub/Sub.

    Skema yang ditetapkan ke topik termasuk daftar nama kolom dan deskripsinya.

Membuat kueri SQL

UI Dataflow SQL memungkinkan Anda membuat kueri SQL untuk menjalankan tugas Dataflow.

Kueri SQL berikut adalah kueri pengayaan data. Fungsi ini menambahkan kolom tambahan, sales_region, ke aliran peristiwa Pub/Sub (transactions), menggunakan tabel BigQuery (us_state_salesregions) yang memetakan status ke wilayah penjualan.

Salin dan tempel kueri SQL berikut ke Editor kueri. Ganti project-id dengan project ID Anda.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Saat Anda memasukkan kueri di UI Dataflow SQL, validator kueri akan memverifikasi sintaksis kueri. Ikon tanda centang hijau akan ditampilkan jika kueri valid. Jika kueri tidak valid, ikon tanda seru berwarna merah akan ditampilkan. Jika sintaksis kueri Anda tidak valid, mengklik ikon validator akan memberikan informasi tentang hal-hal yang perlu Anda perbaiki.

Screenshot berikut menunjukkan kueri yang valid di Query editor. Validator akan menampilkan tanda centang hijau.

Ruang kerja Dataflow SQL dengan kueri dari tutorial yang terlihat di editor.

Membuat tugas Dataflow untuk menjalankan kueri SQL

Untuk menjalankan kueri SQL, buat tugas Dataflow dari UI Dataflow SQL.

  1. Di Editor kueri, klik Buat tugas.

  2. Di panel Create Dataflow job yang terbuka:

    • Untuk Destination, pilih BigQuery.
    • Untuk ID Set Data, pilih dataflow_sql_tutorial.
    • Untuk Table name, masukkan sales.
    Buat formulir Tugas Dataflow SQL.
  3. Opsional: Dataflow secara otomatis memilih setelan yang optimal untuk tugas Dataflow SQL Anda, tetapi Anda dapat meluaskan menu Optional parameters untuk menentukan opsi pipeline berikut secara manual:

    • Jumlah maksimum pekerja
    • Zona
    • Email akun layanan
    • Jenis mesin
    • Eksperimen tambahan
    • Konfigurasi alamat IP pekerja
    • Jaringan
    • Subnetwork
  4. Klik Buat. Tugas Dataflow Anda memerlukan waktu beberapa menit untuk mulai berjalan.

Melihat tugas Dataflow

Dataflow mengubah kueri SQL Anda menjadi pipeline Apache Beam. Klik Lihat tugas untuk membuka UI web Dataflow, tempat Anda dapat melihat representasi grafis pipeline.

Pipeline dari kueri SQL yang ditampilkan di UI web Dataflow.

Untuk melihat perincian transformasi yang terjadi di pipeline, klik kotak. Misalnya, jika Anda mengklik kotak pertama dalam representasi grafis, berlabel Run SQL Query, grafik akan muncul yang menunjukkan operasi yang terjadi di balik layar.

Dua kotak pertama mewakili dua input yang Anda gabungkan: topik Pub/Sub, transactions, dan tabel BigQuery, us_state_salesregions.

Menulis output penggabungan dua input selesai dalam 25 detik.

Untuk melihat tabel output yang berisi hasil tugas, buka UI BigQuery. Di panel Explorer, di project Anda, klik set data dataflow_sql_tutorial yang telah Anda buat. Kemudian, klik tabel output, sales. Tab Pratinjau menampilkan konten tabel output.

Tabel pratinjau penjualan berisi kolom untuk tr_time_str, first_name, last_name, city, state, product, amount, dan sales_region.

Melihat tugas sebelumnya dan mengedit kueri

UI Dataflow menyimpan tugas dan kueri sebelumnya di halaman Tugas Dataflow.

Anda dapat menggunakan daftar histori tugas untuk melihat kueri SQL sebelumnya. Misalnya, Anda ingin mengubah kueri untuk menggabungkan penjualan menurut wilayah penjualan setiap 15 detik. Gunakan halaman Tugas untuk mengakses tugas yang sedang berjalan yang Anda mulai sebelumnya dalam tutorial, menyalin kueri SQL, dan menjalankan tugas lain dengan kueri yang diubah.

  1. Dari halaman Tugas Dataflow, klik tugas yang ingin Anda edit.

  2. Di halaman Job details, di panel Job info, pada bagian Pipeline options, cari kueri SQL. Temukan baris untuk queryString.

    Opsi pipeline tugas bernama queryString.
  3. Salin dan tempel kueri SQL berikut ke Dataflow SQL Editor di SQL Workspace untuk menambahkan tumbling window. Ganti project-id dengan project ID Anda.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
  4. Klik Create job untuk membuat tugas baru dengan kueri yang diubah.

Pembersihan

Agar resource yang digunakan dalam tutorial ini tidak ditagihkan ke akun Penagihan Cloud Anda:

  1. Hentikan skrip publikasi transactions_injector.py jika masih berjalan.

  2. Hentikan tugas Dataflow yang sedang berjalan. Buka UI web Dataflow di konsol Google Cloud.

    Buka UI web Dataflow

    Untuk setiap tugas yang Anda buat dengan mengikuti panduan ini, lakukan langkah-langkah berikut:

    1. Klik nama tugas.

    2. Di halaman Job details, klik Stop. Dialog Stop Job akan muncul dengan opsi cara menghentikan tugas.

    3. Pilih Batal.

    4. Klik Hentikan tugas. Layanan akan menghentikan semua penyerapan dan pemrosesan data sesegera mungkin. Karena Cancel segera menghentikan pemrosesan, Anda mungkin kehilangan data "in-flight". Menghentikan tugas mungkin memerlukan waktu beberapa menit.

  3. Hapus set data BigQuery Anda. Buka UI web BigQuery di konsol Google Cloud.

    Buka UI web BigQuery

    1. Di panel Penjelajah, di bagian Resource, klik set data dataflow_sql_tutorial yang telah Anda buat.

    2. Di panel detail, klik Delete. Dialog konfirmasi akan terbuka.

    3. Di kotak dialog Delete dataset, konfirmasi perintah hapus dengan mengetik delete, lalu klik Delete.

  4. Hapus topik Pub/Sub Anda. Buka halaman topik Pub/Sub di konsol Google Cloud.

    Buka halaman topik Pub/Sub

    1. Pilih topik transactions.

    2. Klik Hapus untuk menghapus topik secara permanen. Dialog konfirmasi akan terbuka.

    3. Di kotak dialog Delete topic, konfirmasi perintah hapus dengan mengetik delete, lalu klik Delete.

    4. Buka halaman langganan Pub/Sub.

    5. Pilih langganan yang tersisa untuk transactions. Jika tugas Anda tidak lagi berjalan, mungkin tidak ada langganan.

    6. Klik Hapus untuk menghapus langganan secara permanen. Pada dialog konfirmasi, klik Hapus.

  5. Hapus bucket staging Dataflow di Cloud Storage. Buka halaman Bucket Cloud Storage di konsol Google Cloud.

    Buka Buckets

    1. Pilih bucket staging Dataflow.

    2. Klik Hapus untuk menghapus bucket secara permanen. Dialog konfirmasi akan terbuka.

    3. Di kotak dialog Delete bucket, konfirmasi perintah hapus dengan mengetik DELETE, lalu klik Delete.

Langkah berikutnya