Dataflow를 사용하여 Avro 레코드를 BigQuery로 스트리밍

이 가이드에서는 Dataflow를 사용해서 테이블 스키마를 자동으로 생성하고 입력 요소를 변환하여 Avro SpecificRecord 객체를 BigQuery에 저장하는 방법을 설명합니다. 또한 이 가이드에서는 Avro에서 생성된 클래스를 사용하여 Dataflow 파이프라인에 있는 작업자 간 중간 데이터를 구체화하거나 전송하는 방법을 보여줍니다.

Apache Avro는 데이터 구조화를 위해 스키마를 사용하는 직렬화 시스템입니다. Avro 데이터를 읽거나 쓸 때 스키마가 항상 존재하므로, 직렬화가 빠르고 간단하게 수행됩니다. 성능 이점이 뛰어나서 메시지 브로커를 통해 분석 시스템으로 이벤트를 전송하는 앱과 같이 시스템 간 메시지 전달을 위해 일반적으로 사용됩니다. Avro 스키마를 사용해서 BigQuery 데이터 웨어하우스 스키마를 관리할 수 있습니다. Avro 스키마를 BigQuery 테이블 구조로 변환하기 위해서는 이 가이드에서 설명되는 커스텀 코드가 필요합니다.

이 가이드는 Avro 스키마를 사용해서 BigQuery 데이터 웨어하우스 스키마를 관리하는 데 관심이 있는 개발자 또는 설계자를 대상으로 합니다. 이 가이드에서는 사용자가 Avro 및 자바에 익숙하다고 가정합니다.

다음 다이어그램은 이 가이드에서 설명하는 아키텍처를 간단히 보여줍니다.

BigQuery 데이터 웨어하우스 스키마를 관리하는 Avro 스키마 아키텍처

이 가이드에서는 다음 단계가 포함된 간단한 주문 처리 시스템을 사용해서 아키텍처 패턴을 설명합니다.

  • 고객이 구매하면 온라인 앱이 이벤트를 생성합니다.
  • 주문 객체에 고유 식별자, 구매한 항목 목록, 타임스탬프가 포함됩니다.
  • Dataflow 파이프라인이 Pub/Sub 주제에서 OrderDetails SpecificRecord Avro 메시지를 읽습니다.
  • Dataflow 파이프라인이 레코드를 Cloud Storage에 Avro 파일로 기록합니다.
  • OrderDetails 클래스가 해당하는 BigQuery 스키마를 자동으로 생성합니다.
  • OrderDetails 객체가 일반 변환 함수를 사용해서 BigQuery에 기록됩니다.

목표

  • Dataflow를 사용하여 Pub/Sub 데이터 스트림에서 JSON 문자열을 수집합니다.
  • JSON 객체를 Avro에서 생성된 클래스의 객체로 변환합니다.
  • Avro 스키마에서 BigQuery 테이블 스키마를 생성합니다.
  • Avro 레코드를 Cloud Storage의 파일에 기록합니다.
  • Avro 레코드를 BigQuery에 기록합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 가이드를 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않게 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. BigQuery, Cloud Storage, and Dataflow API를 사용 설정합니다.

    API 사용 설정

  5. Cloud Console에서 Cloud Shell을 활성화합니다.

    Cloud Shell 활성화

    Cloud Console 하단에 Cloud Shell 세션이 시작되고 명령줄 프롬프트가 표시됩니다. Cloud Shell은 gcloud 명령줄 도구가 포함되고 Cloud SDK가 사전 설치된 셸 환경으로, 현재 프로젝트의 값이 이미 설정되어 있습니다. 세션이 초기화되는 데 몇 초 정도 걸릴 수 있습니다.

