Dataflow를 사용하여 Avro 레코드를 BigQuery로 스트리밍

이 가이드에서는 Dataflow를 사용해서 테이블 스키마를 자동으로 생성하고 입력 요소를 변환하여 Avro SpecificRecord 객체를 BigQuery에 저장하는 방법을 설명합니다. 또한 이 가이드에서는 Avro에서 생성된 클래스를 사용하여 Dataflow 파이프라인에 있는 작업자 간 중간 데이터를 구체화하거나 전송하는 방법을 보여줍니다.

Apache Avro는 데이터 구조화를 위해 스키마를 사용하는 직렬화 시스템입니다. Avro 데이터를 읽거나 쓸 때 스키마가 항상 존재하므로, 직렬화가 빠르고 간단하게 수행됩니다. 성능 이점이 뛰어나서 메시지 브로커를 통해 분석 시스템으로 이벤트를 전송하는 앱과 같이 시스템 간 메시지 전달을 위해 일반적으로 사용됩니다. Avro 스키마를 사용해서 BigQuery 데이터 웨어하우스 스키마를 관리할 수 있습니다. Avro 스키마를 BigQuery 테이블 구조로 변환하기 위해서는 이 가이드에서 설명되는 커스텀 코드가 필요합니다.

이 가이드는 Avro 스키마를 사용해서 BigQuery 데이터 웨어하우스 스키마를 관리하는 데 관심이 있는 개발자 또는 설계자를 대상으로 합니다. 이 가이드에서는 사용자가 Avro 및 자바에 익숙하다고 가정합니다.

다음 다이어그램은 이 가이드에서 설명하는 아키텍처를 간단히 보여줍니다.

BigQuery 데이터 웨어하우스 스키마를 관리하는 Avro 스키마 아키텍처

이 가이드에서는 다음 단계가 포함된 간단한 주문 처리 시스템을 사용해서 아키텍처 패턴을 설명합니다.

  • 고객이 구매하면 온라인 앱이 이벤트를 생성합니다.
  • 주문 객체에 고유 식별자, 구매한 항목 목록, 타임스탬프가 포함됩니다.
  • Dataflow 파이프라인이 Pub/Sub 주제에서 OrderDetails SpecificRecord Avro 메시지를 읽습니다.
  • Dataflow 파이프라인이 레코드를 Cloud Storage에 Avro 파일로 기록합니다.
  • OrderDetails 클래스가 해당하는 BigQuery 스키마를 자동으로 생성합니다.
  • OrderDetails 객체가 일반 변환 함수를 사용해서 BigQuery에 기록됩니다.

목표

  • Dataflow를 사용하여 Pub/Sub 데이터 스트림에서 JSON 문자열을 수집합니다.
  • JSON 객체를 Avro에서 생성된 클래스의 객체로 변환합니다.
  • Avro 스키마에서 BigQuery 테이블 스키마를 생성합니다.
  • Avro 레코드를 Cloud Storage의 파일에 기록합니다.
  • Avro 레코드를 BigQuery에 기록합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 튜토리얼을 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않게 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. API BigQuery, Cloud Storage, and Dataflow 사용 설정

    API 사용 설정

  5. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  6. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  7. API BigQuery, Cloud Storage, and Dataflow 사용 설정

    API 사용 설정

  8. 콘솔에서 Cloud Shell을 활성화합니다.

    Cloud Shell 활성화

    콘솔 하단에 Cloud Shell 세션이 시작되고 명령줄 프롬프트가 표시됩니다. Cloud Shell은 Google Cloud CLI가 사전 설치된 셸 환경으로, 현재 프로젝트의 값이 이미 설정되어 있습니다. 세션이 초기화되는 데 몇 초 정도 걸릴 수 있습니다.

환경 설정

  1. Cloud Shell에서 소스 저장소를 클론합니다.

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. env.sh 파일을 엽니다.

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  3. env.sh 파일에 이 가이드에 사용할 수 있는 사전 설정된 기본값이 포함되어 있지만 해당 환경에 맞게 이 파일을 수정할 수 있습니다.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="${GOOGLE_CLOUD_PROJECT}_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="${MY_BUCKET}/out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    BQ_TABLE="orders"
    

    다음을 바꿉니다.

    • avro-records: Pub/Sub 주제의 이름입니다.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: 클라우드 프로젝트 ID로 생성된 Cloud Storage 버킷의 이름입니다.
    • $MY_BUCKET/""out/": Avro 출력이 포함된 Cloud Storage 버킷의 경로입니다.
    • us-central1: Pub/Sub 및 Dataflow에 사용하는 리전입니다. 리전에 대한 상세 설명은 위치 및 리전을 참조하세요.
    • US: BigQuery의 리전입니다. 위치에 대한 자세한 내용은 데이터 세트 위치를 참조하세요.
    • sales: BigQuery 데이터 세트 이름입니다.
    • orders: BigQuery 테이블 이름입니다.
    • 1: Dataflow 작업자의 최대 개수입니다.
  4. 환경 변수를 설정합니다.

     . ./env.sh
    

