BigQuery에서 Dataflow에 읽기

이 문서에서는 Apache Beam BigQuery I/O 커넥터를 사용하여 BigQuery에서 Dataflow로 데이터를 읽는 방법을 설명합니다.

개요

BigQuery I/O 커넥터는 BigQuery에서 읽기를 위한 두 가지 옵션을 지원합니다.

  • 직접 테이블 읽기. 이 옵션은 BigQuery Storage Read API를 사용하므로 가장 빠릅니다.
  • 내보내기 작업 이 옵션을 사용하면 BigQuery가 테이블 데이터를 Cloud Storage에 쓰는 내보내기 작업을 실행합니다. 그런 다음 커넥터가 Cloud Storage에서 내보낸 데이터를 읽습니다. 이 옵션은 내보내기 단계가 필요하므로 효율적이지 않습니다.

내보내기 작업은 기본 옵션입니다. 직접 읽기를 지정하려면 withMethod(Method.DIRECT_READ)를 호출합니다.

커넥터가 테이블 데이터를 PCollection으로 직렬화합니다. PCollection의 각 요소는 단일 테이블 행을 나타냅니다. 커넥터는 다음 직렬화 방법을 지원합니다.

동시 로드

이 커넥터의 동시 로드는 읽기 메서드에 따라 다릅니다.

  • 직접 읽기: I/O 커넥터가 내보내기 요청의 크기에 따라 동적 스트림 수를 생성합니다. BigQuery에서 바로 이러한 스트림을 동시에 읽습니다.

  • 내보내기 작업: BigQuery에서 Cloud Storage에 쓸 파일 수를 결정합니다. 파일 수는 쿼리와 데이터 양에 따라 다릅니다. I/O 커넥터가 내보낸 파일을 동시에 읽습니다.

성능

다음 표는 다양한 BigQuery I/O 읽기 옵션의 성능 측정항목을 보여줍니다. 워크로드는 자바용 Apache Beam SDK 2.49.0을 사용해 하나의 e2-standard2 작업자에서 실행되었습니다. Runner v2를 사용하지 않았습니다.

레코드 1억 건 | 1KB | 열 1개 처리량(바이트) 처리량(요소)
스토리지 읽기 120MBps 초당 요소 88,000개
Avro 내보내기 105MBps 초당 요소 78,000개
Json 내보내기 110MBps 초당 요소 81,000개

이러한 측정항목은 간단한 일괄 파이프라인을 기반으로 합니다. 이는 I/O 커넥터 간 성능을 비교하기 위한 것이며 실제 파이프라인을 나타내지는 않습니다. Dataflow 파이프라인 성능은 복잡하며 VM 유형, 처리 중인 데이터, 외부 소스 및 싱크의 성능, 사용자 코드와 상관관계가 있습니다. 측정항목은 Java SDK 실행을 기반으로 하며 다른 언어 SDK의 성능 특성을 나타내지 않습니다. 자세한 내용은 Beam IO 성능을 참조하세요.

권장사항

  • 일반적으로 직접 테이블 읽기(Method.DIRECT_READ)를 사용하는 것이 좋습니다. Storage Read API는 데이터 내보내기의 중간 단계가 필요하지 않으므로 내보내기 작업보다 데이터 파이프라인에 더 적합합니다.

  • 직접 읽기를 사용하는 경우 Storage Read API 사용량에 대한 요금이 부과됩니다. BigQuery 가격 책정 페이지의 데이터 추출 가격 책정을 참조하세요.

  • 내보내기 작업에는 추가 요금이 청구되지 않습니다. 그러나 내보내기 작업에는 한도가 있습니다. 시의성이 우선순위이고 비용을 조정할 수 있는 대규모 데이터 이동의 경우 직접 읽기를 사용하는 것이 좋습니다.

  • Storage Read API에는 할당량 한도가 있습니다. Google Cloud 측정항목을 사용하여 할당량 사용량을 모니터링합니다.

  • Storage Read API를 사용할 때 다음과 같이 로그에 임대 만료 및 세션 제한 시간 오류가 표시될 수 있습니다.

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session

    이러한 오류는 일반적으로 6시간 넘게 실행되는 파이프라인에서 작업이 제한 시간보다 오래 걸릴 때 발생할 수 있습니다. 이 문제를 해결하려면 파일 내보내기로 전환합니다.

  • Java SDK를 사용하는 경우 BigQuery 테이블의 스키마를 나타내는 클래스를 만드는 것이 좋습니다. 그런 다음 파이프라인에서 useBeamSchema를 호출하여 Apache Beam Row과 BigQuery TableRow 유형 간에 자동으로 변환합니다. 스키마 클래스의 예시는 ExampleModel.java를 참조하세요.

