BigQuery에서 Dataflow에 읽기

이 문서에서는 Apache Beam BigQuery I/O 커넥터를 사용하여 BigQuery에서 Dataflow로 데이터를 읽는 방법을 설명합니다.

개요

BigQuery I/O 커넥터는 BigQuery에서 읽을 때 다음 두 가지 옵션을 지원합니다.

  • 직접 테이블 읽기. 이 옵션은 BigQuery Storage Read API를 사용하므로 가장 빠릅니다.
  • 내보내기 작업 이 옵션을 사용하면 BigQuery에서 테이블 데이터를 Cloud Storage에 쓰는 내보내기 작업을 실행합니다. 그런 다음 커넥터가 Cloud Storage에서 내보낸 데이터를 읽습니다. 이 옵션은 내보내기 단계가 필요하므로 효율성이 떨어집니다.

내보내기 작업은 기본 옵션입니다. 직접 읽기를 지정하려면 withMethod(Method.DIRECT_READ)를 호출합니다.

커넥터가 테이블 데이터를 PCollection으로 직렬화합니다. PCollection의 각 요소는 단일 테이블 행을 나타냅니다. 커넥터는 다음과 같은 직렬화 방법을 지원합니다.

동시 로드

이 커넥터의 동시 로드는 읽기 메서드에 따라 다릅니다.

  • 직접 읽기: I/O 커넥터가 내보내기 요청의 크기에 따라 동적 스트림 수를 생성합니다. BigQuery에서 바로 이러한 스트림을 동시에 읽습니다.

  • 내보내기 작업: BigQuery에서 Cloud Storage에 쓸 파일 수를 결정합니다. 파일 수는 쿼리와 데이터 양에 따라 다릅니다. I/O 커넥터가 내보낸 파일을 동시에 읽습니다.

성능

다음 표에서는 다양한 BigQuery I/O 읽기 옵션의 성능 측정항목을 보여줍니다. 워크로드는 자바용 Apache Beam SDK 2.49.0을 사용해 하나의 e2-standard2 작업자에서 실행되었습니다. Runner v2를 사용하지 않았습니다.

레코드 1억 건 | 1KB | 열 1개 처리량(바이트) 처리량(요소)
스토리지 읽기 120MBps 초당 요소 88,000개
Avro 내보내기 105MBps 초당 요소 78,000개
JSON 내보내기 110MBps 초당 요소 81,000개

이러한 측정항목은 단순 배치 파이프라인을 기반으로 합니다. 이러한 측정항목은 I/O 커넥터 사이의 성능 비교를 위해 사용되며 반드시 실제 파이프라인을 나타내지는 않습니다. Dataflow 파이프라인 성능은 복잡하며 VM 유형, 처리 중인 데이터, 외부 소스 및 싱크의 성능, 사용자 코드와 상관관계가 있습니다. 측정항목은 Java SDK 실행을 기반으로 하며 다른 언어 SDK의 성능 특성을 나타내지 않습니다. 자세한 내용은 Beam IO 성능을 참조하세요.

권장사항

  • 일반적으로 직접 테이블 읽기(Method.DIRECT_READ)를 사용하는 것이 좋습니다. Storage Read API는 데이터를 내보내는 중간 단계가 필요하지 않으므로 내보내기 작업보다 데이터 파이프라인에 더 적합합니다.

  • 직접 읽기를 사용하면 Storage Read API 사용량에 대한 요금이 청구됩니다. BigQuery 가격 책정 페이지에서 데이터 추출 가격 책정을 참고하세요.

  • 내보내기 작업에는 추가 요금이 청구되지 않습니다. 그러나 내보내기 작업에는 한도가 있습니다. 시의성이 중요하고 비용이 조정 가능한 대규모 데이터 이동의 경우 직접 읽기를 권장합니다.

  • Storage Read API에는 할당량 한도가 있습니다. Google Cloud 측정항목을 사용하여 할당량 사용량을 모니터링합니다.

  • Storage Read API를 사용하면 로그에 다음과 같은 임대 기간 만료 및 세션 제한 시간 오류가 표시될 수 있습니다.

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session

    이러한 오류는 작업이 제한 시간보다 오래 걸릴 때 발생할 수 있으며, 일반적으로 6시간 넘게 실행되는 파이프라인에서 발생합니다. 이 문제를 완화하려면 파일 내보내기로 전환하세요.

