Bigtable 변경 내역 처리


이 튜토리얼에서는 Bigtable 테이블의 변경 내역에서 가져온 데이터베이스 변경의 실시간 스트림을 위해 데이터 파이프라인을 Dataflow에 배포하는 방법을 보여줍니다. 이 파이프라인의 출력은 Cloud Storage의 일련의 파일에 기록됩니다.

음악 듣기 애플리케이션의 예시 데이터 세트가 제공됩니다. 이 튜토리얼에서는 청취한 노래를 추적하여 특정 기간 동안 상위 5개의 항목에 대해 순위를 지정합니다.

이 튜토리얼은 Google Cloud에서 코드 작성 및 데이터 파이프라인 배포에 익숙한 기술 사용자를 대상으로 합니다.

목표

이 튜토리얼에서는 다음 작업을 처리하는 방법을 보여줍니다.

  • 변경 내역이 사용 설정된 Bigtable 테이블을 만듭니다.
  • 변경 내역을 변환하고 출력하는 파이프라인을 Dataflow에 배포합니다.
  • 데이터 파이프라인의 결과를 봅니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud CLI를 설치합니다.
  3. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage API를 사용 설정합니다.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Google Cloud CLI를 설치합니다.
  8. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  11. Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage API를 사용 설정합니다.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. cbt CLI를 업데이트하고 설치합니다.
    gcloud components update
    gcloud components install cbt
    

환경 준비

코드 가져오기

샘플 코드가 있는 저장소를 클론합니다. 이전에 이 저장소를 이미 다운로드한 경우 가져와서 최신 버전을 가져옵니다.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

버킷 만들기

  • Cloud Storage 버킷 생성:
    gcloud storage buckets create gs://BUCKET_NAME
    BUCKET_NAME버킷 이름 요구사항을 충족하는 버킷 이름으로 바꿉니다.
  • Bigtable 인스턴스를 만듭니다.

    이 튜토리얼의 기존 인스턴스를 사용하거나 가까운 리전의 기본 구성으로 인스턴스를 만들 수 있습니다.

    테이블 만들기

    샘플 애플리케이션은 사용자가 청취하는 노래를 추적하고 청취 이벤트를 Bigtable에 저장합니다. column family(cf) 1개와 column 1개(song)가 있고 row key에 사용자 ID를 사용하는 변경 내역이 사용 설정된 테이블을 만듭니다.

    테이블을 만듭니다.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    다음을 바꿉니다.

    • PROJECT_ID: 사용 중인 프로젝트의 ID입니다.
    • BIGTABLE_INSTANCE_ID: 새 테이블을 포함할 인스턴스의 ID입니다.

    파이프라인 시작

    이 파이프라인은 다음을 수행하여 변경 내역을 변환합니다.

    1. 변경 내역 읽기
    2. 노래 이름 가져오기
    3. 노래 청취 이벤트를 N초 단위로 그룹화
    4. 상위 5개 노래 집계
    5. 결과 출력

    파이프라인을 실행합니다.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    BIGTABLE_REGION을 Bigtable 인스턴스가 있는 리전의 ID(예: us-east5)로 바꿉니다.

    파이프라인 이해

    다음 파이프라인의 코드 스니펫은 실행 중인 코드를 이해하는 데 도움이 됩니다.

    변경 내역 읽기

    이 샘플의 코드는 특정 Bigtable 인스턴스 및 테이블의 매개변수로 소스 스트림을 구성합니다.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    노래 이름 가져오기

    노래를 들으면 노래 이름이 column family cf 및 column qualifier song에 기록되므로 코드는 변경 내역 변형에서 값을 추출하고 파이프라인의 다음 단계로 출력합니다.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    상위 5개 노래 집계

    기본 제공 Beam 함수 CountTop.of를 사용하여 현재 창에서 상위 5개 노래를 가져올 수 있습니다.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    결과 출력

    이 파이프라인은 결과를 표준 출력뿐만 아니라 파일에 기록합니다. 파일의 경우 쓰기를 10개 요소의 그룹 또는 1분 세그먼트로 분할합니다.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    파이프라인 보기

    1. Google Cloud 콘솔에서 Dataflow 페이지로 이동합니다.

      Dataflow로 이동

    2. 이름이 song-rank로 시작하는 작업을 클릭합니다.

    3. 화면 하단에서 표시를 클릭하여 로그 패널을 엽니다.

    4. 작업자 로그를 클릭하여 변경 내역의 출력 로그를 모니터링합니다.

    스트림 쓰기

    cbt CLI를 사용하여 다양한 사용자가 청취하는 곡 수를 song-rank 테이블에 씁니다. 이 기능은 시간에 따른 노래 스트리밍 듣기를 시뮬레이션하기 위해 몇 분 이상 작성되도록 설계되었습니다.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    출력 보기

    Cloud Storage의 출력을 읽고 가장 인기 있는 곡을 확인합니다.

    gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    출력 예시:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    삭제

    이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

    프로젝트 삭제

      Google Cloud 프로젝트를 삭제합니다.

      gcloud projects delete PROJECT_ID

    개별 리소스 삭제

    1. 버킷 및 파일을 삭제합니다.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. 테이블에서 변경 내역을 사용 중지합니다.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. song-rank 테이블을 삭제합니다.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. 변경 내역 파이프라인을 중지합니다.

      1. 작업을 나열하여 작업 ID를 가져옵니다.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. 작업을 취소합니다.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        JOB_ID를 이전 명령어 뒤에 표시된 작업 ID로 바꿉니다.

    다음 단계