환경 설정

  1. Cloud Shell에서 소스 저장소를 클론합니다.

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Avro 클래스를 생성합니다.

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn generate-sources
    

    이 명령어는 orderdetails.avsc 파일을 사용하여 OrderDetails 클래스 및 OrderItems 클래스를 생성합니다. OrderDetails 클래스에는 고유 식별자, 타임스탬프, OrderItems 목록이 포함됩니다. OrderItems 클래스에는 고유 식별자, 이름, 가격이 포함됩니다. Avro 스키마는 BigQuery 테이블로 전파됩니다. 여기에서 주문 포함 행에는 OrderItem 유형의 레코드 배열이 포함됩니다. . 자세한 내용은 중첩 및 반복 열 지정을 참조하세요.

  3. env.sh 파일을 엽니다.

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  4. env.sh 파일에 이 가이드에 사용할 수 있는 사전 설정된 기본값이 포함되어 있지만 해당 환경에 맞게 이 파일을 수정할 수 있습니다.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="$GOOGLE_CLOUD_PROJECT""_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="$MY_BUCKET/""out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    `BQ_TABLE=`"`orders`"
    
    # Maximum number of Dataflow workers
    NUM_WORKERS=1
    

    다음을 바꿉니다.

    • avro-records: Pub/Sub 주제의 이름입니다.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: 클라우드 프로젝트 ID로 생성된 Cloud Storage 버킷의 이름입니다.
    • $MY_BUCKET/""out/": Avro 출력이 포함된 Cloud Storage 버킷의 경로입니다.
    • us-central1: Pub/Sub 및 Dataflow에 사용하는 리전입니다. 리전에 대한 상세 설명은 위치 및 리전을 참조하세요.
    • US: BigQuery의 리전입니다. 위치에 대한 자세한 내용은 데이터 세트 위치를 참조하세요.
    • sales: BigQuery 데이터 세트 이름입니다.
    • orders: BigQuery 테이블 이름입니다.
    • 1: Dataflow 작업자의 최대 개수입니다.
  5. 환경 변수를 설정합니다.

     . ./env.sh
    

리소스 만들기

  1. Cloud Shell에서 Pub/Sub 주제를 만듭니다.

    gcloud pubsub topics create $MY_TOPIC
    
  2. Cloud Storage 버킷을 만듭니다.

    gsutil mb -l $REGION -c regional gs://$MY_BUCKET
    

    Cloud Storage 버킷은 앱으로 생성된 원시 이벤트를 백업합니다. 이 버킷은 또한 Dataproc에서 Spark 및 Hadoop 작업을 사용하여 오프라인 분석 및 검사를 수행하기 위한 대체 소스로 사용될 수 있습니다.

  3. BigQuery 데이터 세트를 만듭니다.

    bq --location=$BQ_REGION mk --dataset $GOOGLE_CLOUD_PROJECT:$BQ_DATASET

    BigQuery 데이터 세트에는 여러 리전이 포함된 단일 리전 또는 위치에 있는 테이블과 뷰가 있습니다. 자세한 내용은 데이터 세트 만들기를 참조하세요.

Beam Dataflow 앱 시작

  1. Cloud Shell의 Dataflow 실행기에서 파이프라인을 배포하고 실행합니다.

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn compile exec:java \
        -Dexec.mainClass=com.google.cloud.solutions.beamavro.AvroToBigQuery \
        -Dexec.cleanupDaemonThreads=false \
        -Dexec.args=" \
        --project=$GOOGLE_CLOUD_PROJECT \
        --runner=DataflowRunner \
        --stagingLocation=gs://$MY_BUCKET/stage/ \
        --tempLocation=gs://$MY_BUCKET/temp/ \
        --inputPath=projects/$GOOGLE_CLOUD_PROJECT/topics/$MY_TOPIC \
        --workerMachineType=n1-standard-1 \
        --maxNumWorkers=$NUM_WORKERS \
        --region=$REGION \
        --dataset=$BQ_DATASET \
        --bqTable=$BQ_TABLE \
        --outputPath=$AVRO_OUT"
    

    출력에는 앱 ID가 포함됩니다. 이 가이드에서 나중에 필요하므로 앱 ID를 기록해 둡니다.

  2. Cloud Console에서 Dataflow로 이동합니다.

    Dataflow로 이동

  3. 파이프라인 상태를 보기 위해 앱 ID를 클릭합니다. 파이프라인 상태가 그래프로 표시됩니다.

    파이프라인 상태의 그래프입니다.

코드 검토

AvroToBigQuery.java 파일에서 필요한 매개변수와 함께 파이프라인 옵션이 명령줄 매개변수를 통해 전달됩니다. 스트리밍 모드 옵션도 사용 설정됩니다. BigQuery 테이블 스키마가 Avro 클래스 스키마에서 생성되고 나중에 BigQuery IO 클래스에서 사용됩니다.

TableSchema ts = BigQueryAvroUtils.getTableSchema(OrderDetails.SCHEMA$);

Avro 스키마 객체의 필드에 AvroUtils 클래스가 반복되고 해당 TableFieldSchema 객체가 재귀적으로 생성됩니다. 그런 후 객체가 TableSchema 객체에 래핑되고 반환됩니다.

Avro 입력 형식의 경우 Pub/Sub에서 객체가 읽혀집니다. 입력 형식이 JSON이면 이벤트를 읽고 Avro 객체로 변환합니다.

