Dalam tutorial ini, Anda akan membuat pipeline streaming Dataflow yang mengubah data e-commerce dari topik dan langganan Pub/Sub serta menghasilkan output data ke BigQuery dan Bigtable. Tutorial ini memerlukan Gradle.
Tutorial ini menyediakan aplikasi contoh e-commerce menyeluruh yang mengalirkan data dari webstore ke BigQuery dan Bigtable. Aplikasi contoh ini menggambarkan kasus penggunaan umum dan praktik terbaik untuk menerapkan analisis data streaming dan kecerdasan buatan (AI) real-time. Gunakan tutorial ini untuk mempelajari cara merespons tindakan pelanggan secara dinamis guna menganalisis dan bereaksi terhadap peristiwa secara real time. Tutorial ini menjelaskan cara menyimpan, menganalisis, dan memvisualisasikan data peristiwa untuk mendapatkan lebih banyak insight tentang perilaku pelanggan.
Aplikasi contoh tersedia di GitHub. Untuk menjalankan tutorial ini menggunakan Terraform, ikuti langkah-langkah yang diberikan dengan aplikasi contoh di GitHub.
Tujuan
- Validasi data yang masuk dan terapkan koreksi jika memungkinkan.
- Analisis data clickstream untuk terus menghitung jumlah tampilan per produk dalam jangka waktu tertentu. Simpan informasi ini di penyimpanan latensi rendah. Aplikasi kemudian dapat menggunakan data tersebut untuk memberikan pesan jumlah orang yang melihat produk ini kepada pelanggan di situs.
Gunakan data transaksi untuk menentukan urutan inventaris:
- Analisis data transaksi untuk menghitung jumlah total penjualan untuk setiap item, baik menurut toko maupun secara global, selama jangka waktu tertentu.
- Analisis data inventaris untuk menghitung inventaris yang masuk untuk setiap item.
- Teruskan data ini ke sistem inventaris secara berkelanjutan sehingga dapat digunakan untuk keputusan pembelian inventaris.
Validasi data yang masuk dan terapkan koreksi jika memungkinkan. Tulis data yang tidak dapat dikoreksi ke antrean dead letter untuk analisis dan pemrosesan tambahan. Buat metrik yang mewakili persentase data masuk yang dikirim ke antrean dead letter yang tersedia untuk pemantauan dan pemberitahuan.
Memproses semua data yang masuk ke dalam format standar dan menyimpannya di data warehouse untuk digunakan dalam analisis dan visualisasi di masa mendatang.
Denormalisasi data transaksi untuk penjualan di toko agar dapat menyertakan informasi seperti lintang dan bujur lokasi toko. Berikan informasi toko melalui tabel yang berubah perlahan di BigQuery, menggunakan ID toko sebagai kunci.
Data
Aplikasi memproses jenis data berikut:
- Data klik yang dikirim oleh sistem online ke Pub/Sub.
- Data transaksi yang dikirim oleh sistem lokal atau software as a service (SaaS) ke Pub/Sub.
- Data saham yang dikirim oleh sistem lokal atau SaaS ke Pub/Sub.
Pola tugas
Aplikasi ini berisi pola tugas berikut yang umum untuk pipeline yang dibuat dengan Apache Beam SDK untuk Java:
- Skema Apache Beam untuk digunakan dengan data terstruktur
JsonToRow
untuk mengonversi data JSON- Generator kode
AutoValue
untuk membuat objek Java lama (POJO) - Menambahkan data yang tidak dapat diproses ke antrean untuk analisis lebih lanjut
- Transformasi validasi data serial
DoFn.StartBundle
untuk panggilan batch mikro ke layanan eksternal- Pola input samping
Biaya
Dalam dokumen ini, Anda akan menggunakan komponen Google Cloud yang dapat ditagih berikut:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda,
gunakan kalkulator harga.
Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Buat akun layanan pekerja yang dikelola pengguna untuk pipeline baru Anda dan berikan peran yang diperlukan ke akun layanan.
Untuk membuat akun layanan, jalankan
gcloud iam service-accounts create
perintah:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Ganti
SERVICE_ACCOUNT_ROLE
dengan setiap peran individual.Memberi Akun Google Anda peran yang memungkinkan Anda membuat token akses untuk akun layanan:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Jika perlu, download dan instal Gradle.
Membuat contoh sumber dan sink
Bagian ini menjelaskan cara membuat hal berikut:
- Bucket Cloud Storage yang akan digunakan sebagai lokasi penyimpanan sementara
- Streaming sumber data menggunakan Pub/Sub
- Set data untuk memuat data ke BigQuery
- Instance Bigtable
Membuat bucket Cloud Storage
Mulailah dengan membuat bucket Cloud Storage. Bucket ini digunakan sebagai lokasi penyimpanan sementara oleh pipeline Dataflow.
Gunakan perintah gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Ganti kode berikut:
- BUCKET_NAME: nama untuk bucket Cloud Storage Anda yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.
- LOCATION: lokasi untuk bucket.
Membuat topik dan langganan Pub/Sub
Buat empat topik Pub/Sub, lalu buat tiga langganan.
Untuk membuat topik, jalankan perintah
gcloud pubsub topics create
satu kali untuk setiap topik. Untuk informasi tentang cara memberi nama langganan, lihat
Panduan untuk memberi nama topik atau langganan.
gcloud pubsub topics create TOPIC_NAME
Ganti TOPIC_NAME dengan nilai berikut, jalankan perintah empat kali, sekali untuk setiap topik:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Untuk membuat langganan ke topik Anda, jalankan perintah gcloud pubsub subscriptions create
satu kali untuk setiap langganan:
Buat langganan
Clickstream-inbound-sub
:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Buat langganan
Transactions-inbound-sub
:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Buat langganan
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Membuat set data dan tabel BigQuery
Buat set data BigQuery dan tabel yang dipartisi dengan skema yang sesuai untuk topik Pub/Sub Anda.
Gunakan perintah
bq mk
untuk membuat set data pertama.bq --location=US mk \ PROJECT_ID:Retail_Store
Buat set data kedua.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Gunakan pernyataan SQL CREATE TABLE untuk membuat tabel dengan skema dan data pengujian. Data pengujian memiliki satu toko dengan nilai ID
1
. Pola input sisi update lambat menggunakan tabel ini.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Membuat instance dan tabel Bigtable
Buat instance dan tabel Bigtable. Untuk mengetahui informasi selengkapnya tentang cara membuat instance Bigtable, lihat Membuat instance.
Jika diperlukan, jalankan perintah berikut untuk menginstal
cbt
CLI:gcloud components install cbt
Gunakan perintah
bigtable instances create
untuk membuat instance:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Ganti CLUSTER_ZONE dengan zona tempat cluster berjalan.
Gunakan perintah
cbt createtable
untuk membuat tabel:cbt -instance=aggregate-tables createtable PageView5MinAggregates
Gunakan perintah berikut untuk menambahkan grup kolom ke tabel:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Menjalankan pipeline
Gunakan Gradle untuk menjalankan pipeline streaming. Untuk melihat kode Java yang digunakan pipeline, lihat RetailDataProcessingPipeline.java.
Gunakan perintah
git clone
untuk meng-clone repositori GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Beralih ke direktori aplikasi:
cd dataflow-sample-applications/retail/retail-java-applications
Untuk menguji pipeline, di shell atau terminal, jalankan perintah berikut menggunakan Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Untuk menjalankan pipeline, jalankan perintah berikut menggunakan Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Lihat kode sumber pipeline di GitHub.
Membuat dan menjalankan tugas Cloud Scheduler
Buat dan jalankan tiga tugas Cloud Scheduler, satu yang memublikasikan data clickstream, satu untuk data inventaris, dan satu untuk data transaksi. Langkah ini menghasilkan data contoh untuk pipeline.
Untuk membuat tugas Cloud Scheduler untuk tutorial ini, gunakan perintah
gcloud scheduler jobs create
. Langkah ini akan membuat penayang untuk data clickstream yang memublikasikan satu pesan per menit.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Untuk memulai tugas Cloud Scheduler, gunakan perintah
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Buat dan jalankan penayang serupa lainnya untuk data inventaris yang memublikasikan satu pesan setiap dua menit.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Mulai tugas Cloud Scheduler kedua.
gcloud scheduler jobs run --location=LOCATION inventory
Buat dan jalankan penayang ketiga untuk data transaksi yang memublikasikan satu pesan setiap dua menit.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Mulai tugas Cloud Scheduler ketiga.
gcloud scheduler jobs run --location=LOCATION transactions
Melihat hasil
Melihat data yang ditulis ke tabel BigQuery Anda. Periksa hasilnya di BigQuery dengan menjalankan kueri berikut. Saat pipeline ini berjalan, Anda dapat melihat baris baru yang ditambahkan ke tabel BigQuery setiap menit.
Anda mungkin perlu menunggu hingga tabel terisi dengan data.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Pembersihan
Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.
Menghapus project
Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project Google Cloud yang Anda buat untuk tutorial.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Menghapus resource satu per satu
Jika Anda ingin menggunakan kembali project, hapus resource yang Anda buat untuk tutorial.
Menghapus resource project Google Cloud
Untuk menghapus tugas Cloud Scheduler, gunakan perintah
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Untuk menghapus langganan dan topik Pub/Sub, gunakan perintah
gcloud pubsub subscriptions delete
dangcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Untuk menghapus tabel BigQuery, gunakan perintah
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Hapus set data BigQuery. Set data saja tidak dikenai biaya apa pun.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Untuk menghapus instance Bigtable, gunakan perintah
cbt deleteinstance
. Bucket saja tidak dikenai biaya apa pun.cbt deleteinstance aggregate-tables
Untuk menghapus bucket Cloud Storage, gunakan perintah
gcloud storage rm
. Bucket saja tidak dikenai biaya apa pun.gcloud storage rm gs://BUCKET_NAME --recursive
Mencabut kredensial
Cabut peran yang Anda berikan ke akun layanan pekerja yang dikelola pengguna. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Langkah selanjutnya
- Lihat aplikasi contoh di GitHub.
- Baca postingan blog terkait Pelajari pola Beam dengan pemrosesan Clickstream data Google Tag Manager.
- Baca cara menggunakan Pub/Sub untuk membuat dan menggunakan topik serta untuk Menggunakan langganan.
- Baca cara menggunakan BigQuery untuk membuat set data.
- Pelajari arsitektur referensi, diagram, dan praktik terbaik tentang Google Cloud. Lihat Cloud Architecture Center kami.