Membuat pipeline streaming e-commerce


Dalam tutorial ini, Anda akan membuat pipeline streaming Dataflow yang mengubah data e-commerce dari topik dan langganan Pub/Sub serta menghasilkan output data ke BigQuery dan Bigtable. Tutorial ini memerlukan Gradle.

Tutorial ini menyediakan aplikasi contoh e-commerce menyeluruh yang mengalirkan data dari webstore ke BigQuery dan Bigtable. Aplikasi contoh ini menggambarkan kasus penggunaan umum dan praktik terbaik untuk menerapkan analisis data streaming dan kecerdasan buatan (AI) real-time. Gunakan tutorial ini untuk mempelajari cara merespons tindakan pelanggan secara dinamis guna menganalisis dan bereaksi terhadap peristiwa secara real time. Tutorial ini menjelaskan cara menyimpan, menganalisis, dan memvisualisasikan data peristiwa untuk mendapatkan lebih banyak insight tentang perilaku pelanggan.

Aplikasi contoh tersedia di GitHub. Untuk menjalankan tutorial ini menggunakan Terraform, ikuti langkah-langkah yang diberikan dengan aplikasi contoh di GitHub.

Tujuan

  • Validasi data yang masuk dan terapkan koreksi jika memungkinkan.
  • Analisis data clickstream untuk terus menghitung jumlah tampilan per produk dalam jangka waktu tertentu. Simpan informasi ini di penyimpanan latensi rendah. Aplikasi kemudian dapat menggunakan data tersebut untuk memberikan pesan jumlah orang yang melihat produk ini kepada pelanggan di situs.
  • Gunakan data transaksi untuk menentukan urutan inventaris:

    • Analisis data transaksi untuk menghitung jumlah total penjualan untuk setiap item, baik menurut toko maupun secara global, selama jangka waktu tertentu.
    • Analisis data inventaris untuk menghitung inventaris yang masuk untuk setiap item.
    • Teruskan data ini ke sistem inventaris secara berkelanjutan sehingga dapat digunakan untuk keputusan pembelian inventaris.
  • Validasi data yang masuk dan terapkan koreksi jika memungkinkan. Tulis data yang tidak dapat dikoreksi ke antrean dead letter untuk analisis dan pemrosesan tambahan. Buat metrik yang mewakili persentase data masuk yang dikirim ke antrean dead letter yang tersedia untuk pemantauan dan pemberitahuan.

  • Memproses semua data yang masuk ke dalam format standar dan menyimpannya di data warehouse untuk digunakan dalam analisis dan visualisasi di masa mendatang.

  • Denormalisasi data transaksi untuk penjualan di toko agar dapat menyertakan informasi seperti lintang dan bujur lokasi toko. Berikan informasi toko melalui tabel yang berubah perlahan di BigQuery, menggunakan ID toko sebagai kunci.

Data

Aplikasi memproses jenis data berikut:

  • Data klik yang dikirim oleh sistem online ke Pub/Sub.
  • Data transaksi yang dikirim oleh sistem lokal atau software as a service (SaaS) ke Pub/Sub.
  • Data saham yang dikirim oleh sistem lokal atau SaaS ke Pub/Sub.

Pola tugas

Aplikasi ini berisi pola tugas berikut yang umum untuk pipeline yang dibuat dengan Apache Beam SDK untuk Java:

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloud yang dapat ditagih berikut: Google Cloud:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Buat akun layanan pekerja yang dikelola pengguna untuk pipeline baru Anda dan berikan peran yang diperlukan ke akun layanan.

    1. Untuk membuat akun layanan, jalankan gcloud iam service-accounts create perintah:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Ganti SERVICE_ACCOUNT_ROLE dengan setiap peran individual.

    3. Memberi Akun Google Anda peran yang memungkinkan Anda membuat token akses untuk akun layanan:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. Jika perlu, download dan instal Gradle.

Membuat contoh sumber dan sink

Bagian ini menjelaskan cara membuat hal berikut:

  • Bucket Cloud Storage yang akan digunakan sebagai lokasi penyimpanan sementara
  • Streaming sumber data menggunakan Pub/Sub
  • Set data untuk memuat data ke BigQuery
  • Instance Bigtable

Membuat bucket Cloud Storage

Mulailah dengan membuat bucket Cloud Storage. Bucket ini digunakan sebagai lokasi penyimpanan sementara oleh pipeline Dataflow.

Gunakan perintah gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ganti kode berikut:

  • BUCKET_NAME: nama untuk bucket Cloud Storage Anda yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.
  • LOCATION: lokasi untuk bucket.

Membuat topik dan langganan Pub/Sub

Buat empat topik Pub/Sub, lalu buat tiga langganan.

Untuk membuat topik, jalankan perintah gcloud pubsub topics create satu kali untuk setiap topik. Untuk informasi tentang cara memberi nama langganan, lihat Panduan untuk memberi nama topik atau langganan.

gcloud pubsub topics create TOPIC_NAME

Ganti TOPIC_NAME dengan nilai berikut, jalankan perintah empat kali, sekali untuk setiap topik:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Untuk membuat langganan ke topik Anda, jalankan perintah gcloud pubsub subscriptions create satu kali untuk setiap langganan:

  1. Buat langganan Clickstream-inbound-sub:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Buat langganan Transactions-inbound-sub:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Buat langganan Inventory-inbound-sub:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Membuat set data dan tabel BigQuery

Buat set data BigQuery dan tabel yang dipartisi dengan skema yang sesuai untuk topik Pub/Sub Anda.

  1. Gunakan perintah bq mk untuk membuat set data pertama.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Buat set data kedua.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Gunakan pernyataan SQL CREATE TABLE untuk membuat tabel dengan skema dan data pengujian. Data pengujian memiliki satu toko dengan nilai ID 1. Pola input sisi update lambat menggunakan tabel ini.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Membuat instance dan tabel Bigtable

Buat instance dan tabel Bigtable. Untuk mengetahui informasi selengkapnya tentang cara membuat instance Bigtable, lihat Membuat instance.

  1. Jika diperlukan, jalankan perintah berikut untuk menginstal cbt CLI:

    gcloud components install cbt
    
  2. Gunakan perintah bigtable instances create untuk membuat instance:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Ganti CLUSTER_ZONE dengan zona tempat cluster berjalan.

  3. Gunakan perintah cbt createtable untuk membuat tabel:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Gunakan perintah berikut untuk menambahkan grup kolom ke tabel:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Menjalankan pipeline

Gunakan Gradle untuk menjalankan pipeline streaming. Untuk melihat kode Java yang digunakan pipeline, lihat RetailDataProcessingPipeline.java.

  1. Gunakan perintah git clone untuk meng-clone repositori GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Beralih ke direktori aplikasi:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Untuk menguji pipeline, di shell atau terminal, jalankan perintah berikut menggunakan Gradle:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Untuk menjalankan pipeline, jalankan perintah berikut menggunakan Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

Lihat kode sumber pipeline di GitHub.

Membuat dan menjalankan tugas Cloud Scheduler

Buat dan jalankan tiga tugas Cloud Scheduler, satu yang memublikasikan data clickstream, satu untuk data inventaris, dan satu untuk data transaksi. Langkah ini menghasilkan data contoh untuk pipeline.

  1. Untuk membuat tugas Cloud Scheduler untuk tutorial ini, gunakan perintah gcloud scheduler jobs create. Langkah ini akan membuat penayang untuk data clickstream yang memublikasikan satu pesan per menit.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Untuk memulai tugas Cloud Scheduler, gunakan perintah gcloud scheduler jobs run.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Buat dan jalankan penayang serupa lainnya untuk data inventaris yang memublikasikan satu pesan setiap dua menit.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Mulai tugas Cloud Scheduler kedua.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Buat dan jalankan penayang ketiga untuk data transaksi yang memublikasikan satu pesan setiap dua menit.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Mulai tugas Cloud Scheduler ketiga.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Melihat hasil

Melihat data yang ditulis ke tabel BigQuery Anda. Periksa hasilnya di BigQuery dengan menjalankan kueri berikut. Saat pipeline ini berjalan, Anda dapat melihat baris baru yang ditambahkan ke tabel BigQuery setiap menit.

Anda mungkin perlu menunggu hingga tabel terisi dengan data.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

Menghapus project

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project Google Cloud yang Anda buat untuk tutorial.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Menghapus resource satu per satu

Jika Anda ingin menggunakan kembali project, hapus resource yang Anda buat untuk tutorial.

Membersihkan resource project Google Cloud

  1. Untuk menghapus tugas Cloud Scheduler, gunakan perintah gcloud scheduler jobs delete.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Untuk menghapus langganan dan topik Pub/Sub, gunakan perintah gcloud pubsub subscriptions delete dan gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Untuk menghapus tabel BigQuery, gunakan perintah bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Hapus set data BigQuery. Set data saja tidak dikenai biaya apa pun.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Untuk menghapus instance Bigtable, gunakan perintah cbt deleteinstance. Bucket saja tidak dikenai biaya apa pun.

    cbt deleteinstance aggregate-tables
    
  6. Untuk menghapus bucket Cloud Storage, gunakan perintah gcloud storage rm. Bucket saja tidak dikenai biaya apa pun.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Mencabut kredensial

  1. Cabut peran yang Anda berikan ke akun layanan pekerja yang dikelola pengguna. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya