BigQuery I/O

Dataflow SDK에는 Google BigQuery 테이블에서 데이터를 읽고 쓸 수 있는 읽기쓰기 변환이 기본 제공됩니다. 이름으로 지정한 전체 테이블을 읽거나 쿼리 문자열을 사용하여 부분 데이터를 읽을 수 있습니다.

BigQuery 테이블 이름 지정

BigQuery 테이블에서 읽거나 쓰려면 정규화된 BigQuery 테이블 이름을 제공해야 합니다. 정규화된 BigQuery 테이블 이름은 다음 세 부분으로 구성됩니다.

  • 프로젝트 ID: Google 클라우드 프로젝트의 ID입니다. 기본값을 파이프라인 옵션 객체에서 가져옵니다.
  • 데이터세트 ID: 특정 Cloud 프로젝트 내에서 고유한 BigQuery 데이터세트 ID입니다.
  • 테이블 ID: 특정 데이터세트 내에서 고유한 테이블 ID입니다.

자바

프로젝트 이름을 제공하지 않고 생략된 경우 다음과 같이 BigQueryIO를 사용할 수 있습니다. BigQueryIOPipelineOptions 객체의 기본 프로젝트를 사용합니다.

BigQuery 자바 클라이언트 APITableReference 유형의 객체를 사용하여 타겟 BigQuery 테이블을 식별합니다. 자바용 Dataflow SDK의 BigQueryIO 패키지에는 도우미 메서드, BigQuery 테이블 이름의 세 부분을 포함하는 String에서 TableReference를 형성하기 위해 사용할 수 있는 도우미 메서드 BigQueryIO.parseTableSpec가 포함되어 있습니다.

대부분의 경우 TableReference를 명시적으로 사용할 필요는 없습니다. BigQueryIO 변환에 대한 정적 팩토리 메서드는 테이블 이름을 String로 사용한 후 내부적으로 parseTableSpec을 사용하여 제공된 String에서 TableReference 객체를 생성합니다.

테이블 이름 문자열 형식

다음 형식 중 하나를 포함하는 문자열을 사용하여 대상 BigQuery 테이블을 지정할 수 있습니다.

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

자바

project_id를 생략할 수도 있습니다. project_id를 생략하면 Cloud Dataflow는 파이프라인 옵션 객체의 기본 프로젝트 ID를 사용합니다. 자바에서는 PipelineOptions.getProject를 사용하여 ID에 액세스할 수 있습니다.

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

BigQuery 테이블 행 및 스키마

자바

BigQueryIO 읽기 및 쓰기 변환은 다음과 같이 BigQuery TableRow 객체의 PCollection으로 데이터를 생성하고 소비합니다. TableRow는 BigQuery 자바 클라이언트 API의 일부이며 com.google.api.services.bigquery.model.TableRow 패키지에 포함되어 있습니다.

또한 BigQuery에 쓰기 작업을 수행하려면 대상 테이블에 쓰고자 하는 필드용 TableSchema 객체를 제공해야 합니다. BigQuery TableSchemaTableFieldSchema 클래스를 모두 사용해야 합니다. 이러한 클래스는 각각 com.google.api.services.bigquery.model.TableSchema 패키지와 com.google.api.services.bigquery.model.TableFieldSchema 패키지에 정의됩니다.

BigQuery에서 읽기

자바

BigQuery 테이블에서 읽으려면 apply 변환을 BigQueryIO.Read합니다. BigQueryIO.Read는 BigQuery TableRow 객체의 PCollection을 반환하며, PCollection의 각 요소는 테이블의 단일 행을 나타냅니다.

.from 작업을 사용하여 BigQuery 테이블 이름을 BigQueryIO.Read으로 제공함으로써 전체 BigQuery 테이블을 읽을 수 있습니다. 다음 예시 코드는 BigQueryIO.Read 변환을 적용하여 전체 BigQuery 테이블을 읽는 방법을 보여줍니다.

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

전체 테이블을 읽지 않으려면 .fromQuery 작업을 사용하여 쿼리 문자열을 BigQueryIO.Read에 제공할 수 있습니다. 다음 예시 코드는 쿼리 문자열을 사용하여 BigQuery 테이블에서 특정 필드를 읽는 방법을 보여줍니다.

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

또는 다음 예시와 같이 BigQuery의 표준 SQL 언어를 사용할 수 있습니다.

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

BigQuery에서 읽을 때는 BigQuery의 내보낸 JSON 형식과 일치하도록 TableRow 객체의 정수 값이 String으로 인코딩됩니다.

일괄 모드에서 BigQueryIO.Read BigQueryIO.Read 변환을 적용하면 Dataflow가 BigQuery 내보내기 요청을 호출합니다. Dataflow가 이 API를 사용하면 BigQuery의 할당량가격 책정 정책의 적용을 받습니다.

BigQuery에 쓰기

자바

