Cloud Bigtable용 Dataflow 커넥터

Cloud Bigtable용 Cloud Dataflow 커넥터를 사용하면 Cloud Dataflow 파이프라인에 Cloud Bigtable을 사용할 수 있습니다. 일괄 작업과 스트리밍 작업에 커넥터를 사용할 수 있습니다.

커넥터는 자바로 작성되며 자바용 Cloud Bigtable HBase 클라이언트에 빌드됩니다. 또한 Apache Beam 기반의 자바용 Dataflow SDK 2.x와 호환됩니다. 커넥터의 소스 코드는 GitHub 저장소 googleapis/java-bigtable-hbase에 있습니다.

이 페이지에서는 Cloud Dataflow 커넥터와 함께 Read 변환과 Write 변환을 사용하는 방법을 간략하게 설명합니다. Cloud Dataflow 커넥터에 대한 자세한 내용은 전체 API 문서를 참조하세요.

Maven 프로젝트에 커넥터 추가

Cloud Dataflow 커넥터를 Maven 프로젝트에 추가하려면 Maven 아티팩트를 pom.xml 파일에 종속 항목으로 추가합니다.

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>1.16.0</version>
</dependency>

Cloud Bigtable 구성 지정

파이프라인 실행을 위한 입력을 허용하는 옵션 인터페이스를 만듭니다.

public interface BigtableOptions extends DataflowPipelineOptions {
  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("bigtable-table")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Cloud Bigtable에서 읽거나 쓰는 경우 CloudBigtableConfiguration 구성 객체를 제공해야 합니다. 이 객체는 테이블의 프로젝트 ID, 인스턴스 ID, 이름을 제공합니다.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

읽기의 경우 읽기 결과를 제한하고 필터링하는 Apache HBase Scan 객체를 지정할 수 있는 CloudBigtableScanConfiguration 구성 객체를 제공합니다. 자세한 내용은 Cloud Bigtable에서 읽기를 참조하세요.

Cloud Bigtable에서 읽기

Cloud Bigtable 테이블에서 읽으려면 CloudBigtableIO.read 작업 결과에 Read 변환을 적용합니다. Read 변환은 HBase Result 객체의 PCollection을 반환합니다. PCollection의 각 요소는 테이블의 단일 행을 나타냅니다.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

기본적으로 CloudBigtableIO.read 작업은 테이블의 모든 행을 반환합니다. HBase Scan 객체를 사용하여 읽기를 테이블 내 특정 row key 범위로 제한하거나 읽기 결과에 필터를 적용할 수 있습니다. Scan 객체를 사용하려면 CloudBigtableScanConfiguration에 포함합니다.

예를 들어 테이블의 각 행에서 첫 번째 키-값 쌍만 반환하는 Scan을 추가할 수 있습니다. 이는 테이블의 행 수를 계산할 때 유용합니다.

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Cloud Bigtable에 쓰기

Cloud Bigtable 테이블에 쓰려면 CloudBigtableIO.writeToTable 작업을 apply합니다. Put 객체와 Delete 객체가 포함될 수 있는 HBase Mutation 객체의 PCollection에서 이 작업을 수행해야 합니다.

Cloud Bigtable 테이블이 이미 있고 적합한 column family가 정의되어 있어야 합니다. Dataflow 커넥터는 테이블과 column family를 즉석에서 만들지 않습니다. cbt 명령줄 도구를 사용하여 테이블을 만들고 column family를 설정하거나 이 작업을 프로그래매틱 방식으로 수행할 수 있습니다.

Cloud Bigtable에 쓰기 전에 네트워크를 통해 Put과 Delete를 직렬화할 수 있도록 Cloud Dataflow 파이프라인을 만들어야 합니다.

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

일반적으로 출력 데이터 형식을 HBase Put 또는 Delete 객체의 컬렉션으로 지정하려면 ParDo와 같은 변환을 수행해야 합니다. 다음은 현재 값을 가져와 Put의 row key로 사용하는 간단한 DoFn 변환을 보여주는 예시입니다. 그런 다음 Cloud Bigtable에 Put 객체를 쓸 수 있습니다.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

다음은 전체 쓰기 예시입니다.

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}