이 문서에서는 Apache Beam BigQuery I/O 커넥터를 사용하여 BigQuery에서 Dataflow로 데이터를 읽는 방법을 설명합니다.
개요
BigQuery I/O 커넥터는 BigQuery에서 읽기를 수행하는 두 가지 옵션을 지원합니다.
- 직접 테이블 읽기. 이 옵션은 BigQuery Storage Read API를 사용하므로 가장 빠릅니다.
- 내보내기 작업 이 옵션을 사용하면 BigQuery에서 테이블 데이터를 Cloud Storage에 쓰는 내보내기 작업을 실행합니다. 그런 다음 커넥터가 Cloud Storage에서 내보낸 데이터를 읽습니다. 이 옵션은 내보내기 단계가 필요하므로 효율성이 떨어집니다.
내보내기 작업은 기본 옵션입니다. 직접 읽기를 지정하려면 withMethod(Method.DIRECT_READ)
를 호출합니다.
커넥터가 테이블 데이터를 PCollection
으로 직렬화합니다. PCollection
의 각 요소는 단일 테이블 행을 나타냅니다. 커넥터는 다음 직렬화 방법을 지원합니다.
- 데이터를 Avro 형식의 레코드로 읽습니다. 이 메서드를 사용하여 Avro 레코드를 커스텀 데이터 유형으로 파싱하는 함수를 제공합니다.
- 데이터를
TableRow
객체로 읽습니다. 이 방법은 커스텀 데이터 유형이 필요하지 않기 때문에 편리합니다. 하지만 일반적으로 Avro 형식 레코드를 읽을 때보다 성능이 낮습니다.
동시 로드
이 커넥터의 동시 로드는 읽기 메서드에 따라 다릅니다.
직접 읽기: I/O 커넥터가 내보내기 요청의 크기에 따라 동적 스트림 수를 생성합니다. BigQuery에서 바로 이러한 스트림을 동시에 읽습니다.
내보내기 작업: BigQuery는 Cloud Storage에 쓸 파일 수를 결정합니다. 파일 수는 쿼리와 데이터 양에 따라 다릅니다. I/O 커넥터가 내보낸 파일을 동시에 읽습니다.
성능
다음 표에서는 다양한 BigQuery I/O 읽기 옵션의 성능 측정항목을 보여줍니다. 워크로드는 자바용 Apache Beam SDK 2.49.0을 사용해 하나의 e2-standard2
작업자에서 실행되었습니다. Runner v2를 사용하지 않았습니다.
레코드 1억 건 | 1KB | 열 1개 | 처리량(바이트) | 처리량(요소) |
---|---|---|
스토리지 읽기 | 120MBps | 초당 요소 88,000개 |
Avro 내보내기 | 105MBps | 초당 요소 78,000개 |
Json 내보내기 | 110MBps | 초당 요소 81,000개 |
이러한 측정항목은 단순 배치 파이프라인을 기반으로 합니다. 이러한 측정항목은 I/O 커넥터 사이의 성능 비교를 위해 사용되며 반드시 실제 파이프라인을 나타내지는 않습니다. Dataflow 파이프라인 성능은 복잡하며 VM 유형, 처리 중인 데이터, 외부 소스 및 싱크의 성능, 사용자 코드와 상관관계가 있습니다. 측정항목은 Java SDK 실행을 기반으로 하며 다른 언어 SDK의 성능 특성을 나타내지 않습니다. 자세한 내용은 Beam IO 성능을 참조하세요.
권장사항
일반적으로 직접 테이블 읽기(
Method.DIRECT_READ
)를 사용하는 것이 좋습니다. Storage Read API는 데이터를 내보내는 중간 단계가 필요하지 않으므로 내보내기 작업보다 데이터 파이프라인에 더 적합합니다.직접 읽기를 사용하면 Storage Read API 사용량에 대한 요금이 청구됩니다. BigQuery 가격 책정 페이지에서 데이터 추출 가격 책정을 참고하세요.
내보내기 작업에는 추가 요금이 청구되지 않습니다. 그러나 내보내기 작업에는 한도가 있습니다. 시의성이 중요하고 비용을 조정할 수 있는 대규모 데이터 이동의 경우 직접 읽기를 사용하는 것이 좋습니다.
Storage Read API에는 할당량 한도가 있습니다. Google Cloud 측정항목을 사용하여 할당량 사용량을 모니터링합니다.
Storage Read API를 사용하면 로그에 다음과 같은 임대 기간 만료 및 세션 제한 시간 오류가 표시될 수 있습니다.
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
개
이러한 오류는 일반적으로 6시간 이상 실행되는 파이프라인에서 작업이 제한 시간보다 오래 걸릴 때 발생할 수 있습니다. 이 문제를 완화하려면 파일 내보내기로 전환하세요.
Java SDK를 사용하는 경우 BigQuery 테이블의 스키마를 나타내는 클래스를 만드는 것이 좋습니다. 그런 후 파이프라인에서
useBeamSchema
를 호출하여 Apache BeamRow
와 BigQueryTableRow
유형 간에 자동으로 변환을 수행합니다. 스키마 클래스의 예시는ExampleModel.java
를 참조하세요.
예시
이 섹션의 코드 예에서는 직접 테이블 읽기를 사용합니다.
내보내기 작업을 대신 사용하려면 withMethod
에 대한 호출을 생략하거나 Method.EXPORT
를 지정합니다. 그런 다음 --tempLocation
파이프라인 옵션을 설정하여 내보낸 파일에 대한 Cloud Storage 버킷을 지정합니다.
이 코드 예시에서는 소스 테이블에 다음 열이 있다고 가정합니다.
name
(문자열)age
(정수)
JSON 스키마 파일로 지정됩니다.
[
{"name":"user_name","type":"STRING","mode":"REQUIRED"},
{"name":"age","type":"INTEGER","mode":"REQUIRED"}
]
Avro 형식 레코드 읽기
BigQuery 데이터를 Avro 형식 레코드로 읽으려면 read(SerializableFunction)
메서드를 사용하세요. 이 메서드는 SchemaAndRecord
객체를 파싱하고 커스텀 데이터 유형을 반환하는 애플리케이션 정의 함수를 사용합니다. 커넥터의 출력은 커스텀 데이터 유형의 PCollection
입니다.
다음 코드는 MyData
가 애플리케이션 정의 클래스인 BigQuery 테이블에서 PCollection<MyData>
를 읽습니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
read
메서드는 Avro 레코드에서 커스텀 데이터 클래스로 변환하는 함수를 정의하는 SerializableFunction<SchemaAndRecord, T>
인터페이스를 사용합니다. 앞의 코드 예시에서는 MyData.apply
메서드가 이 변환 함수를 구현합니다. 예시 함수는 Avro 레코드에서 name
및 age
필드를 파싱하고 MyData
인스턴스를 반환합니다.
읽을 BigQuery 테이블을 지정하려면 이전 예시와 같이 from
메서드를 호출합니다. 자세한 내용은 BigQuery I/O 커넥터 문서의 테이블 이름을 참조하세요.
TableRow
객체 읽기
readTableRows
메서드는 BigQuery 데이터를 TableRow
객체의 PCollection
으로 읽습니다. 각 TableRow
는 테이블 데이터의 단일 행을 포함하는 키-값 쌍의 매핑입니다. from
메서드를 호출하여 읽을 BigQuery 테이블을 지정합니다.
다음 코드는 BigQuery 테이블에서 PCollection<TableRows>
을 읽습니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
이 예시에서는 TableRow
사전에서 값에 액세스하는 방법도 보여줍니다.
정수 값은 BigQuery의 내보낸 JSON 형식과 일치하도록 문자열로 인코딩됩니다.
열 투영 및 필터링
직접 읽기(Method.DIRECT_READ
)를 사용할 때는 BigQuery에서 읽어 네트워크를 통해 전송하는 데이터의 양을 줄여 읽기 작업을 더 효율적으로 만들 수 있습니다.
- 열 투영:
withSelectedFields
를 호출하여 테이블에서 열의 하위 집합을 읽습니다. 이렇게 하면 테이블에 열이 많은 경우 효율적으로 읽을 수 있습니다. - 행 필터링:
withRowRestriction
을 호출하여 서버 측에서 데이터를 필터링하는 조건자를 지정합니다.
필터 조건자는 확정적이어야 하며 집계는 지원되지 않습니다.
다음 예시에서는 "user_name"
및 "age"
열을 투영하고 "age > 18"
조건자와 일치하지 않는 행을 필터링합니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
쿼리 결과에서 읽기
앞의 예시는 테이블에서 행을 읽는 방법을 보여줍니다. fromQuery
를 호출하여 SQL 쿼리 결과에서 읽을 수도 있습니다. 이 접근 방식은 일부 계산 작업을 BigQuery로 이동합니다. 이 메서드를 사용하여 뷰에 대해 쿼리를 실행하여 BigQuery 뷰 또는 구체화된 뷰에서 읽을 수도 있습니다.
다음 예시에서는 BigQuery 공개 데이터 세트에 대해 쿼리를 실행하고 결과를 읽습니다. 파이프라인이 실행된 다음 BigQuery 작업 기록에서 쿼리 작업을 볼 수 있습니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
다음 단계
- BigQuery I/O 커넥터 문서 참조
- Google 제공 템플릿 목록 참조