예시

이 섹션의 코드 예시에서는 직접 테이블 읽기를 사용합니다.

내보내기 작업을 대신 사용하려면 withMethod에 대한 호출을 생략하거나 Method.EXPORT를 지정합니다. 그런 후 --tempLocation 파이프라인 옵션을 설정하여 내보낸 파일의 Cloud Storage 버킷을 지정합니다.

이 코드 예시에서는 소스 테이블에 다음 열이 있다고 가정합니다.

  • name(문자열)
  • age(정수)

JSON 스키마 파일로 지정됩니다.

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Avro 형식의 레코드 읽기

BigQuery 데이터를 Avro 형식의 레코드로 읽으려면 read(SerializableFunction) 메서드를 사용합니다. 이 메서드는 SchemaAndRecord 객체를 파싱하고 커스텀 데이터 유형을 반환하는 애플리케이션 정의 함수를 사용합니다. 커넥터의 출력은 커스텀 데이터 유형의 PCollection입니다.

다음 코드는 MyData가 애플리케이션 정의 클래스인 BigQuery 테이블에서 PCollection<MyData>를 읽습니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

read 메서드는 Avro 레코드에서 커스텀 데이터 클래스로 변환할 함수를 정의하는 SerializableFunction<SchemaAndRecord, T> 인터페이스를 사용합니다. 앞의 코드 예시에서 MyData.apply 메서드는 이 변환 함수를 구현합니다. 이 예시 함수는 Avro 레코드에서 nameage 필드를 파싱하고 MyData 인스턴스를 반환합니다.

읽을 BigQuery 테이블을 지정하려면 이전 예시와 같이 from 메서드를 호출합니다. 자세한 내용은 BigQuery I/O 커넥터 문서의 테이블 이름을 참조하세요.

TableRow 객체 읽기

readTableRows 메서드는 BigQuery 데이터를 TableRow 객체의 PCollection으로 읽습니다. 각 TableRow는 테이블 데이터의 단일 행을 포함하는 키-값 쌍의 매핑입니다. from 메서드를 호출하여 읽을 BigQuery 테이블을 지정합니다.

다음 코드는 BigQuery 테이블에서 PCollection<TableRows>을 읽습니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

이 예시에서는 TableRow 사전에서 값에 액세스하는 방법도 보여줍니다. 정수 값은 BigQuery의 내보낸 JSON 형식과 일치하도록 문자열로 인코딩됩니다.

열 투영 및 필터링

직접 읽기(Method.DIRECT_READ)를 사용할 때는 BigQuery에서 읽어 네트워크를 통해 전송하는 데이터의 양을 줄여 읽기 작업을 더 효율적으로 만들 수 있습니다.

  • 열 투영: withSelectedFields를 호출하여 테이블에서 열의 하위 집합을 읽습니다. 이렇게 하면 테이블에 많은 열이 포함되어 있을 때 효율적으로 읽을 수 있습니다.
  • 행 필터링: withRowRestriction을 호출하여 서버 측에서 데이터를 필터링하는 조건자를 지정합니다.

필터 조건자는 확정적이어야 하며 집계는 지원되지 않습니다.

다음 예시에서는 "user_name""age" 열을 투영하고 "age > 18" 조건자와 일치하지 않는 행을 필터링합니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

쿼리 결과에서 읽기

이전 예는 테이블에서 행을 읽는 방법을 보여줍니다. fromQuery를 호출하여 SQL 쿼리 결과를 읽을 수도 있습니다. 이 접근법은 일부 컴퓨팅 작업을 BigQuery로 옮깁니다. 뷰에 대해 쿼리를 실행하여 BigQuery 뷰 또는 구체화된 뷰로부터 읽기를 위해 이 메서드를 사용할 수도 있습니다.

다음 예시에서는 BigQuery 공개 데이터 세트에 대해 쿼리를 실행하고 결과를 읽습니다. 파이프라인이 실행된 다음 BigQuery 작업 기록에서 쿼리 작업을 볼 수 있습니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

다음 단계