이 튜토리얼에서는 Pub/Sub 주제와 구독에서 전자상거래 데이터를 변환하고 데이터를 BigQuery 및 Bigtable에 출력하는 Dataflow 스트리밍 파이프라인을 만듭니다. 이 튜토리얼에는 Gradle이 필요합니다.
이 튜토리얼에서는 웹 스토어에서 BigQuery 및 Bigtable로 데이터를 스트리밍하는 엔드 투 엔드 전자상거래 샘플 애플리케이션을 제공합니다. 이 샘플 애플리케이션은 스트리밍 데이터 분석 및 실시간 인공지능(AI) 구현을 위한 일반적인 사용 사례 및 권장사항을 보여줍니다. 이 튜토리얼을 사용하여 실시간으로 이벤트를 분석 및 응답하기 위해 고객 작업에 대해 동적으로 대응하는 방법을 알아보세요. 이 튜토리얼에서는 이벤트 데이터를 저장, 분석 및 시각화하여 고객 행동에 대한 추가 인사이트를 얻는 방법을 설명합니다.
샘플 애플리케이션은 GitHub에서 제공됩니다. Terraform을 사용하여 이 튜토리얼을 실행하려면 GitHub에서 샘플 애플리케이션에 제공된 단계를 따르세요.
목표
- 수신 데이터를 검증하고 가능한 경우 수정사항을 적용합니다.
- 클릭스트림 데이터를 분석하여 특정 기간 동안 제품별 조회수를 유지합니다. 이 정보를 지연 시간이 짧은 저장소에 저장합니다. 그런 후 애플리케이션이 이 제품을 확인한 사용자 수 메시지를 웹사이트 고객들에게 제공할 수 있습니다.
트랜잭션 데이터를 사용하여 인벤토리 주문 정보를 제공합니다.
- 트랜잭션 데이터를 분석하여 특정 기간 동안 매장별 및 전 세계에서 각 항목의 총 매출을 계산합니다.
- 인벤토리 데이터를 분석하여 각 항목의 수신 인벤토리를 계산합니다.
- 이 데이터를 지속적으로 인벤토리 시스템에 전달하여 인벤토리 구매 결정에 사용합니다.
수신 데이터를 검증하고 가능한 경우 수정사항을 적용합니다. 추가 분석 및 처리를 위해 수정할 수 없는 데이터를 데드 레터 큐에 씁니다. 모니터링 및 알림에 사용할 수 있는 데드 레터 큐로 전송되는 수신 데이터의 비율을 나타내는 측정항목을 만듭니다.
수신되는 모든 데이터를 표준 형식으로 처리하고 향후 분석 및 시각화를 위해 데이터 웨어하우스에 저장합니다.
매장 판매에 대한 트랜잭션 데이터를 비정규화하여 매장 위치의 위도 및 경도와 같은 정보를 포함합니다. 매장 ID를 키로 사용하여 변경 속도가 느린 BigQuery의 테이블을 통해 매장 정보를 제공합니다.
데이터
애플리케이션은 다음과 같은 유형의 데이터를 처리합니다.
- 온라인 시스템에서 Pub/Sub로 전송되는 클릭스트림 데이터
- 온프레미스 또는 SaaS(Software as a service) 시스템에서 Pub/Sub로 전송하는 트랜잭션 데이터
- 온프레미스 또는 SaaS 시스템에서 Pub/Sub로 전송되는 주가 데이터
태스크 패턴
애플리케이션에는 Java용 Apache Beam SDK를 사용하여 빌드된 파이프라인에 공통적인 다음 태스크 패턴이 포함됩니다.
비용
이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.
이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.
시작하기 전에
- Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다.
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
를 프로젝트 ID로 바꿉니다.EMAIL_ADDRESS
를 이메일 주소로 바꿉니다.ROLE
을 각 개별 역할로 바꿉니다.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다.
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
를 프로젝트 ID로 바꿉니다.EMAIL_ADDRESS
를 이메일 주소로 바꿉니다.ROLE
을 각 개별 역할로 바꿉니다.
새 파이프라인의 사용자 관리형 작업자 서비스 계정을 만들고 서비스 계정에 필요한 역할을 부여합니다.
서비스 계정을 만들려면
gcloud iam service-accounts create
명령어를 실행합니다.gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
서비스 계정에 역할을 부여합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
SERVICE_ACCOUNT_ROLE
을 각 개별 역할로 바꿉니다.Google 계정에 서비스 계정에 대해 액세스 토큰을 만들 수 있게 해주는 역할을 부여합니다.
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- 필요한 경우 Gradle을 다운로드하고 설치합니다.
소스 및 싱크 예시 만들기
이 섹션에서는 다음을 만드는 방법을 설명합니다.
- 임시 스토리지 위치로 사용할 Cloud Storage 버킷
- Pub/Sub를 사용하여 데이터 소스 스트리밍
- BigQuery에 데이터를 로드하는 데이터 세트
- Bigtable 인스턴스
Cloud Storage 버킷 만들기
먼저 Cloud Storage 버킷을 만듭니다. 이 버킷은 Dataflow 파이프라인에서 임시 스토리지 위치로 사용됩니다.
gcloud storage buckets create
명령어를 사용합니다.
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
다음을 바꿉니다.
- BUCKET_NAME: 버킷 이름 지정 요구사항을 충족하는 Cloud Storage 버킷의 이름입니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.
- LOCATION: 버킷의 위치입니다.
Pub/Sub 주제 및 구독 만들기
Pub/Sub 주제를 4개 만든 후 구독을 3개 만듭니다.
주제를 만들려면 주제마다 gcloud pubsub topics create
명령어를 한 번 실행합니다. 구독 이름 지정 방법은 주제 또는 구독 이름 지정 안내를 참조하세요.
gcloud pubsub topics create TOPIC_NAME
TOPIC_NAME을 다음 값으로 바꾸고 명령어를 주제마다 한 번씩 총 4번 실행합니다.
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
주제에 대한 구독을 만들려면 구독마다 한 번씩 gcloud pubsub subscriptions create
명령어를 실행합니다.
Clickstream-inbound-sub
구독을 만듭니다.gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Transactions-inbound-sub
구독을 만듭니다.gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Inventory-inbound-sub
구독을 만듭니다.gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
BigQuery 데이터 세트 및 테이블 만들기
Pub/Sub 주제에 적절한 스키마로 BigQuery 데이터 세트와 파티션을 나눈 테이블을 만듭니다.
bq mk
명령어를 사용하여 첫 번째 데이터 세트를 만듭니다.bq --location=US mk \ PROJECT_ID:Retail_Store
두 번째 데이터 세트를 만듭니다.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
CREATE TABLE SQL 문을 사용하여 스키마 및 테스트 데이터로 테이블을 만듭니다. 테스트 데이터에는 ID 값이
1
인 스토어가 하나 있습니다. 업데이트 속도가 느린 부 입력 패턴은 이 테이블을 사용합니다.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Bigtable 인스턴스 및 테이블 만들기
Bigtable 인스턴스와 테이블을 만듭니다. Bigtable 인스턴스 만들기에 대한 자세한 내용은 인스턴스 만들기를 참조하세요.
필요한 경우 다음 명령어를 실행하여
cbt
CLI를 설치합니다.gcloud components install cbt
bigtable instances create
명령어를 사용하여 인스턴스를 만듭니다.gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
CLUSTER_ZONE을 클러스터가 실행되는 영역으로 바꿉니다.
cbt createtable
명령어를 사용하여 테이블을 만듭니다.cbt -instance=aggregate-tables createtable PageView5MinAggregates
테이블에 column family를 추가하려면 다음 명령어를 사용합니다.
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
파이프라인 실행
스트리밍 파이프라인을 실행하려면 Gradle을 사용합니다. 파이프라인에서 사용 중인 Java 코드를 보려면 RetailDataProcessingPipeline.java를 참조하세요.
git clone
명령어를 사용하여 GitHub 저장소를 클론합니다.git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
애플리케이션 디렉터리로 전환합니다.
cd dataflow-sample-applications/retail/retail-java-applications
파이프라인을 테스트하려면 셸 또는 터미널에서 Gradle을 사용하여 다음 명령어를 실행합니다.
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
파이프라인을 실행하려면 Gradle을 사용하여 다음 명령어를 실행합니다.
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
GitHub의 파이프라인 소스 코드를 참조하세요.
Cloud Scheduler 작업 만들기 및 실행
클릭 스트림 데이터를 게시하는 작업, 인벤토리 데이터 작업, 트랜잭션 데이터 작업 등 3가지 Cloud Scheduler 작업을 만들고 실행합니다. 이 단계에서는 파이프라인의 샘플 데이터를 생성합니다.
이 튜토리얼에서 Cloud Scheduler 작업을 만들려면
gcloud scheduler jobs create
명령어를 사용합니다. 이 단계에서는 1분마다 메시지 1개를 게시하는 클릭 스트림 데이터를 위한 게시자를 만듭니다.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Cloud Scheduler 작업을 시작하려면
gcloud scheduler jobs run
명령어를 사용합니다.gcloud scheduler jobs run --location=LOCATION clickstream
2분마다 메시지 1개를 게시하는 인벤토리 데이터를 위한 다른 유사한 게시자를 만들어 실행합니다.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
두 번째 Cloud Scheduler 작업을 시작합니다.
gcloud scheduler jobs run --location=LOCATION inventory
2분마다 메시지 1개를 게시하는 트랜잭션 데이터를 위한 세 번째 게시자를 만들고 실행합니다.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
세 번째 Cloud Scheduler 작업을 시작합니다.
gcloud scheduler jobs run --location=LOCATION transactions
결과 보기
BigQuery 테이블에 기록된 데이터를 봅니다. BigQuery에서 다음 쿼리를 실행하여 결과를 확인합니다. 이 파이프라인이 실행되는 동안 1분마다 BigQuery 테이블에 새 행이 추가되는 것을 확인할 수 있습니다.
테이블에 데이터가 채워질 때까지 기다려야 할 수 있습니다.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
삭제
이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.
프로젝트 삭제
비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 Google Cloud 프로젝트를 삭제하는 것입니다.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
개별 리소스 삭제
프로젝트를 재사용하려면 튜토리얼용으로 만든 리소스를 삭제합니다.
Google Cloud 프로젝트 리소스 삭제
Cloud Scheduler 작업을 삭제하려면
gcloud scheduler jobs delete
명령어를 사용합니다.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Pub/Sub 구독 및 주제를 삭제하려면
gcloud pubsub subscriptions delete
및gcloud pubsub topics delete
명령어를 사용합니다.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
BigQuery 테이블을 삭제하려면
bq rm
명령어를 사용합니다.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
BigQuery 데이터 세트를 삭제합니다. 데이터 세트만으로는 비용이 발생하지 않습니다.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Bigtable 인스턴스를 삭제하려면
cbt deleteinstance
명령어를 사용합니다. 버킷만으로는 비용이 발생하지 않습니다.cbt deleteinstance aggregate-tables
Cloud Storage 버킷을 삭제하려면
gcloud storage rm
명령어를 사용합니다. 버킷만으로는 비용이 발생하지 않습니다.gcloud storage rm gs://BUCKET_NAME --recursive
사용자 인증 정보 취소
사용자 관리형 작업자 서비스 계정에 부여한 역할을 취소합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
선택사항: 만든 사용자 인증 정보를 취소하고 로컬 사용자 인증 정보 파일을 삭제합니다.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
다음 단계
- GitHub의 샘플 애플리케이션 보기
- 관련 블로그 게시물 Google 태그 관리자 데이터의 클릭 스트림 처리를 통한 Beam 패턴 알아보기 읽어보기
- Pub/Sub를 사용하여 주제를 만들고 사용하고 구독을 사용하는 방법 읽어보기
- BigQuery를 사용하여 데이터 세트를 만드는 방법을 알아보기
- Google Cloud에 대한 참조 아키텍처, 다이어그램, 권장사항 살펴보기. Cloud 아키텍처 센터 살펴보기