private static PCollection<OrderDetails> getInputCollection(
    Pipeline pipeline, String inputPath, FORMAT format) {
  if (format == FORMAT.JSON) {
    // Transform JSON to Avro
    return pipeline
        .apply("Read JSON from PubSub", PubsubIO.readStrings().fromTopic(inputPath))
        .apply("To binary", ParDo.of(new JSONToAvro()));
  } else {
    // Read Avro
    return pipeline.apply(
        "Read Avro from PubSub", PubsubIO.readAvros(OrderDetails.class).fromTopic(inputPath));
  }
}

파이프라인이 분기됩니다. Write to Cloud Storage 변환은 복합 변환입니다. 창에서 10초 동안 레코드를 수집한 후 AvroIO 작성자를 사용하여 레코드를 Cloud Storage의 Avro 파일에 기록합니다.

ods.apply(
    "Write to GCS",
    new AvroWriter()
        .withOutputPath(options.getOutputPath())
        .withRecordType(OrderDetails.class));

Write to BigQuery 변환은 레코드를 BigQuery 테이블에 기록합니다.

ods.apply(
    "Write to BigQuery",
    BigQueryIO.write()
        .to(bqStr)
        .withSchema(ts)
        .withWriteDisposition(WRITE_APPEND)
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withFormatFunction(TABLE_ROW_PARSER));

BigQueryIO 변환은 TABLE_ROW_PARSER 메서드를 사용해서 TableRow 객체로 변환하는 방식으로 Avro 객체를 BigQuery로 변환합니다. 파서는 Apache Beam 프로젝트의 테스트 클래스를 기반으로 빌드된 BigQueryAvroUtils 클래스의 convertSpecificRecordToTableRow 메서드를 호출합니다. method는 Avro 필드를 재귀적으로 파싱하고 이를 TableRow 객체에 추가합니다.

private static TableRow convertSpecificRecordToTableRow(
    SpecificRecord record, List<TableFieldSchema> fields) {
  TableRow row = new TableRow();
  for (TableFieldSchema subSchema : fields) {
    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
    // is required, so it may not be null.
    Field field = record.getSchema().getField(subSchema.getName());
    if (field == null || field.name() == null) {
      continue;
    }
    Object convertedValue = getTypedCellValue(field.schema(), subSchema, record.get(field.pos()));
    if (convertedValue != null) {
      // To match the JSON files exported by BigQuery, do not include null values in the output.
      row.set(field.name(), convertedValue);
    }
  }

  return row;
}

다음 표에서는 BigQuery 데이터 유형과 Avro 데이터 유형 사이의 매핑을 보여줍니다. 필드의 논리적 유형으로 식별되는 DateTimestamp와 같은 유형에 유의하세요.

BigQuery Avro
STRING STRING
GEOGRAPHY STRING
BYTES BYTES
INTEGER INT
FLOAT FLOAT
FLOAT64 DOUBLE
NUMERIC BYTES
BOOLEAN BOOLEAN
INT64 LONG
TIMESTAMP LONG
DATE INT
DATETIME STRING
TIME LONG
STRUCT RECORD
REPEATED FIELD ARRAY

BigQuery에서 결과 보기

파이프라인을 테스트하기 위해 gen.py 스크립트를 시작합니다. 이 스크립트는 주문 이벤트의 생성을 시뮬레이션하고 이를 Pub/Sub 주제에 푸시합니다.

  1. Cloud Shell에서 샘플 이벤트 생성기 스크립트 디렉터리로 변경하고 스크립트를 실행합니다.

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. Cloud Console에서 BigQuery로 이동합니다.

    BigQuery로 이동

  3. 테이블 스키마를 보려면 sales 데이터 세트를 클릭한 후 orders 테이블을 선택합니다. env.sh에서 기본 환경 변수를 수정한 경우 데이터 세트 및 테이블 이름이 다를 수 있습니다.

    &#39;orders&#39; 테이블의 테이블 스키마입니다.

  4. 일부 샘플 데이터를 보려면 쿼리 편집기에서 쿼리를 실행합니다.

    SELECT * FROM sales.orders LIMIT 5
    

    샘플 데이터의 쿼리 결과입니다.

    BigQuery 테이블 스키마가 Avro 레코드에서 자동으로 생성되고 데이터가 자동으로 BigQuery 테이블 구조로 변환됩니다.

삭제

프로젝트 삭제

  1. Cloud Console에서 리소스 관리 페이지로 이동합니다.

    리소스 관리로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

개별 리소스 삭제

  1. 안내에 따라 Dataflow 작업을 중지합니다.

  2. Cloud Storage 버킷을 삭제합니다.

    gsutil rm -r gs://$MY_BUCKET
    

다음 단계