BigQuery 테이블에 쓰려면 BigQueryIO.Write 변환을 apply합니다. PCollection<TableRow>에 변환을 apply해야 합니다.

일반적으로 출력 데이터를 BigQuery TableRow 객체의 컬렉션 형식으로 출력하려면 ParDo와 같은 다른 변환을 사용해야 합니다.

BigQueryIO.Write 변환을 생성할 때 대상 테이블을 기반으로 몇 가지 추가 정보를 제공해야 합니다. 테이블 이름 외에 다음을 제공해야 합니다.

  • 대상 테이블의 CreateDisposition. CreateDisposition은 대상 테이블이 존재하는지 아니면 쓰기 작업으로 생성될 수 있는지 여부를 지정합니다.
  • 대상 테이블의 WriteDisposition. WriteDisposition은 작성된 데이터가 기존 테이블을 대체할 수 있는지, 기존 테이블에 행을 추가할 수 있는지, 비어 있는 테이블에만 쓸 수 있는지 여부를 지정합니다.

또한 쓰기 작업으로 새 BigQuery 테이블을 생성할 때는 대상 테이블의 스키마 정보를 제공해야 합니다. 이때는 쓰기 작업으로 TableSchema 객체를 포함해야 합니다.

CreateDisposition

CreateDisposition은 대상 테이블이 없으면 BigQuery 쓰기 작업으로 테이블을 생성해야 하는지 여부를 제어합니다. BigQueryIO.Write 변환을 생성할 때 .withCreateDisposition 메서드를 호출하여 CreateDisposition을 지정합니다.

CreateDisposition은 다음과 같은 유효한 값을 포함하는 enum입니다.

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER: 테이블을 만들지 않도록 지정합니다. 대상 테이블이 없으면 쓰기 작업이 실패합니다.
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED: 테이블이 없으면 쓰기 작업으로 새 테이블을 만들도록 지정합니다. 이 값을 사용할 때는 .withSchema 작업을 사용하여 테이블 스키마도 제공해야합니다. CREATE_IF_NEEDED가 기본 동작입니다.

CREATE_IF_NEEDEDCreateDisposition으로 지정하고 TableSchema를 제공하지 않으면 대상 테이블이 없을 경우 런타임시 변환이 실패하고 java.lang.IllegalArgumentException이 발생할 수 있습니다.

WriteDisposition

WriteDisposition은 BigQuery 쓰기 작업이 기존 테이블에 적용되는 방식을 제어합니다. BigQueryIO.Write 변환을 생성할 때 .withWriteDisposition 메서드를 호출하여 WriteDisposition을 지정합니다.

WriteDisposition은 다음과 같은 유효한 값을 포함하는 enum입니다.

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE: 쓰기 작업이 기존 테이블을 대체하도록 지정합니다. 대상 테이블의 기존 행이 삭제되고 새 행이 테이블에 추가됩니다.
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND: 쓰기 작업이 기존 테이블의 끝에 행을 추가하도록 지정합니다.
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY: 대상 테이블이 비어 있지 않으면 런타임시 쓰기 작업이 실패하도록 지정합니다. WRITE_EMPTY가 기본 동작입니다.

WriteDispositionWRITE_EMPTY를 사용할 때는 대상 테이블이 비어 있는지 확인한 후 실제 쓰기 작업을 수행해야 합니다. 또한 이러한 검사로 인해 테이블에 대한 파이프라인의 독점적인 액세스가 보장되지 않습니다. 두 프로그램이 동시에 실행되고 있을 때 WRITE_EMPTYWriteDisposition로 설정된 동일한 출력 테이블에 쓰려고 하면 둘 다 성공할 수 있습니다.

새 테이블에 쓰기 위해 TableSchema 만들기

BigQuery 쓰기 작업에서 새 테이블을 만드는 경우, 스키마 정보를 제공해야 합니다. TableSchema 객체를 생성하여 스키마 정보를 제공합니다. BigQueryIO.Write 변환을 생성할 때 .withSchema 작업을 사용하여 TableSchema를 전달합니다.

TableSchema 객체는 TableFieldSchema 유형의 객체를 사용하여 테이블의 각 필드에 대한 정보를 포함합니다. 먼저 테이블에 필드 List를 생성하여 TableSchema를 구성합니다. 그런 다음 TableSchema를 구성할 때 .setFields 작업을 사용하여 목록을 전달합니다.

다음 예시 코드는 String 유형의 필드 두 개가 있는 테이블용 TableSchema를 구성하는 방법을 보여줍니다.

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

BigQueryIO.Write 변환 적용

다음 예시 코드는 BigQueryIO.Write 변환을 apply하여 BigQuery 테이블에 PCollection<TableRow>을 쓰는 방법을 보여줍니다. 쓰기 작업은 필요하면 테이블을 만들고, 테이블이 이미 있으면 기존 테이블을 대체합니다.

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam™은 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 상표입니다.