전자상거래 스트리밍 파이프라인 만들기


이 튜토리얼에서는 Pub/Sub 주제와 구독에서 전자상거래 데이터를 변환하고 데이터를 BigQuery 및 Bigtable에 출력하는 Dataflow 스트리밍 파이프라인을 만듭니다. 이 튜토리얼에는 Gradle이 필요합니다.

이 튜토리얼에서는 웹 스토어에서 BigQuery 및 Bigtable로 데이터를 스트리밍하는 엔드 투 엔드 전자상거래 샘플 애플리케이션을 제공합니다. 이 샘플 애플리케이션은 스트리밍 데이터 분석 및 실시간 인공지능(AI) 구현을 위한 일반적인 사용 사례 및 권장사항을 보여줍니다. 이 튜토리얼을 사용하여 실시간으로 이벤트를 분석 및 응답하기 위해 고객 작업에 대해 동적으로 대응하는 방법을 알아보세요. 이 튜토리얼에서는 이벤트 데이터를 저장, 분석 및 시각화하여 고객 행동에 대한 추가 인사이트를 얻는 방법을 설명합니다.

샘플 애플리케이션은 GitHub에서 제공됩니다. Terraform을 사용하여 이 튜토리얼을 실행하려면 GitHub에서 샘플 애플리케이션에 제공된 단계를 따르세요.

목표

  • 수신 데이터를 검증하고 가능한 경우 수정사항을 적용합니다.
  • 클릭스트림 데이터를 분석하여 특정 기간 동안 제품별 조회수를 유지합니다. 이 정보를 지연 시간이 짧은 저장소에 저장합니다. 그런 후 애플리케이션이 이 제품을 확인한 사용자 수 메시지를 웹사이트 고객들에게 제공할 수 있습니다.
  • 트랜잭션 데이터를 사용하여 인벤토리 주문 정보를 제공합니다.

    • 트랜잭션 데이터를 분석하여 특정 기간 동안 매장별 및 전 세계에서 각 항목의 총 매출을 계산합니다.
    • 인벤토리 데이터를 분석하여 각 항목의 수신 인벤토리를 계산합니다.
    • 이 데이터를 지속적으로 인벤토리 시스템에 전달하여 인벤토리 구매 결정에 사용합니다.
  • 수신 데이터를 검증하고 가능한 경우 수정사항을 적용합니다. 추가 분석 및 처리를 위해 수정할 수 없는 데이터를 데드 레터 큐에 씁니다. 모니터링 및 알림에 사용할 수 있는 데드 레터 큐로 전송되는 수신 데이터의 비율을 나타내는 측정항목을 만듭니다.

  • 수신되는 모든 데이터를 표준 형식으로 처리하고 향후 분석 및 시각화를 위해 데이터 웨어하우스에 저장합니다.

  • 매장 판매에 대한 트랜잭션 데이터를 비정규화하여 매장 위치의 위도 및 경도와 같은 정보를 포함합니다. 매장 ID를 키로 사용하여 변경 속도가 느린 BigQuery의 테이블을 통해 매장 정보를 제공합니다.

데이터

애플리케이션은 다음과 같은 유형의 데이터를 처리합니다.

  • 온라인 시스템에서 Pub/Sub로 전송되는 클릭스트림 데이터
  • 온프레미스 또는 SaaS(Software as a service) 시스템에서 Pub/Sub로 전송하는 트랜잭션 데이터
  • 온프레미스 또는 SaaS 시스템에서 Pub/Sub로 전송되는 주가 데이터

태스크 패턴

애플리케이션에는 Java용 Apache Beam SDK를 사용하여 빌드된 파이프라인에 공통적인 다음 태스크 패턴이 포함됩니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. 새 파이프라인의 사용자 관리형 작업자 서비스 계정을 만들고 서비스 계정에 필요한 역할을 부여합니다.

    1. 서비스 계정을 만들려면 gcloud iam service-accounts create 명령어를 실행합니다.

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. 서비스 계정에 역할을 부여합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE을 각 개별 역할로 바꿉니다.

    3. Google 계정에 서비스 계정에 대해 액세스 토큰을 만들 수 있게 해주는 역할을 부여합니다.

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. 필요한 경우 Gradle을 다운로드하고 설치합니다.

소스 및 싱크 예시 만들기

이 섹션에서는 다음을 만드는 방법을 설명합니다.

  • 임시 스토리지 위치로 사용할 Cloud Storage 버킷
  • Pub/Sub를 사용하여 데이터 소스 스트리밍
  • BigQuery에 데이터를 로드하는 데이터 세트
  • Bigtable 인스턴스

Cloud Storage 버킷 만들기

먼저 Cloud Storage 버킷을 만듭니다. 이 버킷은 Dataflow 파이프라인에서 임시 스토리지 위치로 사용됩니다.

gcloud storage buckets create 명령어를 사용합니다.

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

다음을 바꿉니다.

  • BUCKET_NAME: 버킷 이름 지정 요구사항을 충족하는 Cloud Storage 버킷의 이름입니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.
  • LOCATION: 버킷의 위치입니다.

Pub/Sub 주제 및 구독 만들기

Pub/Sub 주제를 4개 만든 후 구독을 3개 만듭니다.

주제를 만들려면 주제마다 gcloud pubsub topics create 명령어를 한 번 실행합니다. 구독 이름 지정 방법은 주제 또는 구독 이름 지정 안내를 참조하세요.

