이 튜토리얼에서는 Bigtable 테이블의 변경 내역에서 가져온 데이터베이스 변경의 실시간 스트림을 위해 데이터 파이프라인을 Dataflow에 배포하는 방법을 보여줍니다. 이 파이프라인의 출력은 Cloud Storage의 일련의 파일에 기록됩니다.
음악 듣기 애플리케이션의 예시 데이터 세트가 제공됩니다. 이 튜토리얼에서는 청취한 노래를 추적하여 특정 기간 동안 상위 5개의 항목에 대해 순위를 지정합니다.
이 튜토리얼은 Google Cloud에서 코드 작성 및 데이터 파이프라인 배포에 익숙한 기술 사용자를 대상으로 합니다.
목표
이 튜토리얼에서는 다음 작업을 처리하는 방법을 보여줍니다.
- 변경 내역이 사용 설정된 Bigtable 테이블을 만듭니다.
- 변경 내역을 변환하고 출력하는 파이프라인을 Dataflow에 배포합니다.
- 데이터 파이프라인의 결과를 봅니다.
비용
이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.
이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.
시작하기 전에
- Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com cbt
CLI를 업데이트하고 설치합니다.gcloud components update gcloud components install cbt
환경 준비
코드 가져오기
샘플 코드가 있는 저장소를 클론합니다. 이전에 이 저장소를 이미 다운로드했으면 최신 버전을 가져옵니다.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
버킷 만들기
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
을 버킷 이름 요구사항을 충족하는 버킷 이름으로 바꿉니다.
Bigtable 인스턴스를 만듭니다.
이 튜토리얼의 기존 인스턴스를 사용하거나 가까운 리전의 기본 구성으로 인스턴스를 만들 수 있습니다.
테이블 만들기
샘플 애플리케이션은 사용자가 청취하는 노래를 추적하고 청취 이벤트를 Bigtable에 저장합니다. column family(cf) 1개와 column 1개(song)가 있고 row key에 사용자 ID를 사용하는 변경 내역이 사용 설정된 테이블을 만듭니다.
테이블을 만듭니다.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
다음을 바꿉니다.
- PROJECT_ID: 사용 중인 프로젝트의 ID입니다.
- BIGTABLE_INSTANCE_ID: 새 테이블을 포함할 인스턴스의 ID입니다.
파이프라인 시작
이 파이프라인은 다음을 수행하여 변경 내역을 변환합니다.
- 변경 내역 읽기
- 노래 이름 가져오기
- 노래 청취 이벤트를 N초 단위로 그룹화
- 상위 5개 노래 집계
- 결과 출력
파이프라인을 실행합니다.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
BIGTABLE_REGION을 Bigtable 인스턴스가 있는 리전의 ID(예: us-east5
)로 바꿉니다.
파이프라인 이해
다음 파이프라인의 코드 스니펫은 실행 중인 코드를 이해하는 데 도움이 됩니다.
변경 내역 읽기
이 샘플의 코드는 특정 Bigtable 인스턴스 및 테이블의 매개변수로 소스 스트림을 구성합니다.
노래 이름 가져오기
노래를 들으면 노래 이름이 column family cf
및 column qualifier song
에 기록되므로 코드는 변경 내역 변형에서 값을 추출하고 파이프라인의 다음 단계로 출력합니다.
상위 5개 노래 집계
기본 제공 Beam 함수 Count
및 Top.of
를 사용하여 현재 창에서 상위 5개 노래를 가져올 수 있습니다.
결과 출력
이 파이프라인은 결과를 표준 출력뿐만 아니라 파일에 기록합니다. 파일의 경우 쓰기를 10개 요소의 그룹 또는 1분 세그먼트로 분할합니다.
파이프라인 보기
Google Cloud 콘솔에서 Dataflow 페이지로 이동합니다.
이름이 song-rank로 시작하는 작업을 클릭합니다.
화면 하단에서 표시를 클릭하여 로그 패널을 엽니다.
작업자 로그를 클릭하여 변경 내역의 출력 로그를 모니터링합니다.
스트림 쓰기
cbt
CLI를 사용하여 다양한 사용자가 청취하는 곡 수를 song-rank
테이블에 씁니다. 이 기능은 시간에 따른 노래 스트리밍 듣기를 시뮬레이션하기 위해 몇 분 이상 작성되도록 설계되었습니다.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
출력 보기
Cloud Storage의 출력을 읽고 가장 인기 있는 곡을 확인합니다.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
출력 예시:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
삭제
이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.
프로젝트 삭제
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
개별 리소스 삭제
버킷 및 파일을 삭제합니다.
gcloud storage rm --recursive gs://BUCKET_NAME/
테이블에서 변경 내역을 사용 중지합니다.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
song-rank
테이블을 삭제합니다.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
변경 내역 파이프라인을 중지합니다.
작업을 나열하여 작업 ID를 가져옵니다.
gcloud dataflow jobs list --region=BIGTABLE_REGION
작업을 취소합니다.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
JOB_ID를 이전 명령어 뒤에 표시된 작업 ID로 바꿉니다.