Tutorial ini menunjukkan cara menggunakan Dataflow SQL untuk menggabungkan aliran data dari Pub/Sub dengan data dari tabel BigQuery.
Tujuan
Dalam tutorial ini, Anda telah:
- Tulis kueri Dataflow SQL yang menggabungkan data streaming Pub/Sub dengan data tabel BigQuery.
- Men-deploy tugas Dataflow dari UI Dataflow SQL.
Biaya
Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:
- Dataflow
- Cloud Storage
- Pub/Sub
- Data Catalog
Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda,
gunakan kalkulator harga.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.Example: Linux or macOS
export GOOGLE_APPLICATION_CREDENTIALS="
"KEY_PATH Replace
KEY_PATH
with the path of the JSON file that contains your credentials.For example:
export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"
Example: Windows
For PowerShell:
$env:GOOGLE_APPLICATION_CREDENTIALS="
"KEY_PATH Replace
KEY_PATH
with the path of the JSON file that contains your credentials.For example:
$env:GOOGLE_APPLICATION_CREDENTIALS="C:\Users\username\Downloads\service-account-file.json"
For command prompt:
set GOOGLE_APPLICATION_CREDENTIALS=
KEY_PATH Replace
KEY_PATH
with the path of the JSON file that contains your credentials. - Menginstal dan melakukan inisialisasi gcloud CLI. Pilih salah satu
opsi penginstalan.
Anda mungkin perlu menetapkan properti
project
ke project yang Anda gunakan untuk panduan ini. - Buka UI web Dataflow SQL di konsol Google Cloud. Tindakan ini akan membuka project yang terakhir Anda akses. Untuk beralih ke project yang berbeda, klik nama project di bagian atas UI web Dataflow SQL, lalu telusuri project yang ingin Anda gunakan.
Buka UI web Dataflow SQL
Membuat contoh sumber
Jika Anda ingin mengikuti contoh yang diberikan dalam tutorial ini, buat sumber berikut dan gunakan dalam langkah-langkah tutorial.
- Topik Pub/Sub yang disebut
transactions
- Aliran data transaksi yang masuk melalui langganan ke topik Pub/Sub. Data untuk setiap transaksi mencakup informasi seperti produk yang dibeli, harga promo, serta kota dan negara bagian tempat pembelian terjadi. Setelah membuat topik Pub/Sub, Anda membuat skrip yang memublikasikan pesan ke topik. Anda akan menjalankan skrip ini di bagian berikutnya dalam tutorial ini. - Tabel BigQuery bernama
us_state_salesregions
- Tabel yang menyediakan pemetaan negara bagian ke wilayah penjualan. Sebelum membuat tabel ini, Anda harus membuat set data BigQuery.
Membuat topik Pub/Sub dan skrip penayang
- Gunakan Google Cloud CLI untuk membuat topik Pub/Sub. Beri nama topik
transactions
.gcloud pubsub topics create transactions
- Buat skrip Python yang memublikasikan pesan ke topik Pub/Sub Anda. Jika belum menginstal Python, Anda harus menginstal Python.
Anda akan menjalankan skrip ini di jendela command line tepat sebelum menjalankan kueri SQL.
- Buat file teks dan beri nama
transactions_injector.py
. - Salin dan tempel kode berikut ke dalam
transactions_injector.py
. Ganti project-id dengan project ID Anda.#!/usr/bin/env python import datetime, json, os, random, time # Set the `project` variable to a Google Cloud project ID. project = '
project-id ' FIRST_NAMES = ['Monet', 'Julia', 'Angelique', 'Stephane', 'Allan', 'Ulrike', 'Vella', 'Melia', 'Noel', 'Terrence', 'Leigh', 'Rubin', 'Tanja', 'Shirlene', 'Deidre', 'Dorthy', 'Leighann', 'Mamie', 'Gabriella', 'Tanika', 'Kennith', 'Merilyn', 'Tonda', 'Adolfo', 'Von', 'Agnus', 'Kieth', 'Lisette', 'Hui', 'Lilliana',] CITIES = ['Washington', 'Springfield', 'Franklin', 'Greenville', 'Bristol', 'Fairview', 'Salem', 'Madison', 'Georgetown', 'Arlington', 'Ashland',] STATES = ['MO','SC','IN','CA','IA','DE','ID','AK','NE','VA','PR','IL','ND','OK','VT','DC','CO','MS', 'CT','ME','MN','NV','HI','MT','PA','SD','WA','NJ','NC','WV','AL','AR','FL','NM','KY','GA','MA', 'KS','VI','MI','UT','AZ','WI','RI','NY','TN','OH','TX','AS','MD','OR','MP','LA','WY','GU','NH'] PRODUCTS = ['Product 2', 'Product 2 XL', 'Product 3', 'Product 3 XL', 'Product 4', 'Product 4 XL', 'Product 5', 'Product 5 XL',] while True: first_name, last_name = random.sample(FIRST_NAMES, 2) data = { 'tr_time_str': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'first_name': first_name, 'last_name': last_name, 'city': random.choice(CITIES), 'state':random.choice(STATES), 'product': random.choice(PRODUCTS), 'amount': float(random.randrange(50000, 70000)) / 100, } # For a more complete example on how to publish messages in Pub/Sub. # https://cloud.google.com/pubsub/docs/publisher message = json.dumps(data) command = "gcloud --project={} pubsub topics publish transactions --message='{}'".format(project, message) print(command) os.system(command) time.sleep(random.randrange(1, 5))
- Buat file teks dan beri nama
Membuat set data dan tabel BigQuery
- Di UI web BigQuery,
buat set data BigQuery. Set data BigQuery adalah
penampung tingkat teratas yang digunakan untuk menyimpan tabel Anda. Tabel BigQuery harus berasal dari set data.
- Di panel Penjelajah, buka tindakan untuk project Anda. Di menu, klik Buat set data. Dalam screenshot berikut, project ID-nya adalah
dataflow-sql
. - Di panel Create dataset yang terbuka, untuk Dataset ID, masukkan
dataflow_sql_tutorial
. - Untuk Data location, pilih opsi dari menu.
- Klik Create dataset.
- Di panel Penjelajah, buka tindakan untuk project Anda. Di menu, klik Buat set data. Dalam screenshot berikut, project ID-nya adalah
- Buat tabel BigQuery.
- Buat file teks dan beri nama
us_state_salesregions.csv
. - Salin dan tempel data berikut ke
us_state_salesregions.csv
. Pada langkah berikutnya, Anda akan memuat data ini ke tabel BigQuery.state_id,state_code,state_name,sales_region 1,MO,Missouri,Region_1 2,SC,South Carolina,Region_1 3,IN,Indiana,Region_1 6,DE,Delaware,Region_2 15,VT,Vermont,Region_2 16,DC,District of Columbia,Region_2 19,CT,Connecticut,Region_2 20,ME,Maine,Region_2 35,PA,Pennsylvania,Region_2 38,NJ,New Jersey,Region_2 47,MA,Massachusetts,Region_2 54,RI,Rhode Island,Region_2 55,NY,New York,Region_2 60,MD,Maryland,Region_2 66,NH,New Hampshire,Region_2 4,CA,California,Region_3 8,AK,Alaska,Region_3 37,WA,Washington,Region_3 61,OR,Oregon,Region_3 33,HI,Hawaii,Region_4 59,AS,American Samoa,Region_4 65,GU,Guam,Region_4 5,IA,Iowa,Region_5 32,NV,Nevada,Region_5 11,PR,Puerto Rico,Region_6 17,CO,Colorado,Region_6 18,MS,Mississippi,Region_6 41,AL,Alabama,Region_6 42,AR,Arkansas,Region_6 43,FL,Florida,Region_6 44,NM,New Mexico,Region_6 46,GA,Georgia,Region_6 48,KS,Kansas,Region_6 52,AZ,Arizona,Region_6 56,TN,Tennessee,Region_6 58,TX,Texas,Region_6 63,LA,Louisiana,Region_6 7,ID,Idaho,Region_7 12,IL,Illinois,Region_7 13,ND,North Dakota,Region_7 31,MN,Minnesota,Region_7 34,MT,Montana,Region_7 36,SD,South Dakota,Region_7 50,MI,Michigan,Region_7 51,UT,Utah,Region_7 64,WY,Wyoming,Region_7 9,NE,Nebraska,Region_8 10,VA,Virginia,Region_8 14,OK,Oklahoma,Region_8 39,NC,North Carolina,Region_8 40,WV,West Virginia,Region_8 45,KY,Kentucky,Region_8 53,WI,Wisconsin,Region_8 57,OH,Ohio,Region_8 49,VI,United States Virgin Islands,Region_9 62,MP,Commonwealth of the Northern Mariana Islands,Region_9
- Di panel Explorer pada UI BigQuery, luaskan project Anda untuk melihat set data
dataflow_sql_tutorial
. - Buka menu tindakan untuk set data
dataflow_sql_tutorial
, lalu klik Buka. - Klik Create table.
- Di panel Create table yang terbuka:
- Untuk Buat tabel dari, pilih Upload.
- Untuk Pilih file, klik Jelajahi, lalu pilih file
us_state_salesregions.csv
Anda. - Untuk Table, masukkan
us_state_salesregions
. - Di bagian Skema, pilih Deteksi otomatis.
- Klik Advanced options untuk meluaskan bagian Advanced options.
- Untuk Header rows to skip, masukkan
1
, lalu klik Create table.
- Untuk Buat tabel dari, pilih Upload.
- Di panel Explorer,
klik
us_state_salesregions
. Di bagian Skema, Anda dapat melihat skema yang dibuat secara otomatis. Di bagian Pratinjau, Anda dapat melihat data tabel.
- Buat file teks dan beri nama
Menetapkan skema ke topik Pub/Sub
Dengan menetapkan skema, Anda dapat menjalankan kueri SQL pada data topik Pub/Sub. Saat ini, Dataflow SQL mengharapkan pesan dalam topik Pub/Sub diserialisasi dalam format JSON.
Untuk menetapkan skema ke contoh topik Pub/Sub transactions
:
Buat file teks dan beri nama
transactions_schema.yaml
. Salin dan tempel teks skema berikut ketransactions_schema.yaml
.- column: event_timestamp description: Pub/Sub event timestamp mode: REQUIRED type: TIMESTAMP - column: tr_time_str description: Transaction time string mode: NULLABLE type: STRING - column: first_name description: First name mode: NULLABLE type: STRING - column: last_name description: Last name mode: NULLABLE type: STRING - column: city description: City mode: NULLABLE type: STRING - column: state description: State mode: NULLABLE type: STRING - column: product description: Product mode: NULLABLE type: STRING - column: amount description: Amount of transaction mode: NULLABLE type: FLOAT
Tetapkan skema menggunakan Google Cloud CLI.
a. Update gcloud CLI dengan perintah berikut. Pastikan versi gcloud CLI adalah 242.0.0 atau yang lebih tinggi.
gcloud components update
b. Jalankan perintah berikut di jendela command line. Ganti project-id dengan project ID Anda, dan path-to-file dengan jalur ke file
transactions_schema.yaml
Anda.gcloud data-catalog entries update \ --lookup-entry='pubsub.topic.`
project-id `.transactions' \ --schema-from-file=path-to-file /transactions_schema.yamlUntuk mengetahui informasi selengkapnya tentang parameter perintah dan format file skema yang diizinkan, lihat halaman dokumentasi untuk gcloud data-catalog entries update.
c. Pastikan skema Anda berhasil ditetapkan ke topik Pub/Sub
transactions
. Ganti project-id dengan project ID Anda.gcloud data-catalog entries lookup 'pubsub.topic.`
project-id `.transactions'
Menemukan sumber Pub/Sub
UI Dataflow SQL menyediakan cara untuk menemukan objek sumber data Pub/Sub untuk project apa pun yang dapat Anda akses, sehingga Anda tidak perlu mengingat nama lengkapnya.
Untuk contoh dalam tutorial ini, buka editor Dataflow SQL dan telusuri topik Pub/Sub transactions
yang Anda buat:
Buka Ruang Kerja SQL.
Di panel Dataflow SQL Editor, telusuri
projectid=project-id transactions
di kotak penelusuran. Ganti project-id dengan project ID Anda.
Melihat skema
- Di panel Dataflow SQL Editor pada UI Dataflow SQL, klik
transactions atau telusuri topik Pub/Sub dengan mengetik
projectid=project-id system=cloud_pubsub
, lalu pilih topik. Di bagian Schema, Anda dapat melihat skema yang ditetapkan ke topik Pub/Sub.
Membuat kueri SQL
UI Dataflow SQL memungkinkan Anda membuat kueri SQL untuk menjalankan tugas Dataflow.
Kueri SQL berikut adalah kueri pengayaan data. Fungsi ini menambahkan kolom tambahan, sales_region
, ke aliran peristiwa Pub/Sub (transactions
), menggunakan tabel BigQuery (us_state_salesregions
) yang memetakan status ke wilayah penjualan.
Salin dan tempel kueri SQL berikut ke Editor kueri. Ganti project-id dengan project ID Anda.
SELECT tr.*, sr.sales_region FROM pubsub.topic.`project-id `.transactions as tr INNER JOIN bigquery.table.`project-id `.dataflow_sql_tutorial.us_state_salesregions AS sr ON tr.state = sr.state_code
Saat Anda memasukkan kueri di UI Dataflow SQL, validator kueri akan memverifikasi sintaksis kueri. Ikon tanda centang hijau akan ditampilkan jika kueri valid. Jika kueri tidak valid, ikon tanda seru berwarna merah akan ditampilkan. Jika sintaksis kueri Anda tidak valid, mengklik ikon validator akan memberikan informasi tentang hal-hal yang perlu Anda perbaiki.
Screenshot berikut menunjukkan kueri yang valid di Query editor. Validator akan menampilkan tanda centang hijau.

Membuat tugas Dataflow untuk menjalankan kueri SQL
Untuk menjalankan kueri SQL, buat tugas Dataflow dari UI Dataflow SQL.
Di Editor kueri, klik Buat tugas.
Di panel Create Dataflow job yang terbuka:
- Untuk Destination, pilih BigQuery.
- Untuk ID Set Data, pilih
dataflow_sql_tutorial
. - Untuk Table name, masukkan
sales
.
Opsional: Dataflow secara otomatis memilih setelan yang optimal untuk tugas Dataflow SQL Anda, tetapi Anda dapat meluaskan menu Optional parameters untuk menentukan opsi pipeline berikut secara manual:
- Jumlah maksimum pekerja
- Zona
- Email akun layanan
- Jenis mesin
- Eksperimen tambahan
- Konfigurasi alamat IP pekerja
- Jaringan
- Subnetwork
Klik Buat. Tugas Dataflow Anda memerlukan waktu beberapa menit untuk mulai berjalan.
Melihat tugas Dataflow
Dataflow mengubah kueri SQL Anda menjadi pipeline Apache Beam. Klik Lihat tugas untuk membuka UI web Dataflow, tempat Anda dapat melihat representasi grafis pipeline.

Untuk melihat perincian transformasi yang terjadi di pipeline, klik kotak. Misalnya, jika Anda mengklik kotak pertama dalam representasi grafis, berlabel Run SQL Query, grafik akan muncul yang menunjukkan operasi yang terjadi di balik layar.
Dua kotak pertama mewakili dua input yang Anda gabungkan: topik Pub/Sub, transactions
, dan tabel BigQuery, us_state_salesregions
.

Untuk melihat tabel output yang berisi hasil tugas, buka UI BigQuery.
Di panel Explorer, di project Anda, klik set data dataflow_sql_tutorial
yang telah Anda buat. Kemudian, klik tabel output,
sales
. Tab Pratinjau menampilkan konten tabel output.

Melihat tugas sebelumnya dan mengedit kueri
UI Dataflow menyimpan tugas dan kueri sebelumnya di halaman Tugas Dataflow.
Anda dapat menggunakan daftar histori tugas untuk melihat kueri SQL sebelumnya. Misalnya, Anda ingin mengubah kueri untuk menggabungkan penjualan menurut wilayah penjualan setiap 15 detik. Gunakan halaman Tugas untuk mengakses tugas yang sedang berjalan yang Anda mulai sebelumnya dalam tutorial, menyalin kueri SQL, dan menjalankan tugas lain dengan kueri yang diubah.
Dari halaman Tugas Dataflow, klik tugas yang ingin Anda edit.
Di halaman Job details, di panel Job info, pada bagian Pipeline options, cari kueri SQL. Temukan baris untuk queryString.
Salin dan tempel kueri SQL berikut ke Dataflow SQL Editor di SQL Workspace untuk menambahkan tumbling window. Ganti project-id dengan project ID Anda.
SELECT sr.sales_region, TUMBLE_START("INTERVAL 15 SECOND") AS period_start, SUM(tr.amount) as amount FROM pubsub.topic.`
project-id `.transactions AS tr INNER JOIN bigquery.table.`project-id `.dataflow_sql_tutorial.us_state_salesregions AS sr ON tr.state = sr.state_code GROUP BY sr.sales_region, TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")Klik Create job untuk membuat tugas baru dengan kueri yang diubah.
Pembersihan
Agar resource yang digunakan dalam tutorial ini tidak ditagihkan ke akun Penagihan Cloud Anda:
Hentikan skrip publikasi
transactions_injector.py
jika masih berjalan.Hentikan tugas Dataflow yang sedang berjalan. Buka UI web Dataflow di konsol Google Cloud.
Untuk setiap tugas yang Anda buat dengan mengikuti panduan ini, lakukan langkah-langkah berikut:
Klik nama tugas.
Di halaman Job details, klik Stop. Dialog Stop Job akan muncul dengan opsi cara menghentikan tugas.
Pilih Batal.
Klik Hentikan tugas. Layanan akan menghentikan semua penyerapan dan pemrosesan data sesegera mungkin. Karena Cancel segera menghentikan pemrosesan, Anda mungkin kehilangan data "in-flight". Menghentikan tugas mungkin memerlukan waktu beberapa menit.
Hapus set data BigQuery Anda. Buka UI web BigQuery di konsol Google Cloud.
Di panel Penjelajah, di bagian Resource, klik set data dataflow_sql_tutorial yang telah Anda buat.
Di panel detail, klik Delete. Dialog konfirmasi akan terbuka.
Di kotak dialog Delete dataset, konfirmasi perintah hapus dengan mengetik
delete
, lalu klik Delete.
Hapus topik Pub/Sub Anda. Buka halaman topik Pub/Sub di konsol Google Cloud.
Pilih topik
transactions
.Klik Hapus untuk menghapus topik secara permanen. Dialog konfirmasi akan terbuka.
Di kotak dialog Delete topic, konfirmasi perintah hapus dengan mengetik
delete
, lalu klik Delete.Pilih langganan yang tersisa untuk
transactions
. Jika tugas Anda tidak lagi berjalan, mungkin tidak ada langganan.Klik Hapus untuk menghapus langganan secara permanen. Pada dialog konfirmasi, klik Hapus.
Hapus bucket staging Dataflow di Cloud Storage. Buka halaman Bucket Cloud Storage di konsol Google Cloud.
Pilih bucket staging Dataflow.
Klik Hapus untuk menghapus bucket secara permanen. Dialog konfirmasi akan terbuka.
Di kotak dialog Delete bucket, konfirmasi perintah hapus dengan mengetik
DELETE
, lalu klik Delete.
Langkah berikutnya
- Lihat pengantar Dataflow SQL.
- Pelajari dasar-dasar pipeline streaming.
- Jelajahi referensi Dataflow SQL.
- Tonton demo analisis streaming yang diberikan di Cloud Next 2019.