Dataflow에서 Apache Iceberg로 쓰기

Dataflow에서 Apache Iceberg로 쓰려면 관리형 I/O 커넥터를 사용합니다.

관리형 I/O는 Apache Iceberg의 다음 기능을 지원합니다.

카탈로그
읽기 기능 일괄 읽기
쓰기 기능
  • 일괄 쓰기
  • 스트리밍 쓰기
  • 동적 대상
  • 동적 테이블 생성

Apache Iceberg용 BigQuery 테이블의 경우 BigQuery Storage API와 함께 BigQueryIO 커넥터를 사용합니다. 테이블이 이미 있어야 합니다. 동적 테이블 생성은 지원되지 않습니다.

종속 항목

다음 종속 항목을 프로젝트에 추가합니다.

자바

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

구성

관리형 I/O는 Apache Iceberg에 다음 구성 매개변수를 사용합니다.

읽기 및 쓰기 구성 데이터 유형 설명
table 문자열 Apache Iceberg 테이블의 식별자입니다. 예: "db.table1"
catalog_name 문자열 카탈로그의 이름입니다. 예: "local"
catalog_properties 지도 Apache Iceberg 카탈로그의 구성 속성 맵입니다. 필요한 속성은 카탈로그에 따라 달라집니다. 자세한 내용은 Apache Iceberg 문서의 CatalogUtil을 참조하세요.
config_properties 지도 Hadoop 구성 속성의 선택적 집합입니다. 자세한 내용은 Apache Iceberg 문서의 CatalogUtil을 참조하세요.
쓰기 구성 데이터 유형 설명
triggering_frequency_seconds 정수 스트리밍 쓰기 파이프라인의 경우 싱크에서 스냅샷을 생성하려고 시도하는 빈도(초)입니다.

동적 대상

Apache Iceberg용 관리형 I/O는 동적 대상을 지원합니다. 커넥터는 고정된 단일 테이블에 쓰는 대신 수신 레코드 내 필드 값을 기반으로 대상 테이블을 동적으로 선택할 수 있습니다.

동적 대상을 사용하려면 table 구성 매개변수의 템플릿을 제공합니다. 자세한 내용은 동적 대상을 참고하세요.

다음 예시에서는 메모리 내 JSON 데이터를 Apache Iceberg 테이블에 씁니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

다음 단계