Membuat pipeline streaming e-commerce


Dalam tutorial ini, Anda akan membuat pipeline streaming Dataflow yang mengubah data e-commerce dari topik Pub/Sub serta langganan dan output data ke BigQuery dan Bigtable. Tutorial ini memerlukan Gradle.

Tutorial ini menyediakan aplikasi contoh e-commerce end-to-end yang mengalirkan data dari toko web ke BigQuery dan Bigtable. Aplikasi contoh tersebut menggambarkan kasus penggunaan umum dan praktik terbaik untuk menerapkan analisis data streaming dan kecerdasan buatan (AI) real-time. Gunakan tutorial ini untuk mempelajari cara merespons tindakan pelanggan secara dinamis agar dapat menganalisis dan bereaksi terhadap peristiwa secara real-time. Tutorial ini menjelaskan cara menyimpan, menganalisis, dan memvisualisasikan data peristiwa untuk mendapatkan lebih banyak insight tentang perilaku pelanggan.

Aplikasi contoh tersedia di GitHub. Untuk menjalankan tutorial ini menggunakan Terraform, ikuti langkah-langkah yang disediakan dalam contoh aplikasi di GitHub.

Tujuan

  • Validasi data yang masuk dan perbaiki jika memungkinkan.
  • Menganalisis data clickstream untuk mempertahankan jumlah penayangan per produk dalam jangka waktu tertentu. Simpan informasi ini di penyimpanan latensi rendah. Aplikasi kemudian dapat menggunakan data tersebut untuk memberikan pesan jumlah orang yang melihat produk ini kepada pelanggan di situs.
  • Gunakan data transaksi untuk menginformasikan pemesanan inventaris:

    • Lakukan analisis data transaksi untuk menghitung total jumlah penjualan setiap item, baik menurut toko maupun secara global, selama periode tertentu.
    • Analisis data inventaris untuk menghitung inventaris yang masuk untuk setiap item.
    • Teruskan data ini ke sistem inventaris secara berkelanjutan sehingga dapat digunakan untuk keputusan pembelian inventaris.
  • Validasi data yang masuk dan perbaiki jika memungkinkan. Menuliskan data yang tidak dapat dikoreksi ke antrean mati untuk analisis dan pemrosesan tambahan. Buat metrik yang mewakili persentase data masuk yang dikirim ke antrean yang dihentikan pengirimannya dan tersedia untuk pemantauan dan pemberitahuan.

  • Proses semua data yang masuk ke dalam format standar dan simpan di data warehouse untuk digunakan di analisis dan visualisasi di masa mendatang.

  • Lakukan denormalisasi data transaksi untuk penjualan di toko agar dapat menyertakan informasi seperti lintang dan bujur lokasi toko. Berikan informasi toko melalui tabel yang berubah secara perlahan di BigQuery, menggunakan ID toko sebagai kunci.

Data

Aplikasi memproses jenis data berikut:

  • Data clickstream yang dikirim oleh sistem online ke Pub/Sub.
  • Data transaksi yang dikirim oleh sistem lokal atau software as a service (SaaS) ke Pub/Sub.
  • Data stok yang dikirim oleh sistem lokal atau SaaS ke Pub/Sub.

Pola tugas

Aplikasi ini berisi pola tugas yang umum digunakan pada pipeline yang dibuat dengan Apache Beam SDK untuk Java:

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Aktifkan API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Aktifkan API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Buat akun layanan pekerja yang dikelola pengguna untuk pipeline baru Anda dan berikan peran yang diperlukan ke akun layanan tersebut.

    1. Untuk membuat akun layanan, jalankan perintah gcloud iam service-accounts create:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Memberikan peran ke akun layanan. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Ganti SERVICE_ACCOUNT_ROLE dengan setiap peran individual.

    3. Memberi Akun Google Anda peran yang memungkinkan Anda membuat token akses untuk akun layanan:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. Jika perlu, download dan instal Gradle.

Membuat contoh sumber dan sink

Bagian ini menjelaskan cara membuat hal berikut:

  • Bucket Cloud Storage untuk digunakan sebagai lokasi penyimpanan sementara
  • Streaming sumber data menggunakan Pub/Sub
  • Set data untuk memuat data ke BigQuery
  • Instance Bigtable

Membuat bucket Cloud Storage

Mulailah dengan membuat bucket Cloud Storage. Bucket ini digunakan sebagai lokasi penyimpanan sementara oleh pipeline Dataflow.

Gunakan gcloud storage buckets createperintah:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ganti kode berikut:

  • BUCKET_NAME: nama untuk bucket Cloud Storage yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.
  • LOCATION: lokasi bucket.

Membuat langganan dan topik Pub/Sub

Buat empat topik Pub/Sub, lalu buat tiga langganan.

Untuk membuat topik, jalankan perintah gcloud pubsub topics create sekali untuk setiap topik. Untuk informasi tentang cara memberi nama langganan, lihat Panduan menamai topik atau langganan.

gcloud pubsub topics create TOPIC_NAME

Ganti TOPIC_NAME dengan nilai berikut, menjalankan perintah empat kali, satu kali untuk setiap topik:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Untuk membuat langganan ke topik Anda, jalankan perintah gcloud pubsub subscriptions create sekali untuk setiap langganan:

  1. Buat langganan Clickstream-inbound-sub:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Buat langganan Transactions-inbound-sub:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Buat langganan Inventory-inbound-sub:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Membuat set data dan tabel BigQuery

Buat set data BigQuery dan tabel yang dipartisi dengan skema yang sesuai untuk topik Pub/Sub Anda.

  1. Gunakan perintah bq mk untuk membuat set data pertama.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Buat set data kedua.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Gunakan pernyataan SQL CREATE TABLE untuk membuat tabel dengan skema dan data pengujian. Data pengujian memiliki satu toko dengan nilai ID 1. Pola input sisi update lambat menggunakan tabel ini.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Membuat instance dan tabel Bigtable

Membuat instance dan tabel Bigtable. Untuk mengetahui informasi lebih lanjut mengenai pembuatan instance Bigtable, baca Membuat instance.

  1. Jika diperlukan, jalankan perintah berikut untuk menginstal cbt CLI:

    gcloud components install cbt
    
  2. Gunakan perintah bigtable instances create untuk membuat instance:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Ganti CLUSTER_ZONE dengan zona tempat cluster berjalan.

  3. Gunakan perintah cbt createtable untuk membuat tabel:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Gunakan perintah berikut untuk menambahkan grup kolom ke tabel:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Menjalankan pipeline

Gunakan Gradle untuk menjalankan pipeline streaming. Untuk melihat kode Java yang digunakan pipeline, lihat RetailDataProcessingPipeline.java.

  1. Gunakan perintah git clone untuk meng-clone repositori GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Beralih ke direktori aplikasi:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Untuk menguji pipeline, jalankan perintah berikut menggunakan Gradle di shell atau terminal Anda:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Untuk menjalankan pipeline, jalankan perintah berikut menggunakan Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

Lihat kode sumber pipeline di GitHub.

Membuat dan menjalankan tugas Cloud Scheduler

Buat dan jalankan tiga tugas Cloud Scheduler, satu tugas yang memublikasikan data clickstream, satu untuk data inventaris, dan satu untuk data transaksi. Langkah ini menghasilkan data sampel untuk pipeline.

  1. Untuk membuat tugas Cloud Scheduler untuk tutorial ini, gunakan perintah gcloud scheduler jobs create. Langkah ini akan membuat penayang untuk data clickstream yang memublikasikan satu pesan per menit.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Untuk memulai tugas Cloud Scheduler, gunakan perintah gcloud scheduler jobs run.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Buat dan jalankan penayang serupa lainnya untuk data inventaris yang memublikasikan satu pesan setiap dua menit.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Mulai tugas Cloud Scheduler kedua.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Membuat dan menjalankan penayang ketiga untuk data transaksi yang memublikasikan satu pesan setiap dua menit.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Mulai tugas Cloud Scheduler ketiga.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Lihat hasil Anda

Melihat data yang ditulis ke tabel BigQuery Anda. Periksa hasilnya di BigQuery dengan menjalankan kueri berikut. Saat pipeline ini berjalan, Anda dapat melihat baris baru yang ditambahkan ke tabel BigQuery setiap menit.

Anda mungkin perlu menunggu sampai tabel terisi data.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

Menghapus project

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project Google Cloud yang Anda buat untuk tutorial.

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus resource satu per satu

Jika Anda ingin menggunakan kembali project tersebut, hapus resource yang Anda buat untuk tutorial.

Membersihkan resource project Google Cloud

  1. Untuk menghapus tugas Cloud Scheduler, gunakan perintah gcloud scheduler jobs delete.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Untuk menghapus langganan dan topik Pub/Sub, gunakan perintah gcloud pubsub subscriptions delete dan gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Untuk menghapus tabel BigQuery, gunakan perintah bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Menghapus set data BigQuery. Set data saja tidak dikenai biaya apa pun.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Untuk menghapus instance Bigtable, gunakan perintah cbt deleteinstance. Bucket saja tidak dikenai biaya apa pun.

    cbt deleteinstance aggregate-tables
    
  6. Untuk menghapus bucket Cloud Storage, gunakan perintah gcloud storage rm. Bucket saja tidak dikenai biaya apa pun.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Mencabut kredensial

  1. Cabut peran yang Anda berikan ke akun layanan pekerja yang dikelola pengguna. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.

    gcloud auth application-default revoke
  3. Opsional: Cabut kredensial dari gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya