Bigtable HBase Beam 커넥터

Dataflow 파이프라인에서 Bigtable을 사용하려면 오픈소스 Bigtable Beam I/O 커넥터 2개를 사용하면 됩니다.

HBase에서 Bigtable로 마이그레이션하거나 애플리케이션에서 HBase API를 호출하는 경우 이 페이지에 설명된 Bigtable HBase Beam 커넥터(CloudBigtableIO)를 사용합니다.

다른 모든 경우에는 Bigtable Beam 커넥터(BigtableIO)를 Cloud Bigtable API에서 작동하는 Java용 Cloud Bigtable 클라이언트와 함께 사용해야 합니다. 이 커넥터를 사용하려면 Bigtable Beam 커넥터를 참조하세요.

Apache Beam 프로그래밍 모델에 대한 자세한 내용은 Beam 문서를 참조하세요.

HBase 시작하기

Bigtable HBase Beam 커넥터는 Java로 작성되며 Java용 Bigtable HBase 클라이언트에 빌드됩니다. 또한 Apache Beam 기반의 Java용 Dataflow SDK 2.x와 호환됩니다. 커넥터의 소스 코드는 GitHub 저장소 googleapis/java-bigtable-hbase에 있습니다.

이 페이지에서는 ReadWrite 변환을 사용하는 방법을 간략하게 설명합니다.

인증 설정

로컬 개발 환경에서 이 페이지의 Java 샘플을 사용하려면 gcloud CLI를 설치 및 초기화한 다음 사용자 인증 정보로 애플리케이션 기본 사용자 인증 정보를 설정하세요.

  1. Google Cloud CLI를 설치합니다.
  2. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  3. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login

자세한 내용은 로컬 개발 환경의 인증 설정를 참조하세요.

프로덕션 환경의 인증 설정에 대한 자세한 내용은 Google Cloud에서 실행되는 코드에 대해 애플리케이션 기본 사용자 인증 정보를 설정합니다.를 참조하세요.

Maven 프로젝트에 커넥터 추가

Bigtable HBase Beam 커넥터를 Maven 프로젝트에 추가하려면 Maven 아티팩트를 pom.xml 파일에 종속 항목으로 추가합니다.

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Bigtable 구성 지정

파이프라인 실행을 위한 입력을 허용하는 옵션 인터페이스를 만듭니다.

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Bigtable에서 읽거나 쓰는 경우 CloudBigtableConfiguration 구성 객체를 제공해야 합니다. 이 객체는 테이블의 프로젝트 ID, 인스턴스 ID, 이름을 제공합니다.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

읽기의 경우 읽기 결과를 제한하고 필터링하는 Apache HBase Scan 객체를 지정할 수 있는 CloudBigtableScanConfiguration 구성 객체를 제공합니다. 자세한 내용은 Bigtable에서 읽기를 참조하세요.

Bigtable에서 읽기

Bigtable 테이블에서 읽으려면 CloudBigtableIO.read 작업 결과에 Read 변환을 적용합니다. Read 변환은 HBase Result 객체의 PCollection을 반환합니다. PCollection의 각 요소는 테이블의 단일 행을 나타냅니다.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

기본적으로 CloudBigtableIO.read 작업은 테이블의 모든 행을 반환합니다. HBase Scan 객체를 사용하여 읽기를 테이블 내 특정 row key 범위로 제한하거나 읽기 결과에 필터를 적용할 수 있습니다. Scan 객체를 사용하려면 CloudBigtableScanConfiguration에 포함합니다.

예를 들어 테이블의 각 행에서 첫 번째 키-값 쌍만 반환하는 Scan을 추가할 수 있습니다. 이는 테이블의 행 수를 계산할 때 유용합니다.

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Bigtable에 쓰기

Bigtable 테이블에 쓰려면 CloudBigtableIO.writeToTable 작업을 apply합니다. Put 객체와 Delete 객체가 포함될 수 있는 HBase Mutation 객체의 PCollection에서 이 작업을 수행해야 합니다.

Bigtable 테이블이 이미 있고 적합한 column family가 정의되어 있어야 합니다. Dataflow 커넥터는 테이블과 column family를 즉석에서 만들지 않습니다. cbt CLI를 사용하여 테이블을 만들고 column family를 설정하거나 프로그래매틱 방식으로 수행할 수 있습니다.

Bigtable에 쓰기 전에 네트워크를 통해 Put과 Delete를 직렬화할 수 있도록 Dataflow 파이프라인을 만들어야 합니다.

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

일반적으로 출력 데이터 형식을 HBase Put 또는 Delete 객체의 컬렉션으로 지정하려면 ParDo와 같은 변환을 수행해야 합니다. 다음은 현재 값을 가져와 Put의 row key로 사용하는 DoFn 변환을 보여주는 예시입니다. 그런 다음 Bigtable에 Put 객체를 쓸 수 있습니다.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

일괄 쓰기 흐름 제어를 사용 설정하려면 BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROLtrue로 설정합니다. 이 기능은 일괄 쓰기 요청의 트래픽 비율을 자동으로 제한하며 Bigtable 자동 확장에서 Dataflow 작업을 처리하기 위해 노드를 자동으로 추가하거나 삭제할 수 있게 해줍니다.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

다음은 일괄 쓰기 흐름 제어를 사용 설정하는 변형을 포함한 전체 쓰기 예시입니다.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}