このチュートリアルでは、Pub/Sub トピックとサブスクリプションから e コマースデータを変換し、そのデータを BigQuery と Bigtable に出力する Dataflow ストリーミング パイプラインを作成します。このチュートリアルでは Gradle が必要です。
このチュートリアルでは、ウェブストアから BigQuery と Bigtable にデータをストリーミングするエンドツーエンドの e コマース サンプル アプリケーションを使用します。このサンプル アプリケーションは、ストリーミング データ分析とリアルタイム人工知能(AI)を実装するための一般的なユースケースとベスト プラクティスを示しています。このチュートリアルを使用して、顧客行動に動的に反応し、イベントをリアルタイムで分析して対応する方法を学習します。このチュートリアルでは、イベントデータを保存、分析、可視化して、顧客行動に関する詳細な分析情報を得る方法について説明します。
サンプル スクリプトは GitHub からダウンロードできます。Terraform を使用してこのチュートリアルを実行するには、GitHub にあるサンプル アプリケーションの手順に沿って操作します。
目標
- 受信データを検証し、可能な部分に修正を加えます。
- クリックストリーム データを分析し、一定期間での商品あたりの閲覧回数のカウントを保持します。この情報を低レイテンシのストアに保存します。これにより、アプリケーションでこのデータを使用し、商品を閲覧したユーザー数をウェブサイトの顧客に提示できます。
トランザクション データを使用して、インベントリの順位を知らせます。
- トランザクション データを分析し、一定期間での各店舗と全店舗の両方における各商品の売上合計数を計算します。
- インベントリ データを分析して、商品ごとの受信インベントリを計算します。
- このデータを継続的にインベントリ システムに渡し、インベントリ購入の判断に使用できます。
受信データを検証し、可能な部分に修正を加えます。さらなる分析と処理を行うために、修正不能なデータをデッドレター キューに書き込みます。モニタリングとアラートのため、デッドレター キューに送信された受信データの割合を示す指標を作成します。
今後の分析と可視化で使用するために、すべての受信データを標準形式に処理し、データ ウェアハウスに保存します。
店舗の売上のトランザクション データを非正規化し、店舗所在地の緯度と経度などの情報を含めることができます。店舗 ID をキーとして使用し、BigQuery での緩やかなテーブル変更を通じて店舗情報を提供します。
データ
このアプリケーションは次のタイプのデータを処理します。
- オンライン システムから Pub/Sub に送信されるクリックストリーム データ。
- オンプレミスまたは Software as a Service(SaaS)システムから Pub/Sub に送信されるトランザクション データ。
- オンプレミスまたは SaaS システムから Pub/Sub に送信されるストックデータ。
タスクパターン
このアプリケーションには、Apache Beam SDK for Java で構築されたパイプラインに共通する次のタスクパターンが含まれています。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_ID
は、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_ID
は、実際の Google Cloud プロジェクト名に置き換えます。
-
-
Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler API を有効にします。
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Google アカウントのローカル認証情報を作成します。
gcloud auth application-default login
-
Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。EMAIL_ADDRESS
は実際のメールアドレスに置き換えます。ROLE
は、個々のロールに置き換えます。
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_ID
は、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_ID
は、実際の Google Cloud プロジェクト名に置き換えます。
-
-
Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler API を有効にします。
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Google アカウントのローカル認証情報を作成します。
gcloud auth application-default login
-
Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。EMAIL_ADDRESS
は実際のメールアドレスに置き換えます。ROLE
は、個々のロールに置き換えます。
新しいパイプライン用にユーザー管理のワーカー サービス アカウントを作成し、このサービス アカウントに必要なロールを付与します。
サービス アカウントを作成するには、
gcloud iam service-accounts create
コマンドを実行します。gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
SERVICE_ACCOUNT_ROLE
は、個々のロールに置き換えます。Google アカウントに、サービス アカウントのアクセス トークンを作成できるロールを付与します。
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- 必要に応じて、Gradle をダウンロードしてインストールします。
サンプルのソースとシンクを作成する
このセクションでは、次のものを作成します。
- 一時的な格納場所として使用する Cloud Storage バケット
- Pub/Sub を使用したストリーミング データソース
- BigQuery にデータを読み込むデータセット
- Bigtable インスタンス
Cloud Storage バケットを作成する
まず、Cloud Storage バケットを作成します。このバケットは、Dataflow パイプラインによって一時ストレージの場所として使用されます。
gcloud storage buckets create
コマンドを実行します。
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
次のように置き換えます。
- BUCKET_NAME: バケットの命名要件を満たす Cloud Storage バケットの名前。Cloud Storage のバケット名は、グローバルに一意である必要があります。
- LOCATION: バケットのロケーション。
Pub/Sub トピックとサブスクリプションを作成する
4 つの Pub/Sub トピックを作成してから、3 つのサブスクリプションを作成します。
トピックを作成するには、トピックごとに gcloud pubsub topics create
コマンドを実行します。サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。
gcloud pubsub topics create TOPIC_NAME
TOPIC_NAME を次の値に置き換え、コマンドを 4 回(トピックごとに 1 回)実行します。
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
トピックへのサブスクリプションを作成するには、サブスクリプションごとに gcloud pubsub subscriptions create
コマンドを実行します。
Clickstream-inbound-sub
サブスクリプションを作成します。gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Transactions-inbound-sub
サブスクリプションを作成します。gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Inventory-inbound-sub
サブスクリプションを作成します。gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
BigQuery のデータセットとテーブルを作成する
BigQuery データセットと、Pub/Sub トピックに適したスキーマを使用してパーティション分割テーブルを作成します。
bq mk
コマンドを使用して、最初のデータセットを作成します。bq --location=US mk \ PROJECT_ID:Retail_Store
2 つ目のデータセットを作成します。
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
CREATE TABLE SQL ステートメントでスキーマとテストデータを指定してテーブルを作成します。テストデータには、ID 値が
1
の店舗が 1 つ含まれています。このテーブルは、更新の遅い副入力パターンで使用します。bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Bigtable のインスタンスとテーブルを作成する
Bigtable のインスタンスとテーブルを作成します。Bigtable インスタンスの作成の詳細については、インスタンスを作成するをご覧ください。
必要に応じて、次のコマンドを実行して
cbt
CLI をインストールします。gcloud components install cbt
bigtable instances create
コマンドを使用して、インスタンスを作成します。gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
CLUSTER_ZONE は、クラスタが実行されるゾーンに置き換えます。
cbt createtable
コマンドを使用してテーブルを作成します。cbt -instance=aggregate-tables createtable PageView5MinAggregates
テーブルに列ファミリーを追加するには、次のコマンドを使用します。
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
パイプラインを実行する
Gradle を使用して、ストリーミング パイプラインを実行します。パイプラインが使用している Java コードを表示するには、RetailDataProcessingPipeline.java をご覧ください。
git clone
コマンドを使用して GitHub リポジトリのクローンを作成します。git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
アプリケーション ディレクトリに切り替えます。
cd dataflow-sample-applications/retail/retail-java-applications
パイプラインをテストするには、シェルまたはターミナルで Gradle を使用して、次のコマンドを実行します。
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
パイプラインを実行するには、Gradle を使用して次のコマンドを実行します。
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
GitHub のパイプラインのソースコードをご覧ください。
Cloud Scheduler ジョブを作成して実行する
3 つの Cloud Scheduler ジョブを作成して実行します。1 つはクリックストリーム データ、1 つはインベントリ データ、もう 1 つはトランザクション データをパブリッシュするジョブです。このステップでは、パイプラインのサンプルデータを生成します。
このチュートリアルの Cloud Scheduler ジョブを作成するには、
gcloud scheduler jobs create
コマンドを使用します。この手順では、1 分あたり 1 件のメッセージをパブリッシュするクリックストリーム データのパブリッシャーを作成します。gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Cloud Scheduler ジョブを開始するには、
gcloud scheduler jobs run
コマンドを使用します。gcloud scheduler jobs run --location=LOCATION clickstream
インベントリ データ用に別のパブリッシャーを作成し、2 分ごとに 1 つのメッセージとしてパブリッシュします。
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
2 番目の Cloud Scheduler ジョブを開始します。
gcloud scheduler jobs run --location=LOCATION inventory
トランザクション データ用に 3 番目のパブリッシャーを作成して実行します。これは、2 分ごとに 1 つのメッセージをパブリッシュします。
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
3 番目の Cloud Scheduler ジョブを開始します。
gcloud scheduler jobs run --location=LOCATION transactions
結果を表示する
BigQuery テーブルに書き込まれたデータを表示します。次のクエリを実行して、BigQuery の結果を確認します。このパイプラインの実行中は、1 分おきに BigQuery テーブルに新しい行が追加されます。
テーブルへのデータ入力が完了するまで待たなければならない場合があります。
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
課金を停止する最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。
- Google Cloud コンソールで、[リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
個々のリソースを削除する
プロジェクトを再利用する場合は、チュートリアル用に作成したリソースを削除します。
Google Cloud プロジェクトのリソースをクリーンアップする
Cloud Scheduler のジョブを削除するには、
gcloud scheduler jobs delete
コマンドを使用します。gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Pub/Sub サブスクリプションとトピックを削除するには、
gcloud pubsub subscriptions delete
コマンドとgcloud pubsub topics delete
コマンドを使用します。gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
BigQuery テーブルを削除するには、
bq rm
コマンドを使用します。bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
BigQuery データセットを削除します。データセットだけで料金は発生しません。
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Bigtable インスタンスを削除するには、
cbt deleteinstance
コマンドを使用します。バケットだけでは料金は発生しません。cbt deleteinstance aggregate-tables
Cloud Storage バケットを削除するには、
gcloud storage rm
コマンドを使用します。バケットだけでは料金は発生しません。gcloud storage rm gs://BUCKET_NAME --recursive
認証情報を取り消す
ユーザー管理のワーカー サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。
gcloud auth application-default revoke
-
(省略可)gcloud CLI から認証情報を取り消します。
gcloud auth revoke
次のステップ
- GitHub にあるサンプル アプリケーションを確認する。
- 関連するブログ投稿である Google タグ マネージャー データのクリックストリーム処理で Beam パターンについて学習するを確認する。
- Pub/Sub を使用してトピックを作成して使用する方法と、サブスクリプションを使用する方法を確認する。
- BigQuery を使用してデータセットを作成する方法を確認する。
- Google Cloud に関するリファレンス アーキテクチャ、図、ベスト プラクティスを確認する。Cloud アーキテクチャ センター をご覧ください。