Dalam tutorial ini, Anda akan membuat pipeline streaming Dataflow yang mengubah data e-commerce dari topik Pub/Sub serta langganan dan output data ke BigQuery dan Bigtable. Tutorial ini memerlukan Gradle.
Tutorial ini menyediakan aplikasi contoh e-commerce end-to-end yang mengalirkan data dari toko web ke BigQuery dan Bigtable. Aplikasi contoh tersebut menggambarkan kasus penggunaan umum dan praktik terbaik untuk menerapkan analisis data streaming dan kecerdasan buatan (AI) real-time. Gunakan tutorial ini untuk mempelajari cara merespons tindakan pelanggan secara dinamis agar dapat menganalisis dan bereaksi terhadap peristiwa secara real-time. Tutorial ini menjelaskan cara menyimpan, menganalisis, dan memvisualisasikan data peristiwa untuk mendapatkan lebih banyak insight tentang perilaku pelanggan.
Aplikasi contoh tersedia di GitHub. Untuk menjalankan tutorial ini menggunakan Terraform, ikuti langkah-langkah yang disediakan dalam contoh aplikasi di GitHub.
Tujuan
- Validasi data yang masuk dan perbaiki jika memungkinkan.
- Menganalisis data clickstream untuk mempertahankan jumlah penayangan per produk dalam jangka waktu tertentu. Simpan informasi ini di penyimpanan latensi rendah. Aplikasi kemudian dapat menggunakan data tersebut untuk memberikan pesan jumlah orang yang melihat produk ini kepada pelanggan di situs.
Gunakan data transaksi untuk menginformasikan pemesanan inventaris:
- Lakukan analisis data transaksi untuk menghitung total jumlah penjualan setiap item, baik menurut toko maupun secara global, selama periode tertentu.
- Analisis data inventaris untuk menghitung inventaris yang masuk untuk setiap item.
- Teruskan data ini ke sistem inventaris secara berkelanjutan sehingga dapat digunakan untuk keputusan pembelian inventaris.
Validasi data yang masuk dan perbaiki jika memungkinkan. Menuliskan data yang tidak dapat dikoreksi ke antrean mati untuk analisis dan pemrosesan tambahan. Buat metrik yang mewakili persentase data masuk yang dikirim ke antrean yang dihentikan pengirimannya dan tersedia untuk pemantauan dan pemberitahuan.
Proses semua data yang masuk ke dalam format standar dan simpan di data warehouse untuk digunakan di analisis dan visualisasi di masa mendatang.
Lakukan denormalisasi data transaksi untuk penjualan di toko agar dapat menyertakan informasi seperti lintang dan bujur lokasi toko. Berikan informasi toko melalui tabel yang berubah secara perlahan di BigQuery, menggunakan ID toko sebagai kunci.
Data
Aplikasi memproses jenis data berikut:
- Data clickstream yang dikirim oleh sistem online ke Pub/Sub.
- Data transaksi yang dikirim oleh sistem lokal atau software as a service (SaaS) ke Pub/Sub.
- Data stok yang dikirim oleh sistem lokal atau SaaS ke Pub/Sub.
Pola tugas
Aplikasi ini berisi pola tugas yang umum digunakan pada pipeline yang dibuat dengan Apache Beam SDK untuk Java:
- Skema Apache Beam untuk menangani data terstruktur
JsonToRow
untuk mengonversi data JSON- Generator kode
AutoValue
untuk menghasilkan objek Java lama (POJO) biasa - Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut
- Transformasi validasi data serial
DoFn.StartBundle
untuk melakukan panggilan batch mikro ke layanan eksternal- Pola input samping
Biaya
Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda,
gunakan kalkulator harga.
Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Buat kredensial autentikasi lokal untuk Akun Google Anda:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Buat kredensial autentikasi lokal untuk Akun Google Anda:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Buat akun layanan pekerja yang dikelola pengguna untuk pipeline baru Anda dan berikan peran yang diperlukan ke akun layanan tersebut.
Untuk membuat akun layanan, jalankan perintah
gcloud iam service-accounts create
:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Memberikan peran ke akun layanan. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Ganti
SERVICE_ACCOUNT_ROLE
dengan setiap peran individual.Memberi Akun Google Anda peran yang memungkinkan Anda membuat token akses untuk akun layanan:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Jika perlu, download dan instal Gradle.
Membuat contoh sumber dan sink
Bagian ini menjelaskan cara membuat hal berikut:
- Bucket Cloud Storage untuk digunakan sebagai lokasi penyimpanan sementara
- Streaming sumber data menggunakan Pub/Sub
- Set data untuk memuat data ke BigQuery
- Instance Bigtable
Membuat bucket Cloud Storage
Mulailah dengan membuat bucket Cloud Storage. Bucket ini digunakan sebagai lokasi penyimpanan sementara oleh pipeline Dataflow.
Gunakan
gcloud storage buckets create
perintah:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Ganti kode berikut:
- BUCKET_NAME: nama untuk bucket Cloud Storage yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.
- LOCATION: lokasi bucket.
Membuat langganan dan topik Pub/Sub
Buat empat topik Pub/Sub, lalu buat tiga langganan.
Untuk membuat topik, jalankan
perintah gcloud pubsub topics create
sekali untuk setiap topik. Untuk informasi tentang cara memberi nama langganan, lihat
Panduan menamai topik atau langganan.
gcloud pubsub topics create TOPIC_NAME
Ganti TOPIC_NAME dengan nilai berikut, menjalankan perintah empat kali, satu kali untuk setiap topik:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Untuk membuat langganan ke topik Anda, jalankan
perintah gcloud pubsub subscriptions create
sekali untuk setiap langganan:
Buat langganan
Clickstream-inbound-sub
:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Buat langganan
Transactions-inbound-sub
:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Buat langganan
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Membuat set data dan tabel BigQuery
Buat set data BigQuery dan tabel yang dipartisi dengan skema yang sesuai untuk topik Pub/Sub Anda.
Gunakan perintah
bq mk
untuk membuat set data pertama.bq --location=US mk \ PROJECT_ID:Retail_Store
Buat set data kedua.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Gunakan pernyataan SQL CREATE TABLE untuk membuat tabel dengan skema dan data pengujian. Data pengujian memiliki satu toko dengan nilai ID
1
. Pola input sisi update lambat menggunakan tabel ini.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Membuat instance dan tabel Bigtable
Membuat instance dan tabel Bigtable. Untuk mengetahui informasi lebih lanjut mengenai pembuatan instance Bigtable, baca Membuat instance.
Jika diperlukan, jalankan perintah berikut untuk menginstal
cbt
CLI:gcloud components install cbt
Gunakan perintah
bigtable instances create
untuk membuat instance:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Ganti CLUSTER_ZONE dengan zona tempat cluster berjalan.
Gunakan perintah
cbt createtable
untuk membuat tabel:cbt -instance=aggregate-tables createtable PageView5MinAggregates
Gunakan perintah berikut untuk menambahkan grup kolom ke tabel:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Menjalankan pipeline
Gunakan Gradle untuk menjalankan pipeline streaming. Untuk melihat kode Java yang digunakan pipeline, lihat RetailDataProcessingPipeline.java.
Gunakan perintah
git clone
untuk meng-clone repositori GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Beralih ke direktori aplikasi:
cd dataflow-sample-applications/retail/retail-java-applications
Untuk menguji pipeline, jalankan perintah berikut menggunakan Gradle di shell atau terminal Anda:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Untuk menjalankan pipeline, jalankan perintah berikut menggunakan Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Lihat kode sumber pipeline di GitHub.
Membuat dan menjalankan tugas Cloud Scheduler
Buat dan jalankan tiga tugas Cloud Scheduler, satu tugas yang memublikasikan data clickstream, satu untuk data inventaris, dan satu untuk data transaksi. Langkah ini menghasilkan data sampel untuk pipeline.
Untuk membuat tugas Cloud Scheduler untuk tutorial ini, gunakan perintah
gcloud scheduler jobs create
. Langkah ini akan membuat penayang untuk data clickstream yang memublikasikan satu pesan per menit.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Untuk memulai tugas Cloud Scheduler, gunakan perintah
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Buat dan jalankan penayang serupa lainnya untuk data inventaris yang memublikasikan satu pesan setiap dua menit.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Mulai tugas Cloud Scheduler kedua.
gcloud scheduler jobs run --location=LOCATION inventory
Membuat dan menjalankan penayang ketiga untuk data transaksi yang memublikasikan satu pesan setiap dua menit.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Mulai tugas Cloud Scheduler ketiga.
gcloud scheduler jobs run --location=LOCATION transactions
Lihat hasil Anda
Melihat data yang ditulis ke tabel BigQuery Anda. Periksa hasilnya di BigQuery dengan menjalankan kueri berikut. Saat pipeline ini berjalan, Anda dapat melihat baris baru yang ditambahkan ke tabel BigQuery setiap menit.
Anda mungkin perlu menunggu sampai tabel terisi data.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Pembersihan
Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.
Menghapus project
Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project Google Cloud yang Anda buat untuk tutorial.
- Di konsol Google Cloud, buka halaman Manage resource.
- Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
- Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.
Menghapus resource satu per satu
Jika Anda ingin menggunakan kembali project tersebut, hapus resource yang Anda buat untuk tutorial.
Membersihkan resource project Google Cloud
Untuk menghapus tugas Cloud Scheduler, gunakan perintah
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Untuk menghapus langganan dan topik Pub/Sub, gunakan perintah
gcloud pubsub subscriptions delete
dangcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Untuk menghapus tabel BigQuery, gunakan perintah
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Menghapus set data BigQuery. Set data saja tidak dikenai biaya apa pun.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Untuk menghapus instance Bigtable, gunakan perintah
cbt deleteinstance
. Bucket saja tidak dikenai biaya apa pun.cbt deleteinstance aggregate-tables
Untuk menghapus bucket Cloud Storage, gunakan perintah
gcloud storage rm
. Bucket saja tidak dikenai biaya apa pun.gcloud storage rm gs://BUCKET_NAME --recursive
Mencabut kredensial
Cabut peran yang Anda berikan ke akun layanan pekerja yang dikelola pengguna. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.
gcloud auth application-default revoke
-
Opsional: Cabut kredensial dari gcloud CLI.
gcloud auth revoke
Langkah selanjutnya
- Lihat aplikasi contoh di GitHub.
- Baca postingan blog terkait Mempelajari pola Beam dengan pemrosesan Clickstream data Google Tag Manager.
- Baca cara menggunakan Pub/Sub untuk membuat dan menggunakan topik serta Menggunakan langganan.
- Baca cara menggunakan BigQuery untuk membuat set data.
- Pelajari arsitektur referensi, diagram, dan praktik terbaik tentang Google Cloud. Lihat Cloud Architecture Center kami.