이 문서에서는 Apache Beam BigQuery I/O 커넥터를 사용하여 Dataflow에서 BigQuery로 데이터를 기록하는 방법을 설명합니다.
BigQuery I/O 커넥터는 Apache Beam SDK에서 제공됩니다. 최신 SDK 버전을 사용하는 것이 좋습니다. 자세한 내용은 Apache Beam 2.x SDK를 참조하세요.
Python용 교차 언어 지원도 제공됩니다.
개요
BigQuery I/O 커넥터는 BigQuery에 쓰기에 대해 다음 메서드를 지원합니다.
STORAGE_WRITE_API
. 이 모드에서는 커넥터가 BigQuery Storage Write API를 사용해서 BigQuery 스토리지에 직접 쓰기를 수행합니다. Storage Write API는 스트리밍 수집과 일괄 로드를 단일 고성능 API로 결합합니다. 이 모드는 정확히 한 번의 시맨틱스를 보장합니다.STORAGE_API_AT_LEAST_ONCE
. 이 모드도 Storage Write API를 사용하지만 최소 한 번 이상의 시맨틱스를 제공합니다. 이 모드는 대부분의 파이프라인에서 지연 시간을 낮춰줍니다. 하지만 중복 쓰기가 발생할 수 있습니다.FILE_LOADS
. 이 모드에서는 커넥터가 Cloud Storage의 스테이징 파일에 입력 데이터를 씁니다. 그런 후 BigQuery 로드 작업을 실행해서 데이터를 BigQuery에 로드합니다. 이 모드는 일괄 파이프라인에서 가장 일반적으로 발견되는 바인드된PCollections
의 기본값입니다.STREAMING_INSERTS
. 이 모드에서는 커넥터가 레거시 스트리밍 API를 사용합니다. 이 모드는 바인드되지 않은PCollections
의 기본값이지만 새 프로젝트에서는 권장되지 않습니다.
쓰기 메서드를 선택할 때는 다음 항목을 고려하세요.
- 스트리밍 작업의 경우
STORAGE_WRITE_API
또는STORAGE_API_AT_LEAST_ONCE
를 사용하는 것이 좋습니다. 이러한 모드는 중간 스테이징 파일을 사용하지 않고 BigQuery 스토리지에 직접 쓰기 때문입니다. - 적어도 한 번 스트리밍 모드를 사용하여 파이프라인을 실행하는 경우 쓰기 모드를
STORAGE_API_AT_LEAST_ONCE
로 설정합니다. 이 설정은 보다 효율적이며 적어도 한 번 스트리밍 모드의 시맨틱스와 일치합니다. - 파일 로드와 Storage Write API는 할당량 및 한도가 서로 다릅니다.
- 로드 작업에는 공유된 BigQuery 슬롯 풀 또는 예약된 슬롯이 사용됩니다. 예약된 슬롯을 사용하려면
PIPELINE
유형의 예약 할당을 사용해서 프로젝트에서 로드 작업을 실행합니다. 공유 BigQuery 슬롯 풀을 사용할 때는 로드 작업이 무료입니다. 하지만 BigQuery에서 공유 풀에 사용 가능한 용량이 보장되지 않습니다. 자세한 내용은 예약 소개를 참조하세요.
동시 로드
스트리밍 파이프라인의
FILE_LOADS
및STORAGE_WRITE_API
에 대해 커넥터는 데이터를 여러 파일 또는 스트림으로 샤딩합니다. 일반적으로 자동 샤딩을 사용 설정하려면withAutoSharding
을 호출하는 것이 좋습니다.일괄 파이프라인의
FILE_LOADS
에서는 커넥터가 파티션을 나눈 파일에 데이터를 쓴 다음 BigQuery에 동시에 로드합니다.일괄 파이프라인의
STORAGE_WRITE_API
에서는 각 작업자가 총 샤드 수에 따라 BigQuery에 쓸 스트림을 하나 이상 만듭니다.STORAGE_API_AT_LEAST_ONCE
에는 기본 쓰기 스트림 하나가 있습니다. 여러 작업자가 이 스트림에 추가됩니다.
성능
다음 표에서는 다양한 BigQuery I/O 읽기 옵션의 성능 측정항목을 보여줍니다. 워크로드는 자바용 Apache Beam SDK 2.49.0을 사용해 하나의 e2-standard2
작업자에서 실행되었습니다. Runner v2를 사용하지 않았습니다.
레코드 1억 건 | 1KB | 열 1개 | 처리량(바이트) | 처리량(요소) |
---|---|---|
스토리지 쓰기 | 55Mbps | 초당 요소 54,000개 |
Avro 로드 | 78MBps | 초당 요소 77,000개 |
Json 로드 | 54MBps | 초당 요소 53,000개 |
이러한 측정항목은 단순 배치 파이프라인을 기반으로 합니다. 이러한 측정항목은 I/O 커넥터 사이의 성능 비교를 위해 사용되며 반드시 실제 파이프라인을 나타내지는 않습니다. Dataflow 파이프라인 성능은 복잡하며 VM 유형, 처리 중인 데이터, 외부 소스 및 싱크의 성능, 사용자 코드와 상관관계가 있습니다. 측정항목은 Java SDK 실행을 기반으로 하며 다른 언어 SDK의 성능 특성을 나타내지 않습니다. 자세한 내용은 Beam IO 성능을 참조하세요.
권장사항
이 섹션에서는 Dataflow에서 BigQuery에 쓰기 위한 권장사항을 설명합니다.
일반적인 고려사항
Storage Write API에는 할당량 한도가 있습니다. 커넥터는 대부분의 파이프라인에서 이러한 한도를 처리합니다. 하지만 일부 시나리오에서는 사용 가능한 Storage Write API 스트림이 소진될 수 있습니다. 예를 들어, 대상이 많은 자동 샤딩 및 자동 확장을 사용하는 파이프라인, 특히 워크로드가 매우 가변적인 장기 실행 작업에서 이러한 문제가 발생할 수 있습니다. 이 문제가 발생하면 문제를 막아주는
STORAGE_WRITE_API_AT_LEAST_ONCE
의 사용을 고려하세요.Google Cloud 측정항목을 사용하여 Storage Write API 할당량 사용량을 모니텅합니다.
파일 로드를 사용하는 경우 일반적으로 Avro가 JSON보다 성능이 우수합니다. Avro를 사용하려면
withAvroFormatFunction
을 호출합니다.기본적으로 로드 작업은 Dataflow 작업과 동일한 프로젝트에서 실행됩니다. 다른 프로젝트를 지정하려면
withLoadJobProjectId
를 호출합니다.Java SDK를 사용하는 경우 BigQuery 테이블의 스키마를 나타내는 클래스를 만드는 것이 좋습니다. 그런 후 파이프라인에서
useBeamSchema
를 호출하여 Apache BeamRow
와 BigQueryTableRow
유형 간에 자동으로 변환을 수행합니다. 스키마 클래스 예시는ExampleModel.java
를 참조하세요.수천 개의 필드가 포함된 복잡한 스키마가 사용된 테이블을 로드하는 경우
withMaxBytesPerPartition
을 호출하여 각 로드 작업의 최대 크기를 더 작게 설정하는 것이 좋습니다.
스트리밍 파이프라인
다음 권장사항은 스트리밍 파이프라인에 적용됩니다.
스트리밍 파이프라인의 경우 Storage Write API(
STORAGE_WRITE_API
또는STORAGE_API_AT_LEAST_ONCE
)를 사용하는 것이 좋습니다.스트리밍 파이프라인은 파일 로드를 사용할 수 있지만 이 방식에는 다음과 같은 단점이 있습니다.
가능하면
STORAGE_WRITE_API_AT_LEAST_ONCE
를 사용하는 것이 좋습니다. BigQuery에 기록되는 레코드가 중복될 수 있지만 비용이 높지 않고STORAGE_WRITE_API
보다 확장성이 뛰어납니다.일반적으로는
STREAMING_INSERTS
를 사용하지 않는 것이 좋습니다. 스트리밍 삽입은 Storage Write API보다 비용이 높으며 성능이 좋지 않습니다.데이터 샤딩은 스트리밍 파이프라인의 성능을 향상시킬 수 있습니다. 대부분의 파이프라인에서는 자동 샤딩으로 시작하는 것이 좋습니다. 하지만 다음과 같이 샤딩을 조정할 수 있습니다.
STORAGE_WRITE_API
의 경우withNumStorageWriteApiStreams
를 호출하여 쓰기 스트림 수를 설정합니다.FILE_LOADS
의 경우withNumFileShards
를 호출하여 파일 샤드 수를 설정합니다.
스트리밍 삽입을 사용하는 경우
retryTransientErrors
를 재시도 정책으로 설정하는 것이 좋습니다.
일괄 파이프라인
다음 권장사항은 일괄 파이프라인에 적용됩니다.
대부분의 대규모 일괄 파이프라인의 경우 먼저
FILE_LOADS
를 시도하는 것이 좋습니다. 일괄 파이프라인은STORAGE_WRITE_API
를 사용할 수 있지만 대규모(vCPU 1,000개 이상)인 경우 또는 동시 파이프라인이 실행 중인 경우 할당량 한도를 초과할 가능성이 높습니다. Apache Beam은 일괄STORAGE_WRITE_API
작업의 최대 쓰기 스트림 수를 제한하지 않으므로 작업이 결국 BigQuery Storage API 한도에 도달합니다.FILE_LOADS
를 사용하면 공유 BigQuery 슬롯 풀이나 예약된 슬롯 풀이 소진될 수 있습니다. 이러한 종류의 오류가 발생하면 다음 방법을 시도해 보세요.- 작업의 최대 작업자 수 또는 작업자 크기를 줄입니다.
- 예약된 슬롯을 더 많이 구입합니다.
STORAGE_WRITE_API
사용을 고려합니다.
중소 규모의 파이프라인(vCPU 1,000개 미만)에는
STORAGE_WRITE_API
가 유용할 수 있습니다. 이러한 소규모 작업의 경우 데드 레터 큐가 필요하거나FILE_LOADS
공유 슬롯 풀이 충분하지 않으면STORAGE_WRITE_API
를 사용하는 것이 좋습니다.중복 데이터를 허용할 수 있는 경우
STORAGE_WRITE_API_AT_LEAST_ONCE
를 사용하는 것이 좋습니다. 이 모드를 사용하면 BigQuery에 중복 레코드가 기록될 수 있지만STORAGE_WRITE_API
옵션보다 비용이 저렴할 수 있습니다.쓰기 모드마다 파이프라인의 특성에 따라 다르게 작동할 수 있습니다. 실험을 통해 워크로드에 가장 적합한 쓰기 모드를 찾으세요.
행 수준 오류 처리
이 섹션에서는 잘못된 형식의 입력 데이터 또는 스키마 불일치 등으로 인해 행 수준에서 발생할 수 있는 오류를 처리하는 방법을 설명합니다.
Storage Write API의 경우 기록할 수 없는 행은 별개의 PCollection
에 배치됩니다. 이 컬렉션을 가져오려면 WriteResult
객체에서 getFailedStorageApiInserts
를 호출합니다. 이 접근 방식의 예시는 BigQuery로 데이터 스트리밍을 참조하세요.
나중에 처리할 수 있도록 데드 레터 큐 또는 테이블로 오류를 전송하는 것이 좋습니다. 이 패턴에 대한 자세한 내용은 BigQueryIO
데드 레터 패턴을 참고하세요.
FILE_LOADS
의 경우 데이터를 로드하는 중 오류가 발생하면 로드 작업이 실패하고 파이프라인이 런타임 예외를 일으킵니다. Dataflow 로그에서 오류를 보거나 BigQuery 작업 기록을 확인할 수 있습니다.
I/O 커넥터는 실패한 개별 행에 대한 정보를 반환하지 않습니다.
오류 문제 해결에 대한 자세한 내용은 BigQuery 커넥터 오류를 참조하세요.
예시
다음 예시에서는 Dataflow를 사용하여 BigQuery에 쓰는 방법을 보여줍니다.
기존 테이블에 쓰기
다음 예시에서는 BigQuery에 PCollection<MyData>
를 쓰는 일괄 파이프라인을 만듭니다. 여기서 MyData
는 커스텀 데이터 유형입니다.
BigQueryIO.write()
메서드는 쓰기 작업을 구성하는 데 사용되는 BigQueryIO.Write<T>
유형을 반환합니다. 자세한 내용은 Apache Beam 문서에서 테이블에 쓰기를 참조하세요. 이 코드 예시는 기존 테이블(CREATE_NEVER
)에 쓰기를 수행하고 새 행을 테이블(WRITE_APPEND
)에 추가합니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
신규 또는 기존 테이블에 쓰기
다음 예시에서는 생성 배치를 CREATE_IF_NEEDED
로 설정하여 대상 테이블이 존재하지 않을 경우 새 테이블을 만듭니다. 이 옵션을 사용할 때는 테이블 스키마를 제공해야 합니다. 커넥터는 새 테이블을 만들 때 이 스키마를 사용합니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
BigQuery에 데이터 스트리밍
다음 예시는 쓰기 모드를 STORAGE_WRITE_API
로 설정하여 정확히 한 번의 시맨틱스를 사용해서 데이터를 스트리밍하는 방법을 보여줍니다.
모든 스트리밍 파이프라인에 정확히 한 번의 시맨틱스가 필요한 것은 아닙니다. 예를 들어 대상 테이블에서 중복 항목을 수동으로 삭제할 수도 있습니다. 중복 레코드 가능성이 시나리오에서 허용되는 경우에는 쓰기 메서드를 STORAGE_API_AT_LEAST_ONCE
로 설정하여 최소한 한 번 이상의 시맨틱스를 사용할 수 있습니다. 이 방법은 일반적으로 보다 효율적이고 대부분의 파이프라인에서 지연 시간을 줄여줍니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
다음 단계
- Apache Beam 문서에서 BigQuery I/O 커넥터 자세히 알아보기
- Storage Write API를 사용해서 BigQuery에 데이터 스트리밍 자세히 알아보기(블로그 게시물)