Dataflow에서 BigQuery로 쓰기

이 문서에서는 Apache Beam BigQuery I/O 커넥터를 사용하여 Dataflow에서 BigQuery로 데이터를 기록하는 방법을 설명합니다.

BigQuery I/O 커넥터는 Apache Beam SDK에서 제공됩니다. 최신 SDK 버전을 사용하는 것이 좋습니다. 자세한 내용은 Apache Beam 2.x SDK를 참조하세요.

Python용 교차 언어 지원도 제공됩니다.

개요

BigQuery I/O 커넥터는 BigQuery에 쓰기에 대해 다음 메서드를 지원합니다.

  • STORAGE_WRITE_API. 이 모드에서는 커넥터가 BigQuery Storage Write API를 사용해서 BigQuery 스토리지에 직접 쓰기를 수행합니다. Storage Write API는 스트리밍 수집과 일괄 로드를 단일 고성능 API로 결합합니다. 이 모드는 정확히 한 번의 시맨틱스를 보장합니다.
  • STORAGE_API_AT_LEAST_ONCE. 이 모드도 Storage Write API를 사용하지만 최소 한 번 이상의 시맨틱스를 제공합니다. 이 모드는 대부분의 파이프라인에서 지연 시간을 낮춰줍니다. 하지만 중복 쓰기가 발생할 수 있습니다.
  • FILE_LOADS. 이 모드에서는 커넥터가 Cloud Storage의 스테이징 파일에 입력 데이터를 씁니다. 그런 후 BigQuery 로드 작업을 실행해서 데이터를 BigQuery에 로드합니다. 이 모드는 일괄 파이프라인에서 가장 일반적으로 발견되는 바인드된 PCollections의 기본값입니다.
  • STREAMING_INSERTS. 이 모드에서는 커넥터가 레거시 스트리밍 API를 사용합니다. 이 모드는 바인드되지 않은 PCollections의 기본값이지만 새 프로젝트에서는 권장되지 않습니다.

쓰기 메서드를 선택할 때는 다음 항목을 고려하세요.

  • 스트리밍 작업의 경우 STORAGE_WRITE_API 또는 STORAGE_API_AT_LEAST_ONCE를 사용하는 것이 좋습니다. 이러한 모드는 중간 스테이징 파일을 사용하지 않고 BigQuery 스토리지에 직접 쓰기 때문입니다.
  • 적어도 한 번 스트리밍 모드를 사용하여 파이프라인을 실행하는 경우 쓰기 모드를 STORAGE_API_AT_LEAST_ONCE로 설정합니다. 이 설정은 보다 효율적이며 적어도 한 번 스트리밍 모드의 시맨틱스와 일치합니다.
  • 파일 로드와 Storage Write API는 할당량 및 한도가 서로 다릅니다.
  • 로드 작업에는 공유된 BigQuery 슬롯 풀 또는 예약된 슬롯이 사용됩니다. 예약된 슬롯을 사용하려면 PIPELINE 유형의 예약 할당을 사용해서 프로젝트에서 로드 작업을 실행합니다. 공유 BigQuery 슬롯 풀을 사용할 때는 로드 작업이 무료입니다. 하지만 BigQuery에서 공유 풀에 사용 가능한 용량이 보장되지 않습니다. 자세한 내용은 예약 소개를 참조하세요.

동시 로드

  • 스트리밍 파이프라인의 FILE_LOADSSTORAGE_WRITE_API에 대해 커넥터는 데이터를 여러 파일 또는 스트림으로 샤딩합니다. 일반적으로 자동 샤딩을 사용 설정하려면 withAutoSharding을 호출하는 것이 좋습니다.

  • 일괄 파이프라인의 FILE_LOADS에서는 커넥터가 파티션을 나눈 파일에 데이터를 쓴 다음 BigQuery에 동시에 로드합니다.

  • 일괄 파이프라인의 STORAGE_WRITE_API에서는 각 작업자가 총 샤드 수에 따라 BigQuery에 쓸 스트림을 하나 이상 만듭니다.

  • STORAGE_API_AT_LEAST_ONCE에는 기본 쓰기 스트림 하나가 있습니다. 여러 작업자가 이 스트림에 추가됩니다.

성능

다음 표에서는 다양한 BigQuery I/O 읽기 옵션의 성능 측정항목을 보여줍니다. 워크로드는 자바용 Apache Beam SDK 2.49.0을 사용해 하나의 e2-standard2 작업자에서 실행되었습니다. Runner v2를 사용하지 않았습니다.

레코드 1억 건 | 1KB | 열 1개 처리량(바이트) 처리량(요소)
스토리지 쓰기 55Mbps 초당 요소 54,000개
Avro 로드 78MBps 초당 요소 77,000개
Json 로드 54MBps 초당 요소 53,000개

이러한 측정항목은 단순 배치 파이프라인을 기반으로 합니다. 이러한 측정항목은 I/O 커넥터 사이의 성능 비교를 위해 사용되며 반드시 실제 파이프라인을 나타내지는 않습니다. Dataflow 파이프라인 성능은 복잡하며 VM 유형, 처리 중인 데이터, 외부 소스 및 싱크의 성능, 사용자 코드와 상관관계가 있습니다. 측정항목은 Java SDK 실행을 기반으로 하며 다른 언어 SDK의 성능 특성을 나타내지 않습니다. 자세한 내용은 Beam IO 성능을 참조하세요.

권장사항

이 섹션에서는 Dataflow에서 BigQuery에 쓰기 위한 권장사항을 설명합니다.

일반적인 고려사항

  • Storage Write API에는 할당량 한도가 있습니다. 커넥터는 대부분의 파이프라인에서 이러한 한도를 처리합니다. 하지만 일부 시나리오에서는 사용 가능한 Storage Write API 스트림이 소진될 수 있습니다. 예를 들어, 대상이 많은 자동 샤딩 및 자동 확장을 사용하는 파이프라인, 특히 워크로드가 매우 가변적인 장기 실행 작업에서 이러한 문제가 발생할 수 있습니다. 이 문제가 발생하면 문제를 막아주는 STORAGE_WRITE_API_AT_LEAST_ONCE의 사용을 고려하세요.

  • Google Cloud 측정항목을 사용하여 Storage Write API 할당량 사용량을 모니텅합니다.

  • 파일 로드를 사용하는 경우 일반적으로 Avro가 JSON보다 성능이 우수합니다. Avro를 사용하려면 withAvroFormatFunction을 호출합니다.

  • 기본적으로 로드 작업은 Dataflow 작업과 동일한 프로젝트에서 실행됩니다. 다른 프로젝트를 지정하려면 withLoadJobProjectId를 호출합니다.

  • Java SDK를 사용하는 경우 BigQuery 테이블의 스키마를 나타내는 클래스를 만드는 것이 좋습니다. 그런 후 파이프라인에서 useBeamSchema를 호출하여 Apache Beam Row와 BigQuery TableRow 유형 간에 자동으로 변환을 수행합니다. 스키마 클래스 예시는 ExampleModel.java를 참조하세요.

  • 수천 개의 필드가 포함된 복잡한 스키마가 사용된 테이블을 로드하는 경우 withMaxBytesPerPartition을 호출하여 각 로드 작업의 최대 크기를 더 작게 설정하는 것이 좋습니다.

스트리밍 파이프라인

다음 권장사항은 스트리밍 파이프라인에 적용됩니다.

  • 스트리밍 파이프라인의 경우 Storage Write API(STORAGE_WRITE_API 또는 STORAGE_API_AT_LEAST_ONCE)를 사용하는 것이 좋습니다.

  • 스트리밍 파이프라인은 파일 로드를 사용할 수 있지만 이 방식에는 다음과 같은 단점이 있습니다.

    • 파일을 쓰려면 윈도잉이 필요합니다. 전역 기간을 사용할 수 없습니다.
    • BigQuery는 공유 슬롯 풀을 사용할 때 최선의 방식으로 파일을 로드합니다. 레코드가 기록되는 시간과 BigQuery에서 사용 가능한 시간 사이에는 상당한 지연이 발생할 수 있습니다.
    • 잘못된 데이터 또는 스키마 불일치로 인해 로드 작업이 실패하면 전체 파이프라인이 실패합니다.
  • 가능하면 STORAGE_WRITE_API_AT_LEAST_ONCE를 사용하는 것이 좋습니다. BigQuery에 기록되는 레코드가 중복될 수 있지만 비용이 높지 않고 STORAGE_WRITE_API보다 확장성이 뛰어납니다.

  • 일반적으로는 STREAMING_INSERTS를 사용하지 않는 것이 좋습니다. 스트리밍 삽입은 Storage Write API보다 비용이 높으며 성능이 좋지 않습니다.

  • 데이터 샤딩은 스트리밍 파이프라인의 성능을 향상시킬 수 있습니다. 대부분의 파이프라인에서는 자동 샤딩으로 시작하는 것이 좋습니다. 하지만 다음과 같이 샤딩을 조정할 수 있습니다.

  • 스트리밍 삽입을 사용하는 경우 retryTransientErrors재시도 정책으로 설정하는 것이 좋습니다.

일괄 파이프라인

다음 권장사항은 일괄 파이프라인에 적용됩니다.

  • 대부분의 대규모 일괄 파이프라인의 경우 먼저 FILE_LOADS를 시도하는 것이 좋습니다. 일괄 파이프라인은 STORAGE_WRITE_API를 사용할 수 있지만 대규모(vCPU 1,000개 이상)인 경우 또는 동시 파이프라인이 실행 중인 경우 할당량 한도를 초과할 가능성이 높습니다. Apache Beam은 일괄 STORAGE_WRITE_API 작업의 최대 쓰기 스트림 수를 제한하지 않으므로 작업이 결국 BigQuery Storage API 한도에 도달합니다.

  • FILE_LOADS를 사용하면 공유 BigQuery 슬롯 풀이나 예약된 슬롯 풀이 소진될 수 있습니다. 이러한 종류의 오류가 발생하면 다음 방법을 시도해 보세요.

    • 작업의 최대 작업자 수 또는 작업자 크기를 줄입니다.
    • 예약된 슬롯을 더 많이 구입합니다.
    • STORAGE_WRITE_API 사용을 고려합니다.
  • 중소 규모의 파이프라인(vCPU 1,000개 미만)에는 STORAGE_WRITE_API가 유용할 수 있습니다. 이러한 소규모 작업의 경우 데드 레터 큐가 필요하거나 FILE_LOADS 공유 슬롯 풀이 충분하지 않으면 STORAGE_WRITE_API를 사용하는 것이 좋습니다.

  • 중복 데이터를 허용할 수 있는 경우 STORAGE_WRITE_API_AT_LEAST_ONCE를 사용하는 것이 좋습니다. 이 모드를 사용하면 BigQuery에 중복 레코드가 기록될 수 있지만 STORAGE_WRITE_API 옵션보다 비용이 저렴할 수 있습니다.

  • 쓰기 모드마다 파이프라인의 특성에 따라 다르게 작동할 수 있습니다. 실험을 통해 워크로드에 가장 적합한 쓰기 모드를 찾으세요.

행 수준 오류 처리

이 섹션에서는 잘못된 형식의 입력 데이터 또는 스키마 불일치 등으로 인해 행 수준에서 발생할 수 있는 오류를 처리하는 방법을 설명합니다.

Storage Write API의 경우 기록할 수 없는 행은 별개의 PCollection에 배치됩니다. 이 컬렉션을 가져오려면 WriteResult 객체에서 getFailedStorageApiInserts를 호출합니다. 이 접근 방식의 예시는 BigQuery로 데이터 스트리밍을 참조하세요.

나중에 처리할 수 있도록 데드 레터 큐 또는 테이블로 오류를 전송하는 것이 좋습니다. 이 패턴에 대한 자세한 내용은 BigQueryIO 데드 레터 패턴을 참조하세요.

FILE_LOADS의 경우 데이터를 로드하는 중 오류가 발생하면 로드 작업이 실패하고 파이프라인이 런타임 예외를 일으킵니다. Dataflow 로그에서 오류를 보거나 BigQuery 작업 기록을 확인할 수 있습니다. I/O 커넥터는 실패한 개별 행에 대한 정보를 반환하지 않습니다.

오류 문제 해결에 대한 자세한 내용은 BigQuery 커넥터 오류를 참조하세요.

예시

다음 예시에서는 Dataflow를 사용하여 BigQuery에 쓰는 방법을 보여줍니다.

기존 테이블에 쓰기

다음 예시에서는 BigQuery에 PCollection<MyData>를 쓰는 일괄 파이프라인을 만듭니다. 여기서 MyData는 커스텀 데이터 유형입니다.

BigQueryIO.write() 메서드는 쓰기 작업을 구성하는 데 사용되는 BigQueryIO.Write<T> 유형을 반환합니다. 자세한 내용은 Apache Beam 문서에서 테이블에 쓰기를 참조하세요. 이 코드 예시는 기존 테이블(CREATE_NEVER)에 쓰기를 수행하고 새 행을 테이블(WRITE_APPEND)에 추가합니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

신규 또는 기존 테이블에 쓰기

다음 예시에서는 생성 배치CREATE_IF_NEEDED로 설정하여 대상 테이블이 존재하지 않을 경우 새 테이블을 만듭니다. 이 옵션을 사용할 때는 테이블 스키마를 제공해야 합니다. 커넥터는 새 테이블을 만들 때 이 스키마를 사용합니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

BigQuery에 데이터 스트리밍

다음 예시는 쓰기 모드를 STORAGE_WRITE_API로 설정하여 정확히 한 번의 시맨틱스를 사용해서 데이터를 스트리밍하는 방법을 보여줍니다.

모든 스트리밍 파이프라인에 정확히 한 번의 시맨틱스가 필요한 것은 아닙니다. 예를 들어 대상 테이블에서 중복 항목을 수동으로 삭제할 수도 있습니다. 중복 레코드 가능성이 시나리오에서 허용되는 경우에는 쓰기 메서드STORAGE_API_AT_LEAST_ONCE로 설정하여 최소한 한 번 이상의 시맨틱스를 사용할 수 있습니다. 이 방법은 일반적으로 보다 효율적이고 대부분의 파이프라인에서 지연 시간을 줄여줍니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

다음 단계