BigQuery I/O

Dataflow SDK에는 Google BigQuery 테이블에서 데이터를 읽고 쓸 수 있는 읽기쓰기 변환이 기본 제공됩니다. 이름으로 지정한 전체 테이블을 읽거나 쿼리 문자열을 사용하여 부분 데이터를 읽을 수 있습니다.

BigQuery 테이블 이름 지정

BigQuery 테이블에서 읽거나 쓰려면 정규화된 BigQuery 테이블 이름을 제공해야 합니다. 정규화된 BigQuery 테이블 이름은 다음 세 부분으로 구성됩니다.

  • 프로젝트 ID: Google 클라우드 프로젝트의 ID입니다. 기본값을 파이프라인 옵션 객체에서 가져옵니다.
  • 데이터세트 ID: 특정 Cloud 프로젝트 내에서 고유한 BigQuery 데이터세트 ID입니다.
  • 테이블 ID: 특정 데이터세트 내에서 고유한 테이블 ID입니다.

자바

프로젝트 이름을 제공하지 않고 BigQueryIO를 사용할 수 있습니다. 생략할 경우, BigQueryIOPipelineOptions 객체의 기본 프로젝트를 사용합니다.

BigQuery 자바 Client APITableReference 유형의 객체를 받아들여 타겟 BigQuery 테이블을 식별합니다. 자바용 Dataflow SDK의 BigQueryIO 패키지에는 지원 메소드 BigQueryIO.parseTableSpec이 포함되어 있습니다. 이 메소드를 사용하면 BigQuery 테이블 이름의 세 가지 부분을 포함하는 String에서 TableReference를 생성할 수 있습니다.

대부분의 경우, TableReference 객체를 명시적으로 사용할 필요는 없습니다. BigQueryIO 변환의 정적 팩토리 메소드는 테이블 이름을 String으로 받아들인 후 parseTableSpec을 내부적으로 사용하여 제공된 String에서 TableReference 객체를 생성합니다.

테이블 이름 문자열 형식

다음 형식 중 하나를 포함하는 문자열을 사용하여 대상 BigQuery 테이블을 지정할 수 있습니다.

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

자바

project_id를 생략할 수도 있습니다. project_id를 생략하면 Cloud Dataflow는 파이프라인 옵션 객체의 기본 프로젝트 ID를 사용합니다. 자바에서는 PipelineOptions.getProject로 ID에 액세스할 수 있습니다.

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

BigQuery 테이블 행 및 스키마

자바

BigQueryIO 읽기 및 쓰기 변환은 BigQuery TableRow 객체의 PCollection으로 데이터를 생성하고 소비합니다. TableRowcom.google.api.services.bigquery.model.TableRow 패키지의 BigQuery 자바 Client API의 일부입니다.

또한 BigQuery에 쓸 때 대상 테이블에 쓸 필드에 TableSchema 객체를 제공해야 합니다. BigQuery TableSchemaTableFieldSchema 클래스를 모두 사용해야 합니다. 이러한 클래스는 com.google.api.services.bigquery.model.TableSchemacom.google.api.services.bigquery.model.TableFieldSchema 패키지에 각각 정의되어 있습니다.

BigQuery에서 읽기

자바

BigQuery 테이블에서 읽으려면 BigQueryIO.Read 변환을 apply합니다. BigQueryIO.Read는 BigQuery TableRow 객체의 PCollection을 반환하며, 이때 PCollection의 각 요소는 테이블의 단일 행을 나타냅니다.

.from 연산자를 사용하여 BigQueryIO.Read에 BigQuery 테이블 이름을 제공하여 전체 BigQuery 테이블을 읽을 수 있습니다. 다음 예제 코드는 BigQueryIO.Read 변환을 적용하여 전체 BigQuery 테이블을 읽는 방법을 보여줍니다.

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

전체 테이블을 읽지 않으려는 경우, .fromQuery 연산자를 사용하여 BigQueryIO.Read에 쿼리 문자열을 제공할 수 있습니다. 다음 예 코드는 쿼리 문자열을 사용하여 BigQuery 테이블에서 특정 필드를 읽는 방법을 보여줍니다.

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

또는 다음 예와 같이 BigQuery의 표준 SQL 언어를 사용할 수 있습니다.

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

BigQuery에서 읽을 때 TableRow 객체의 정수 값은 BigQuery의 내보낸 JSON 형식과 일치하도록 String으로 인코딩됩니다.

일괄 모드에서 BigQueryIO.Read 변환을 적용하면 Dataflow가 BigQuery 내보내기 요청을 호출합니다. Dataflow가 이 API를 사용하면 BigQuery의 할당량가격 정책의 적용을 받습니다.

BigQuery에 쓰기

자바

BigQuery 테이블에 쓰려면 BigQueryIO.Write 변환을 apply합니다. 변환을 PCollection<TableRow>apply해야 합니다.