예시

이 섹션의 코드 예에서는 직접 테이블 읽기를 사용합니다.

내보내기 작업을 대신 사용하려면 withMethod에 대한 호출을 생략하거나 Method.EXPORT를 지정합니다. 그런 다음 --tempLocation 파이프라인 옵션을 설정하여 내보낸 파일의 Cloud Storage 버킷을 지정합니다.

이 코드 예시에서는 소스 테이블에 다음 열이 있다고 가정합니다.

  • name(문자열)
  • age(정수)

JSON 스키마 파일로 지정됩니다.

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Avro 형식 레코드 읽기

BigQuery 데이터를 Avro 형식 레코드로 읽으려면 read(SerializableFunction) 메서드를 사용하세요. 이 메서드는 SchemaAndRecord 객체를 파싱하고 맞춤 데이터 유형을 반환하는 애플리케이션 정의 함수를 사용합니다. 커넥터의 출력은 커스텀 데이터 유형의 PCollection입니다.

다음 코드는 MyData가 애플리케이션 정의 클래스인 BigQuery 테이블에서 PCollection<MyData>를 읽습니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

read 메서드는 Avro 레코드에서 커스텀 데이터 클래스로 변환하는 함수를 정의하는 SerializableFunction<SchemaAndRecord, T> 인터페이스를 사용합니다. 이전 코드 예에서 MyData.apply 메서드는 이 변환 함수를 구현합니다. 이 예시 함수는 Avro 레코드에서 nameage 필드를 파싱하고 MyData 인스턴스를 반환합니다.

읽을 BigQuery 테이블을 지정하려면 이전 예시와 같이 from 메서드를 호출합니다. 자세한 내용은 BigQuery I/O 커넥터 문서의 테이블 이름을 참고하세요.

TableRow 객체 읽기

readTableRows 메서드는 BigQuery 데이터를 TableRow 객체의 PCollection로 읽습니다. 각 TableRow는 테이블 데이터의 단일 행을 포함하는 키-값 쌍의 매핑입니다. from 메서드를 호출하여 읽을 BigQuery 테이블을 지정합니다.

다음 코드는 BigQuery 테이블에서 PCollection<TableRows>을 읽습니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

이 예시에서는 TableRow 사전에서 값에 액세스하는 방법도 보여줍니다. 정수 값은 BigQuery의 내보낸 JSON 형식과 일치하도록 문자열로 인코딩됩니다.

열 투영 및 필터링

직접 읽기(Method.DIRECT_READ)를 사용할 때는 BigQuery에서 읽어 네트워크를 통해 전송하는 데이터의 양을 줄여 읽기 작업을 더 효율적으로 만들 수 있습니다.

  • 열 투영: withSelectedFields를 호출하여 테이블에서 열의 하위 집합을 읽습니다. 이렇게 하면 테이블에 열이 많을 때 효율적으로 읽을 수 있습니다.
  • 행 필터링: withRowRestriction을 호출하여 서버 측에서 데이터를 필터링하는 조건자를 지정합니다.

필터 조건자는 결정론적이어야 하며 집계는 지원되지 않습니다.

다음 예시에서는 "user_name""age" 열을 투영하고 "age > 18" 조건자와 일치하지 않는 행을 필터링합니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

쿼리 결과에서 읽기

이전 예에서는 테이블에서 행을 읽는 방법을 보여줍니다. fromQuery를 호출하여 SQL 쿼리의 결과에서 읽을 수도 있습니다. 이 접근 방식은 일부 계산 작업을 BigQuery로 이동합니다. 이 메서드를 사용하여 뷰에 대해 쿼리를 실행하여 BigQuery 뷰 또는 구체화된 뷰에서 읽을 수도 있습니다.

다음 예에서는 BigQuery 공개 데이터 세트에 대해 쿼리를 실행하고 결과를 읽습니다. 파이프라인이 실행된 다음 BigQuery 작업 기록에서 쿼리 작업을 볼 수 있습니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

다음 단계