Cloud Bigtable 用の Dataflow コネクタ

Cloud Bigtable 用の Cloud Dataflow コネクタにより、Cloud Dataflow パイプライン内で Cloud Bigtable を使用できるようになります。このコネクタは、一括オペレーションとストリーミング オペレーションの両方に使用できます。

このコネクタは Java で書かれており、Java 用 Cloud Bigtable HBase クライアント上でビルドされます。これは、Apache Beam ベースの Dataflow SDK 2.x for Java と互換性があります。コネクタのソースコードは、GitHub の googleapis/java-bigtable-hbase リポジトリにあります。

このページでは、Cloud Dataflow コネクタで Read 変換と Write 変換を使用する方法について説明します。Cloud Dataflow コネクタの API の詳細ドキュメントもご覧ください。

Maven プロジェクトへのコネクタの追加

Cloud Dataflow コネクタを Maven プロジェクトに追加するには、Maven アーティファクトを pom.xml ファイルに依存ファイルとして追加します。

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>1.16.0</version>
</dependency>

Cloud Bigtable 構成の指定

パイプラインを実行するための入力を許可するオプション インターフェースを作成します。

public interface BigtableOptions extends DataflowPipelineOptions {
  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("bigtable-table")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Cloud Bigtable に対して読み書きを実行する際には、CloudBigtableConfiguration 構成オブジェクトを指定する必要があります。このオブジェクトには、使用するテーブルのプロジェクト ID とインスタンス ID、およびテーブル自体の名前が指定されます。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

読み取り用に CloudBigtableScanConfiguration 構成オブジェクトを指定します。これにより、読み取りの結果を制限およびフィルタリングする Apache HBase Scan オブジェクトを指定できます。詳細については、Cloud Bigtable からの読み取りをご覧ください。

Cloud Bigtable からの読み取り

Cloud Bigtable テーブルから読み取るには、CloudBigtableIO.read オペレーションの結果に Read 変換を適用します。Read 変換は、HBase Result オブジェクトの PCollection を返します。PCollection 内の各要素はテーブル内の 1 行を表します。

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

デフォルトでは、CloudBigtableIO.read オペレーションはテーブル内のすべての行を返します。HBase Scan オブジェクトを使用すると、読み取りの対象をテーブル内の特定範囲の行キーに制限することや、読み取り結果へのフィルタの適用ができます。Scan オブジェクトを使用するには、そのオブジェクトを CloudBigtableScanConfiguration に含めます。

たとえば、テーブルの各行から最初の Key-Value ペアのみを返す Scan を追加できます。これは、テーブル内の行数をカウントする際に有用です。

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Cloud Bigtable への書き込み

Cloud Bigtable テーブルに書き込むには、CloudBigtableIO.writeToTable オペレーションに apply を行います。このオペレーションは、HBase Mutation オブジェクトの PCollection に行う必要があります。ここには、Put オブジェクト Delete オブジェクトを含めることができます。

Cloud Bigtable テーブルがすでに存在し、適切な列ファミリーが定義されている必要があります。Dataflow コネクタによってテーブルや列ファミリーが動的に作成されることはありません。テーブルを作成して列ファミリーを設定するには、cbt コマンドライン ツールを使用できます。また、プログラムによって行うこともできます。

Cloud Bigtable に書き込むには、事前に、Cloud Dataflow パイプラインを作成して、ネットワーク経由の put オペレーションと delete オペレーションがシリアライズされるようにしておく必要があります。

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

一般に、出力データを HBase Put または HBase Delete オブジェクトのコレクションとしてフォーマットするには、ParDo などの変換を実行する必要があります。以下に簡単な DoFn 変換の例を示します。この例では、現在値を受け取って、それを Put の行キーとして使用しています。これで、Put オブジェクトを Cloud Bigtable に書き込むことができます。

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

書き込みの例の全体は次のとおりです。

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}