일반적으로 출력 데이터를 BigQuery TableRow 객체의 컬렉션으로 형식을 지정하려면 ParDo와 같은 다른 변환을 사용해야 합니다.

BigQueryIO.Write 변환을 생성하면 대상 테이블에 따라 몇 가지 추가 정보를 제공해야 합니다. 테이블 이름 외에 다음을 제공해야 합니다.

  • 대상 테이블의 CreateDisposition. CreateDisposition은 대상 테이블이 존재해야 하는지 또는 쓰기 작업으로 생성할 수 있는지 여부를 지정합니다.
  • 대상 테이블의 WriteDisposition. WriteDisposition은 쓰는 데이터가 기존 테이블을 대체할 지, 행을 기존 테이블에 추가할 지 또는 테이블에만 쓸 지 여부를 지정합니다.

또한 쓰기 작업으로 새 BigQuery 테이블을 생성하는 경우, 대상 테이블의 스키마 정보를 제공해야 합니다. 이 경우, 쓰기 작업과 함께 TableSchema 객체를 포함해야 합니다.

CreateDisposition

CreateDisposition은 대상 테이블이 없는 경우에 BigQuery 쓰기 작업이 테이블을 생성할지 여부를 제어합니다. .withCreateDisposition 메소드를 호출하여 BigQueryIO.Write 변환을 생성할 때 CreateDisposition을 지정합니다.

CreateDisposition은 다음과 같은 유효한 값을 가진 enum입니다.

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER: 테이블을 만들지 않도록 지정합니다. 대상 테이블이 없으면 쓰기 작업이 실패합니다.
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED: 대상 테이블이 없는 경우, 쓰기 작업이 새 테이블을 만들도록 지정합니다. 이 값을 사용하는 경우, .withSchema 연산자를 사용하여 테이블 스키마도 제공해야 합니다. CREATE_IF_NEEDED가 기본 동작입니다.

CREATE_IF_NEEDEDCreateDisposition으로 지정하고 TableSchema를 제공하지 않은 경우, 타겟 테이블이 없으면 java.lang.IllegalArgumentException과 함께 런타임에서 변환이 실패할 수 있습니다.

WriteDisposition

WriteDisposition은 BigQuery 쓰기 작업이 기존 테이블에 적용되는 방법을 제어합니다. WriteDisposition 메소드를 호출하여 BigQueryIO.Write 변환을 생성할 때 .withWriteDisposition을 지정합니다.

WriteDisposition은 다음과 같은 유효한 값을 가진 enum입니다.

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE: 쓰기 작업이 기존 테이블을 대체하도록 지정합니다. 대상 테이블의 기존 행이 삭제되고 새 행이 테이블에 추가됩니다.
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND: 쓰기 작업이 기존 테이블의 끝에 행을 추가하도록 지정합니다.
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY: 대상 테이블이 비어 있지 않으면 런타임에 쓰기 작업이 실패하도록 지정합니다. WRITE_EMPTY가 기본 동작입니다.

WriteDispositionWRITE_EMPTY를 사용하면 실제 쓰기 작업보다 훨씬 먼저 대상 테이블이 비어 있는지 여부에 대한 검사가 수행될 수 있습니다. 또한 이러한 검사로 인해 테이블에 대한 파이프라인의 독점적인 액세스가 보장되지 않습니다. 동시에 실행되는 두 프로그램이 WRITE_EMPTYWriteDisposition인 동일한 출력 테이블에 쓰려고 시도하면 두 프로그램 모두 성공할 수 있습니다.

새 테이블에 쓰기 위해 TableSchema 만들기

BigQuery 쓰기 작업에서 새 테이블을 만드는 경우, 스키마 정보를 제공해야 합니다. TableSchema 객체를 만들어 스키마 정보를 제공합니다. BigQueryIO.Write 변환을 생성할 때 .withSchema 연산자를 사용하여 TableSchema를 전달합니다.

TableSchema 객체는 TableFieldSchema 유형의 객체를 사용하여 테이블의 각 필드의 정보를 포함합니다. 먼저 테이블에 필드 List를 작성하여 TableSchema를 생성합니다. 그런 다음 TableSchema를 생성할 때 .setFields 연산자를 사용하여 리스트를 전달합니다.

다음 예 코드에서는 String 유형의 필드가 두 개 있는 테이블에 대해 TableSchema를 생성하는 방법을 보여줍니다.

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

BigQueryIO.Write 변환 적용

다음 예 코드는 BigQueryIO.Write 변환을 apply하여 PCollection<TableRow>를 BigQuery 테이블에 쓰는 방법을 보여줍니다. 필요한 경우, 쓰기 작업은 테이블을 만듭니다. 테이블이 이미 있으면 대체됩니다.

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam™은 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 상표입니다.
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.