e コマースのストリーミング パイプラインを作成する


このチュートリアルでは、Pub/Sub トピックとサブスクリプションから e コマースデータを変換し、そのデータを BigQuery と Bigtable に出力する Dataflow ストリーミング パイプラインを作成します。このチュートリアルでは Gradle が必要です。

このチュートリアルでは、ウェブストアから BigQuery と Bigtable にデータをストリーミングするエンドツーエンドの e コマース サンプル アプリケーションを使用します。このサンプル アプリケーションは、ストリーミング データ分析とリアルタイム人工知能(AI)を実装するための一般的なユースケースとベスト プラクティスを示しています。このチュートリアルを使用して、顧客行動に動的に反応し、イベントをリアルタイムで分析して対応する方法を学習します。このチュートリアルでは、イベントデータを保存、分析、可視化して、顧客行動に関する詳細な分析情報を得る方法について説明します。

サンプル スクリプトは GitHub からダウンロードできます。Terraform を使用してこのチュートリアルを実行するには、GitHub にあるサンプル アプリケーションの手順に沿って操作します。

目標

  • 受信データを検証し、可能な部分に修正を加えます。
  • クリックストリーム データを分析し、一定期間での商品あたりの閲覧回数のカウントを保持します。この情報を低レイテンシのストアに保存します。これにより、アプリケーションでこのデータを使用し、商品を閲覧したユーザー数をウェブサイトの顧客に提示できます。
  • トランザクション データを使用して、インベントリの順位を知らせます。

    • トランザクション データを分析し、一定期間での各店舗と全店舗の両方における各商品の売上合計数を計算します。
    • インベントリ データを分析して、商品ごとの受信インベントリを計算します。
    • このデータを継続的にインベントリ システムに渡し、インベントリ購入の判断に使用できます。
  • 受信データを検証し、可能な部分に修正を加えます。さらなる分析と処理を行うために、修正不能なデータをデッドレター キューに書き込みます。モニタリングとアラートのため、デッドレター キューに送信された受信データの割合を示す指標を作成します。

  • 今後の分析と可視化で使用するために、すべての受信データを標準形式に処理し、データ ウェアハウスに保存します。

  • 店舗の売上のトランザクション データを非正規化し、店舗所在地の緯度と経度などの情報を含めることができます。店舗 ID をキーとして使用し、BigQuery での緩やかなテーブル変更を通じて店舗情報を提供します。

データ

このアプリケーションは次のタイプのデータを処理します。

  • オンライン システムから Pub/Sub に送信されるクリックストリーム データ。
  • オンプレミスまたは Software as a Service(SaaS)システムから Pub/Sub に送信されるトランザクション データ。
  • オンプレミスまたは SaaS システムから Pub/Sub に送信されるストックデータ。

タスクパターン

このアプリケーションには、Apache Beam SDK for Java で構築されたパイプラインに共通する次のタスクパターンが含まれています。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler API を有効にします。

    gcloud services enable compute.googleapis.comdataflow.googleapis.compubsub.googleapis.combigquery.googleapis.combigtable.googleapis.combigtableadmin.googleapis.com cloudscheduler.googleapis.com
  7. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  8. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  9. Google Cloud CLI をインストールします。
  10. gcloud CLI を初期化するには:

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  12. Google Cloud プロジェクトで課金が有効になっていることを確認します

  13. Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler API を有効にします。

    gcloud services enable compute.googleapis.comdataflow.googleapis.compubsub.googleapis.combigquery.googleapis.combigtable.googleapis.combigtableadmin.googleapis.com cloudscheduler.googleapis.com
  14. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  15. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  16. 新しいパイプライン用にユーザー管理のワーカー サービス アカウントを作成し、このサービス アカウントに必要なロールを付与します。

    1. サービス アカウントを作成するには、gcloud iam service-accounts create コマンドを実行します。

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE は、個々のロールに置き換えます。

    3. Google アカウントに、サービス アカウントのアクセス トークンを作成できるロールを付与します。

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. 必要に応じて、Gradle をダウンロードしてインストールします。

サンプルのソースとシンクを作成する

このセクションでは、次のものを作成します。

  • 一時的な格納場所として使用する Cloud Storage バケット
  • Pub/Sub を使用したストリーミング データソース
  • BigQuery にデータを読み込むデータセット
  • Bigtable インスタンス

Cloud Storage バケットを作成する

まず、Cloud Storage バケットを作成します。このバケットは、Dataflow パイプラインによって一時ストレージの場所として使用されます。

gcloud storage buckets create コマンドを実行します。

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

次のように置き換えます。

  • BUCKET_NAME: バケットの命名要件を満たす Cloud Storage バケットの名前。Cloud Storage のバケット名は、グローバルに一意である必要があります。
  • LOCATION: バケットのロケーション

Pub/Sub トピックとサブスクリプションを作成する

4 つの Pub/Sub トピックを作成してから、3 つのサブスクリプションを作成します。

トピックを作成するには、トピックごとに gcloud pubsub topics create コマンドを実行します。サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

gcloud pubsub topics create TOPIC_NAME

TOPIC_NAME を次の値に置き換え、コマンドを 4 回(トピックごとに 1 回)実行します。

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

トピックへのサブスクリプションを作成するには、サブスクリプションごとに gcloud pubsub subscriptions create コマンドを実行します。

  1. Clickstream-inbound-sub サブスクリプションを作成します。

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Transactions-inbound-sub サブスクリプションを作成します。

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Inventory-inbound-sub サブスクリプションを作成します。

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

BigQuery のデータセットとテーブルを作成する

BigQuery データセットと、Pub/Sub トピックに適したスキーマを使用してパーティション分割テーブルを作成します。

  1. bq mk コマンドを使用して、最初のデータセットを作成します。

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. 2 つ目のデータセットを作成します。

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. CREATE TABLE SQL ステートメントでスキーマとテストデータを指定してテーブルを作成します。テストデータには、ID 値が 1 の店舗が 1 つ含まれています。このテーブルは、更新の遅い副入力パターンで使用します。

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Bigtable のインスタンスとテーブルを作成する

Bigtable のインスタンスとテーブルを作成します。Bigtable インスタンスの作成の詳細については、インスタンスを作成するをご覧ください。

  1. 必要に応じて、次のコマンドを実行して cbt CLI をインストールします。

    gcloud components install cbt
    
  2. bigtable instances create コマンドを使用して、インスタンスを作成します。

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    CLUSTER_ZONE は、クラスタが実行されるゾーンに置き換えます。

  3. cbt createtable コマンドを使用してテーブルを作成します。

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. テーブルに列ファミリーを追加するには、次のコマンドを使用します。

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

パイプラインを実行する

Gradle を使用して、ストリーミング パイプラインを実行します。パイプラインが使用している Java コードを表示するには、RetailDataProcessingPipeline.java をご覧ください。

  1. git clone コマンドを使用して GitHub リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. アプリケーション ディレクトリに切り替えます。

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. パイプラインをテストするには、シェルまたはターミナルで Gradle を使用して、次のコマンドを実行します。

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. パイプラインを実行するには、Gradle を使用して次のコマンドを実行します。

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

GitHub のパイプラインのソースコードをご覧ください。

Cloud Scheduler ジョブを作成して実行する

3 つの Cloud Scheduler ジョブを作成して実行します。1 つはクリックストリーム データ、1 つはインベントリ データ、もう 1 つはトランザクション データをパブリッシュするジョブです。このステップでは、パイプラインのサンプルデータを生成します。

  1. このチュートリアルの Cloud Scheduler ジョブを作成するには、gcloud scheduler jobs create コマンドを使用します。この手順では、1 分あたり 1 件のメッセージをパブリッシュするクリックストリーム データのパブリッシャーを作成します。

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Cloud Scheduler ジョブを開始するには、gcloud scheduler jobs run コマンドを使用します。

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. インベントリ データ用に別のパブリッシャーを作成し、2 分ごとに 1 つのメッセージとしてパブリッシュします。

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. 2 番目の Cloud Scheduler ジョブを開始します。

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. トランザクション データ用に 3 番目のパブリッシャーを作成して実行します。これは、2 分ごとに 1 つのメッセージをパブリッシュします。

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. 3 番目の Cloud Scheduler ジョブを開始します。

    gcloud scheduler jobs run --location=LOCATION transactions
    

結果を表示する

BigQuery テーブルに書き込まれたデータを表示します。次のクエリを実行して、BigQuery の結果を確認します。このパイプラインの実行中は、1 分おきに BigQuery テーブルに新しい行が追加されます。

テーブルへのデータ入力が完了するまで待たなければならない場合があります。

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

課金を停止する最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースを削除する

プロジェクトを再利用する場合は、チュートリアル用に作成したリソースを削除します。

Google Cloud プロジェクトのリソースをクリーンアップする

  1. Cloud Scheduler のジョブを削除するには、gcloud scheduler jobs delete コマンドを使用します。

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Pub/Sub サブスクリプションとトピックを削除するには、gcloud pubsub subscriptions delete コマンドと gcloud pubsub topics delete コマンドを使用します。

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. BigQuery テーブルを削除するには、bq rm コマンドを使用します。

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. BigQuery データセットを削除します。データセットだけで料金は発生しません。

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Bigtable インスタンスを削除するには、cbt deleteinstance コマンドを使用します。バケットだけでは料金は発生しません。

    cbt deleteinstance aggregate-tables
    
  6. Cloud Storage バケットを削除するには、gcloud storage rm コマンドを使用します。バケットだけでは料金は発生しません。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

認証情報を取り消す

  1. ユーザー管理のワーカー サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  3. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