gcloud pubsub topics create TOPIC_NAME

TOPIC_NAME을 다음 값으로 바꾸고 명령어를 주제마다 한 번씩 총 4번 실행합니다.

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

주제에 대한 구독을 만들려면 구독마다 한 번씩 gcloud pubsub subscriptions create 명령어를 실행합니다.

  1. Clickstream-inbound-sub 구독을 만듭니다.

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Transactions-inbound-sub 구독을 만듭니다.

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Inventory-inbound-sub 구독을 만듭니다.

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

BigQuery 데이터 세트 및 테이블 만들기

Pub/Sub 주제에 적절한 스키마로 BigQuery 데이터 세트와 파티션을 나눈 테이블을 만듭니다.

  1. bq mk 명령어를 사용하여 첫 번째 데이터 세트를 만듭니다.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. 두 번째 데이터 세트를 만듭니다.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. CREATE TABLE SQL 문을 사용하여 스키마 및 테스트 데이터로 테이블을 만듭니다. 테스트 데이터에는 ID 값이 1인 스토어가 하나 있습니다. 업데이트 속도가 느린 부 입력 패턴은 이 테이블을 사용합니다.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Bigtable 인스턴스 및 테이블 만들기

Bigtable 인스턴스와 테이블을 만듭니다. Bigtable 인스턴스 만들기에 대한 자세한 내용은 인스턴스 만들기를 참조하세요.

  1. 필요한 경우 다음 명령어를 실행하여 cbt CLI를 설치합니다.

    gcloud components install cbt
    
  2. bigtable instances create 명령어를 사용하여 인스턴스를 만듭니다.

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    CLUSTER_ZONE을 클러스터가 실행되는 영역으로 바꿉니다.

  3. cbt createtable 명령어를 사용하여 테이블을 만듭니다.

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. 테이블에 column family를 추가하려면 다음 명령어를 사용합니다.

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

파이프라인 실행

스트리밍 파이프라인을 실행하려면 Gradle을 사용합니다. 파이프라인에서 사용 중인 Java 코드를 보려면 RetailDataProcessingPipeline.java를 참조하세요.

  1. git clone 명령어를 사용하여 GitHub 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 애플리케이션 디렉터리로 전환합니다.

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. 파이프라인을 테스트하려면 셸 또는 터미널에서 Gradle을 사용하여 다음 명령어를 실행합니다.

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. 파이프라인을 실행하려면 Gradle을 사용하여 다음 명령어를 실행합니다.

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

GitHub의 파이프라인 소스 코드를 참조하세요.

Cloud Scheduler 작업 만들기 및 실행

클릭 스트림 데이터를 게시하는 작업, 인벤토리 데이터 작업, 트랜잭션 데이터 작업 등 3가지 Cloud Scheduler 작업을 만들고 실행합니다. 이 단계에서는 파이프라인의 샘플 데이터를 생성합니다.

  1. 이 튜토리얼에서 Cloud Scheduler 작업을 만들려면 gcloud scheduler jobs create 명령어를 사용합니다. 이 단계에서는 1분마다 메시지 1개를 게시하는 클릭 스트림 데이터를 위한 게시자를 만듭니다.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Cloud Scheduler 작업을 시작하려면 gcloud scheduler jobs run 명령어를 사용합니다.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. 2분마다 메시지 1개를 게시하는 인벤토리 데이터를 위한 다른 유사한 게시자를 만들어 실행합니다.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. 두 번째 Cloud Scheduler 작업을 시작합니다.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. 2분마다 메시지 1개를 게시하는 트랜잭션 데이터를 위한 세 번째 게시자를 만들고 실행합니다.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. 세 번째 Cloud Scheduler 작업을 시작합니다.

    gcloud scheduler jobs run --location=LOCATION transactions
    

결과 보기

BigQuery 테이블에 기록된 데이터를 봅니다. BigQuery에서 다음 쿼리를 실행하여 결과를 확인합니다. 이 파이프라인이 실행되는 동안 1분마다 BigQuery 테이블에 새 행이 추가되는 것을 확인할 수 있습니다.

테이블에 데이터가 채워질 때까지 기다려야 할 수 있습니다.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 Google Cloud 프로젝트를 삭제하는 것입니다.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

개별 리소스 삭제

프로젝트를 재사용하려면 튜토리얼용으로 만든 리소스를 삭제합니다.

Google Cloud 프로젝트 리소스 삭제

  1. Cloud Scheduler 작업을 삭제하려면 gcloud scheduler jobs delete 명령어를 사용합니다.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Pub/Sub 구독 및 주제를 삭제하려면 gcloud pubsub subscriptions deletegcloud pubsub topics delete 명령어를 사용합니다.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. BigQuery 테이블을 삭제하려면 bq rm 명령어를 사용합니다.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. BigQuery 데이터 세트를 삭제합니다. 데이터 세트만으로는 비용이 발생하지 않습니다.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Bigtable 인스턴스를 삭제하려면 cbt deleteinstance 명령어를 사용합니다. 버킷만으로는 비용이 발생하지 않습니다.

    cbt deleteinstance aggregate-tables
    
  6. Cloud Storage 버킷을 삭제하려면 gcloud storage rm 명령어를 사용합니다. 버킷만으로는 비용이 발생하지 않습니다.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

사용자 인증 정보 취소

  1. 사용자 관리형 작업자 서비스 계정에 부여한 역할을 취소합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

다음 단계