리소스 만들기

  1. Cloud Shell에서 Pub/Sub 주제를 만듭니다.

    gcloud pubsub topics create "${MY_TOPIC}"
    
  2. Cloud Storage 버킷을 만듭니다.

    gsutil mb -l "${REGION}" -c regional "gs://${MY_BUCKET}"
    

    Cloud Storage 버킷은 앱으로 생성된 원시 이벤트를 백업합니다. 이 버킷은 또한 Dataproc에서 Spark 및 Hadoop 작업을 사용하여 오프라인 분석 및 검사를 수행하기 위한 대체 소스로 사용될 수 있습니다.

  3. BigQuery 데이터 세트를 만듭니다.

    bq --location="${BQ_REGION}" mk --dataset "${GOOGLE_CLOUD_PROJECT}:${BQ_DATASET}"
    

    BigQuery 데이터 세트에는 여러 리전이 포함된 단일 리전 또는 위치에 있는 테이블과 뷰가 있습니다. 자세한 내용은 데이터 세트 만들기를 참조하세요.

Beam Dataflow 앱 시작

  1. Cloud Shell의 Dataflow 실행기에서 파이프라인을 배포하고 실행합니다.

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    
    mvn clean package
    
    java -cp target/BeamAvro-bundled-1.0-SNAPSHOT.jar \
    com.google.cloud.solutions.beamavro.AvroToBigQuery \
    --runner=DataflowRunner \
    --project="${GOOGLE_CLOUD_PROJECT}" \
    --stagingLocation="gs://${MY_BUCKET}/stage/" \
    --tempLocation="gs://${MY_BUCKET}/temp/" \
    --inputPath="projects/${GOOGLE_CLOUD_PROJECT}/topics/${MY_TOPIC}" \
    --workerMachineType=n1-standard-1 \
    --region="${REGION}" \
    --dataset="${BQ_DATASET}" \
    --bigQueryTable="${BQ_TABLE}" \
    --outputPath="gs://${MY_BUCKET}/out/" \
    --jsonFormat=false \
    --avroSchema="$(<../orderdetails.avsc)"
    

    출력에는 앱 ID가 포함됩니다. 이 가이드에서 나중에 필요하므로 앱 ID를 기록해 둡니다.

  2. 콘솔에서 Dataflow로 이동합니다.

    Dataflow로 이동

  3. 파이프라인 상태를 보기 위해 앱 ID를 클릭합니다. 파이프라인 상태가 그래프로 표시됩니다.

    파이프라인 상태의 그래프입니다.

코드 검토

AvroToBigQuery.java 파일에서 필요한 매개변수와 함께 파이프라인 옵션이 명령줄 매개변수를 통해 전달됩니다. 스트리밍 모드 옵션도 사용 설정됩니다. BigQuery 테이블 스키마가 BigQuery IOBeam Schema를 사용하여 Avro 스키마에서 자동으로 생성됩니다.

Avro 입력 형식의 경우 Pub/Sub에서 객체가 읽혀집니다. 입력 형식이 JSON이면 이벤트를 읽고 Avro 객체로 변환합니다.

Schema avroSchema = new Schema.Parser().parse(options.getAvroSchema());

if (options.getJsonFormat()) {
  return input
      .apply("Read Json", PubsubIO.readStrings().fromTopic(options.getInputPath()))
      .apply("Make GenericRecord", MapElements.via(JsonToAvroFn.of(avroSchema)));
} else {
  return input.apply("Read GenericRecord", PubsubIO.readAvroGenericRecords(avroSchema)
      .fromTopic(options.getInputPath()));
}

파이프라인은 별도의 두 개의 쓰기를 수행하기 위해 분기됩니다.

BigQueryIO는 Beam 스키마를 사용하여 Avro 객체를 내부적으로 TableRow 객체로 변환하여 BigQuery에 씁니다. BigQuery 데이터 유형과 Avro 데이터 유형 간의 매핑을 참조하세요.

BigQuery에서 결과 보기

파이프라인을 테스트하기 위해 gen.py 스크립트를 시작합니다. 이 스크립트는 주문 이벤트의 생성을 시뮬레이션하고 이를 Pub/Sub 주제에 푸시합니다.

  1. Cloud Shell에서 샘플 이벤트 생성기 스크립트 디렉터리로 변경하고 스크립트를 실행합니다.

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. 콘솔에서 BigQuery로 이동합니다.

    BigQuery로 이동

  3. 테이블 스키마를 보려면 sales 데이터 세트를 클릭한 후 orders 테이블을 선택합니다. env.sh에서 기본 환경 변수를 수정한 경우 데이터 세트 및 테이블 이름이 다를 수 있습니다.

    &#39;orders&#39; 테이블의 테이블 스키마입니다.

  4. 일부 샘플 데이터를 보려면 쿼리 편집기에서 쿼리를 실행합니다.

    SELECT * FROM sales.orders LIMIT 5
    

    샘플 데이터의 쿼리 결과입니다.

    BigQuery 테이블 스키마가 Avro 레코드에서 자동으로 생성되고 데이터가 자동으로 BigQuery 테이블 구조로 변환됩니다.

삭제

프로젝트 삭제

  1. 콘솔에서 리소스 관리 페이지로 이동합니다.

    리소스 관리로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

개별 리소스 삭제

  1. 안내에 따라 Dataflow 작업을 중지합니다.

  2. Cloud Storage 버킷을 삭제합니다.

    gsutil rm -r gs://$MY_BUCKET
    

다음 단계